Spring Boot Webflux DynamoDB Tutorial – Let us integrate AWS DynamoDB with Spring Boot Webflux. In this tutorial will be try to integrate DynamoDB with Webflux in Spring Boot. Instead of using the default AWS Sync Client which blocks the thread, we will use Async client with Webflux.
We are going to create a REST API with basic CRUD operations on a Customer entity. The customer record will be stored in AWS DynamoDB. Also we will use Webflux to connect with DynamoDB.
Let’s start by bootstrapping a new Spring Boot project using start.spring.io. Following project settings were selected in start.spring.io.
Once generated, import the project in your favorite IDE.
Update the build.gradle
file as following. We have added awssdk dynamodb 2.10.40 through dependencyManagement. Also note that spring-boot-starter-weblux is on classpath.
build.gradle
plugins {
id 'org.springframework.boot' version '2.2.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'net.viralpatel'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
mavenCentral()
maven {
url 'https://s3-us-west-2.amazonaws.com/dynamodb-local/release'
}
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'software.amazon.awssdk:dynamodb'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'io.projectreactor:reactor-test'
}
dependencyManagement {
imports {
mavenBom 'software.amazon.awssdk:bom:2.10.40'
}
}
test {
useJUnitPlatform()
}
Code language: Gradle (gradle)
Add following configuration in application.yaml
file. We have defined a couple of properties for dynamodb endpoint and table name. Note the endpoint is pointing to our local DynamoDB. In production you might want to change this to point to your AWS region. We can also use Spring profiles to switch the value for this properties in different environment.
application.yaml
application:
dynamodb:
endpoint: http://localhost:8000
customer_table: customers
Code language: YAML (yaml)
Following is the directory structure for our REST API project. Note that the customer related classes are segregated in a customer package.
Before we proceed with the rest of tutorial, we will setup a local dynamodb instance where we can test our changes. Instead of relying on AWS environment for DynamoDB this would speed up the development process.
We will create customers
table in local DynamoDB. Follow the steps at https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html
java -Djava.library.path=./DynamoDBLocal_lib/
-jar DynamoDBLocal.jar
aws dynamodb create-table
--table-name customers
--attribute-definitions AttributeName=customerId,AttributeType=S
--key-schema AttributeName=customerId,KeyType=HASH
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
--endpoint-url http://localhost:8000
aws dynamodb list-tables
--endpoint-url http://localhost:8000
Output: {
"TableNames": [
"customers"
]
}
Let us start by defining the Repository and DynamoDB configuration to access the data from DynamoDB. As noted earlier we will use DynamoDbAsyncClient
to access DynamoDB.
The repository class contains basic CRUD methods to maintain Customer entity. Note how we use dynamoDbAsyncClient
to access DynamoDB using different GetItemRequest, DeleteItemRequest, ScanRequest APIs. Also we map the return type from DynamoDbAsyncClient which is CompletableFuture
to Reactor’s Mono
class using Mono.fromCompletionStage
.
CustomerRepository.java
package net.viralpatel.springbootwebfluxdynamodb.customer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.util.Map;
import java.util.UUID;
@Repository
public class CustomerRepository {
private DynamoDbAsyncClient dynamoDbAsyncClient;
private String customerTable;
public CustomerRepository(DynamoDbAsyncClient dynamoDbAsyncClient,
@Value("${application.dynamodb.customer_table}") String customerTable) {
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
this.customerTable = customerTable;
}
public Flux<Customer> listCustomers() {
ScanRequest scanRequest = ScanRequest.builder()
.tableName(customerTable)
.build();
return Mono.fromCompletionStage(dynamoDbAsyncClient.scan(scanRequest))
.map(scanResponse -> scanResponse.items())
.map(CustomerMapper::fromList)
.flatMapMany(Flux::fromIterable);
}
public Mono<Customer> createCustomer(Customer customer) {
customer.setId(UUID.randomUUID().toString());
PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(customerTable)
.item(CustomerMapper.toMap(customer))
.build();
return Mono.fromCompletionStage(dynamoDbAsyncClient.putItem(putItemRequest))
.map(putItemResponse -> putItemResponse.attributes())
.map(attributeValueMap -> customer);
}
public Mono<String> deleteCustomer(String customerId) {
DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
.tableName(customerTable)
.key(Map.of("customerId", AttributeValue.builder().s(customerId).build()))
.build();
return Mono.fromCompletionStage(dynamoDbAsyncClient.deleteItem(deleteItemRequest))
.map(deleteItemResponse -> deleteItemResponse.attributes())
.map(attributeValueMap -> customerId);
}
public Mono<Customer> getCustomer(String customerId) {
GetItemRequest getItemRequest = GetItemRequest.builder()
.tableName(customerTable)
.key(Map.of("customerId", AttributeValue.builder().s(customerId).build()))
.build();
return Mono.fromCompletionStage(dynamoDbAsyncClient.getItem(getItemRequest))
.map(getItemResponse -> getItemResponse.item())
.map(CustomerMapper::fromMap);
}
public Mono<String> updateCustomer(String customerId, Customer customer) {
customer.setId(customerId);
PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(customerTable)
.item(CustomerMapper.toMap(customer))
.build();
return Mono.fromCompletionStage(dynamoDbAsyncClient.putItem(putItemRequest))
.map(updateItemResponse -> customerId);
}
}
Code language: Java (java)
In following configuration class we create an instance of DynamoDbAsyncClient. Note how we mapped the endpoint from the application.yaml using Spring @Value
annotation.
DynamoDBConfig.java
package net.viralpatel.springbootwebfluxdynamodb.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import java.net.URI;
@Configuration
public class DynamoDBConfig {
@Bean
public DynamoDbAsyncClient dynamoDbAsyncClient(
@Value("${application.dynamodb.endpoint}") String dynamoDBEndpoint) {
return DynamoDbAsyncClient.builder()
.endpointOverride(URI.create(dynamoDBEndpoint))
.credentialsProvider(DefaultCredentialsProvider.builder().build())
.build();
}
}
Code language: Java (java)
Below is a utility class to map response from DynamoDB into our Customer entity class and vice versa. It’s just a boilerplate code.
CustomerMapper.java
package net.viralpatel.springbootwebfluxdynamodb.customer;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CustomerMapper {
public static List<Customer> fromList(List<Map<String, AttributeValue>> items) {
return items.stream()
.map(CustomerMapper::fromMap)
.collect(Collectors.toList());
}
public static Customer fromMap(Map<String, AttributeValue> attributeValueMap) {
Customer customer = new Customer();
customer.setId(attributeValueMap.get("customerId").s());
customer.setName(attributeValueMap.get("name").s());
customer.setEmail(attributeValueMap.get("email").s());
customer.setCity(attributeValueMap.get("city").s());
return customer;
}
public static Map<String, AttributeValue> toMap(Customer customer) {
return Map.of(
"customerId", AttributeValue.builder().s(customer.getId()).build(),
"name", AttributeValue.builder().s(customer.getName()).build(),
"email", AttributeValue.builder().s(customer.getEmail()).build(),
"city", AttributeValue.builder().s(customer.getCity()).build()
);
}
}
Code language: Java (java)
Next we are defining Spring’s Service class to abstract the repository from our routes (controller in old world) layer. Note that we are calling CustomerRepository from the service class and mapping the response into ServerResponse with appropriate Http Status.
CustomerService.java
package net.viralpatel.springbootwebfluxdynamodb.customer;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.util.stream.Collectors;
@Service
public class CustomerService {
private CustomerRepository customerRepository;
public CustomerService(CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
public Mono<ServerResponse> listCustomers(ServerRequest serverRequest) {
return customerRepository.listCustomers()
.collect(Collectors.toList())
.flatMap(customers -> ServerResponse.ok().body(BodyInserters.fromValue(customers)));
}
public Mono<ServerResponse> createCustomer(ServerRequest serverRequest) {
return serverRequest.bodyToMono(Customer.class)
.flatMap(customer -> customerRepository.createCustomer(customer))
.flatMap(customer -> ServerResponse.created(URI.create("/customers/" + customer.getId())).build());
}
public Mono<ServerResponse> deleteCustomer(ServerRequest serverRequest) {
String customerId = serverRequest.pathVariable("customerId");
return customerRepository.deleteCustomer(customerId)
.flatMap(customer -> ServerResponse.ok().build());
}
public Mono<ServerResponse> getCustomer(ServerRequest serverRequest) {
String customerId = serverRequest.pathVariable("customerId");
return customerRepository.getCustomer(customerId)
.flatMap(customer -> ServerResponse.ok().body(BodyInserters.fromValue(customer)));
}
public Mono<ServerResponse> updateCustomer(ServerRequest serverRequest) {
String customerId = serverRequest.pathVariable("customerId");
return serverRequest.bodyToMono(Customer.class)
.flatMap(customer -> customerRepository.updateCustomer(customerId, customer))
.flatMap(customer -> ServerResponse.ok().build());
}
}
Code language: Java (java)
Finally we glue everything up using Routes.java. In this class we utilize Spring Webflux RouterFunctions to define the route endpoints for Customer REST API.
We defined a bunch of methods using GET, PUT, POST, DELETE methods in Spring Webflux RouterFunctions and invoked appropriate CustomerService methods.
Routes.java
package net.viralpatel.springbootwebfluxdynamodb;
import net.viralpatel.springbootwebfluxdynamodb.customer.CustomerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class Routes {
private CustomerService customerService;
public Routes(CustomerService customerService) {
this.customerService = customerService;
}
@Bean
RouterFunction<ServerResponse> customers() {
return route(GET("/customers"), customerService::listCustomers)
.andRoute(POST("/customers"), customerService::createCustomer)
.andRoute(GET("/customers/{customerId}"), customerService::getCustomer)
.andRoute(PUT("/customers/{customerId}"), customerService::updateCustomer)
.andRoute(DELETE("/customers/{customerId}"), customerService::deleteCustomer);
}
}
Code language: Java (java)
Build and execute the project using Gradle.
$ ./gradlew bootRun
Code language: Shell Session (shell)
Make sure your local instance of DynamoDB is up and running and customers table is created before starting the project. If everything is setup correctly, the customer API should be able to communicate with DynamoDB.
Open Postman and fire up the APIs.
Create new customer
List all customers
Source code of this project is available on Github.
Github – spring-boot-webflux-dynamodb
Java URL Encoder/Decoder Example - In this tutorial we will see how to URL encode/decode…
Show Multiple Examples in OpenAPI - OpenAPI (aka Swagger) Specifications has become a defecto standard…
Local WordPress using Docker - Running a local WordPress development environment is crucial for testing…
1. JWT Token Overview JSON Web Token (JWT) is an open standard defines a compact…
GraphQL Subscription provides a great way of building real-time API. In this tutorial we will…
1. Overview Spring Boot Webflux DynamoDB Integration tests - In this tutorial we will see…
View Comments
Nice article!
While I agree that Docker is the way to go, I fail to see how it was easier to set up your example than using a local DynamoDB library - starting them up requires a one-liner in both cases but installing Docker on your local and build machines, plus adding a dependency in your build script vs just adding a dependency? Hmm, doesn't really add up :)
Also, why not use a dynamic free port instead of a fixed 8000 port?
Btw, if you're using a lot of AWS stuff, give localstack a try, great stuff for both integration testing and local development, bundles all the most frequently used AWS services into one library / container.
Thanks for the comments. I agree sometimes its just easy to use local library like this one instead of using Docker. However Docker does make it easy to run any container as it becomes like easy ritual to start an image and interact with it.
In this example, I am just using 8000 port so I don't have to change the config in my application.yaml to point DynamoDB client to it.
Also, thanks for suggesting LocalStack. I have been playing a while with it and it really make it easy interacting with AWS locally. I will try to write few articles on that.
Hi Viral,
Thanks for writing this blog. I have recently started exploring java and web flux.
In the current implementation, dynamo async client returns completable future and you are converting to Mono/flux. Both have different ways to handle concurrency/multithreading.
Reactor use event loop which uses the same number of threads as per machine configuration.
CompletableFuture can have any number of threads defined by configuration and it completes request on request per thread model.
Are we getting any real benefit from using webflux here?