此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Batch 文档 5.2.2! |
通过消息启动批处理作业
使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选择:
-
在命令行中,使用
CommandLineJobRunner
-
以编程方式,使用
JobOperator.start()
或JobLauncher.run()
例如,您可能希望使用CommandLineJobRunner
调用批处理作业时
使用 shell 脚本。或者,您可以使用JobOperator
直接(例如,当使用
Spring Batch 作为 Web 应用程序的一部分)。但是,呢
更复杂的用例?也许您需要轮询远程 (S)FTP
服务器来检索批处理作业或应用程序的数据
必须同时支持多个不同的数据源。为
例如,您不仅可能从 Web 接收数据文件,还可以从
FTP 和其他来源。也许输入文件的额外转换是
在调用 Spring Batch 之前需要。
因此,执行批处理作业会更强大
通过使用 Spring Integration 及其众多适配器。例如
您可以使用文件入站通道适配器来
监控文件系统中的目录并启动批处理作业
输入文件到达后立即。此外,您可以创建 Spring
使用多个不同适配器的集成流轻松
从多个源摄取批处理作业的数据
同时仅使用配置。实现所有这些
使用 Spring Integration 的场景很容易,因为它允许
解耦的、事件驱动的执行JobLauncher
.
Spring Batch Integration 提供了JobLaunchingMessageHandler
你可以的类
用于启动批处理作业。的输入JobLaunchingMessageHandler
由
Spring Integration 消息,其有效负载类型为JobLaunchRequest
.此类是Job
即将推出,并在JobParameters
那是
启动批处理作业所必需的。
下图显示了典型的 Spring Integration 启动批处理作业所需的消息流。EIP(企业集成模式)网站提供了消息传递图标及其描述的完整概述。

将文件转换为 JobLaunchRequest
以下示例将文件转换为JobLaunchRequest
:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
执行批处理作业时,JobExecution
instance 返回。您可以使用此
实例来确定执行的状态。如果
一个JobExecution
能够创建
成功,无论是否
或不实际执行成功。
关于如何JobExecution
实例是否返回取决于提供的TaskExecutor
.如果synchronous
(单线程)TaskExecutor
实现,则JobExecution
仅返回响应after
作业完成。使用asynchronous
TaskExecutor
这JobExecution
instance 被返回
马上。然后,您可以将id
之JobExecution
实例
(与JobExecution.getJobId()
) 并查询JobRepository
用于作业的更新状态
使用JobExplorer
.更多
信息,请参阅查询存储库。
Spring Batch 集成配置
考虑某人需要创建文件的情况inbound-channel-adapter
听
对于提供目录中的 CSV 文件,请将它们移交给转换器
(FileMessageToJobRequest
),通过作业启动网关启动作业,以及
记录JobExecution
使用logging-channel-adapter
.
-
Java
-
XML
以下示例显示了如何在 Java 中配置该常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例显示了如何在 XML 中配置该常见情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
示例 ItemReader 配置
现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring
批ItemReader
(例如)使用在作业定义的位置找到的文件
名为“input.file.name”的参数,如以下 Bean 配置所示:
-
Java
-
XML
以下 Java 示例显示了必要的 bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例显示了必要的 bean 配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中的主要兴趣点是注入#{jobParameters['input.file.name']}
作为 Resource 属性值,并将ItemReader
豆
具有步骤范围。将 Bean 设置为具有步骤作用域可利用
后期绑定支持,允许访问jobParameters
变量。