Spring Batch・SQS・DynamoDBで非同期APIのバックエンド処理

環境等は最後のpom.xmlを参照してください。

Main

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

Config

package com.example.demo;

import javax.sql.DataSource;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
public class BatchConfig {
    @Bean
    public DataSource jobRepositoryEmbeddedDataSource(){
        return new EmbeddedDatabaseBuilder()
          .setType(EmbeddedDatabaseType.H2)
          .addScript("classpath:/org/springframework/batch/core/schema-h2.sql")
          .build();
    }
}

Job

package com.example.demo.job;

import java.util.List;
import java.util.ArrayList;

import com.example.demo.job.decider.VerifyMessageDecider;
import com.example.demo.job.listener.JobListener;
import com.example.demo.job.tasklet.*;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class TransformDataJob {

    @Autowired private ReceiveMessageTasklet receiveMessage;
    @Autowired private ExclusiveCheckTasklet exclusiveCheck;
    @Autowired private DeleteMessageTasklet deleteMessage;
    @Autowired private TransformDataTasklet transformData;
    @Autowired private ReplyResultTasklet replyResult;

    @Autowired private VerifyMessageDecider decider;

    @Autowired private JobBuilderFactory jobBuilderFactory;
    @Autowired private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step receiveStep() {
        return stepBuilderFactory.get("receiveStep").tasklet(receiveMessage).build();
    }

    @Bean
    public Step checkStep() {
        return stepBuilderFactory.get("checkStep").tasklet(exclusiveCheck).build();
    }

    @Bean
    public Step cleanupStep() {
        return stepBuilderFactory.get("cleanupStep").tasklet(deleteMessage).build();
    }

    @Bean
    public Step transformStep() {
        return stepBuilderFactory.get("transformStep").tasklet(transformData).build();
    }

    @Bean
    public Step replyStep() {
        return stepBuilderFactory.get("replyStep").tasklet(replyResult).build();
    }

    @Bean
    public Flow flow() throws Exception {
        return new FlowBuilder<Flow>("flow")
            .from(receiveStep())
                .on(ExitStatus.COMPLETED.getExitCode()).to(decider)       // メッセージあり
                .on(ExitStatus.FAILED.getExitCode()).to(receiveStep())    // メッセージなし
            .from(decider)
                .on("STOP").end()                                         // 停止指示あり
                .on("CONTINUE").to(checkStep())                           // 停止指示なし
            .from(checkStep())
                .on(ExitStatus.COMPLETED.getExitCode()).to(cleanupStep()) // 排他チェックOK
                .on(ExitStatus.FAILED.getExitCode()).to(receiveStep())    // 排他チェックNG
            .from(cleanupStep())
            .next(transformStep())
            .next(replyStep())
            .next(receiveStep())
            .build();
    }

    // 多重処理させたかったがShareデータが混戦するためこのままでは使えない
    @Bean
    public Flow multiFlow() throws Exception {
        List<Flow> flowList = new ArrayList<Flow>();
        for(int i=0; i<2; i++) { flowList.add(flow()); }
        Flow[] flows = new Flow[flowList.size()];

        return new FlowBuilder<Flow>("multiFlow")
            .split(new SimpleAsyncTaskExecutor())
            .add(flowList.toArray(flows))
            .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
            .incrementer(new RunIdIncrementer())
            .listener(listener())
            .start(flow())
            .end()
            .build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobListener();
    }
}

Listener


package com.example.demo.job.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

public class JobListener extends JobExecutionListenerSupport {

  @Override
  public void beforeJob(JobExecution jobExecution) {
    super.beforeJob(jobExecution);
    System.out.println("Job Start");
  } 

  @Override
  public void afterJob(JobExecution jobExecution) {
    super.afterJob(jobExecution);
    System.out.println("Job End");
  }

}

Tasklet

ReceiveMessageTasklet

package com.example.demo.job.tasklet;

import java.util.List;

import com.amazonaws.services.sqs.model.Message;

import com.example.demo.aws.client.AmazonSQSClient;
import com.example.demo.job.share.SharedDataContainer;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ReceiveMessageTasklet implements Tasklet {

    @Autowired AmazonSQSClient sqs;
    @Autowired SharedDataContainer share;

    @Override
    public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
        System.out.println("ReceiveMessageTasklet");

        List<Message> messages = sqs.receive();

        if (messages.size() > 0) {
            share.message = messages.get(0);
            contribution.setExitStatus(ExitStatus.COMPLETED);
        } else {
            contribution.setExitStatus(ExitStatus.FAILED);
        }
        return RepeatStatus.FINISHED;
    }
}

ExclusiveCheckTasklet

package com.example.demo.job.tasklet;

import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;

import com.example.demo.aws.client.AmazonDynamoDBClient;
import com.example.demo.job.share.SharedDataContainer;
import com.example.demo.model.CommunicationData;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ExclusiveCheckTasklet implements Tasklet {

    @Autowired AmazonDynamoDBClient ddb;
    @Autowired SharedDataContainer share;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        System.out.println("ExclusiveCheckTasklet");

        CommunicationData commData = ddb.mapper.load(CommunicationData.class, share.message.getBody());
        if (!"request".equals(commData.getStatus())) {
            contribution.setExitStatus(ExitStatus.FAILED);
            return RepeatStatus.FINISHED;
        }

        commData.setStatus("accept");

        try {
            ddb.mapper.save(commData);
            share.commData = commData;
            contribution.setExitStatus(ExitStatus.COMPLETED);
        } catch(ConditionalCheckFailedException e) {
            contribution.setExitStatus(ExitStatus.FAILED);
        }  
        return RepeatStatus.FINISHED;
    }  
}

DeleteMessageTasklet

package com.example.demo.job.tasklet;

import com.example.demo.aws.client.AmazonSQSClient;
import com.example.demo.job.share.SharedDataContainer;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DeleteMessageTasklet implements Tasklet {

    @Autowired AmazonSQSClient sqs;
    @Autowired SharedDataContainer share;

    @Override
    public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
        System.out.println("DeleteMessageTasklet");

        sqs.delete(share.message);
        return RepeatStatus.FINISHED;
    }
}

TransformDataTasklet

package com.example.demo.job.tasklet;

import com.example.demo.job.share.SharedDataContainer;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TransformDataTasklet implements Tasklet {

    @Autowired SharedDataContainer share;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        System.out.println("TransformDataTasklet");

        String requestData = share.commData.getRequestData();
        String replyData = requestData; // 加工
        share.commData.setReplyData(replyData);
        return RepeatStatus.FINISHED;
    }
}

ReplyResultTasklet

package com.example.demo.job.tasklet;

import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;

import com.example.demo.aws.client.AmazonDynamoDBClient;
import com.example.demo.job.share.SharedDataContainer;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ReplyResultTasklet implements Tasklet {

    @Autowired AmazonDynamoDBClient ddb;
    @Autowired SharedDataContainer share;

    @Override
    public RepeatStatus execute(final StepContribution contribution, final ChunkContext chunkContext) throws Exception {
        System.out.println("ReplyResultTasklet");

        share.commData.setStatus("reply");

        try {
            ddb.mapper.save(share.commData);
            contribution.setExitStatus(ExitStatus.COMPLETED);
        } catch(final ConditionalCheckFailedException e) {
            contribution.setExitStatus(ExitStatus.FAILED);
        }  
        return RepeatStatus.FINISHED;
    }  
}

Decider

package com.example.demo.job.decider;

import com.example.demo.job.share.SharedDataContainer;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class VerifyMessageDecider implements JobExecutionDecider {

    @Autowired SharedDataContainer share;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if ("stop".equals(share.message.getBody())) {
            return new FlowExecutionStatus("STOP");
        } else {
            return new FlowExecutionStatus("CONTINUE");
        }
    };
}

ShareData

package com.example.demo.job.share;

import com.amazonaws.services.sqs.model.Message;
import com.example.demo.model.CommunicationData;

import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;

@Component @JobScope
public class SharedDataContainer {
    public Message message;
    public CommunicationData commData;
}

Model

package com.example.demo.model;

import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBVersionAttribute;

@DynamoDBTable(tableName = "CommunicationData")
public class CommunicationData { 
    private String id;
    private String status;
    private String requestData;
    private String replyData;
    private Long version;

    @DynamoDBHashKey(attributeName = "Id")
    public String getId() { return id; }
    public void setId(final String id) { this.id = id; }

    @DynamoDBAttribute(attributeName="Status")
    public String getStatus() { return this.status; }
    public void setStatus(final String status) { this.status = status; }

    @DynamoDBAttribute(attributeName="RequestData")
    public String getRequestData() { return this.requestData; }
    public void setRequestData(final String requestData) { this.requestData = requestData; }

    @DynamoDBAttribute(attributeName="ReplyData")
    public String getReplyData() { return this.replyData; }
    public void setReplyData(final String replyData) { this.replyData = replyData; }

    @DynamoDBVersionAttribute(attributeName="Version")
    public Long getVersion() { return version; }
    public void setVersion(Long version) { this.version = version;}
}

AWS Client

DynamoDB

package com.example.demo.aws.client;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;

import org.springframework.stereotype.Component;

@Component
public class AmazonDynamoDBClient {
    AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
        .withRegion(Regions.AP_NORTHEAST_1)
        .build();

        // https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/DynamoDBMapper.OptionalConfig.html
    DynamoDBMapperConfig mapperConfig = new DynamoDBMapperConfig.Builder()
        .withSaveBehavior(DynamoDBMapperConfig.SaveBehavior.CLOBBER)
        .withConsistentReads(DynamoDBMapperConfig.ConsistentReads.CONSISTENT)
        .withTableNameOverride(null)
        .withPaginationLoadingStrategy(DynamoDBMapperConfig.PaginationLoadingStrategy.EAGER_LOADING)
        .build();

    public DynamoDBMapper mapper = new DynamoDBMapper(client, mapperConfig);
}

SQS

package com.example.demo.aws.client;

import java.util.List;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.Message;

import org.springframework.stereotype.Component;

@Component
public class AmazonSQSClient {
    String url = "";

    // https://docs.aws.amazon.com/ja_jp/sdk-for-java/v1/developer-guide/examples-sqs-message-queues.html
    AmazonSQS sqs = AmazonSQSClientBuilder.standard()
        .withRegion(Regions.AP_NORTHEAST_1)
        .build();

    ReceiveMessageRequest request = new ReceiveMessageRequest()
        .withQueueUrl(url)
        .withWaitTimeSeconds(20)
        .withMaxNumberOfMessages(1);

    public List<Message> receive() {
        return sqs.receiveMessage(request).getMessages();
    }

    public void delete(Message message) {
        sqs.deleteMessage(url, message.getReceiptHandle());
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>software.amazon.awssdk</groupId>
          <artifactId>bom</artifactId>
          <version>2.7.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
          <groupId>software.amazon.awssdk</groupId>
            <artifactId>sqs</artifactId>
        </dependency>

        <dependency>
          <groupId>software.amazon.awssdk</groupId>
            <artifactId>dynamodb</artifactId>
        </dependency>

        <dependency>
            <groupId>com.amazonaws.services.dynamodbv2</groupId>
            <artifactId>amazon-dynamodb-transactions</artifactId>
            <version>1.1.2</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

参考

の使用 AWS SDK for Java - AWS SDK for Java 1.x
のプログラミング情報 AWS SDK for Java
DynamoDBトランザクションライブラリをちょっぴり試してみた | DevelopersIO
よく訓練されたアップル信者、都元です。 AWSにおける代表的なKVS型データベース「DynamoDB」、皆様活用されておりますでしょうか。 RDBはACIDという特性が(略)。一方KVSではBASEという(略)。この辺り …
タイトルとURLをコピーしました