Spring Batch Course Notes 14- Parallel Flow Job


When we have scenario where we need to setup dependencies between the steps and make the steps runs parallely based on its step dependency being satisfied we implement the parallel job flow.

  • Create the individual steps as per the requirement.
  • Create a new Flow object using the FlowBuilder class.
    • In the builder set the .start,.next,and .build methods to mention what is the start of the flow and the steps associated with the flow.
    • Do this for all the individual flows of the job
  • Now group the individual flows of the job in a new Flow object
    • Add the .split method to assign a new SimpleAsyncTaskExecutor
    • Add the .add method and assign all the child flows under the main parent that needs to be executed paralelly.
    • use the .build to build the composite flow
  • So in the job instead of wadding the steps ,we need to add the flows now use the .start,.next and .end to describe the order of the different flows created and do the .build to build the job.

Spring Batch Course Notes 12- Multi Threaded Job


Multi Threading is where instead of processing one record at a time you process multiple records at a time. Let’s say you read the data from a file ,process it and write it into the database. By default a single threaded process will read N records process it and write it into the database(Assuming chunk size N).But in case of multi threading, if 5 threads are created and each thread read process and write N record, so 5N records are processed in the same time taken by a single threaded process to process N record.

One drawback of this is the multi threaded process is the order of the data that is written by the writer will not be the same order of data presented by the reader. This is typically used in writing data into the data base where the order of the data written is not important.

So in cases where the order of data is important Async Job should be explored instead of Multi-Threaded job.

  • Converting a single threaded process into a multi threaded process is easy.
  • Create a new Object of the ThreadPoolTaskExecutor and set core pool size and max pool properties using setCorePoolSize and setMaxPoolSize methods. Also set afterPropertiesSet
  • Assign the threadpoolexecutor into the Step in the the StepBuilder factory call using the .taskExecutor method and run the job.

Spring Batch Course Notes 13- Async Job


Synchronous execution will the previous task wait for the next task to run. In asynchronous executioner we can make the task run parallely instead of one task waiting on the other.

  • Need a POM Dependency of spring-batch-integration
  • Along with creating ItemProcessor we can create a new processor using the class AsyncItemProcessor.
    • Delegate the item processor object into the AsyncItemProcessor object using the the .delegate method.
    • Add TaskExecutor using the setTaskExecutor on the AsyncItemProcessor object. Setting it to new SimpleAsyncTaskExecutor should do the trick.
    • Set the AsyncItemProcessor into the job.
  • Create a new Object of the AsyncItemWriter class which taken in the data processed by the Async Item Processor and writes the data.
    • Delegate the custom ItemWriter object into the AsyncItemWriter object using the .delegate method.
    • Set the Task Executor as new SimpleAsyncTaskExecutor.
    • Add the AsyncItemWriter into the .writer method of the StepBuilder instead of the Custom Item Writer.

Difference between a Async Processing and multithreaded processing is that multithreaded processing processes the entire chunk in parallel due to this the order of records written will be random. But in case of Async Processing we run the processor and Writer parallely separately due this this the order of the output will not be impacted.

Spring Batch Course 11- Skip invalid Records


  • Skip policy is used to skip the records that are failed, otherwise this will cause the job to abort. The skip policy will allow the Spring Batch to complete the job for the records that are valid and skip the invalid records.
  • We can set the skipListener to handle the skipped records either to put them in a file or into a table in the database so that the support team can look into the failures and recover/reprocess or fix the invalid records.
  • To enable the skip we need to the following –
    • Add .faultTolerant() into the StepBuilder. This is to enable the fault tolerant policies like Skip, Retry and Restart.
    • Add .skip(Exception.class) after that to let the step know what are the exceptions that need to be skipped.
    • Add .skipLimit(<<limit>>) to restrict the number of skipped records. If the number of skipped records exceed this count then the step will abort.
    • We can also add .skipPolicy() to apply custom/out of box skip policies instead of .skipLimit
      • new AlwaysSkipItemSkipPolicy() will skip all the items that are in error.
    • To handle skip records add .listener(<<ListenerClassName>>) into the step Builder.
      • A custom skip listener can also be created where you can handle custom events to handle the Read, Write,Process errors like writing it to a file or a DB.
        • Create new class for a custom Skip policy implementing the SkipListener.
        • OnSkipInRead — Use this to specify how to handle read exceptions.
        • OnSkipInProcess — Use this to specify how to handle process exceptions.
        • OnSkipInWrite — Use this to specify how to handle write exceptions.

Spring Batch – Get StepExecution details of the previous step in the tasklet that runs in the next step.


Usually when I need to print the Step Summary information of the Step Execution Information of my Chunk Based step, I create a tasklet step immediately after the chunk based step to print the summary of the main step like number of records processed, records skipped, records committed etc.

I use the below approach to get the details of the step information of the previous step executed.

@Component
public class SummaryTaskletClass implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        JobExecution jobExecution = chunkContext
                                    .getStepContext()
                                    .getStepExecution()
                                    .getJobExecution();// Get the job execution from the chunkContext
        StepExecution stepExecution=null;
// jobExecution.getStepExecutions() will give all the step execution under a job
// Loop through them and get the filter them based on the Step Name you want to select
        for (StepExecution stepDetail:jobExecution.getStepExecutions()) {
            if(stepDetail.getStepName().equals("PreviousStepName")){
                stepExecution = stepDetail;
                break;
            }
        };
        System.out.println("********************************************************************************");
        System.out.println("**************          Job Run Statistics Summary                  ************");
        System.out.println("********************************************************************************");
        System.out.println("******* Load Start Time   : "+stepExecution.getStartTime());
        System.out.println("******* Load End Time     : "+stepExecution.getEndTime());
        System.out.println("******* Commit Count      : "+stepExecution.getCommitCount());
        System.out.println("******* Read Count        : "+stepExecution.getReadCount());
        System.out.println("******* Filter Count      : "+stepExecution.getFilterCount());
        System.out.println("******* Write Count       : "+stepExecution.getWriteCount());
        System.out.println("******* Read Skip Count   : "+stepExecution.getReadSkipCount());
        System.out.println("******* Write Skip Count  : "+stepExecution.getWriteSkipCount());
        System.out.println("******* Process Skip Count: "+stepExecution.getProcessSkipCount());
        System.out.println("******* Rollback Count    : "+stepExecution.getRollbackCount());
        System.out.println("******* Exit Code         : "+stepExecution.getExitStatus().getExitCode());
        System.out.println("******* Exit Message      : "+stepExecution.getExitStatus().getExitDescription());
        System.out.println("********************************************************************************");
        System.out.println("********************************************************************************");
        return RepeatStatus.FINISHED;
    }
}

Spring Batch – Job Repository Tables are not getting created


Below is the reference –>

https://stackoverflow.com/questions/59398894/spring-batch-job-repository-tables-are-not-getting-created-in-the-schema-i-confi

A new configuration class needs to be created which calls the sql scripts of the job tables and runs the same.

@Configuration
@Profile({"dev","prod"})
public class JobRepositorySchemaConfig {
private final String JOB_REPO_SCHEMA = "classpath:batch_repo_schema.sql";

@Autowired
@Qualifier("secondDatasource")
DataSource datasource;

@Autowired
WebApplicationContext webApplicationContext;

@PostConstruct
public void loadIfInMemory() throws Exception {
    Resource resource = webApplicationContext.getResource("classpath:/org/springframework/batch/core/schema-drop-hsqldb.sql");
    Resource resource2 = webApplicationContext.getResource("classpath:/org/springframework/batch/core/schema-hsqldb.sql");
    ScriptUtils.executeSqlScript(datasource.getConnection(), resource);
    ScriptUtils.executeSqlScript(datasource.getConnection(), resource2);
}}

Connecting to Oracle Database in Java using Wallets


Below is the reference —

https://stackoverflow.com/questions/7634196/what-is-correct-jdbc-url-syntax-if-oracle-wallets-are-used

I had missed to add the additional imports and set the oracle.net.wallet_location properly. I set the property using the command and restarted the job to fix the issue.

System.setProperty("oracle.net.tns_admin",tnsAdminEnv);
System.setProperty("oracle.net.wallet_location",tnsAdminEnv);

Libraries imported –>

		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/oraclepki -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>oraclepki</artifactId>
			<version>23.2.0.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/osdt_cert -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>osdt_cert</artifactId>
			<version>21.11.0.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.oracle.database.security/osdt_core -->
		<dependency>
			<groupId>com.oracle.database.security</groupId>
			<artifactId>osdt_core</artifactId>
			<version>21.11.0.0</version>
		</dependency>

How to stop a Spring Batch Command Line Application – When System.exit(1) called inside a validation bean fail to stop the application


I was building a command line application using Spring Batch and I wanted the the application to stop running if the input parameters passed are invalid.

So I had a validation bean which was validating the input parameters and calling the System.exit(1) after putting the error into the STDERR stream, But when I run the application the error message was getting printed but the application was still running. When I debugged the code I see that the control never came out of the SpringApplication.run context.

So I needed to exit the context first before calling the System.exit to exit the application.

Get the context of the application when the SpringApplication.run is called in the main –

	public static void main(String[] args) {
		try {
			ConfigurableApplicationContext context = SpringApplication.run(CliApp.class, args); // Get the ConfigurableApplicationContext  into a variable so that it can be autowired in the validation class
		}
		catch (Exception  ex){
			System.exit(1); // Call system.exit() when the context is failed
		}
	}

In the validation class autowire the the application context

@Getter
@Setter
public class InputParameters {
    @Autowired
    private ConfigurableApplicationContext context;//Autowire the Context object
    private String sqlFile;

In the validation method of the class call SpringApplication.exit(context) to fail the application. This exception will be caught in the main and System.exit(1) will be called to stop the program.

        catch (Exception ex){
            System.err.println("Error Message : "+ex.getMessage());
            System.err.println("Invalid Arguments : Please pass all  the mandatory arguments");

            SpringApplication.exit(context); //Closes the Spring Batch program and returns the control back to main with exception. Catch the exception and call System.exit to exit the application in error.
        }

Spring Batch Course Notes – 10 – Spring Out Of Box Item Writers


  • FlatFileitemWriter
    • Needs two things ,resource writer which says where to write the file
    • Line aggregator which defines how to map the data into a line.
    • writer.setHeaderCallback allows the writer to add a header row in the output flat file.
    • writer.setAppendAllowed will allow writer to append they data into the file.
    • writer.setFooterCallback will allow the writer to add a footer row into the file once the data is written into the file
  • XMLItemWriter
    • Spring object to xml Mapper dependency is required
    • xstream dependency should also be added
    • STAXEventItemWriter
      • Resource
      • Marshaller
      • RootTagName
    • Add @XmlElement tag to your model if your model property its different from the xml tag name. Also you need to set the setAutoDetectAnnotation property to true.
  • JDBCBatchItemWriter
    • setDatasource
    • setSQL
    • setPreparedItemStatement
    • There is an alternate way to map the DB fields to the bean and that is by using JDBCBatchItemWriterBuilder. Using this we can create an Item Writer , set sql and directly map the bean in the setSql using beanMapped() method.

Set default dataSource for Spring Job Repository while using of multiple DataSources in Spring Batch


I used the below reference link when I got this error while using multiple dataSources in Spring Batch

https://stackoverflow.com/questions/25540502/use-of-multiple-datasources-in-spring-batch

Basically we need to set the BatchConfigurer bean from one of the data sources to fix the issue.

@Configuration
@EnableBatchProcessing
public class BatchConfig {

@Bean
BatchConfigurer configurer(@Qualifier("batchDataSource") DataSource dataSource){
return new DefaultBatchConfigurer(dataSource);
}

...