Spring Batch Course 11- Skip invalid Records


  • Skip policy is used to skip the records that are failed, otherwise this will cause the job to abort. The skip policy will allow the Spring Batch to complete the job for the records that are valid and skip the invalid records.
  • We can set the skipListener to handle the skipped records either to put them in a file or into a table in the database so that the support team can look into the failures and recover/reprocess or fix the invalid records.
  • To enable the skip we need to the following –
    • Add .faultTolerant() into the StepBuilder. This is to enable the fault tolerant policies like Skip, Retry and Restart.
    • Add .skip(Exception.class) after that to let the step know what are the exceptions that need to be skipped.
    • Add .skipLimit(<<limit>>) to restrict the number of skipped records. If the number of skipped records exceed this count then the step will abort.
    • We can also add .skipPolicy() to apply custom/out of box skip policies instead of .skipLimit
      • new AlwaysSkipItemSkipPolicy() will skip all the items that are in error.
    • To handle skip records add .listener(<<ListenerClassName>>) into the step Builder.
      • A custom skip listener can also be created where you can handle custom events to handle the Read, Write,Process errors like writing it to a file or a DB.
        • Create new class for a custom Skip policy implementing the SkipListener.
        • OnSkipInRead — Use this to specify how to handle read exceptions.
        • OnSkipInProcess — Use this to specify how to handle process exceptions.
        • OnSkipInWrite — Use this to specify how to handle write exceptions.

Spring Batch – Get StepExecution details of the previous step in the tasklet that runs in the next step.


Usually when I need to print the Step Summary information of the Step Execution Information of my Chunk Based step, I create a tasklet step immediately after the chunk based step to print the summary of the main step like number of records processed, records skipped, records committed etc.

I use the below approach to get the details of the step information of the previous step executed.

@Component
public class SummaryTaskletClass implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        JobExecution jobExecution = chunkContext
                                    .getStepContext()
                                    .getStepExecution()
                                    .getJobExecution();// Get the job execution from the chunkContext
        StepExecution stepExecution=null;
// jobExecution.getStepExecutions() will give all the step execution under a job
// Loop through them and get the filter them based on the Step Name you want to select
        for (StepExecution stepDetail:jobExecution.getStepExecutions()) {
            if(stepDetail.getStepName().equals("PreviousStepName")){
                stepExecution = stepDetail;
                break;
            }
        };
        System.out.println("********************************************************************************");
        System.out.println("**************          Job Run Statistics Summary                  ************");
        System.out.println("********************************************************************************");
        System.out.println("******* Load Start Time   : "+stepExecution.getStartTime());
        System.out.println("******* Load End Time     : "+stepExecution.getEndTime());
        System.out.println("******* Commit Count      : "+stepExecution.getCommitCount());
        System.out.println("******* Read Count        : "+stepExecution.getReadCount());
        System.out.println("******* Filter Count      : "+stepExecution.getFilterCount());
        System.out.println("******* Write Count       : "+stepExecution.getWriteCount());
        System.out.println("******* Read Skip Count   : "+stepExecution.getReadSkipCount());
        System.out.println("******* Write Skip Count  : "+stepExecution.getWriteSkipCount());
        System.out.println("******* Process Skip Count: "+stepExecution.getProcessSkipCount());
        System.out.println("******* Rollback Count    : "+stepExecution.getRollbackCount());
        System.out.println("******* Exit Code         : "+stepExecution.getExitStatus().getExitCode());
        System.out.println("******* Exit Message      : "+stepExecution.getExitStatus().getExitDescription());
        System.out.println("********************************************************************************");
        System.out.println("********************************************************************************");
        return RepeatStatus.FINISHED;
    }
}

Spring Batch – Job Repository Tables are not getting created


Below is the reference –>

https://stackoverflow.com/questions/59398894/spring-batch-job-repository-tables-are-not-getting-created-in-the-schema-i-confi

A new configuration class needs to be created which calls the sql scripts of the job tables and runs the same.

@Configuration
@Profile({"dev","prod"})
public class JobRepositorySchemaConfig {
private final String JOB_REPO_SCHEMA = "classpath:batch_repo_schema.sql";

@Autowired
@Qualifier("secondDatasource")
DataSource datasource;

@Autowired
WebApplicationContext webApplicationContext;

@PostConstruct
public void loadIfInMemory() throws Exception {
    Resource resource = webApplicationContext.getResource("classpath:/org/springframework/batch/core/schema-drop-hsqldb.sql");
    Resource resource2 = webApplicationContext.getResource("classpath:/org/springframework/batch/core/schema-hsqldb.sql");
    ScriptUtils.executeSqlScript(datasource.getConnection(), resource);
    ScriptUtils.executeSqlScript(datasource.getConnection(), resource2);
}}

Connecting to Oracle Database in Java using Wallets


Below is the reference —

https://stackoverflow.com/questions/7634196/what-is-correct-jdbc-url-syntax-if-oracle-wallets-are-used

I had missed to add the additional imports and set the oracle.net.wallet_location properly. I set the property using the command and restarted the job to fix the issue.

System.setProperty("oracle.net.tns_admin",tnsAdminEnv);
System.setProperty("oracle.net.wallet_location",tnsAdminEnv);

Libraries imported –>

		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/oraclepki -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>oraclepki</artifactId>
			<version>23.2.0.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/osdt_cert -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>osdt_cert</artifactId>
			<version>21.11.0.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/osdt_core -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>osdt_core</artifactId>
			<version>21.11.0.0</version>
		</dependency>

Solved : PKI-02010: Invalid MAC for Wallet. Wallet verification failed


Below is the reference –

https://nagulu1.rssing.com/chan-70145935/article11.html

As mentioned in the article using special characters in the password will cause this issue to occur. I had provided the password as pa$$word1. Changed this to Password12345 and the issue is fixed.

Answer multiple console prompts using .bat file


Below is the reference used –

https://stackoverflow.com/questions/61245350/answer-multiple-console-prompts-using-bat-file

I was able to solve the issue using

@(  Echo username
    Echo password
) | createOracleWallet.bat

How to stop a Spring Batch Command Line Application – When System.exit(1) called inside a validation bean fail to stop the application


I was building a command line application using Spring Batch and I wanted the the application to stop running if the input parameters passed are invalid.

So I had a validation bean which was validating the input parameters and calling the System.exit(1) after putting the error into the STDERR stream, But when I run the application the error message was getting printed but the application was still running. When I debugged the code I see that the control never came out of the SpringApplication.run context.

So I needed to exit the context first before calling the System.exit to exit the application.

Get the context of the application when the SpringApplication.run is called in the main –

	public static void main(String[] args) {
		try {
			ConfigurableApplicationContext context = SpringApplication.run(CliApp.class, args); // Get the ConfigurableApplicationContext  into a variable so that it can be autowired in the validation class
		}
		catch (Exception  ex){
			System.exit(1); // Call system.exit() when the context is failed
		}
	}

In the validation class autowire the the application context

@Getter
@Setter
public class InputParameters {
    @Autowired
    private ConfigurableApplicationContext context;//Autowire the Context object
    private String sqlFile;

In the validation method of the class call SpringApplication.exit(context) to fail the application. This exception will be caught in the main and System.exit(1) will be called to stop the program.

        catch (Exception ex){
            System.err.println("Error Message : "+ex.getMessage());
            System.err.println("Invalid Arguments : Please pass all  the mandatory arguments");

            SpringApplication.exit(context); //Closes the Spring Batch program and returns the control back to main with exception. Catch the exception and call System.exit to exit the application in error.
        }

Spring Batch Course Notes – 10 – Spring Out Of Box Item Writers


  • FlatFileitemWriter
    • Needs two things ,resource writer which says where to write the file
    • Line aggregator which defines how to map the data into a line.
    • writer.setHeaderCallback allows the writer to add a header row in the output flat file.
    • writer.setAppendAllowed will allow writer to append they data into the file.
    • writer.setFooterCallback will allow the writer to add a footer row into the file once the data is written into the file
  • XMLItemWriter
    • Spring object to xml Mapper dependency is required
    • xstream dependency should also be added
    • STAXEventItemWriter
      • Resource
      • Marshaller
      • RootTagName
    • Add @XmlElement tag to your model if your model property its different from the xml tag name. Also you need to set the setAutoDetectAnnotation property to true.
  • JDBCBatchItemWriter
    • setDatasource
    • setSQL
    • setPreparedItemStatement
    • There is an alternate way to map the DB fields to the bean and that is by using JDBCBatchItemWriterBuilder. Using this we can create an Item Writer , set sql and directly map the bean in the setSql using beanMapped() method.

Set default dataSource for Spring Job Repository while using of multiple DataSources in Spring Batch


I used the below reference link when I got this error while using multiple dataSources in Spring Batch

https://stackoverflow.com/questions/25540502/use-of-multiple-datasources-in-spring-batch

Basically we need to set the BatchConfigurer bean from one of the data sources to fix the issue.

@Configuration
@EnableBatchProcessing
public class BatchConfig {

@Bean
BatchConfigurer configurer(@Qualifier("batchDataSource") DataSource dataSource){
return new DefaultBatchConfigurer(dataSource);
}

...

Spring Batch – Dynamically map table from database to another


I needed to copy data from a table to a table in another database with the same fields and the datatype layout. Below is the code reference that I used for the same with Spring Boot and Spring Batch.

The input query file ,the target table are the input parameters here.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import javax.sql.DataSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Bean
@StepScope
public ItemReader < Map < String, Object >> reader(DataSource dataSource1, ApplicationArguments args) throws IOException {
JdbcCursorItemReader < Map < String, Object >> reader = new JdbcCursorItemReader < > ();
reader.setDataSource(dataSource1);

// Read the input SQL query from the specified text file
String queryTextFile = args.getOptionValues("sqlFile").get(0);
String sqlQuery = new String(Files.readAllBytes(Paths.get(queryTextFile)));

reader.setSql(sqlQuery);
reader.setRowMapper(new ColumnMapRowMapper());
return reader;
}

@Bean
public ItemProcessor < Map < String, Object > , Map < String, Object >> processor(ApplicationArguments args) {
String targetTableName = args.getOptionValues("outputTable").get(0);
return new MyDataItemProcessor(targetTableName);
}

@Bean
public ItemWriter < Map < String, Object >> writer(DataSource dataSource2) {
return new MyDataItemWriter(dataSource2);
}

@Bean
public Step step1(
StepBuilderFactory stepBuilderFactory,
ItemReader < Map < String, Object >> reader,
ItemProcessor < Map < String, Object > , Map < String, Object >> processor,
ItemWriter < Map < String, Object >> writer) {
return stepBuilderFactory.get("step1")
. < Map < String, Object > , Map < String, Object >> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}

@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step step1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}

public static void main(String[] args) {
SpringApplication.run(BatchConfiguration.class, args);
}
}

class ColumnMapRowMapper implements RowMapper < Map < String, Object >> {
@Override
public Map < String,
Object > mapRow(ResultSet resultSet, int rowNum) throws SQLException {
Map < String, Object > row = new HashMap < > ();
row.put("source_column1", resultSet.getObject("column1"));
row.put("source_column2", resultSet.getObject("column2"));
row.put("source_column3", resultSet.getObject("column3"));
// Add more columns as needed
return row;
}
}

class MyDataItemProcessor implements ItemProcessor < Map < String, Object > , Map < String, Object >> {

private final String targetTableName;

public MyDataItemProcessor(String targetTableName) {
this.targetTableName = targetTableName;
}

@Override
public Map < String,
Object > process(Map < String, Object > item) throws Exception {
Map < String, Object > mappedItem = new HashMap < > ();

// Implement your dynamic column mapping logic here
// Example: mappedItem.put("target_column1", item.get("source_column1"));
// ... Map other columns as needed

return mappedItem;
}
}

class MyDataItemWriter implements ItemWriter < Map < String, Object >> {

private final NamedParameterJdbcTemplate jdbcTemplate;
private final String targetTableName;

public MyDataItemWriter(DataSource dataSource2) {
this.jdbcTemplate = new NamedParameterJdbcTemplate(dataSource2);
// Initialize the target table name here if needed
this.targetTableName = "target_table_name";
}

@Override
public void write(List < ? extends Map < String, Object >> items) throws Exception {
// Implement your custom write logic here
// Example: for (Map<String, Object> item : items) {
// String sql = "INSERT INTO " + targetTableName + " (...) VALUES (...)";
// jdbcTemplate.update(sql, item);
// }
}
}

@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

In this code:We’ve added a main method in the Application class to run the Spring Boot application.We use ApplicationArguments to retrieve command line arguments for the input SQL query text file (–sqlFile) and the output table name (–outputTable).The input SQL query is read from the specified text file and passed to the item reader.The output table name is passed to the item processor to determine the target table for the data.You can run this Spring Boot application by providing the required command line arguments when executing the JAR file. For example:

java -jar your-app.jar --sqlFile=/path/to/sql/query.sql --outputTable=target_table_name

Replace your-app.jar, /path/to/sql/query.sql, and target_table_name with your actual JAR file name, SQL query file path, and target table name.