Using DynamoDB with Kotlin + Spring Boot

Using DynamoDB with Kotlin + Spring Boot

Overview

  • This article summarizes table design and CRUD operations for DynamoDB in a Spring Boot-based project, utilizing the DynamoDB Enhanced Client library recommended by AWS.

Considerations for DynamoDB Table Design

  • DynamoDB, AWS's serverless NoSQL database, is ideal for infinitely scaling production environments but comes with constraints, such as no support for table joins, necessitating a different approach from traditional RDBMS.

  • The table's primary key consists of a combination of a Partition Key(PK) and a Sort Key(SK). For query purposes, the PK works on a hash basis requiring exact matches, while the SK allows for range-based filtering. To query on fields not included in the partition or sort keys, a full table scan or the creation of LSI or GSI is required.

  • LSI should be created with caution due to its production environment constraints, offering real-time data for queries but limiting table modification post-creation and capping the same partition's size at 10GB.

  • GSI directly impacts costs, emphasizing the need to keep GSIs small. Setting the projection type to KEYS_ONLY during index creation minimizes storage space by only including index, partition, and sort key values. (ALL replicates the base table, doubling the storage required.)

Considerations for Querying DynamoDB Tables

  • To retrieve multiple items with a single request using a Partition Key + Sort Key list, use BatchGetItem, essentially a list version of GetItem. Related Link

  • Executing a Query filters by the combination of Partition Key and Sort Key. Further filtering can be achieved with Filter Expression, refining the results on the server side before responding to the client. It's important to note that physical read amounts remain the same whether or not a filter expression is used, impacting constraints and billing equally. Related Link

build.gradle.kts

  • Add the following content to your project's /build.gradle.kts:
val awsSdkVersion by extra { "2.25.11" }

dependencies {
    implementation("software.amazon.awssdk:dynamodb-enhanced:$awsSdkVersion")
    implementation("software.amazon.awssdk:url-connection-client:$awsSdkVersion")
}

DynamoDB Local Setup

  • If you want to develop in a safe, isolated local environment separated from the actual DynamoDB tables, you can use the DynamoDB Local Docker image provided by AWS. The setup method using Docker Compose is as follows. (For it to work properly, the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables must be set on your operating system.)
# Set up DynamoDB Local
$ nano docker-compose.yml
version: '3.8'
services:
  dynamodb-local:
    command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
    image: "amazon/dynamodb-local:latest"
    container_name: dynamodb-local
    ports:
      - "8000:8000"
    volumes:
      - "./docker/dynamodb:/home/dynamodblocal/data"
    working_dir: /home/dynamodblocal
    restart: always

# Start DynamoDB Local
$ docker-compose up -d

Creating a DynamoDbEnhancedClient bean

  • It's time to register the dynamoDbEnhancedClient singleton bean for use in @Repository beans for actual DynamoDB connections.
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbClient

@Configuration
class DynamoDbConfig {

    @Bean
    fun dynamoDbClient(): DynamoDbClient {

        return when (System.getenv("SPRING_PROFILES_ACTIVE") == "local") {
            // Connect to DynamoDB Local for local development environment
            true -> {
                DynamoDbClient
                    .builder()
                    .endpointOverride(URI.create("http://localhost:8000"))
                    .httpClient(UrlConnectionHttpClient.builder().build())
                    .build()
            }
            // Connect to DynamoDB for remote deployment environment
            false -> {
                DynamoDbClient
                    .builder()
                    .build()
            }
        }
    }

    @Bean
    fun dynamoDbEnhancedClient(
        @Qualifier("dynamoDbClient") dynamoDbClient: DynamoDbClient
    ): DynamoDbEnhancedClient {

        return DynamoDbEnhancedClient.builder()
            .dynamoDbClient(dynamoDbClient)
            .build()
    }
}

Creating a DynamoDB Table

  • Before bean design, it's time to create the table. Design the table as follows, with a Hash Key for storing the chat room ID as pk: String, and a Range Key for storing the chat room message ID as sk: String:
$ aws dynamodb create-table \ 
    --table-name {table-name} \
    --attribute-definitions AttributeName=pk,AttributeType=S AttributeName=sk,AttributeType=S \
    --key-schema AttributeName=pk,KeyType=HASH AttributeName=sk,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST

DynamoDB Bean Design

  • It's time to design the bean that maps to the actual physical table. Assuming a Single Table Design, it was designed simply.
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.*
import java.math.BigDecimal

@DynamoDbBean
data class MessageDynamoDbBean(

    // ROOM#{room.id}
    @get:DynamoDbPartitionKey
    @get:DynamoDbAttribute("pk")
    var pk: String = "",

    // MESSAGE#{message.id}
    @get:DynamoDbSortKey
    @get:DynamoDbAttribute("sk")
    var sk: String = "",

    // MESSAGE
    @get:DynamoDbAttribute("entityType")
    var entityType: String = "MESSAGE",

    @get:DynamoDbAttribute("message")
    var message: String = "",

    @get:DynamoDbAttribute("customData")
    @get:DynamoDbConvertedBy(DynamoDbStringMapToJsonAttributeConverter::class)
    var customData: Map<String, String?>? = null,

    @get:DynamoDbAttribute("userId")
    var userId: String = "",

    @get:DynamoDbAttribute("createdAt")
    var createdAt: Instant = Instant.now(), 

    @get:DynamoDbAttribute("lastModifiedAt")
    var lastModifiedAt: Instant = Instant.now(), 

    // USER#{user-id}#ROOM#{room-id}
    @get:DynamoDbSecondaryPartitionKey(indexNames = ["gsi-pk-user-id-room-id"])
    @get:DynamoDbAttribute("gsk1pk")
    var gsi1pk: String = ""
)
  • @DynamoDbAttribute can be used to map each attribute and field, allowing for different physical attribute names and field names.

  • Fields like pk, sk, and entityType are essential for Single Table Design, ensuring all beans using the same table maintain these fields.

  • @DynamoDbPartitionKey for partition key fields and @DynamoDbSortKey for sort key fields are specified to define key structures.

  • GSI is applied to allow user-based message querying, indicated by @DynamoDbSecondaryPartitionKey and @DynamoDbSecondarySortKey.

  • A custom converter, DynamoDbStringMapToJsonAttributeConverter, is used for storing Map<String, String?>? fields as JSON strings.

  • Instant types storing timestamp information are automatically converted to DynamoDB's String type, preserving the ISO 8601 format.

Custom Attribute Converter: StringMapToJsonAttributeConverter

  • How should a field of type Map<String, String?>? be stored at the bean level? While various methods exist, storing the JSON-converted string in a string-type field is most ideal if the field is not an index target. Below is how to create an AttributeConverter implementation.
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter
import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType
import software.amazon.awssdk.services.dynamodb.model.AttributeValue

class DynamoDbStringMapToJsonAttributeConverter : AttributeConverter<Map<String, String?>?> {

    override fun transformFrom(input: Map<String, String?>?): AttributeValue {

        return try {
            AttributeValue
                .builder()
                .s(mapper.writeValueAsString(input))
                .build()
        } catch (e: JsonProcessingException) {
            AttributeValue
                .builder()
                .nul(true)
                .build()
        }
    }

    override fun transformTo(input: AttributeValue): Map<String, String?>? {

        return try {
            mapper.readValue(input.s(), Map::class.java) as Map<String, String?>
        } catch (e: JsonProcessingException) {
            null
        }
    }

    override fun type(): EnhancedType<Map<String, String?>?>? {

        return EnhancedType.mapOf(String::class.java, String::class.java)
    }

    override fun attributeValueType(): AttributeValueType {

        return AttributeValueType.S
    }

    companion object {

        private val mapper = jacksonObjectMapper().apply {
            setSerializationInclusion(JsonInclude.Include.ALWAYS)
            configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            disable(SerializationFeature.FAIL_ON_EMPTY_BEANS, SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            registerModules(JavaTimeModule())
        }
    }
}

CRUD: READ

  • Here's an example of listing and single item retrieval for the previously created bean.
import org.springframework.stereotype.Repository
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable
import software.amazon.awssdk.enhanced.dynamodb.Key
import software.amazon.awssdk.enhanced.dynamodb.TableSchema
import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional
import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException
import com.fasterxml.jackson.annotation.JsonFormat
import java.util.stream.Collectors

@Repository
class MessageDynamoDbRepository(
    private val dynamoDbEnhancedClient: DynamoDbEnhancedClient
) {
    private val table: DynamoDbTable<MessageDynamoDbBean>
        get() = dynamoDbEnhancedClient.table(
            "{table-name}",
            TableSchema.fromBean(MessageDynamoDbBean::class.java)
        )

    fun fetchAllByRoomIdAndMessageId(
        roomId: Long?,
        messageId: String? = "",
        sort: DynamoDbSort = DynamoDbSort.LESS_THAN,
        limit: Int = 100
    ): List<MessageDynamoDbBean> {

        roomId ?: return emptyList()

        val key = Key.builder()
            .partitionValue("ROOM#$roomId")
            .sortValue("MESSAGE#$messageId")
            .build()

        val queryConditional = when (sort) {
            DynamoDbSort.LESS_THAN -> QueryConditional.sortLessThan(key)
            DynamoDbSort.GREATER_THAN -> QueryConditional.sortGreaterThan(key)
        }

        val queryEnhanceRequest = QueryEnhancedRequest.builder()
            .queryConditional(queryConditional)
            .scanIndexForward(true)
            .build()

        return table
            .query(queryEnhanceRequest)
            .items()
            .stream()
            .limit(limit.toLong())
            .collect(Collectors.toList())
            .sortedBy { it.id }
    }

    fun fetchOneByRoomIdAndMessageId(
        roomId: Long?,
        messageId: String?
    ): MessageDynamoDbBean? {

        roomId ?: return null
        messageId ?: return null

        val queryConditional = QueryConditional
            .keyEqualTo(
                Key.builder()
                    .partitionValue("ROOM#$roomId")
                    .sortValue("MESSAGE#${messageId}")
                    .build()
            )

        val queryEnhanceRequest = QueryEnhancedRequest.builder()
            .queryConditional(queryConditional)
            .scanIndexForward(false)
            .build()

        return try {
            table
                .query(queryEnhanceRequest)
                .items()
                .stream()
                .findFirst()
                .get()
        } catch (ex: ResourceNotFoundException) {
            null
        } catch (ex: NoSuchElementException) {
            null
        }
    }
}

@JsonFormat(shape = JsonFormat.Shape.STRING)
enum class DynamoDbSort(val scanIndexForward: Boolean) {

    LESS_THAN(scanIndexForward = false),
    GREATER_THAN(scanIndexForward = true)
}

CRUD: BATCH READ

  • Multiple items can be listed with a BatchGetItem command.
fun fetchAllByRoomIdAndmessageIds(
    roomId: Long?,
    messageIds : List<String> = emptyList()
): List <MessageDynamoDbBean> {

    roomId ?: return emptyList()
    if (messageIds.isNullOrEmpty()) return emptyList()

    val readBatchBuilder = ReadBatch
        .builder(MessageDynamoDbBean::class.java)
        .mappedTableResource(table)

    messageIds.forEach {
        readBatchBuilder.addGetItem(
            Key
                .builder()
                .partitionValue("ROOM#$roomId")
                .sortValue("MESSAGE#${it}")
                .build()
        )
    }

    return dynamoDbEnhancedClient
        .batchGetItem {
            it.addReadBatch(readBatchBuilder.build())
        }
        .resultsForTable(table)
        .stream()
        .collect(Collectors.toList())
}

CRUD: INSERT OR REPLACE

  • The PutItem command can be executed as follows. If an item with the same Primary Key already exists, it replaces the existing item with the new one without throwing any exceptions.
fun save(message: MessageDynamoDbBean) {
    table.putItem(message)
}

CRUD: INSERT OR UPDATE

  • UpdateItem operates similarly to PutItem but, instead of replacing the entire item, it adds or replaces the values of fields in the item if it already exists.
fun update(message: MessageDynamoDbBean): MessageDynamoDbBean {
    return table.updateItem(message)
}

CRUD: Atomic Counter

  • Ensuring atomicity for operations like updating the number of visitors to a website or the number of views on a post or video is crucial. DynamoDB provides the Atomic Counter feature for storing such values, preventing concurrent requests from corrupting data.
@get:DynamoDbAtomicCounter(startValue = 0, delta = 1)
@get:DynamoDbAttribute("totalVisitCount")
var totalVisitCount: Long = 0,
  • By specifying @DynamoDbAtomicCounter on a particular attribute, it operates as an Atomic Counter. A document can have more than one Atomic Counter specified.

  • startValue specifies the initial value when the document is first created, defaulting to 0. delta is the unit by which the document's value is incremented or decremented when updateItem() is executed, with 1 meaning an increase by 1, and -1 meaning a decrease by 1, defaulting to 1.

  • Since an attribute is updated solely by the specified delta value, alternating between incrementing by 1 and then decreasing by 1 for different scenarios is not feasible. For such cases, each field should be segregated into separate isolated documents.

  • Updates to the attribute's value can only occur through DynamoDbTable#updateItem execution, ensuring atomicity.

CRUD: BATCH INSERTS OR REPLACE

  • The BatchWriteItem command can be executed as follows, considering two exceptions in the logic.

  • First, if the item count in a single batch request exceeds 25, an exception occurs.

  • Second, the batch request does not throw a separate exception for failed items. Instead, you can write retry logic for the list of failed items through unprocessedDeleteItemsForTable() and unprocessedPutItemsForTable() included in the batch response.

  • Batch processing, although more complex to implement than PutItem commands, significantly speeds up handling large quantities of items. (For instance, creating 100 million items with an average size of 175.22 bytes in on-demand mode takes about 25 minutes.)

fun saveAll(messages: List<MessageDynamoDbBean>) {

    // Divide the list of items into maximum batches of 25
    messages.chunked(25).forEach { aChunkOfMessages ->
        val writeBatchBuilder = WriteBatch
            .builder(MessageDynamoDbBean::class.java)
            .mappedTableResource(table)

        aChunkOfMessages.forEach { message ->
            writeBatchBuilder.addPutItem(message)
        }

        val batchWriteItemEnhancedRequest: BatchWriteItemEnhancedRequest = BatchWriteItemEnhancedRequest
            .builder()
            .writeBatches(writeBatchBuilder.build())
            .build()

        val batchWriteResult = dynamoDbEnhancedClient.batchWriteItem(batchWriteItemEnhancedRequest)

        // Process deletion for items failed to delete
        batchWriteResult.unprocessedDeleteItemsForTable(table).forEach { key ->
            table.deleteItem(key)
        }

        // Process creation for items failed to create
        batchWriteResult.unprocessedPutItemsForTable(table).forEach { item ->
            table.putItem(item)
        }
    }
}

CRUD: DELETE

  • The DeleteItem command can be executed as below. It deletes the item corresponding to the Primary Key field values of the parameter and returns the deleted item.
fun delete(message: MessageDynamoDbBean): MessageDynamoDbBean {
    table.deleteItem(message)
}

CRUD Exception Handling: List of Automatically Retried Exceptions

  • DynamoDB automatically retries certain exceptions up to a predefined maximum number of attempts at the SDK level. Below is the complete list of exceptions for which the retry logic is triggered. (All belong to the software.amazon.awssdk.services.dynamodb.model package.)
# 400
ItemCollectionSizeLimitExceededException
LimitExceededException
ProvisionedThroughputExceededException
RequestLimitExceeded

# 500
InternalServerErrorException

CRUD Exception Handling: 400 ProvisionedThroughputExceededException

  • If a DynamoDB table's billing mode is set to Provisioned, exceeding the set throughput limits for read/write operations can lead to 400 ProvisionedThroughputExceededException. Following the SDK's retry policies, a single failed request can take up to almost 60 seconds, which can be critical in production environments. Even with auto-scaling policies set, they do not apply immediately, making this error inevitable during certain times of sudden request spikes. [Related Link]

  • The simplest solution is to change the table's billing mode to On-demand. The method is as follows. (The update proceeds without downtime and takes a considerable amount of time. Moreover, changing to on-demand is allowed only once per day.)

$ aws dynamodb update-table --table-name {table-name} --billing-mode PAY_PER_REQUEST

CRUD Exception Handling: InternalServerErrorException

  • When performing CRUD operations in repository beans, you might encounter the software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException, indicating a 500 Internal Server Error. This exception suggests an internal issue within AWS, and not a fault in the application itself.
software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException: Internal server error (Service: DynamoDb, Status Code: 500, Request ID: 236OI2K4PJSJK206OMTMV9DQF3VV4KQNSO5AEMVJF66Q9ASUAAJG, Extended Request ID: null)
  • Key considerations for your code in this scenario include: If a PutItem, UpdateItem, or DeleteItem operation triggers this exception, the command might have been completed or not. Therefore, it's advisable to check the status of the operation using GetItem following an exception, and then decide if a retry is necessary.

  • In the case of a TransactWriteItem operation triggering this exception, it's clear that the operation did not complete, allowing for an immediate retry logic to be implemented.

References and Further Reading