此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Batch 文档 5.2.2! |
常见批处理模式
一些批处理作业可以纯粹由 Spring Batch 中的现成组件组装。
例如,ItemReader
和ItemWriter
实现可以配置为
涵盖广泛的场景。但是,对于大多数情况下,自定义代码必须是
写。应用程序开发人员的主要 API 入口点是Tasklet
这ItemReader
这ItemWriter
,以及各种侦听器接口。最简单的批次
作业可以使用来自 Spring Batch 的现成输入ItemReader
,但它通常是
在处理和编写中存在需要开发人员的自定义问题的情况
实现ItemWriter
或ItemProcessor
.
在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。
这些示例主要以侦听器接口为特色。应该注意的是,一个ItemReader
或ItemWriter
如果适用,也可以实现侦听器接口。
记录项处理和失败
一个常见的用例是需要对步骤中的错误进行特殊处理,逐项处理,
也许是登录到特殊通道或将记录插入数据库。一个
面向块Step
(从步骤工厂 bean 创建)允许用户实现此使用
带有简单ItemReadListener
对于错误read
和ItemWriteListener
为
错误write
.以下代码片段演示了一个侦听器,该侦听器记录了读取
和写入失败:
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现此侦听器后,必须使用步骤注册它。
-
Java
-
XML
以下示例显示了如何使用步骤 Java 注册侦听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例演示如何使用 XML 中的步骤注册侦听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果您的监听器在onError() 方法,它必须在
将要回滚的事务。如果您需要使用事务性
资源,例如数据库,在onError() 方法,请考虑添加声明式
transaction 添加到该方法(有关详细信息,请参阅 Spring Core 参考指南),并给出其
propagation 属性的值为REQUIRES_NEW . |
出于业务原因手动停止作业
Spring Batch 提供了一个stop()
方法通过JobOperator
接口,但这是
实际上供操作员而不是应用程序程序员使用。有时,它是
从业务内部停止作业执行更方便或更有意义
逻辑。
最简单的做法就是扔一个RuntimeException
(既不重试的
无限期或跳过)。例如,可以使用自定义异常类型,如图所示
在以下示例中:
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
停止执行步骤的另一种简单方法是返回null
从ItemReader
,如以下示例所示:
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的例子实际上依赖于存在默认实现的事实
的CompletionPolicy
当项目
processed 是null
.可以实施更复杂的完成政策,并且
注入Step
通过SimpleStepFactoryBean
.
-
Java
-
XML
以下示例显示了如何将完成策略注入 Java 中的步骤:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例演示如何将完成策略注入 XML 中的步骤:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在StepExecution
,由Step
在项目处理之间在框架中的实现。要实现这一点
替代方案,我们需要访问当前的StepExecution
,这可以通过以下方式实现
实现一个StepListener
并将其注册到Step
.以下示例
显示设置标志的监听器:
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
设置标志后,默认行为是步骤抛出JobInterruptedException
.可以通过StepInterruptionPolicy
.但是,唯一的选择是抛出或不抛出异常,
所以这总是一份工作的不正常结局。
添加页脚记录
通常,写入平面文件时,必须将“页脚”记录附加到
文件,在所有处理完成后。这可以使用FlatFileFooterCallback
Spring Batch 提供的接口。这FlatFileFooterCallback
(及其对应的FlatFileHeaderCallback
) 是FlatFileItemWriter
,并且可以添加到项目编写器。
-
Java
-
XML
以下示例演示如何使用FlatFileHeaderCallback
和FlatFileFooterCallback
在 Java 中:
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例演示如何使用FlatFileHeaderCallback
和FlatFileFooterCallback
在 XML 中:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一个方法,当页脚必须 写入,如以下接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
编写摘要页脚
涉及页脚记录的一个常见要求是在 输出过程,并将此信息附加到文件末尾。这个页脚经常 用作文件的摘要或提供校验和。
例如,如果批处理作业正在写入Trade
记录到一个平面文件,并且有一个
要求所有Trades
放置在页脚中,则
以后ItemWriter
实现可以使用:
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这TradeItemWriter
存储一个totalAmount
随着amount
从每个Trade
写的项目。在最后Trade
处理时,框架调用writeFooter
,它将totalAmount
到文件中。请注意,write
方法
使用临时变量,chunkTotal
,存储Trade
金额。这样做是为了确保,如果跳过发生在write
方法,则totalAmount
保持不变。它只是在write
方法,一旦我们保证不会抛出异常,我们就会更新totalAmount
.
为了writeFooter
方法,则TradeItemWriter
(其中
实现FlatFileFooterCallback
) 必须连接到FlatFileItemWriter
作为footerCallback
.
-
Java
-
XML
以下示例演示如何连接TradeItemWriter
在 Java 中:
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例演示如何连接TradeItemWriter
在 XML 中:
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
方式TradeItemWriter
到目前为止,只有在
这Step
不可重新启动。这是因为该类是有状态的(因为它存储了totalAmount
),但totalAmount
不会持久化到数据库。因此,它
在重新启动时无法检索。为了使这个类可重启,
这ItemStream
接口应与方法open
和update
,如以下示例所示:
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update 方法存储最新版本的totalAmount
到ExecutionContext
就在该对象持久化到数据库之前。开放式方法
检索任何现有的totalAmount
从ExecutionContext
并将其用作
处理的起点,允许TradeItemWriter
在重启时拾取,其中
它离开了上一次Step
被运行。
驱动基于查询的 ItemReaders
在关于读取器和写入器的章节中,讨论了使用分页的数据库输入。许多数据库提供商(例如 DB2)都非常悲观锁定策略,如果正在读取的表也需要被在线应用程序的其他部分使用,则可能会导致问题。此外,在极度大型数据集上打开光标可能会导致某些提供商的数据库出现问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法有效通过迭代键,而不是需要返回的整个对象,如下图说明:

如您所见,上图中显示的示例使用与在基于游标的示例中使用的“FOO”表。但是,不是选择整行,而是在 SQL 语句中仅选择了ID。因此,与其选择FOO
返回对象 从read
一Integer
返回。然后可以使用此数字查询'details',这是一个完整的Foo
对象,如下图所示:

一ItemProcessor
应该用于将从驱动查询中获取的密钥转换为完整的Foo
对象。 现有的 DAO 可用于查询基于键上的完整对象。
多行记录
虽然平面文件通常情况下,每条记录都仅限于单个行,但通常文件可能具有跨越多行的记录,其中多个 格式。 以下文件摘录显示了这种安排的一个例子:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
以“HEA”开头的行和以“FOT”开头的行之间的所有内容都是被视为一条记录。为了正确处理这种情况,必须考虑一些因素:
-
不是一次读取一条记录,而是
ItemReader
必须将多行记录的每一行都读取为一组,以便可以将其传递给ItemWriter
完整。 -
每种行类型可能需要以不同的方式进行标记化。
因为一条记录跨越多行,而且因为我们可能不知道有多少行有,ItemReader
必须小心始终读取整个记录。为了这样做,自定义ItemReader
应该作为FlatFileItemReader
.
-
Java
-
XML
以下示例演示如何实现自定义ItemReader
在 Java 中:
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例演示如何实现自定义ItemReader
在 XML 中:
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
确保每一行都正确标记,这对于
fixed-length 输入,则PatternMatchingCompositeLineTokenizer
可用于
委托FlatFileItemReader
.看FlatFileItemReader
在阅读器和
作家章节了解更多详情。然后,委托读取器使用PassThroughFieldSetMapper
交付一个FieldSet
对于返回换行的每一行ItemReader
.
-
Java
-
XML
以下示例显示了如何确保每一行在 Java 中都正确标记:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例演示如何确保每一行在 XML 中正确标记:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
此包装器必须能够识别记录的结尾,以便它可以连续
叫read()
在其委托上,直到到达终点。对于读取的每一行,
wrapper 应该构建要返回的项目。到达页脚后,项目可以
被退回交付给ItemProcessor
和ItemWriter
,如
以下示例:
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业要求从批处理作业中调用外部命令。 这样的过程可以由调度程序单独启动,但 有关运行的通用元数据将丢失。此外,多步骤作业还将 也需要拆分为多个作业。
由于需求非常普遍,Spring Batch 提供了一个Tasklet
实现
调用系统命令。
-
Java
-
XML
以下示例显示了如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例演示如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
在未找到输入时处理步骤完成
在许多批处理方案中,在数据库或文件中找不到要处理的行不是
特殊。这Step
只是认为没有找到任何工作并以 0 完成
项目已读。所有ItemReader
Spring 中开箱即用的实现
Batch 默认使用此方法。如果什么都没有写出来,这可能会导致一些混乱
即使存在输入(如果文件被错误命名或类似名称,通常会发生这种情况
问题出现)。因此,应检查元数据本身以确定如何
框架发现需要处理的大量工作。但是,如果找不到输入怎么办
被认为是例外的?在这种情况下,以编程方式检查元数据中没有项目
处理并导致故障是最好的解决方案。因为这是一个常见的用例,
Spring Batch 为侦听器提供了正是这个功能,如
的类定义NoWorkFoundStepExecutionListener
:
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的StepExecutionListener
检查readCount
属性的StepExecution
在“afterStep”阶段确定是否未读取任何项目。如果
是这样的,一个退出代码FAILED
返回,表示Step
应该失败。
否则null
返回,这不会影响Step
.
将数据传递到未来的步骤
将信息从一个步骤传递到另一个步骤通常很有用。这可以通过
这ExecutionContext
.问题是有两个ExecutionContexts
: 一个在Step
级别和一个在Job
水平。这Step
ExecutionContext
仅保留为
只要步长,而Job
ExecutionContext
始终保持在整体中Job
.上
另一方面,Step
ExecutionContext
每次Step
提交一个
块,而Job
ExecutionContext
仅在每个Step
.
这种分离的后果是所有数据都必须放在Step
ExecutionContext
而Step
正在执行。这样做可以确保数据
正确存储,而Step
运行。如果数据存储到Job
ExecutionContext
,
则在Step
执行。如果Step
失败,则该数据将丢失。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
使数据可供将来使用Steps
,必须“提升”到Job
ExecutionContext
步骤完成后。Spring Batch 提供了ExecutionContextPromotionListener
为此目的。必须配置侦听器
与ExecutionContext
必须促进。它可以
此外,还可以选择配置升级的退出代码模式列表
应该发生 (COMPLETED
是默认值)。与所有监听器一样,必须注册
在Step
.
-
Java
-
XML
以下示例演示如何将步骤提升为Job
ExecutionContext
在 Java 中:
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例演示如何将步骤提升为Job
ExecutionContext
在 XML 中:
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从Job
ExecutionContext
,如图所示
在以下示例中:
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}