KV Store Persistence
The EDK extends the IDK's KvStore abstraction with database-backed implementations. Store key-value data in PostgreSQL, MySQL, or SQLite with full multi-tenant and multi-scope isolation.
Overview
The database KV store provides:
- Multi-dialect support - PostgreSQL, MySQL, SQLite backends
- Multi-scope isolation - APP, TENANT, PRINCIPAL, SESSION partitioning
- TTL support - Automatic expiration of entries
- Namespace isolation - Type-safe namespaces with codecs
- IDK compatibility - Implements the same
KvStoreinterface
Architecture
The database KV store implements the IDK's KvStoreListing interface:
Data Model
KvEntryEntity
Each key-value entry is stored as:
data class KvEntryEntity(
val id: String, // Unique entry ID
val storeId: String, // KV store identifier
val tenantId: String, // Tenant isolation
val principalId: String?, // User isolation (null for APP/TENANT)
val sessionId: String?, // Session isolation (null except SESSION scope)
val namespace: String, // Namespace isolation
val key: String, // Key within namespace
val value: ByteArray, // Serialized value
val expiresAt: Instant?, // Expiration time (null = never)
val createdAt: Instant,
val updatedAt: Instant
)
Partition Key Structure
The partition key provides multi-level isolation:
| Scope | Partition Key Components |
|---|---|
| APP | (storeId, tenantId, null, null, namespace, key) |
| TENANT | (storeId, tenantId, null, null, namespace, key) |
| PRINCIPAL | (storeId, tenantId, principalId, null, namespace, key) |
| SESSION | (storeId, tenantId, principalId, sessionId, namespace, key) |
Repository Interface
KvRepository
The core repository interface:
interface KvRepository {
// Read operations
suspend fun findEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): KvEntryEntity? // Returns null if expired or not found
suspend fun existsEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): Boolean // Excludes expired entries
suspend fun listKeys(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String
): List<String> // Non-expired keys only
suspend fun findAllEntries(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String
): List<KvEntryEntity> // Non-expired entries only
// Write operations
suspend fun upsertEntry(entity: KvEntryEntity)
suspend fun deleteEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String
): Boolean // Returns true if entry was deleted
suspend fun touchEntry(
storeId: String,
tenantId: String,
principalId: String?,
sessionId: String?,
namespace: String,
key: String,
newExpiresAt: Instant?
): Boolean // Update TTL without changing value
// Maintenance
suspend fun cleanupExpired(namespace: String?): Int // Returns count deleted
}
DatabaseKvStore
The main KV store implementation:
class DatabaseKvStore(
private val config: KvStoreConfig,
private val repositoryProvider: () -> KvRepository,
private val partitionKey: PartitionKey // Contains scope identifiers
) : KvStoreListing {
override suspend fun <V : Any> put(
namespace: KvNamespace<V>,
key: String,
value: V,
ttl: Duration
): IdkResult<KvPutResult, IdkError> {
return try {
val encoded = namespace.codec.encode(value)
val expiresAt = if (ttl.isInfinite()) null
else Clock.System.now() + ttl
val entity = KvEntryEntity(
id = UUID.randomUUID().toString(),
storeId = config.id,
tenantId = partitionKey.tenantId,
principalId = partitionKey.principalId,
sessionId = partitionKey.sessionId,
namespace = namespace.name,
key = key,
value = encoded,
expiresAt = expiresAt,
createdAt = Clock.System.now(),
updatedAt = Clock.System.now()
)
repository.upsertEntry(entity)
Ok(KvPutResult(
metadata = KvEntryMetadata(
createdAtEpochMillis = entity.createdAt.toEpochMilliseconds(),
expiresAtEpochMillis = entity.expiresAt?.toEpochMilliseconds()
)
))
} catch (e: Exception) {
Err(IdkError(
code = "KV_PUT_FAILED",
message = IdkError.Message(
i18nKey = "com.sphereon.kv.error.put-failed",
defaultMessage = "Failed to store value: ${e.message}"
)
))
}
}
override suspend fun <V : Any> get(
namespace: KvNamespace<V>,
key: String
): IdkResult<V?, IdkError> {
return try {
val entity = repository.findEntry(
storeId = config.id,
tenantId = partitionKey.tenantId,
principalId = partitionKey.principalId,
sessionId = partitionKey.sessionId,
namespace = namespace.name,
key = key
)
if (entity == null) {
Ok(null)
} else {
val decoded = namespace.codec.decode(entity.value)
Ok(decoded)
}
} catch (e: Exception) {
Err(IdkError(code = "KV_GET_FAILED", ...))
}
}
// ... other KvStore methods
}
Usage Examples
Basic Operations
import com.sphereon.data.store.kv.*
import kotlinx.serialization.Serializable
import kotlin.time.Duration.Companion.hours
@Serializable
data class CacheEntry(
val data: String,
val cachedAt: Long
)
// Define a namespace with JSON codec
val cacheNamespace = KvNamespace(
name = "api.cache",
codec = KotlinxSerializationJsonKvCodec(
json = Json { ignoreUnknownKeys = true },
serializer = CacheEntry.serializer()
)
)
// Store a value with TTL
val result = kvStore.put(
namespace = cacheNamespace,
key = "user-profile-123",
value = CacheEntry(
data = """{"name": "Alice"}""",
cachedAt = System.currentTimeMillis()
),
ttl = 1.hours
)
when (result) {
is IdkResult.Success -> {
println("Stored, expires at: ${result.value.metadata.expiresAtEpochMillis}")
}
is IdkResult.Failure -> {
println("Failed: ${result.error.message}")
}
}
// Retrieve the value
val getResult = kvStore.get(cacheNamespace, "user-profile-123")
when (getResult) {
is IdkResult.Success -> {
val entry = getResult.value
if (entry != null) {
println("Cached data: ${entry.data}")
} else {
println("Not found or expired")
}
}
is IdkResult.Failure -> {
println("Error: ${getResult.error}")
}
}
Multi-Tenant Usage
@Singleton
class TenantCacheService @Inject constructor(
private val kvStoreService: KvStoreService
) {
private val cacheNamespace = KvNamespace(
name = "tenant.cache",
codec = StringKvCodec()
)
suspend fun getCachedValue(tenantId: String, key: String): String? {
// Get tenant-scoped KV store
val store = kvStoreService.getStoreForTenant(tenantId)
return store.get(cacheNamespace, key)
.getOrNull()
}
suspend fun setCachedValue(tenantId: String, key: String, value: String, ttl: Duration) {
val store = kvStoreService.getStoreForTenant(tenantId)
store.put(cacheNamespace, key, value, ttl)
.getOrThrow()
}
}
Session-Scoped Storage
@Singleton
class SessionDataService @Inject constructor(
private val kvStoreService: KvStoreService
) {
private val sessionNamespace = KvNamespace(
name = "session.data",
codec = KotlinxSerializationJsonKvCodec(
json = Json,
serializer = SessionData.serializer()
)
)
suspend fun getSessionData(sessionId: String): SessionData? {
// Get session-scoped KV store
val store = kvStoreService.getStoreForSession(sessionId)
return store.get(sessionNamespace, "current")
.getOrNull()
}
suspend fun setSessionData(sessionId: String, data: SessionData) {
val store = kvStoreService.getStoreForSession(sessionId)
// Session data expires with session (30 minutes)
store.put(sessionNamespace, "current", data, 30.minutes)
.getOrThrow()
}
}
Listing and Cleanup
// List all keys in namespace
val keysResult = kvStore.listKeys(cacheNamespace)
when (keysResult) {
is IdkResult.Success -> {
keysResult.value.forEach { key ->
println("Key: $key")
}
}
}
// Get all entries with metadata
val entriesResult = kvStore.getAllEntries(cacheNamespace)
when (entriesResult) {
is IdkResult.Success -> {
entriesResult.value.forEach { (key, entry) ->
println("$key: ${entry.value}, expires: ${entry.metadata.expiresAtEpochMillis}")
}
}
}
// Clean up expired entries
val cleanupResult = kvStore.cleanupExpired(cacheNamespace)
when (cleanupResult) {
is IdkResult.Success -> {
println("Cleaned up ${cleanupResult.value} expired entries")
}
}
Database Schema
PostgreSQL
CREATE TABLE kv_entry (
id TEXT PRIMARY KEY,
store_id TEXT NOT NULL,
tenant_id TEXT NOT NULL,
principal_id TEXT,
session_id TEXT,
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value BYTEA NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE (store_id, tenant_id, principal_id, session_id, namespace, key)
);
CREATE INDEX idx_kv_entry_lookup ON kv_entry(store_id, tenant_id, principal_id, session_id, namespace);
CREATE INDEX idx_kv_entry_expires ON kv_entry(expires_at) WHERE expires_at IS NOT NULL;
MySQL
Uses DATETIME(6) for timestamps and BLOB for value storage.
SQLite
Uses TEXT for timestamps and BLOB for value storage.
Configuration
Backend Identifiers
object DatabaseKvStoreBackends {
const val DATABASE = "database" // Generic database backend
const val POSTGRESQL = "postgresql" // PostgreSQL-specific
const val MYSQL = "mysql" // MySQL-specific
const val SQLITE = "sqlite" // SQLite-specific
}
Store Configuration
val config = KvStoreConfig(
id = "cache-store",
scopeBinding = KvStoreScopeBinding.TENANT, // Partition by tenant
backendId = DatabaseKvStoreBackends.POSTGRESQL,
enabled = true,
order = 0
)
Properties Configuration
sphereon:
kv:
stores:
cache:
backend: postgresql
scope-binding: TENANT
enabled: true
session:
backend: postgresql
scope-binding: SESSION
enabled: true
Scope Bindings
| Binding | Partition By | Use Case |
|---|---|---|
APP | Application only | Global cache, feature flags |
TENANT | Tenant ID | Tenant-specific data |
PRINCIPAL_TENANT | Tenant + Principal | User data within tenant |
SESSION | Tenant + Principal + Session | Session-specific state |
Best Practices
Choose appropriate scope binding - Use the most restrictive scope that meets your needs for proper data isolation.
Set meaningful TTLs - Always set TTL for temporary data. Use Duration.INFINITE only for permanent data.
Run periodic cleanup - Schedule cleanupExpired() to remove expired entries if your database doesn't support automatic TTL enforcement.
Use typed namespaces - Define namespaces with proper codecs for type safety and consistent serialization.
Handle errors with IdkResult - All operations return IdkResult; handle both success and failure cases.
Monitor storage growth - Track namespace sizes and implement cleanup policies for high-volume stores.