Spring Batch – Dynamically map table from database to another


I needed to copy data from a table to a table in another database with the same fields and the datatype layout. Below is the code reference that I used for the same with Spring Boot and Spring Batch.

The input query file ,the target table are the input parameters here.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import javax.sql.DataSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Bean
@StepScope
public ItemReader < Map < String, Object >> reader(DataSource dataSource1, ApplicationArguments args) throws IOException {
JdbcCursorItemReader < Map < String, Object >> reader = new JdbcCursorItemReader < > ();
reader.setDataSource(dataSource1);

// Read the input SQL query from the specified text file
String queryTextFile = args.getOptionValues("sqlFile").get(0);
String sqlQuery = new String(Files.readAllBytes(Paths.get(queryTextFile)));

reader.setSql(sqlQuery);
reader.setRowMapper(new ColumnMapRowMapper());
return reader;
}

@Bean
public ItemProcessor < Map < String, Object > , Map < String, Object >> processor(ApplicationArguments args) {
String targetTableName = args.getOptionValues("outputTable").get(0);
return new MyDataItemProcessor(targetTableName);
}

@Bean
public ItemWriter < Map < String, Object >> writer(DataSource dataSource2) {
return new MyDataItemWriter(dataSource2);
}

@Bean
public Step step1(
StepBuilderFactory stepBuilderFactory,
ItemReader < Map < String, Object >> reader,
ItemProcessor < Map < String, Object > , Map < String, Object >> processor,
ItemWriter < Map < String, Object >> writer) {
return stepBuilderFactory.get("step1")
. < Map < String, Object > , Map < String, Object >> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}

@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step step1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}

public static void main(String[] args) {
SpringApplication.run(BatchConfiguration.class, args);
}
}

class ColumnMapRowMapper implements RowMapper < Map < String, Object >> {
@Override
public Map < String,
Object > mapRow(ResultSet resultSet, int rowNum) throws SQLException {
Map < String, Object > row = new HashMap < > ();
row.put("source_column1", resultSet.getObject("column1"));
row.put("source_column2", resultSet.getObject("column2"));
row.put("source_column3", resultSet.getObject("column3"));
// Add more columns as needed
return row;
}
}

class MyDataItemProcessor implements ItemProcessor < Map < String, Object > , Map < String, Object >> {

private final String targetTableName;

public MyDataItemProcessor(String targetTableName) {
this.targetTableName = targetTableName;
}

@Override
public Map < String,
Object > process(Map < String, Object > item) throws Exception {
Map < String, Object > mappedItem = new HashMap < > ();

// Implement your dynamic column mapping logic here
// Example: mappedItem.put("target_column1", item.get("source_column1"));
// ... Map other columns as needed

return mappedItem;
}
}

class MyDataItemWriter implements ItemWriter < Map < String, Object >> {

private final NamedParameterJdbcTemplate jdbcTemplate;
private final String targetTableName;

public MyDataItemWriter(DataSource dataSource2) {
this.jdbcTemplate = new NamedParameterJdbcTemplate(dataSource2);
// Initialize the target table name here if needed
this.targetTableName = "target_table_name";
}

@Override
public void write(List < ? extends Map < String, Object >> items) throws Exception {
// Implement your custom write logic here
// Example: for (Map<String, Object> item : items) {
// String sql = "INSERT INTO " + targetTableName + " (...) VALUES (...)";
// jdbcTemplate.update(sql, item);
// }
}
}

@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

In this code:We’ve added a main method in the Application class to run the Spring Boot application.We use ApplicationArguments to retrieve command line arguments for the input SQL query text file (–sqlFile) and the output table name (–outputTable).The input SQL query is read from the specified text file and passed to the item reader.The output table name is passed to the item processor to determine the target table for the data.You can run this Spring Boot application by providing the required command line arguments when executing the JAR file. For example:

java -jar your-app.jar --sqlFile=/path/to/sql/query.sql --outputTable=target_table_name

Replace your-app.jar, /path/to/sql/query.sql, and target_table_name with your actual JAR file name, SQL query file path, and target table name.