環境等は最後のpom.xmlを参照してください。
Main
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
Config
package com.example.demo;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
public class BatchConfig {
@Bean
public DataSource jobRepositoryEmbeddedDataSource(){
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:/org/springframework/batch/core/schema-h2.sql")
.build();
}
}
Job
package com.example.demo.job;
import java.util.List;
import java.util.ArrayList;
import com.example.demo.job.decider.VerifyMessageDecider;
import com.example.demo.job.listener.JobListener;
import com.example.demo.job.tasklet.*;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class TransformDataJob {
@Autowired private ReceiveMessageTasklet receiveMessage;
@Autowired private ExclusiveCheckTasklet exclusiveCheck;
@Autowired private DeleteMessageTasklet deleteMessage;
@Autowired private TransformDataTasklet transformData;
@Autowired private ReplyResultTasklet replyResult;
@Autowired private VerifyMessageDecider decider;
@Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private StepBuilderFactory stepBuilderFactory;
@Bean
public Step receiveStep() {
return stepBuilderFactory.get("receiveStep").tasklet(receiveMessage).build();
}
@Bean
public Step checkStep() {
return stepBuilderFactory.get("checkStep").tasklet(exclusiveCheck).build();
}
@Bean
public Step cleanupStep() {
return stepBuilderFactory.get("cleanupStep").tasklet(deleteMessage).build();
}
@Bean
public Step transformStep() {
return stepBuilderFactory.get("transformStep").tasklet(transformData).build();
}
@Bean
public Step replyStep() {
return stepBuilderFactory.get("replyStep").tasklet(replyResult).build();
}
@Bean
public Flow flow() throws Exception {
return new FlowBuilder<Flow>("flow")
.from(receiveStep())
.on(ExitStatus.COMPLETED.getExitCode()).to(decider) // メッセージあり
.on(ExitStatus.FAILED.getExitCode()).to(receiveStep()) // メッセージなし
.from(decider)
.on("STOP").end() // 停止指示あり
.on("CONTINUE").to(checkStep()) // 停止指示なし
.from(checkStep())
.on(ExitStatus.COMPLETED.getExitCode()).to(cleanupStep()) // 排他チェックOK
.on(ExitStatus.FAILED.getExitCode()).to(receiveStep()) // 排他チェックNG
.from(cleanupStep())
.next(transformStep())
.next(replyStep())
.next(receiveStep())
.build();
}
// 多重処理させたかったがShareデータが混戦するためこのままでは使えない
@Bean
public Flow multiFlow() throws Exception {
List<Flow> flowList = new ArrayList<Flow>();
for(int i=0; i<2; i++) { flowList.add(flow()); }
Flow[] flows = new Flow[flowList.size()];
return new FlowBuilder<Flow>("multiFlow")
.split(new SimpleAsyncTaskExecutor())
.add(flowList.toArray(flows))
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.listener(listener())
.start(flow())
.end()
.build();
}
@Bean
public JobExecutionListener listener() {
return new JobListener();
}
}
Listener
package com.example.demo.job.listener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
public class JobListener extends JobExecutionListenerSupport {
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
System.out.println("Job Start");
}
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
System.out.println("Job End");
}
}
Tasklet
ReceiveMessageTasklet
package com.example.demo.job.tasklet;
import java.util.List;
import com.amazonaws.services.sqs.model.Message;
import com.example.demo.aws.client.AmazonSQSClient;
import com.example.demo.job.share.SharedDataContainer;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ReceiveMessageTasklet implements Tasklet {
@Autowired AmazonSQSClient sqs;
@Autowired SharedDataContainer share;
@Override
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
System.out.println("ReceiveMessageTasklet");
List<Message> messages = sqs.receive();
if (messages.size() > 0) {
share.message = messages.get(0);
contribution.setExitStatus(ExitStatus.COMPLETED);
} else {
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
}
ExclusiveCheckTasklet
package com.example.demo.job.tasklet;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.example.demo.aws.client.AmazonDynamoDBClient;
import com.example.demo.job.share.SharedDataContainer;
import com.example.demo.model.CommunicationData;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ExclusiveCheckTasklet implements Tasklet {
@Autowired AmazonDynamoDBClient ddb;
@Autowired SharedDataContainer share;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("ExclusiveCheckTasklet");
CommunicationData commData = ddb.mapper.load(CommunicationData.class, share.message.getBody());
if (!"request".equals(commData.getStatus())) {
contribution.setExitStatus(ExitStatus.FAILED);
return RepeatStatus.FINISHED;
}
commData.setStatus("accept");
try {
ddb.mapper.save(commData);
share.commData = commData;
contribution.setExitStatus(ExitStatus.COMPLETED);
} catch(ConditionalCheckFailedException e) {
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
}
DeleteMessageTasklet
package com.example.demo.job.tasklet;
import com.example.demo.aws.client.AmazonSQSClient;
import com.example.demo.job.share.SharedDataContainer;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DeleteMessageTasklet implements Tasklet {
@Autowired AmazonSQSClient sqs;
@Autowired SharedDataContainer share;
@Override
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
System.out.println("DeleteMessageTasklet");
sqs.delete(share.message);
return RepeatStatus.FINISHED;
}
}
TransformDataTasklet
package com.example.demo.job.tasklet;
import com.example.demo.job.share.SharedDataContainer;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TransformDataTasklet implements Tasklet {
@Autowired SharedDataContainer share;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("TransformDataTasklet");
String requestData = share.commData.getRequestData();
String replyData = requestData; // 加工
share.commData.setReplyData(replyData);
return RepeatStatus.FINISHED;
}
}
ReplyResultTasklet
package com.example.demo.job.tasklet;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.example.demo.aws.client.AmazonDynamoDBClient;
import com.example.demo.job.share.SharedDataContainer;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ReplyResultTasklet implements Tasklet {
@Autowired AmazonDynamoDBClient ddb;
@Autowired SharedDataContainer share;
@Override
public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
System.out.println("ReplyResultTasklet");
share.commData.setStatus("reply");
try {
ddb.mapper.save(share.commData);
contribution.setExitStatus(ExitStatus.COMPLETED);
} catch(final ConditionalCheckFailedException e) {
contribution.setExitStatus(ExitStatus.FAILED);
}
return RepeatStatus.FINISHED;
}
}
Decider
package com.example.demo.job.decider;
import com.example.demo.job.share.SharedDataContainer;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class VerifyMessageDecider implements JobExecutionDecider {
@Autowired SharedDataContainer share;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if ("stop".equals(share.message.getBody())) {
return new FlowExecutionStatus("STOP");
} else {
return new FlowExecutionStatus("CONTINUE");
}
};
}
ShareData
package com.example.demo.job.share;
import com.amazonaws.services.sqs.model.Message;
import com.example.demo.model.CommunicationData;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;
@Component @JobScope
public class SharedDataContainer {
public Message message;
public CommunicationData commData;
}
Model
package com.example.demo.model;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBVersionAttribute;
@DynamoDBTable(tableName = "CommunicationData")
public class CommunicationData {
private String id;
private String status;
private String requestData;
private String replyData;
private Long version;
@DynamoDBHashKey(attributeName = "Id")
public String getId() { return id; }
public void setId(final String id) { this.id = id; }
@DynamoDBAttribute(attributeName="Status")
public String getStatus() { return this.status; }
public void setStatus(final String status) { this.status = status; }
@DynamoDBAttribute(attributeName="RequestData")
public String getRequestData() { return this.requestData; }
public void setRequestData(final String requestData) { this.requestData = requestData; }
@DynamoDBAttribute(attributeName="ReplyData")
public String getReplyData() { return this.replyData; }
public void setReplyData(final String replyData) { this.replyData = replyData; }
@DynamoDBVersionAttribute(attributeName="Version")
public Long getVersion() { return version; }
public void setVersion(Long version) { this.version = version;}
}
AWS Client
DynamoDB
package com.example.demo.aws.client;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
import org.springframework.stereotype.Component;
@Component
public class AmazonDynamoDBClient {
AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
.withRegion(Regions.AP_NORTHEAST_1)
.build();
// https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/DynamoDBMapper.OptionalConfig.html
DynamoDBMapperConfig mapperConfig = new DynamoDBMapperConfig.Builder()
.withSaveBehavior(DynamoDBMapperConfig.SaveBehavior.CLOBBER)
.withConsistentReads(DynamoDBMapperConfig.ConsistentReads.CONSISTENT)
.withTableNameOverride(null)
.withPaginationLoadingStrategy(DynamoDBMapperConfig.PaginationLoadingStrategy.EAGER_LOADING)
.build();
public DynamoDBMapper mapper = new DynamoDBMapper(client, mapperConfig);
}
SQS
package com.example.demo.aws.client;
import java.util.List;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import org.springframework.stereotype.Component;
@Component
public class AmazonSQSClient {
String url = "";
// https://docs.aws.amazon.com/ja_jp/sdk-for-java/v1/developer-guide/examples-sqs-message-queues.html
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withRegion(Regions.AP_NORTHEAST_1)
.build();
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(url)
.withWaitTimeSeconds(20)
.withMaxNumberOfMessages(1);
public List<Message> receive() {
return sqs.receiveMessage(request).getMessages();
}
public void delete(Message message) {
sqs.deleteMessage(url, message.getReceiptHandle());
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.7.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws.services.dynamodbv2</groupId>
<artifactId>amazon-dynamodb-transactions</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
参考
の使用 AWS SDK for Java - AWS SDK for Java 1.x
のプログラミング情報 AWS SDK for Java
DynamoDBトランザクションライブラリをちょっぴり試してみた | DevelopersIO