Skip to content
AGNT

Backend · Core modules

Session store.

The Redis-backed chat history store. Every conversation is encrypted at rest with Fernet, scoped to a (platform, scope_id)tuple so Telegram and WhatsApp don't leak into each other, and caps at 20 messages in Redis with the last 12 fed into each LLM call. File: agnt-backend/app/core/session_store.py.

Constants

Four numbers define the session store's behaviour. All four live at the top of the file so tuning is a single-file operation.

pythonapp/core/session_store.py
CONV_TTL = 86400        # 24 hours
MAX_MESSAGES = 20       # Redis storage cap
RETURN_MESSAGES = 12    # sent into LLM context per turn
MAX_APPEND_RETRIES = 5
  • CONV_TTL = 86400— conversations expire after 24 hours of inactivity. The DB fallback handles cold reads beyond that.
  • MAX_MESSAGES = 20— hard cap in Redis. Older messages are trimmed on append to keep the blob small.
  • RETURN_MESSAGES = 12— the last 12 messages are fed into the LLM context per turn. The other 8 live in Redis for audit but don't influence new replies.
  • MAX_APPEND_RETRIES = 5— optimistic concurrency via Redis WATCH. If a parallel write hits the same key, the append retries up to five times before giving up.

Key format and injection protection

Keys are of the form conv:{platform}:{scope_id}. Both segments are sanitised to reject a literal colon — otherwise a user could set their display name to "alice:conv:bob"and read another user's session. Sanitisation happens by raising rather than replacing so callers can't silently drift past the check.

pythonapp/core/session_store.py — key builders
def _sanitize_scope(value: str) -> str:
    """Reject scope/platform values containing ':' to prevent Redis key injection."""
    if ":" in value:
        raise ValueError(f"Invalid scope value (contains ':'): {value!r}")
    return value


def _key(scope_id: str, platform: str = "default") -> str:
    """Build a platform-scoped Redis key for conversation history."""
    return f"conv:{_sanitize_scope(platform)}:{_sanitize_scope(scope_id)}"

Encryption at rest

The blob stored in Redis is a Fernet ciphertext of the JSON-encoded message list. The key comes from ENCRYPTION_KEY viaapp/core/crypto.py. If the backend boots without ENCRYPTION_KEYin any non-dev environment it hard-fails on startup — PII in plaintext is not acceptable outside local dev.

Legacy unencrypted blobs (from before the encryption rollout) get deleted on read and the caller sees an empty history, which the app handles gracefully by starting a new conversation.

DB fallback

On a Redis miss, get_history falls back to the conversation_snapshotstable. If a snapshot exists, it's decrypted, restored to the Redis cache with a fresh TTL, and returned. This means users who go quiet for more than a day can still resume a conversation the next morning without losing context.

pythonapp/core/session_store.py — get_history
async def get_history(scope_id: str, platform: str = "default") -> list[LLMMessage]:
    r = await get_redis()
    key = _key(scope_id, platform)
    raw = await r.get(key)
    if not raw:
        # Redis cache miss — try to load from DB
        try:
            from app.db.session import async_session
            from sqlalchemy import text
            async with async_session() as db:
                result = await db.execute(
                    text(
                        "SELECT messages_json FROM conversation_snapshots "
                        "WHERE scope_id = :sid AND platform = :plat"
                    ),
                    {"sid": scope_id, "plat": platform},
                )
                row = result.fetchone()
                if row and row[0]:
                    plaintext = decrypt(row[0] if isinstance(row[0], bytes) else row[0].encode())
                    messages = json.loads(plaintext)
                    # Restore to Redis cache
                    encrypted = encrypt(json.dumps(messages))
                    await r.set(key, encrypted, ex=CONV_TTL)
                    return [LLMMessage(**m) for m in messages]
        except Exception:
            logger.warning("DB fallback failed for conversation %s", key)
        return []
    try:
        plaintext = decrypt(raw if isinstance(raw, bytes) else raw.encode())
    except Exception:
        # Legacy unencrypted data or corrupted blob — clear and start fresh
        await r.delete(key)
        return []
    data = json.loads(plaintext)
    messages = [LLMMessage.model_validate(m) for m in data]
    return messages[-RETURN_MESSAGES:]

Append with optimistic concurrency

Writes use Redis WATCH + MULTIto handle concurrent appends from different webhook workers. If another process updates the key between the read and the write, the transaction aborts and retries up to five times. Failures past five retries log a warning and drop the write — rare in practice because conversations are inherently serial per user.

Wiping a session

Users can wipe their agent memory from the profile screen. That flow calls a helper which deletes both the Redis key and the conversation_snapshots row. The next message the user sends starts from scratch.

Related