Batch 的域语言
对于任何有经验的批处理架构师来说,批处理的总体概念
Spring Batch 应该是熟悉和舒适的。有 “Jobs” 和 “Steps” 以及
开发人员提供的名为ItemReader和ItemWriter.然而
由于 Spring 模式、作、模板、回调和惯用语,因此有
以下机会:
-
对明确关注点分离的依从性显著提高。
-
清晰描述的架构层和作为接口提供的服务。
-
简单的默认实现,允许快速采用和易于使用 开箱即用。
-
显著增强了可扩展性。
下图是批处理参考体系结构的简化版本,该体系结构 已经使用了几十年。它概述了构成 batch processing 的 domain language 进行批处理。此体系结构框架是一个蓝图,它具有以下 已通过最近几代的数十年实施得到验证 平台(大型机上的 COBOL、Unix 上的 C,现在的 Java 任何地方)。JCL 和 COBOL 开发人员 可能和 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了 layers、components 和 technical 的物理实现 服务常见于健壮、可维护的系统,用于解决 使用基础设施和扩展创建从简单到复杂的批处理应用程序 以满足非常复杂的加工需求。
上图突出显示了构成
Spring Batch 中。一个Job有一对多步骤,每个步骤都只有一个ItemReader,
一ItemProcessor和一个ItemWriter.需要启动一个作业(使用JobLauncher),并且需要将有关当前正在运行的进程的元数据存储在JobRepository).
工作
本节介绍与批处理作业概念相关的构造型。一个Job是一个
封装整个批处理过程的实体。与其他 Spring 一样
projects 中,一个Job与 XML 配置文件或基于 Java 的
配置。此配置可称为 “job configuration”。然而Job只是整个层次结构的顶部,如下图所示:
在 Spring Batch 中,Job只是一个容器Step实例。它结合了多个
在逻辑上属于流中并允许配置属性的步骤
global 到所有步骤,例如可重启性。作业配置包含:
-
作业的名称。
-
的定义和排序
Step实例。 -
作业是否可重新启动。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 提供了
这Job接口以SimpleJob类,这会创建一些标准的
功能Job.当使用基于 Java 的配置时,一组
builders 可用于实例化Job,如下所示
示例显示:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户, Spring Batch 提供了Job接口以SimpleJob类,这会创建一些标准的
功能Job.但是,批处理命名空间抽象化了
直接实例化它。相反,您可以使用<job>元素中,作为
以下示例显示:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance 实例
一个JobInstance是指逻辑作业运行的概念。考虑一个批处理作业,该
应该在一天结束时运行一次,例如EndOfDay Job从前面
图。有一个EndOfDayjob 的Job必须是
单独跟踪。对于此 job,有一个逻辑JobInstance每天。
例如,有 1 月 1 日运行、1 月 2 日运行,依此类推。如果 1 月 1 日
run 第一次失败,第二天再次运行,它仍然是 1 月 1 日的运行。
(通常,这也与它正在处理的数据相对应,即 1 月
第 1 次运行处理 1 月 1 日的数据)。因此,每个JobInstance可以有多个
执行次数 (JobExecution将在本章后面更详细地讨论),并且只有
一JobInstance(对应于特定的Job并识别JobParameters) 可以
在给定时间运行。
的定义JobInstance与要加载的数据完全无关。
这完全取决于ItemReader实现来确定数据的加载方式。为
示例中,在EndOfDay场景中,数据上可能有一列指示effective date或schedule date数据所属的。所以,1 月 1 日运行
将仅加载第 1 次运行中的数据,而 1 月 2 日运行将仅使用
2nd. 因为这个决定可能是一个商业决策,所以它留给了ItemReader来决定。但是,使用相同的JobInstance确定是否
“状态”(即ExecutionContext,本章稍后将对此进行讨论)
使用以前的执行。使用新的JobInstance表示“从
beginning“,而使用现有实例通常意味着”从您离开的地方开始”
关闭”。
JobParameters (作业参数)
经过讨论JobInstance以及它与Job,自然而然地要问的问题
is: “How is one”JobInstance与另一个区别开来?答案是:JobParameters.一个JobParametersobject 包含一组用于启动批处理的参数
工作。它们可用于识别,甚至在运行期间用作参考数据,因为
下图显示:
在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于
对于 1 月 2 日,实际上只有一个Job,但它有两个JobParameter对象:
一个以 Job 参数 01-01-2017 启动,另一个以
参数为 01-02-2017。因此,合约可以定义为:JobInstance = Job+ 识别JobParameters.这使开发人员能够有效地控制JobInstance定义,因为它们控制传入的参数。
并非所有作业参数都需要用于识别JobInstance.默认情况下,它们会这样做。但是,框架也允许提交
的Job的参数对JobInstance. |
JobExecution (任务执行)
一个JobExecution是指单次尝试运行 Job 的技术概念。一
执行可能以 failure 或 success 结束,但JobInstance对应于给定的
除非执行成功完成,否则不会认为执行已完成。
使用EndOfDay Job前面作为示例,请考虑JobInstance为
2017 年 1 月 1 日,第一次运行时失败。如果使用相同的
将作业参数标识为首次运行 (01-01-2017),则新增JobExecution是
创建。然而,仍然只有一个JobInstance.
一个Job定义作业是什么以及如何执行,而JobInstance是一个
Purely Organizational 对象将执行分组在一起,主要是为了启用正确的
restart semantics 的 Semantics 中。一个JobExecution,但是,它是
实际发生在运行期间,并且包含更多必须控制的属性
和 persisted,如下表所示:
财产 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
这 |
|
一个 |
|
一个 |
|
包含需要在 执行。 |
|
在执行 |
这些属性很重要,因为它们是持久的,可以完全用于
确定执行的状态。例如,如果EndOfDay01-01 的 Job 是
在晚上 9:00 执行,在 9:30 失败,则在批处理中创建以下条目
元数据表:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
| 为了清楚起见,可能已缩写或删除列名,并且 格式。 |
现在作业已失败,假设问题花了一整夜才出现
确定,因此 “Batch 窗口” 现在已关闭。进一步假设窗口
从晚上 9:00 开始,作业在 01-01 中再次启动,从中断处开始,然后
在 9:30 成功完成。因为现在是第二天,所以 01-02 作业必须为
运行,之后在 9:31 开始,并在正常时间完成
时间 10:30。没有要求一个JobInstance被踢出
另一个,除非两个 Job 有可能尝试访问相同的数据,
导致数据库级别的锁定问题。这完全取决于调度程序
确定Job应该运行。因为它们是分开的JobInstancesSpring
Batch 不会尝试阻止它们并发运行。(尝试运行
相同JobInstance而另一个已在运行会导致JobExecutionAlreadyRunningException被抛出)。现在应该有一个额外的条目
在这两个JobInstance和JobParameters表中的JobExecution表,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
2 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
2 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
3 |
日期 |
附表。日期 |
2017-01-02 00:00:00 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
完成 |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
完成 |
| 为了清楚起见,可能已缩写或删除列名,并且 格式。 |
步
一个Step是一个域对象,它封装了批处理的独立顺序阶段
工作。因此,每个Job完全由一个或多个步骤组成。一个Step包含
定义和控制实际批处理所需的所有信息。这
必然是一个模糊的描述,因为任何给定的Step位于
开发者编写Job.一个Step可以像
开发者的愿望。一个简单的Step可能会将数据从文件加载到数据库中,
需要很少或不需要代码(取决于使用的实现)。更复杂的Step可能具有作为处理的一部分应用的复杂业务规则。如
替换为Job一个Step有一个单独的StepExecution与唯一的JobExecution,如下图所示:
StepExecution 执行
一个StepExecution表示执行Step.新的StepExecution每次创建Step运行,类似于JobExecution.但是,如果步骤失败
执行,因为该步骤在失败之前,则不会保留任何执行。一个StepExecution仅在其Step实际上已启动。
Step执行由StepExecution类。每次执行
包含对其相应步骤的引用,并且JobExecution和交易相关
数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤
execution 包含一个ExecutionContext,其中包含开发人员需要
在批处理运行中保留,例如所需的统计信息或状态信息
重新启动。下表列出了StepExecution:
财产 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
这 |
|
包含需要在 执行。 |
|
已成功读取的项目数。 |
|
已成功写入的项目数。 |
|
已为此执行提交的事务数。 |
|
业务事务由 |
|
次数 |
|
次数 |
|
已被 |
|
次数 |
ExecutionContext
一ExecutionContext表示持久化的键/值对的集合,
由框架控制,为开发人员提供存储持久化
state 的 state 的StepExecutionobject 或JobExecution对象。(对于那些
熟悉的 Quartz,它与JobDataMap.)最好的使用示例是
便于重启。以平面文件输入为例,同时处理单个
行,框架会定期持久化ExecutionContext在提交点。行为
所以让我们ItemReader存储其状态,以防在运行期间发生致命错误
或者即使停电。所需要做的就是输入当前的行数
读入上下文,如下例所示,框架会执行
休息:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
使用EndOfDay示例中的Jobstereotypes 部分为例,假设有
是一步,loadData,将文件加载到数据库中。在第一次运行失败后,
元数据表将类似于以下示例:
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
日期 |
附表。日期 |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
地位 |
1 |
1 |
loadData (加载数据) |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{件数=40321} |
在上述情况下,Step运行 30 分钟并处理了 40,321 个“碎片”,其中
将表示文件中的行。此值在每次
commit 的 Commit 中,可以包含与ExecutionContext.在提交之前收到通知需要满足以下条件之一StepListenerimplementations(或ItemStream),将更详细地讨论
在本指南的后面部分。与前面的示例一样,假设Job是
第二天重新启动。重新启动时,ExecutionContext之
最后一次运行将从数据库中重构。当ItemReader打开时,它可以
检查它在上下文中是否有任何 stored 状态,并从那里初始化自身,
如下例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,上述代码运行后,当前行为 40,322,让Step从上次中断的地方重新开始。您还可以使用ExecutionContext为
需要保留的有关运行本身的统计信息。例如,如果平面文件
包含跨多行存在的处理订单,则可能需要
store 已处理的订单数量(这与
行读取),以便可以在Step替换为总数
正文中处理的订单数。框架为开发人员处理存储 this作,
以正确地将其范围限定为单个JobInstance.这可能非常困难
了解是否存在ExecutionContext应该使用或不使用。例如,使用EndOfDay上面的示例,当 01-01 运行第二次再次启动时,
框架认识到它是相同的JobInstance和单个Step基础
拉取ExecutionContext并将其(作为StepExecution) 到Step本身。相反,对于 01-02 运行,框架
识别出它是一个不同的实例,因此必须将空上下文传递给Step.框架为
developer,以确保在正确的时间向他们提供状态。这也很重要
请注意,恰好有一个ExecutionContext存在每StepExecution在任何给定时间。
的客户端ExecutionContext应该小心,因为这会创建一个共享的
键空间。因此,在输入值时应小心,以确保没有数据
覆盖。但是,Step在上下文中绝对不存储任何数据,因此没有
对框架产生不利影响的方式。
请注意,至少有一个ExecutionContext每JobExecution并且每个StepExecution.例如,请考虑以下
代码片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
正如评论中所指出的,ecStep不等于ecJob.他们是两个不同的ExecutionContexts.范围限定为Step保存在Step,而作用域为 Job 的 Job 则保存在每个Step执行。
在ExecutionContext,则所有非临时条目都必须为Serializable.
执行上下文的正确序列化为步骤和作业的重启功能奠定了基础。
如果您使用的键或值本身不可序列化,则需要
采用定制的序列化方法。无法序列化执行上下文
可能会危及状态持久化过程,使失败的作业无法正确恢复。 |
JobRepository 仓库
JobRepository是前面提到的所有 stereotype 的持久化机制。
它为JobLauncher,Job和Step实现。当Job首次启动时,一个JobExecution是从存储库获取的。此外,在
执行过程,StepExecution和JobExecution实现是持久化的
通过将它们传递到存储库。
-
Java
-
XML
使用 Java 配置时,@EnableBatchProcessing注解提供了一个JobRepository作为自动配置的组件之一。
Spring Batch XML 命名空间支持配置JobRepository实例
使用<job-repository>标签,如下例所示:
<job-repository id="jobRepository"/>
作业Starters
JobLauncher表示用于启动Job使用一组给定的JobParameters,如下例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预计 implementations 会获得有效的JobExecution从JobRepository并执行Job.
ItemReader
ItemReader是一个抽象,表示Step一
item 的 intent 值。当ItemReader已经用尽了它所能提供的物品,它
通过返回null.您可以找到有关ItemReaderinterface 及其
Readers And Writers 中的各种实现。
ItemWriter
ItemWriter是一个抽象,表示Step、一个批处理或块
的项目数。通常,ItemWriter不知道它应该输入的内容
receive next,并且只知道在其当前调用中传递的 Item。您可以找到更多
有关ItemWriter接口及其在 Readers And Writers 中的各种实现。
ItemProcessor
ItemProcessor是表示项的业务处理的抽象。
虽然ItemReader读取一个项目,而ItemWriter写入一项,ItemProcessor提供用于转换或应用其他业务处理的访问点。
如果在处理 Item 时确定 Item 无效,则返回null表示不应写出该项。您可以找到有关ItemProcessorReaders And Writers 中的界面。
Batch 命名空间
前面列出的许多域概念都需要在 Spring 中配置ApplicationContext.虽然上面有一些接口的实现,但您可以
在标准 bean 定义中使用,为了便于
配置,如下例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>