|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Batch 文档 5.1.2! |
|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Batch 文档 5.1.2! |
一些 Batch Job 可以完全从 Spring Batch 中的现成组件组装。
例如,可以将 and 实现配置为
涵盖广泛的场景。但是,在大多数情况下,自定义代码必须为
写。应用程序开发人员的主要 API 入口点是 、 、 、 和各种侦听器接口。最简单的批处理
jobs 可以使用来自 Spring Batch 的现成 input ,但它通常是
在处理和写入中存在需要开发人员的自定义关注点
实现 OR .ItemReaderItemWriterTaskletItemReaderItemWriterItemReaderItemWriterItemProcessor
在本章中,我们提供了自定义业务逻辑中常见模式的一些示例。
这些示例主要包含侦听器接口。应该注意的是,如果合适,or 也可以实现侦听器接口。ItemReaderItemWriter
记录项处理和失败
一个常见的用例是需要对步骤中的错误进行特殊处理,逐项,
也许是登录到特殊通道或将记录插入数据库。一个
面向块(从 Step Factory Bean 创建)允许用户实现此用途
case 中带有一个简单的 for 错误 on 和一个 for
上的错误。以下代码片段说明了一个侦听器,该侦听器将两个 read
和写入失败:StepItemReadListenerreadItemWriteListenerwrite
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实施此侦听器后,必须将其注册到一个步骤中。
-
Java
-
XML
以下示例显示了如何向步骤 Java 注册侦听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例演示如何在 XML 中注册一个步骤的侦听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的侦听器在方法中执行任何操作,它必须位于
将要回滚的事务。如果您需要使用事务性
资源(例如数据库)中,请考虑添加声明性
transaction 传递给该方法(有关详细信息,请参见 Spring Core 参考指南),并为其
propagation 属性的值为 .onError()onError()REQUIRES_NEW |
如果你的侦听器在方法中执行任何操作,它必须位于
将要回滚的事务。如果您需要使用事务性
资源(例如数据库)中,请考虑添加声明性
transaction 传递给该方法(有关详细信息,请参见 Spring Core 参考指南),并为其
propagation 属性的值为 .onError()onError()REQUIRES_NEW |
出于业务原因手动停止作业
Spring Batch 通过接口提供了一种方法,但这是
真正供操作员使用,而不是供应用程序程序员使用。有时,它是
从企业内部停止任务执行更方便或更有意义
逻辑。stop()JobOperator
最简单的方法是抛出一个 (一个既没有重试的
无限期地跳过)。例如,可以使用自定义异常类型,如下所示
在以下示例中:RuntimeException
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
阻止步骤执行的另一种简单方法是从 返回 ,如以下示例所示:nullItemReader
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的示例实际上依赖于存在默认实现的事实
的策略,当项目要
processed 是 。可以实施更复杂的完成策略,并且
注入 通过 .CompletionPolicynullStepSimpleStepFactoryBean
-
Java
-
XML
以下示例显示了如何将完成策略注入 Java 中的步骤:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例显示了如何将完成策略注入到 XML 中的步骤中:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在 中设置一个标志,该标志由框架中的实现在项目处理之间进行检查。实现这个
或者,我们需要访问当前的 ,这可以通过
实现 a 并将其注册到 .以下示例
显示设置标志的侦听器:StepExecutionStepStepExecutionStepListenerStep
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
设置标志后,默认行为是步骤引发 .此行为可以通过 .但是,唯一的选择是抛出或不抛出异常
所以这总是一个不正常的工作结局。JobInterruptedExceptionStepInterruptionPolicy
添加页脚记录
通常,在写入平面文件时,必须在
文件,在所有处理完成后。这可以使用 Spring Batch 提供的接口来实现。(及其对应的 ) 是 的可选属性,可以添加到项编写器中。FlatFileFooterCallbackFlatFileFooterCallbackFlatFileHeaderCallbackFlatFileItemWriter
-
Java
-
XML
下面的示例展示了如何在 Java 中使用 和 :FlatFileHeaderCallbackFlatFileFooterCallback
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
下面的示例演示如何在 XML 中使用 和 :FlatFileHeaderCallbackFlatFileFooterCallback
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一个方法,当页脚必须为 写入,如以下接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
编写摘要页脚
涉及页脚记录的一个常见要求是在 output 进程,并将此信息附加到文件末尾。此页脚通常 用作文件的摘要或提供校验和。
例如,如果批处理作业正在将记录写入平面文件,并且存在
要求将所有 the 中的总金额放在页脚中,则
可以使用以下实现:TradeTradesItemWriter
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这将存储一个值,该值会随着写入的每个项目而增加。处理完最后一个后,框架调用 ,这会将 放入文件中。请注意,方法
使用临时变量 ,该变量存储块中 amount 的总数。这样做是为了确保,如果方法中发生 skip,则 保持不变。只有在方法结束时,一旦我们保证不会引发异常,我们才会更新 .TradeItemWritertotalAmountamountTradeTradewriteFootertotalAmountwritechunkTotalTradewritetotalAmountwritetotalAmount
为了调用该方法,将 (which
implements ) 必须作为 .writeFooterTradeItemWriterFlatFileFooterCallbackFlatFileItemWriterfooterCallback
-
Java
-
XML
下面的示例展示了如何在 Java 中连接 :TradeItemWriter
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
下面的示例演示如何在 XML 中连接 :TradeItemWriter
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
到目前为止,编写 the 的方式只有在
是不可重新启动的。这是因为该类是有状态的(因为它存储了 ),但 没有持久化到数据库中。因此,它
在重新启动时无法检索。为了使此类可重启,
该接口应与方法 和 一起实现,如以下示例所示:TradeItemWriterSteptotalAmounttotalAmountItemStreamopenupdate
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update 方法将 的最新版本存储到该对象保存到数据库之前。open 方法
从 中检索任何现有的 ,并将其用作
处理的起点,允许在重新启动时拾取
它在上次运行 时停止。totalAmountExecutionContexttotalAmountExecutionContextTradeItemWriterStep
驱动基于查询的 ItemReader
在关于 reader 和 writer 的章节中,使用 讨论了分页。许多数据库供应商(比如 DB2)都非常悲观 如果正在读取的表也需要由 在线应用程序的其他部分。此外,打开光标 大型数据集可能会导致某些供应商的数据库出现问题。因此,许多 项目更喜欢使用 'Driving Query' 方法来读取数据。这种方法有效 通过迭代 key,而不是需要返回的整个对象,作为 下图说明:
如您所见,上图中显示的示例使用与原来的 'FOO' 表相同的
在基于游标的示例中使用。但是,不是选择整行,而是只选择
在 SQL 语句中选择了 ID。因此,而不是返回对象
from ,则返回 an。然后,此数字可用于查询
'details' ,这是一个完整的对象,如下图所示:FOOreadIntegerFoo
应使用 An 来转换从 driving 查询中获取的 key
转换为一个完整的对象。现有的 DAO 可用于查询基于完整对象的
在键上。ItemProcessorFoo
多行记录
虽然平面文件通常每个记录都局限于单个 行,则文件可能具有跨多行且多个 格式。以下文件摘录显示了这种安排的示例:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
以 'HEA' 开头的行和以 'FOT' 开头的行之间的所有内容都是 被视为 1 条记录。为了 正确处理这种情况:
-
必须读取 multi line record 作为一个组,以便可以将其传递给 intact。
ItemReaderItemWriter -
每种行类型可能需要以不同的方式进行标记。
因为一条记录跨越多行,而且我们可能不知道有多少行
有,必须小心始终阅读整个记录。为了
执行此操作时,应将自定义实现为 .ItemReaderItemReaderFlatFileItemReader
-
Java
-
XML
以下示例演示如何在 Java 中实现自定义:ItemReader
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例演示如何在 XML 中实现自定义:ItemReader
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
确保每一行都正确地标记化,这对于
fixed-length input,则可以在
委托。请参阅 Reader 中的 FlatFileItemReader 和
作家章节了解更多详情。然后,委托读者使用 a 将 for each line 传递回换行。PatternMatchingCompositeLineTokenizerFlatFileItemReaderPassThroughFieldSetMapperFieldSetItemReader
-
Java
-
XML
以下示例说明如何确保每 X 行在 Java 中正确标记化:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例说明如何确保在 XML 中正确标记每一行:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
这个包装器必须能够识别记录的结尾,这样它才能持续地
调用,直到到达末尾。对于读取的每一行,
wrapper 构建要返回的项目。到达页脚后,项目可以
返回以传送到 和 ,如
以下示例:read()ItemProcessorItemWriter
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业要求从批处理作业中调用外部命令。 这样的过程可以由调度程序单独启动,但 有关运行的常见元数据将丢失。此外,多步骤作业也将 还需要拆分为多个 Job。
由于这种需求非常普遍,因此 Spring Batch 提供了
调用 system 命令。Tasklet
-
Java
-
XML
以下示例演示如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例演示如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
在未找到输入时处理步骤完成
在许多批处理方案中,在数据库或文件中找不到要处理的行不是
特殊。简单地认为 没有找到任何工作,并以 0 完成
items 已读取。Spring 中提供的所有实现都是开箱即用的
Batch 默认使用此方法。如果未写出任何内容,这可能会导致一些混淆
即使存在输入(如果文件命名错误或类似名称,通常会发生这种情况
问题出现)。因此,应检查元数据本身以确定如何
框架发现很多工作要处理。但是,如果未找到输入
被认为是例外的?在这种情况下,以编程方式检查元数据中是否有项目
处理并导致失败是最好的解决方案。因为这是一个常见的用例,
Spring Batch 提供了具有此功能的侦听器,如
的类定义 :StepItemReaderNoWorkFoundStepExecutionListener
public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的代码在 'afterStep' 阶段检查 the 的属性,以确定是否未读取任何项目。如果那
,则返回退出代码,指示 应该失败。
否则,将返回 .StepExecutionListenerreadCountStepExecutionFAILEDStepnullStep
将数据传递给 Future Steps
将信息从一个步骤传递到另一个步骤通常很有用。这可以通过以下方式完成
这。问题是有两个:一个在级别,一个在级别。仅保留为
长为步长,而 仍贯穿整个 .上
另一方面,每次提交
块,而 则仅在每个 .ExecutionContextExecutionContextsStepJobStepExecutionContextJobExecutionContextJobStepExecutionContextStepJobExecutionContextStep
这种分离的结果是,所有数据都必须放在 执行 时。这样做可确保数据
在运行时妥善存放。如果数据存储到 ,
,则在执行过程中不会保留该 ID。如果失败,则该数据将丢失。StepExecutionContextStepStepJobExecutionContextStepStep
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
要使数据可用于 future ,必须在步骤完成后将其“提升”到 。Spring Batch 为此提供了。必须配置 Listener
与必须提升的数据相关的键。它可以
此外,还可以选择配置一个退出代码模式列表,该列表的升级
应该发生( 是默认值)。与所有侦听器一样,它必须注册
在 .StepsJobExecutionContextExecutionContextPromotionListenerExecutionContextCOMPLETEDStep
-
Java
-
XML
以下示例演示如何将步骤提升为 in Java:JobExecutionContext
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例演示如何将步骤提升为 in XML:JobExecutionContext
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从 中检索保存的值,如下所示
在以下示例中:JobExecutionContext
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}