介绍

Spring Batch 作为 Spring 的子项目,是一款基于 Spring 的企业批处理框架。通过它可以构建出健壮的企业批处理应用。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

Spring Batch把批处理简化为Job和Job step两部分,在Job step中,把数据处理分为读数据(Reader)处理数据(Processor)、**写数据(Writer)**三个步骤,异常处理机制分为==跳过==、==重试==、==重启==三种,作业方式分为==多线程、并行、远程、分区==四种。开发者在开发过程中,大部分工作是根据业务要求编写Reader、Processor和Writer即可,提高了批处理开发的效率。同时Spring Batch本身也提供了很多默认的Reader和Writer,开箱即用。

官网详细介绍:https://spring.io/projects/spring-batch

优势

  • 丰富的开箱即用组件 开箱即用组件包括各种资源的读、写。读/写:支持文本文件读/写、XML文件读/写、数据库读/写、JMS队列读/写等。还提供作业仓库,作业调度器等基础设施,大大简化开发复杂度。
  • 面向chunk处理 支持多次读、一次写、避免多次对资源的写入,大幅提升批处理效率。
  • 事务管理能力 默认采用Spring提供的声明式事务管理模型,面向Chunk的操作支持事务管理,同时支持为每个tasklet操作设置细粒度的事务配置:隔离级别、传播行为、超时设置等。
  • 元数据管理 自动记录Job和Step的执行情况、包括成功、失败、失败的异常信息、执行次数、重试次数、跳过次数、执行时间等,方便后期的维护和查看。
  • 易监控的批处理应用 提供了灵活的监控模式,包括直接查看数据库、通过Spring Batch提供的API查看、JMX控制台查看等。其中还说到Spring Batch Admin,不过这个项目已不维护,改为用Spring Cloud Data Flow了。
  • 丰富的流程定义 支持顺序任务、条件分支任务、基于这两种任务可以组织复杂的任务流程。
  • 健壮的批处理应用 支持作业的跳过、重试、重启能力、避免因错误导致批处理作业的异常中断。
  • 易扩展的批处理应用 扩展机制包括多线程执行一个Step(Multithreaded step)、多线程并行执行多个Step(Parallelizing step)、远程执行作业(Remote chunking)、分区执行(partitioning step)。
  • 复用企业现有IT资产 提供多种Adapter能力,使得企业现有的服务可以方便集成到批处理应用中。避免重新开发、达到复用企业遗留的服务资产。

使用场景

  • 定期提交批处理任务
  • 并行批处理
  • 企业消息驱动处理
  • 大规模并行批处理
  • 失败后手动或定时重启
  • 按顺序处理依赖的任务(可扩展为工作流驱动的批处理)
  • 部分处理:跳过记录(例如,回滚时)
  • 批处理事务

原则与建议

当我们构建一个批处理的过程时,必须注意以下原则:

1、通常情况下,批处理的过程对系统和架构的设计要够要求比较高,因此尽可能的使用通用架构来处理批量数据处理,降低问题发生的可能性。Spring Batch是一个是一个轻量级的框架,适用于处理一些灵活并没有到海量的数据。

2、批处理应该尽可能的简单,尽量避免在单个批处理中去执行过于复杂的任务。我们可以将任务分成多个批处理或者多个步骤去实现。

3、保证数据处理和物理数据紧密相连。笼统的说就是我们在处理数据的过程中有很多步骤,在某些步骤执行完时应该就写入数据,而不是等所有都处理完。

4、尽可能减少系统资源的使用、尤其是耗费大量资源的IO以及跨服务器引用,尽量分配好数据处理的批次。

5、定期分析系统的IO使用情况、SQL语句的执行情况等,尽可能的减少不必要的IO操作。优化的原则有:

尽量在一次事物中对同一数据进行读取或写缓存。
一次事物中,尽可能在开始就读取所有需要使用的数据。
优化索引,观察SQL的执行情况,尽量使用主键索引,尽量避免全表扫描或过多的索引扫描。
SQL中的where尽可能通过主键查询。
6、不要在批处理中对相同的数据执行2次相同的操作。

7、对于批处理程序而言应该在批处理启动之前就分配足够的内存,以免处理的过程中去重新申请新的内存页。

8、对数据的完整性应该从最差的角度来考虑,每一步的处理都应该建立完备的数据校验。

9、对于数据的总量我们应该有一个和数据记录在数据结构的某个字段 上。

10、所有的批处理系统都需要进行压力测试。

11、如果整个批处理的过程是基于文件系统,在处理的过程中请切记完成文件的备份以及文件内容的校验。

通用策略

和软件开发的设计模式一样,批处理也有各种各样的现成模式可供参考。当一个开发(设计)人员开始执行批处理任务时,应该将业务逻辑拆分为一下的步骤或者板块分批执行:

  1. 数据转换:某个(某些)批处理的外部数据可能来自不同的外部系统或者外部提供者,这些数据的结构千差万别。在统一进行批量数据处理之前需要对这些数据进行转换,合并为一个统一的结构。因此在数据开始真正的执行业务处理之前,先要使用其他的方法或者一些批处理任务将这些数据转换为统一的格式。
  2. 数据校验:批处理是对大量数据进行处理,并且数据的来源千差万别,所以批处理的输入数据需要对数据的完整性性进行校验(比如校验字段数据是否缺失)。另外批处理输出的数据也需要进行合适的校验(例如处理了100条数据,校验100条数据是否校验成功)
  3. 提取数据:批处理的工作是逐条从数据库或目标文件读取记录(records),提取时可以通过一些规则从数据源中进行数据筛选。
  4. 数据实时更新处理:根据业务要求,对实时数据进行处理。某些时候一行数据记录的处理需要绑定在一个事物之下。
  5. 输出记录到标准的文档格式:数据处理完成之后需要根据格式写入到对应的外部数据系统中。

以上五个步骤是一个标准的数据批处理过程,Spring batch框架为业务实现提供了以上几个功能入口。

Spring Batch核心概念

Spring Batch在基础架构层,把任务抽象为Job和Step,一个Job由多个Step来完成,step就是每个job要执行的单个步骤。

1、Job:是一个接口,接口中定义了一个作业是怎么样执行的

2、JobInstance:是job的一次执行,一个JobInstance可重复执行,如果上一次执行失败下次执行的时候还会重新执行上次失败的job,每一次执行就是一个JobExceution

3、JobParameters:作为参数可以用来启动Job,并且可以用来标识不同的Job,运行时提供给JobInstance,jonExceution根据状态和参数决定下一次是否继续执行

4、JobExceution:每一次尝试执行一个Job的时候,我们就可以将其称为一个JobExceution,这个执行的结果可以为成功,也可以为失败,例如一个JobInstance执行失败了,下一次执行他传入的参数是上次执行的时间,他将会继续执行,这样始终执行的是一个JobInstance,而产生了两个JobExceution

5、Step:主要分为两块

(1)Tasklet:是一个任务单元,它是属于可以重复利用的东西。接口其中包含了一个唯一的方法execute();

(2)Chunk-based:chunk就是数据块,你需要定义多大的数据量是一个chunk。Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。

  • itemReader:数据输入input:对于一个Step而言,每次读取一个条目;
  • itemProcessor:数据处理processing
  • ItemWriter:数据输出output:对于一个Step而言,每次根据设定输出批量一个条目;

6、StepExecution:一个Step的每一次尝试执行,都会创建一个StepExection,在一个Step实际开始执行的时候创建

7、ExecutionContext:执行上下文,代表的是一个key-value键值对的集合,可以被Spring框架进行在持久化管理,能够是开发人员存储持久化状态,每一个JobExecution以及每一个StepExecution的执行都会对应一个执行上下文(ExecutionContext);对于StepExecution在每一次提交点时就会保存一下执行上下文,而对于Job是在每一个StepExecution执行之间进行保存,例如,我们从Step1换到Step2是就会保存;

8、JobLauncher:接口,用于启动和加载Job,根据传入的参数进行启动,返回Job一次执行的情况

9、JobRepository:Job及Job的运行结果和状态、Step的运行结果和状态,都会保存在JobRepository中。

20220516141133.png

概念说明可见下表:

领域对象 描述
JobRepository 作业仓库,保存Job、Step执行过程中的状态及结果
JobLauncher 作业执行器,是执行Job的入口
Job 一个批处理任务,由一个或多个Step组成
Step 一个任务的具体的执行逻辑单位
Item 一条数据记录
ItemReader 从数据源读数据
ItemProcessor 对数据进行处理,如数据清洗、转换、过滤、校验等
ItemWriter 写入数据到指定目标
Chunk 给定数量的Item集合,如读取到chunk数量后,才进行写操作
Tasklet Step中具体执行逻辑,可重复执行

Spring Batch数据表

batch_job_instance:这张表能看到每次运行的job名字。

20220516141150.png

batch_job_execution:这张表能看到每次运行job的开始时间,结束时间,状态,以及失败后的错误消息是什么。

20220516141157.png

batch_step_execution:这张表你能看到更多关于step的详细信息。比如step的开始时间,结束时间,提交次数,读写次数,状态,以及失败后的错误信息等。

20220516141204.png

开始使用

@EnableBatchProcessing

==注意:== 启动类上要添加这个注解

启动任务的两种方式

配置文件设置不自动执行

spring:
  batch:
    job:
      # 默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行
      enabled: false

定时任务启动

@Component
public class BatchTaskJob {

	@Autowired
    JobLauncher jobLauncher;

	@Autowired
    JobOperator jobOperator;

	@Resource(name="messagebatchinsertjob")
	private Job batchJob;

	/**
	 * 每天读取txt文件,
	 * @throws Exception
	 */
	//@Scheduled(cron = "0 0/1 * * * ?")
	@Scheduled(cron = "*/10 * * * * ?")
	public void job3() throws Exception {
		JobParameters jobParameter = new JobParametersBuilder()
						.addLong("time",System.currentTimeMillis()).toJobParameters()
		JobExecution run = jobLauncher.run(batchJob, jobParameter);
		run.getId();
	}

}

接口调用

@RestController
public class JobController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobOperator jobOperator;

    @Resource(name="messagebatchinsertjob")
    private Job batchJob;

    @GetMapping("/runJob")
    public void runJob(@RequestParam("job1param") String job1param) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
            			.addString("job1param",job1param).toJobParameters();
        //JobExecution run = jobLauncher.run(batchJob, new JobParametersBuilder()
        		//.addLong("time",System.currentTimeMillis()).toJobParameters());
        JobExecution run = jobLauncher.run(batchJob, jobParameters);
        run.getId();
    }

}

任务的类型

监听器

监听器 说明
JobExecutionListener 在 Job 开始之前(beforeJob)和之后(afterJob)触发
StepExecutionListener 在 Step 开始之前(beforeStep)和之后(afterStep)触发
ChunkListener 在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发
ItemReadListener 在 Read 开始之前(beforeRead),之后(afterRead)和错误后(onReadError)触发
ItemProcessListener 在 Read 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发
ItemWriteListener 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发
SkipListener 在 Read 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发

Listener Example

CustomStepListener.java

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("StepExecutionListener - beforeStep");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("StepExecutionListener - afterStep");
        return null;
    }

}

CustomItemReaderListener.java

import org.springframework.batch.core.ItemReadListener;
import com.mkyong.Domain;

public class CustomItemReaderListener implements ItemReadListener<Domain> {
@Override
public void beforeRead() {
    System.out.println("ItemReadListener - beforeRead");
}

@Override
public void afterRead(Domain item) {
    System.out.println("ItemReadListener - afterRead");
}

@Override
public void onReadError(Exception ex) {
    System.out.println("ItemReadListener - onReadError");
}
    }

CustomItemWriterListener.java

import java.util.List;
import org.springframework.batch.core.ItemWriteListener;
import com.mkyong.Domain;

public class CustomItemWriterListener implements ItemWriteListener<Domain> {

    @Override
    public void beforeWrite(List<? extends Domain> items) {
        System.out.println("ItemWriteListener - beforeWrite");
    }

    @Override
    public void afterWrite(List<? extends Domain> items) {
        System.out.println("ItemWriteListener - afterWrite");
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Domain> items) {
        System.out.println("ItemWriteListener - onWriteError");
    }

}