Skip to main content
Version: v0.25.0 (Latest)

KV Store Persistence

The EDK extends the IDK's KvStore abstraction with database-backed implementations. Store key-value data in PostgreSQL, MySQL, or SQLite with full multi-tenant and multi-scope isolation.

Overview

The database KV store provides:

  • Multi-dialect support - PostgreSQL, MySQL, SQLite backends
  • Multi-scope isolation - APP, TENANT, PRINCIPAL, SESSION partitioning
  • TTL support - Automatic expiration of entries
  • Namespace isolation - Type-safe namespaces with codecs
  • IDK compatibility - Implements the same KvStore interface

Architecture

The database KV store implements the IDK's KvStoreListing interface:

KV Store Architecture

Data Model

KvEntryEntity

Each key-value entry is stored as:

data class KvEntryEntity(
val id: String, // Unique entry ID
val storeId: String, // KV store identifier
val tenantId: String, // Tenant isolation
val principalId: String?, // User isolation (null for APP/TENANT)
val sessionId: String?, // Session isolation (null except SESSION scope)
val namespace: String, // Namespace isolation
val key: String, // Key within namespace
val value: ByteArray, // Serialized value
val expiresAt: Instant?, // Expiration time (null = never)
val createdAt: Instant,
val updatedAt: Instant
)

Partition Key Structure

The partition key provides multi-level isolation:

ScopePartition Key Components
APP(storeId, tenantId, null, null, namespace, key)
TENANT(storeId, tenantId, null, null, namespace, key)
PRINCIPAL(storeId, tenantId, principalId, null, namespace, key)
SESSION(storeId, tenantId, principalId, sessionId, namespace, key)

Repository Interface

KvRepository

The core repository interface:

interface KvRepository {
// Read operations
suspend fun findEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): KvEntryEntity? // Returns null if expired or not found

suspend fun existsEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): Boolean // Excludes expired entries

suspend fun listKeys(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String
): List<String> // Non-expired keys only

suspend fun findAllEntries(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String
): List<KvEntryEntity> // Non-expired entries only

// Write operations
suspend fun upsertEntry(entity: KvEntryEntity)

suspend fun deleteEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): Boolean // Returns true if entry was deleted

suspend fun touchEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String,
newExpiresAt: Instant?
): Boolean // Update TTL without changing value

// Maintenance
suspend fun cleanupExpired(namespace: String?): Int // Returns count deleted
}

DatabaseKvStore

The main KV store implementation:

class DatabaseKvStore(
private val config: KvStoreConfig,
private val repositoryProvider: () -> KvRepository,
private val partitionKey: PartitionKey // Contains scope identifiers
) : KvStoreListing {

override suspend fun <V : Any> put(
namespace: KvNamespace<V>,
key: String,
value: V,
ttl: Duration
): IdkResult<KvPutResult, IdkError> {
return try {
val encoded = namespace.codec.encode(value)
val expiresAt = if (ttl.isInfinite()) null
else Clock.System.now() + ttl

val entity = KvEntryEntity(
id = UUID.randomUUID().toString(),
storeId = config.id,
tenantId = partitionKey.tenantId,
principalId = partitionKey.principalId,
sessionId = partitionKey.sessionId,
namespace = namespace.name,
key = key,
value = encoded,
expiresAt = expiresAt,
createdAt = Clock.System.now(),
updatedAt = Clock.System.now()
)

repository.upsertEntry(entity)

Ok(KvPutResult(
metadata = KvEntryMetadata(
createdAtEpochMillis = entity.createdAt.toEpochMilliseconds(),
expiresAtEpochMillis = entity.expiresAt?.toEpochMilliseconds()
)
))
} catch (e: Exception) {
Err(IdkError(
code = "KV_PUT_FAILED",
message = IdkError.Message(
i18nKey = "com.sphereon.kv.error.put-failed",
defaultMessage = "Failed to store value: ${e.message}"
)
))
}
}

override suspend fun <V : Any> get(
namespace: KvNamespace<V>,
key: String
): IdkResult<V?, IdkError> {
return try {
val entity = repository.findEntry(
storeId = config.id,
tenantId = partitionKey.tenantId,
principalId = partitionKey.principalId,
sessionId = partitionKey.sessionId,
namespace = namespace.name,
key = key
)

if (entity == null) {
Ok(null)
} else {
val decoded = namespace.codec.decode(entity.value)
Ok(decoded)
}
} catch (e: Exception) {
Err(IdkError(code = "KV_GET_FAILED", ...))
}
}

// ... other KvStore methods
}

Usage Examples

Basic Operations

import com.sphereon.data.store.kv.*
import kotlinx.serialization.Serializable
import kotlin.time.Duration.Companion.hours

@Serializable
data class CacheEntry(
val data: String,
val cachedAt: Long
)

// Define a namespace with JSON codec
val cacheNamespace = KvNamespace(
name = "api.cache",
codec = KotlinxSerializationJsonKvCodec(
json = Json { ignoreUnknownKeys = true },
serializer = CacheEntry.serializer()
)
)

// Store a value with TTL
val result = kvStore.put(
namespace = cacheNamespace,
key = "user-profile-123",
value = CacheEntry(
data = """{"name": "Alice"}""",
cachedAt = System.currentTimeMillis()
),
ttl = 1.hours
)

when (result) {
is IdkResult.Success -> {
println("Stored, expires at: ${result.value.metadata.expiresAtEpochMillis}")
}
is IdkResult.Failure -> {
println("Failed: ${result.error.message}")
}
}

// Retrieve the value
val getResult = kvStore.get(cacheNamespace, "user-profile-123")

when (getResult) {
is IdkResult.Success -> {
val entry = getResult.value
if (entry != null) {
println("Cached data: ${entry.data}")
} else {
println("Not found or expired")
}
}
is IdkResult.Failure -> {
println("Error: ${getResult.error}")
}
}

Multi-Tenant Usage

@Singleton
class TenantCacheService @Inject constructor(
private val kvStoreService: KvStoreService
) {
private val cacheNamespace = KvNamespace(
name = "tenant.cache",
codec = StringKvCodec()
)

suspend fun getCachedValue(tenantId: String, key: String): String? {
// Get tenant-scoped KV store
val store = kvStoreService.getStoreForTenant(tenantId)

return store.get(cacheNamespace, key)
.getOrNull()
}

suspend fun setCachedValue(tenantId: String, key: String, value: String, ttl: Duration) {
val store = kvStoreService.getStoreForTenant(tenantId)

store.put(cacheNamespace, key, value, ttl)
.getOrThrow()
}
}

Session-Scoped Storage

@Singleton
class SessionDataService @Inject constructor(
private val kvStoreService: KvStoreService
) {
private val sessionNamespace = KvNamespace(
name = "session.data",
codec = KotlinxSerializationJsonKvCodec(
json = Json,
serializer = SessionData.serializer()
)
)

suspend fun getSessionData(sessionId: String): SessionData? {
// Get session-scoped KV store
val store = kvStoreService.getStoreForSession(sessionId)

return store.get(sessionNamespace, "current")
.getOrNull()
}

suspend fun setSessionData(sessionId: String, data: SessionData) {
val store = kvStoreService.getStoreForSession(sessionId)

// Session data expires with session (30 minutes)
store.put(sessionNamespace, "current", data, 30.minutes)
.getOrThrow()
}
}

Listing and Cleanup

// List all keys in namespace
val keysResult = kvStore.listKeys(cacheNamespace)

when (keysResult) {
is IdkResult.Success -> {
keysResult.value.forEach { key ->
println("Key: $key")
}
}
}

// Get all entries with metadata
val entriesResult = kvStore.getAllEntries(cacheNamespace)

when (entriesResult) {
is IdkResult.Success -> {
entriesResult.value.forEach { (key, entry) ->
println("$key: ${entry.value}, expires: ${entry.metadata.expiresAtEpochMillis}")
}
}
}

// Clean up expired entries
val cleanupResult = kvStore.cleanupExpired(cacheNamespace)

when (cleanupResult) {
is IdkResult.Success -> {
println("Cleaned up ${cleanupResult.value} expired entries")
}
}

Database Schema

PostgreSQL

CREATE TABLE kv_entry (
id TEXT PRIMARY KEY,
store_id TEXT NOT NULL,
tenant_id TEXT NOT NULL,
principal_id TEXT,
session_id TEXT,
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value BYTEA NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE (store_id, tenant_id, principal_id, session_id, namespace, key)
);

CREATE INDEX idx_kv_entry_lookup ON kv_entry(store_id, tenant_id, principal_id, session_id, namespace);
CREATE INDEX idx_kv_entry_expires ON kv_entry(expires_at) WHERE expires_at IS NOT NULL;

MySQL

Uses DATETIME(6) for timestamps and BLOB for value storage.

SQLite

Uses TEXT for timestamps and BLOB for value storage.

Configuration

Backend Identifiers

object DatabaseKvStoreBackends {
const val DATABASE = "database" // Generic database backend
const val POSTGRESQL = "postgresql" // PostgreSQL-specific
const val MYSQL = "mysql" // MySQL-specific
const val SQLITE = "sqlite" // SQLite-specific
}

Store Configuration

val config = KvStoreConfig(
id = "cache-store",
scopeBinding = KvStoreScopeBinding.TENANT, // Partition by tenant
backendId = DatabaseKvStoreBackends.POSTGRESQL,
enabled = true,
order = 0
)

Properties Configuration

sphereon:
kv:
stores:
cache:
backend: postgresql
scope-binding: TENANT
enabled: true

session:
backend: postgresql
scope-binding: SESSION
enabled: true

Scope Bindings

BindingPartition ByUse Case
APPApplication onlyGlobal cache, feature flags
TENANTTenant IDTenant-specific data
PRINCIPAL_TENANTTenant + PrincipalUser data within tenant
SESSIONTenant + Principal + SessionSession-specific state

Best Practices

Choose appropriate scope binding - Use the most restrictive scope that meets your needs for proper data isolation.

Set meaningful TTLs - Always set TTL for temporary data. Use Duration.INFINITE only for permanent data.

Run periodic cleanup - Schedule cleanupExpired() to remove expired entries if your database doesn't support automatic TTL enforcement.

Use typed namespaces - Define namespaces with proper codecs for type safety and consistent serialization.

Handle errors with IdkResult - All operations return IdkResult; handle both success and failure cases.

Monitor storage growth - Track namespace sizes and implement cleanup policies for high-volume stores.