# arandu — Complete Documentation
> This file contains the complete documentation for the arandu SDK, a long-term memory system for AI agents. Generated automatically from the source docs.
---
# arandu
**Long-term memory for AI agents.** Extract facts from conversations, resolve entities, reconcile knowledge over time, and retrieve relevant context - all backed by PostgreSQL and pgvector.
> *The name "Arandu" comes from the Guarani word meaning "wisdom acquired through experience" - literally "listening to time." Just as the Guarani concept describes knowledge built through lived experience, Arandu gives your AI agent the ability to accumulate, consolidate, and recall knowledge over time.*
---
## Why arandu?
Most AI agents are stateless. They forget everything between sessions. `arandu` gives your agent a persistent, structured memory that grows smarter over time:
- **Automatic fact extraction** - The write pipeline uses LLMs to extract entities, facts, and relationships from natural language.
- **Entity resolution** - Recognizes that "my wife Ana", "Ana", and "her" all refer to the same person, using a 3-phase resolver (exact → fuzzy → LLM).
- **Knowledge reconciliation** - Decides whether new information should ADD, UPDATE, or DELETE existing facts. No duplicates, no stale data.
- **Multi-signal retrieval** - Combines semantic search (pgvector), keyword matching, graph traversal, and recency scoring to find the most relevant facts.
- **Background maintenance** - Clustering, consolidation, and importance scoring keep memory organized and fresh - like how your brain consolidates during sleep.
- **Provider-agnostic** - Bring your own LLM and embedding provider via simple Python protocols. OpenAI and Anthropic (Claude) providers included.
## Installation
```bash
pip install arandu
```
With OpenAI support (recommended):
```bash
pip install arandu[openai]
```
### Requirements
- Python 3.11+
- PostgreSQL with the [pgvector](https://github.com/pgvector/pgvector) extension
## Quick Start
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
# 1. Set up providers
provider = OpenAIProvider(api_key="sk-...")
# 2. Create client
memory = MemoryClient(
database_url="postgresql+psycopg://user:pass@localhost/mydb",
llm=provider,
embeddings=provider,
)
# 3. Initialize tables (idempotent)
await memory.initialize()
# 4. Write — extracts facts automatically
result = await memory.write(
agent_id="user_123",
message="I live in São Paulo and work at Acme Corp as a backend engineer.",
)
print(f"Added {len(result.facts_added)} facts, resolved {len(result.entities_resolved)} entities")
# 5. Retrieve — finds relevant context
context = await memory.retrieve(
agent_id="user_123",
query="where does the user live and work?",
)
print(context.context)
# 6. Cleanup
await memory.close()
asyncio.run(main())
```
## How It Works
### Write Pipeline
```
Message → Extract (LLM) → Resolve Entities → Reconcile → Upsert
```
Every message goes through four stages: the LLM extracts structured facts, entities are resolved to canonical records, new facts are reconciled against existing knowledge, and decisions (ADD/UPDATE/NOOP/DELETE) are executed.
→ [Learn more about the Write Pipeline](concepts/write-pipeline.md)
### Read Pipeline
```
Query → Plan (deterministic) → Retrieve (semantic + keyword + graph) → Rerank → Format
```
Queries go through a deterministic planner that decides retrieval strategy (no LLM call), then three parallel signals are merged, optionally reranked, and compressed into a context string.
→ [Learn more about the Read Pipeline](concepts/read-pipeline.md)
### Background Jobs
```
Clustering → Consolidation → Importance Scoring → Summary Refresh
```
Periodic background jobs keep memory organized and fresh - like sleep-time processing in the brain.
→ [Learn more about Background Jobs](concepts/background-jobs.md)
## Architecture
`arandu` is designed around three principles:
1. **Protocol-based DI** - LLM and embedding providers are injected via `typing.Protocol`. No vendor lock-in.
2. **Fail-safe by default** - Every LLM call has timeouts and fallbacks. A failed extraction still logs the event. A failed reconciliation defaults to ADD.
3. **Composition over inheritance** - Small, focused modules composed into pipelines. No deep class hierarchies.
→ [Learn more about the Design Philosophy](concepts/design-philosophy.md)
## Next Steps
- :material-rocket-launch:{ .lg .middle } **Getting Started**
---
Full setup guide: PostgreSQL, pgvector, first write and retrieve.
[:octicons-arrow-right-24: Getting Started](getting-started.md)
- :material-brain:{ .lg .middle } **Concepts**
---
Deep dive into how each pipeline works and why.
[:octicons-arrow-right-24: Write Pipeline](concepts/write-pipeline.md)
---
# Getting Started
This guide walks you through setting up `arandu` from scratch: installing dependencies, configuring PostgreSQL with pgvector, writing your first facts, and retrieving them.
## Prerequisites
- **Python 3.11+**
- **PostgreSQL 15+** with the [pgvector](https://github.com/pgvector/pgvector) extension installed
- An **OpenAI API key** (or any LLM/embedding provider - see [Custom Providers](#custom-providers))
## Step 1: Install
```bash
pip install arandu[openai]
```
This installs the core SDK plus the bundled OpenAI provider. If you're using a different LLM provider, install just the core:
```bash
pip install arandu
```
## Step 2: Set Up PostgreSQL + pgvector
`arandu` stores facts, entities, and embeddings in PostgreSQL using the pgvector extension for vector similarity search.
### Option A: Docker (recommended for development)
```bash
docker run -d \
--name memory-db \
-e POSTGRES_USER=memory \
-e POSTGRES_PASSWORD=memory \
-e POSTGRES_DB=memory \
-p 5432:5432 \
pgvector/pgvector:pg16
```
The `pgvector/pgvector` image comes with the extension pre-installed. Your connection string will be:
`postgresql+psycopg://memory:memory@localhost:5432/memory`
> **psycopg vs psycopg2:** Arandu uses `psycopg` (async driver), **not** `psycopg2` (sync). Your connection string must start with `postgresql+psycopg://`, not `postgresql+psycopg2://`. Many Django/Flask tutorials use psycopg2 - make sure you're using the right one.
### Option B: Existing PostgreSQL
If you already have PostgreSQL running, enable the pgvector extension:
```sql
CREATE EXTENSION IF NOT EXISTS vector;
```
> **pgvector installation:** If you don't have pgvector installed on your server, follow the
[pgvector installation guide](https://github.com/pgvector/pgvector#installation).
## Understanding Agent, Session, and Speaker
Every `write()` call uses three identifiers. Here's what each one does:
**`agent_id`** identifies whose memory this is. Think of it as a brain. One agent = one memory space. All facts, entities, and relationships live inside that agent's memory. If you have two chatbots, each one gets its own `agent_id` and they don't share memories.
**`speaker_name`** identifies who is talking. When someone says "I live in São Paulo", the SDK needs to know who "I" is. If the speaker is Rafael, "I" becomes "Rafael lives in São Paulo". Without `speaker_name`, the SDK doesn't know who "I" refers to and will raise a `ValueError`.
**`session_id`** tags the conversation context. It's optional (defaults to `"default"`). Use it when you want to track which conversation a message came from. For example, a support ticket, a therapy session, or a meeting.
> **Memory is NOT separated by session:** Changing `session_id` does **not** create a separate memory. All facts go into the same agent memory regardless of session. When you call `retrieve()`, it searches everything the agent knows, across all sessions. The `session_id` is metadata on the event, not a partition key.
```python
await memory.write(
agent_id="my-assistant", # whose memory
message="I live in São Paulo",
speaker_name="Rafael", # who is talking
session_id="support-ticket-42", # optional context tag
)
```
For a simple chatbot with one user, you only need `agent_id` and `speaker_name`. Add `session_id` when you want to track where a conversation happened.
> **Why three separate fields?:** A memory system needs to answer three questions: whose brain stores this? (agent), who said it? (speaker), and in what context? (session). Mixing them into a single identifier breaks down when two people talk to the same agent, or the same person has multiple conversations. Separating them keeps the model clean and flexible.
## Step 3: Initialize the Client
```python
import asyncio
from arandu import MemoryClient, MemoryConfig
from arandu.providers.openai import OpenAIProvider
async def main():
# Create the LLM + embedding provider
provider = OpenAIProvider(api_key="sk-...")
# Create the memory client
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost:5432/memory",
llm=provider,
embeddings=provider,
)
# Create tables (safe to call multiple times)
await memory.initialize()
print("Memory initialized!")
await memory.close()
asyncio.run(main())
```
> **Using Anthropic (Claude) instead of OpenAI:** ```python
from arandu.providers.anthropic import AnthropicProvider
from arandu.providers.openai import OpenAIProvider
llm = AnthropicProvider(api_key="sk-ant-...") # Claude for LLM
embeddings = OpenAIProvider(api_key="sk-...") # OpenAI for embeddings only
memory = MemoryClient(
database_url="postgresql+psycopg://...",
llm=llm,
embeddings=embeddings, # Anthropic doesn't offer embeddings
)
```
Install with: `pip install arandu[anthropic]`
> **Using DeepSeek, Groq, or local models:** Any OpenAI-compatible provider works with `OpenAIProvider`. Just set `base_url`:
```python
llm = OpenAIProvider(api_key="sk-...", model="deepseek-chat", base_url="https://api.deepseek.com/v1")
```
See the [Cookbook](cookbook.md) for more examples.
`initialize()` creates all required tables and indexes (including pgvector HNSW indexes). It's idempotent - safe to call on every startup.
> **About `agent_id`:** The `agent_id` is your **partitioning key**. Each agent_id gets its own isolated memory space - facts written for one agent are never returned for another. Think of it as a brain: one agent, one memory. Use any string (database ID, UUID, slug). The same agent_id must be used in both `write()` and `retrieve()` calls for the same agent.
> **About `session_id`:** The `session_id` identifies the **conversation context** (default: `"default"`). Think of it like a WhatsApp chat thread - same agent, different conversations. When not provided, all writes go to the `"default"` session.
> **About `speaker_name`:** The `speaker_name` identifies **who is speaking** the message. It is a **required** positional parameter in `write()`. Pronouns like "I", "me", "eu", "myself" automatically resolve to the speaker entity (`person:{speaker_slug}`). For example, if `speaker_name="Rafael"` and the message says "I live in São Paulo", the fact is attributed to Rafael - not to a generic `user:self`. Use the speaker's real name (e.g., `"Rafael"`, `"Ana"`).
## Step 4: Write Your First Facts
The `write()` method takes a natural language message and automatically:
1. Extracts entities, facts, and relationships using an LLM
2. Resolves entities to canonical records (deduplication)
3. Reconciles new facts against existing knowledge
4. Upserts the results into the database
```python
async def write_example(memory: MemoryClient):
# First message
result = await memory.write(
agent_id="user_123",
message="My name is Rafael and I live in São Paulo. I work at Acme Corp as a backend engineer.",
speaker_name="Rafael",
)
print(f"Facts added: {len(result.facts_added)}")
for fact in result.facts_added:
print(f" [{fact.entity_name}] {fact.fact_text} (confidence: {fact.confidence})")
# Output:
# [Rafael] Lives in São Paulo (confidence: 0.95)
# [Rafael] Works at Acme Corp as a backend engineer (confidence: 0.95)
# [Acme Corp] Rafael works at Acme Corp (confidence: 0.95)
print(f"Entities resolved: {len(result.entities_resolved)}")
print(f"Duration: {result.duration_ms:.0f}ms")
# Second message — the system recognizes "Rafael" and updates knowledge
result = await memory.write(
agent_id="user_123",
message="I just moved to Rio de Janeiro. Still working at Acme though.",
speaker_name="Rafael",
session_id="onboarding", # optional — defaults to "default"
)
print(f"Facts added: {len(result.facts_added)}")
print(f"Facts updated: {len(result.facts_updated)}") # "lives in São Paulo" → "lives in Rio"
```
### Understanding WriteResult
The `WriteResult` object tells you exactly what happened:
| Field | Type | Description |
|-------|------|-------------|
| `event_id` | `str` | Unique ID for this write event |
| `facts_added` | `list` | New facts created (ADD decisions) |
| `facts_updated` | `list` | Existing facts superseded (UPDATE decisions) |
| `facts_unchanged` | `list` | Facts confirmed but not changed (NOOP decisions) |
| `facts_deleted` | `list` | Facts retracted (DELETE decisions) |
| `entities_resolved` | `list` | Entities identified and resolved |
| `duration_ms` | `float` | Total pipeline duration |
| `success` | `bool` | Whether the pipeline completed without errors |
| `error` | `str \| None` | Error message if the pipeline failed internally |
## Step 5: Retrieve Context
The `retrieve()` method finds facts relevant to a query using multiple signals:
```python
async def retrieve_example(memory: MemoryClient):
result = await memory.retrieve(
agent_id="user_123",
query="where does Rafael live and what does he do?",
)
# Option 1: Pre-formatted string — paste directly into your LLM prompt
print(result.context)
# Option 2: Individual scored facts — for programmatic access
for fact in result.facts:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
# Retrieve within a specific session (optional — omit to search all sessions)
session_result = await memory.retrieve(
agent_id="user_123",
query="where does Rafael live?",
session_id="onboarding", # optional — defaults to searching all sessions
)
# With config adjustments (e.g., disable reranker for faster results)
fast_result = await memory.retrieve(
agent_id="user_123",
query="where does Rafael live?",
config_overrides={"enable_reranker": False, "topk_facts": 5},
)
print(f"Total candidates evaluated: {result.total_candidates}")
print(f"Duration: {result.duration_ms:.0f}ms")
```
> **`.context` vs `.facts`:** Use **`result.context`** when you just need a string to inject into an LLM prompt - it's pre-formatted with tier labels (CORE MEMORY, EXTENDED CONTEXT, etc.). Use **`result.facts`** when you need programmatic access to individual facts, scores, and metadata.
### Per-request Config Overrides
You can override any `MemoryConfig` field for a single request without changing the client's default config:
```python
result = await memory.retrieve(
agent_id="user_123",
query="where does Rafael live?",
config_overrides={
"enable_reranker": False,
"topk_facts": 5,
"spreading_activation_hops": 0,
},
)
# config_effective shows the actual config used for this request
print(result.config_effective)
```
Only the provided keys are overridden; all other fields inherit from the client's `MemoryConfig`.
### Understanding RetrieveResult
| Field | Type | Description |
|-------|------|-------------|
| `facts` | `list[ScoredFact]` | Ranked facts with scores |
| `context` | `str` | Pre-formatted context string for LLM prompts |
| `total_candidates` | `int` | Total facts evaluated before ranking |
| `duration_ms` | `float` | Total pipeline duration |
| `config_effective` | `dict` | Effective config values used for this request |
Each `ScoredFact` contains:
| Field | Type | Description |
|-------|------|-------------|
| `fact_id` | `str` | Unique fact identifier |
| `entity_name` | `str` | Human-readable entity name |
| `attribute_key` | `str` | Fact category/attribute |
| `fact_text` | `str` | The fact content |
| `score` | `float` | Combined relevance score (0-1) |
| `scores` | `dict` | Breakdown by signal (semantic, recency, etc.) |
| `speaker` | `str \| None` | Who spoke the message this fact was extracted from |
## Step 6: Managing Facts
Beyond `write()` and `retrieve()`, the SDK provides CRUD operations for managing individual facts: fetching by ID, listing all facts, and deleting.
### Get a specific fact
```python
detail = await memory.get(agent_id="user_123", fact_id="some-uuid-here")
if detail:
print(f"[{detail.entity_name}] {detail.fact_text}")
print(f" confidence: {detail.confidence}, importance: {detail.importance}")
print(f" created: {detail.created_at}")
else:
print("Fact not found")
```
`get()` returns a `FactDetail` or `None`. It fetches any fact by ID — including facts that were soft-deleted by the reconciliation pipeline (i.e. superseded by a newer version). Use it for direct lookups when you have the ID.
### List all facts
```python
# First page (newest first)
facts = await memory.get_all(agent_id="user_123", limit=50, offset=0)
for fact in facts:
print(f"[{fact.fact_id}] {fact.entity_name}: {fact.fact_text}")
# Next page
page2 = await memory.get_all(agent_id="user_123", limit=50, offset=50)
```
`get_all()` returns only **active** facts (`valid_to IS NULL`), ordered by `created_at` descending. Use `limit` and `offset` for pagination.
#### Filtering by entity
```python
# Only facts about Ana
ana_facts = await memory.get_all(agent_id="user_123", entity_keys=["person:ana"])
# Facts about Pedro OR Ana
facts = await memory.get_all(agent_id="user_123", entity_keys=["person:pedro", "person:ana"])
```
The `entity_keys` filter also works with `retrieve()`:
```python
# Semantic search scoped to facts about Ana
result = await memory.retrieve(
agent_id="user_123",
query="what does she do for work?",
entity_keys=["person:ana"],
)
```
When `entity_keys` is provided, only facts linked to at least one of the specified entities are returned (OR logic). Without `entity_keys`, all facts are searched as before.
> **Aliases are resolved automatically:** `entity_keys` accepts both canonical keys (`person:pedro_menezes`) and aliases (`person:pedro` or just `pedro`). The SDK resolves aliases against the `memory_entity_aliases` table before filtering, so you don't need to know the canonical form. Any key that does not resolve is surfaced in `result.warnings` — `retrieve()` never returns "zero silently" because of a bad key.
```python
result = await memory.retrieve(
agent_id="user_123",
query="what does she do?",
entity_keys=["person:pedro", "person:unknown"],
)
# result.facts → filtered by Pedro's canonical key (alias resolved)
# result.warnings → ["entity_key 'person:unknown' not found (not canonical, no matching alias)"]
```
### Delete a fact
```python
deleted = await memory.delete(agent_id="user_123", fact_id="some-uuid-here")
print(f"Deleted: {deleted}") # True if found and removed, False otherwise
```
`delete()` performs a **hard delete** — the row is physically removed from the database. Associated entity links are removed automatically via cascade. This is the explicit user action ("I want this gone"); the pipeline's soft-delete via `valid_to` is a separate mechanism for reconciliation.
### Delete all facts
```python
count = await memory.delete_all(agent_id="user_123")
print(f"Deleted {count} facts")
```
`delete_all()` removes **every** fact belonging to the agent. Use with caution — this is irreversible. Intended for reset/debug scenarios.
### List entities
```python
entities = await memory.entities(agent_id="user_123", limit=50)
for entity in entities:
print(f"[{entity.entity_type}] {entity.display_name} ({entity.fact_count} facts)")
if entity.summary_text:
print(f" Summary: {entity.summary_text}")
```
`entities()` returns active entities ordered by `last_seen_at` descending. Use it to see what the agent knows about — people, places, organizations, etc.
### Understanding FactDetail
| Field | Type | Description |
|-------|------|-------------|
| `fact_id` | `str` | Unique fact identifier |
| `entity_name` | `str` | Human-readable entity name |
| `entity_key` | `str` | Canonical entity key |
| `entity_type` | `str` | Entity type (e.g. "person", "organization") |
| `attribute_key` | `str \| None` | Fact category/attribute |
| `fact_text` | `str` | The fact content |
| `category` | `str \| None` | Fact category |
| `confidence` | `float` | Confidence score (0-1) |
| `importance` | `float` | Importance score (0-1) |
| `valid_from` | `datetime \| None` | When the fact became valid |
| `created_at` | `datetime \| None` | When the fact was created |
| `source_context` | `str \| None` | Original context snippet |
| `speaker` | `str \| None` | Who spoke the message this fact was extracted from |
### Understanding EntityDetail
| Field | Type | Description |
|-------|------|-------------|
| `entity_id` | `str` | Unique entity identifier |
| `canonical_key` | `str` | Canonical entity key (e.g. "person::rafael") |
| `display_name` | `str` | Human-readable entity name |
| `entity_type` | `str` | Entity type (e.g. "person", "organization") |
| `summary_text` | `str \| None` | Auto-generated entity summary |
| `fact_count` | `int` | Number of facts linked to this entity |
| `importance_score` | `float \| None` | Computed importance score |
| `first_seen_at` | `datetime \| None` | When the entity was first mentioned |
| `last_seen_at` | `datetime \| None` | When the entity was last mentioned |
| `profile_text` | `str \| None` | Consolidated entity profile |
## Step 7: Configure (Optional)
Every aspect of the pipeline is configurable via `MemoryConfig`:
```python
from arandu import MemoryConfig
from arandu.providers.openai import OpenAIProvider
# Single provider for all LLM operations (extraction, reranker, etc.)
llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")
config = MemoryConfig(
# Tight timeout for real-time chat
extraction_timeout_sec=15.0,
# Tune retrieval
topk_facts=30,
min_similarity=0.25,
enable_reranker=True,
# Custom score weights (default: semantic=0.70, recency=0.20, importance=0.10)
score_weights={
"semantic": 0.60,
"recency": 0.25,
"importance": 0.15,
},
# Set timezone for recency calculations
timezone="America/Sao_Paulo",
)
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=llm,
embeddings=llm,
config=config,
)
```
All parameters have sensible defaults - you only need to override what matters for your use case.
## Step 8: Debugging with Verbose Mode
Pass `verbose=True` to `write()` or `retrieve()` to get a detailed trace of every pipeline step:
```python
result = await memory.write(agent_id="user_123", message="...", speaker_name="Rafael", verbose=True)
# Access the pipeline trace
if result.pipeline:
for step in result.pipeline.steps:
print(f" {step.name}: {step.duration_ms:.1f}ms")
print(f" data: {step.data}")
```
The trace includes steps like `extraction`, `entity_resolution`, `reconciliation`, and `upsert`, each with timing and intermediate data. If the pipeline fails internally, an `error` step is added with the exception details - useful for diagnosing silent failures.
You can serialize the full trace with `result.pipeline.to_dict()`.
## Step 9: Cleanup
Always close the client when done to release database connections:
```python
await memory.close()
```
Or use it as an async context pattern:
```python
memory = MemoryClient(...)
await memory.initialize()
try:
# ... use memory
finally:
await memory.close()
```
## Complete Example
Here's a full working example putting it all together:
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost:5432/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Write some facts
await memory.write(
agent_id="user_123",
message="I'm a software engineer living in Berlin. I love cycling and craft coffee.",
speaker_name="Rafael",
)
await memory.write(
agent_id="user_123",
message="My girlfriend Ana is a designer. We adopted a cat named Pixel last month.",
speaker_name="Rafael",
)
# Retrieve context
result = await memory.retrieve(agent_id="user_123", query="tell me about this person")
print(result.context)
# Targeted retrieval
result = await memory.retrieve(agent_id="user_123", query="who is Ana?")
for fact in result.facts:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
finally:
await memory.close()
asyncio.run(main())
```
## Custom Providers
`arandu` uses Python protocols for dependency injection. You can bring any LLM or embedding provider by implementing two simple interfaces:
```python
from arandu.protocols import LLMProvider, LLMResult, EmbeddingProvider
class MyLLMProvider:
async def complete(
self,
messages: list[dict],
temperature: float = 0,
response_format: dict | None = None,
max_tokens: int | None = None,
) -> LLMResult:
# Call your LLM here
text = ... # get response text from your LLM
return LLMResult(text=text, usage=None)
class MyEmbeddingProvider:
async def embed(self, texts: list[str]) -> list[list[float]]:
# Return embeddings for a batch
...
async def embed_one(self, text: str) -> list[float] | None:
# Return embedding for a single text
...
```
No inheritance required - just implement the methods with the right signatures.
## Next Steps
- [**Write Pipeline**](concepts/write-pipeline.md) - Understand how facts are extracted, entities resolved, and knowledge reconciled
- [**Read Pipeline**](concepts/read-pipeline.md) - Learn how multi-signal retrieval finds the most relevant facts
- [**Data Types & Schema**](advanced/data-types.md) - Database schema reference (tables, columns, types) for direct SQL queries
- [**Background Jobs**](concepts/background-jobs.md) - Set up clustering, consolidation, and importance scoring
- [**Design Philosophy**](concepts/design-philosophy.md) - Explore the neuroscience-inspired architecture
---
# Write Pipeline
When you call `memory.write()`, the SDK reads a natural language message and automatically extracts who and what was mentioned, figures out if it's new or updated information, and stores it as structured, versioned facts - all in one call.
**You don't need to understand the internals to use it.** Just call `write()` and check `result.facts_added`. This page explains what happens under the hood for when you want to tune behavior or debug results.
```mermaid
flowchart LR
A["Message"] --> B["Alias Lookup +\nPre-retrieval"]
B --> C["Informed\nExtraction"]
C --> D["Resolve Entities"]
D --> E["Upsert"]
E --> F["WriteResult"]
```
## Overview
Every `memory.write(agent_id, message, speaker_name)` call runs these steps (see also the optional `occurred_at` parameter in [Configuration](../configuration.md#extraction)):
0. **Guard** - Empty messages return immediately. No event, no LLM call, no tokens consumed.
1. **Log the event** - The raw message is saved as an immutable audit trail (never modified or deleted).
2. **Detect emotion** - Classifies the message's emotion, intensity, and energy level.
3. **Alias lookup** - Scans the message for known entity names using word-boundary matching against the alias cache (no LLM call).
4. **Pre-retrieval / Profile load** - For each recognized entity, fetches existing knowledge: entity profiles (if available) or top-K existing facts via pgvector embedding similarity (no LLM call).
5. **Informed extraction** - A single LLM call receives the message, speaker context, and existing knowledge context. Returns entities, facts (with action NEW/UPDATE and importance category), and relations in one JSON response.
6. **Resolve entities** - Deduplicates mentions ("Ana", "my wife Ana", "Aninha") into a single canonical entity.
7. **Upsert** - Saves facts, entity links, relationships, and updated entity profiles to the database.
Each stage is independently fail-safe: if informed extraction fails, the pipeline falls back to the legacy blind extraction + reconciliation flow. If upsert fails for one fact, the others proceed normally.
> **Legacy fallback:** If informed extraction fails (LLM timeout, invalid JSON, etc.), the pipeline automatically falls back to the old flow: entity scan (1 LLM call) + fact extraction + relation extraction (2 concurrent LLM calls) + reconciliation (1 LLM call per ambiguous fact). This ensures no data is lost even when the new path encounters an error.
---
## Stage 1: Memory-Aware Extraction
**In plain English:** Before calling the LLM, the SDK checks what it already knows about the entities in the message. It loads existing entity profiles or recent facts, then sends everything to the LLM in a single call: "Here's the message, here's the speaker, here's when it was sent, and here's what we already know. Extract all the facts." The LLM returns structured data -- entities, facts, relations, and updated profiles -- and the system handles deduplication downstream.
The extraction stage uses a **memory-aware** approach (informed extraction) by default. Instead of extracting blindly, the pipeline first gathers existing knowledge and passes it as context, so the LLM can extract with full awareness of what is already stored. The LLM is instructed to **extract ALL factual information** from the message, including specific details like proper nouns, book titles, country names, dates, and numbers. Deduplication is handled downstream by the reconciliation step (ADD/UPDATE/NOOP/DELETE decisions), not by asking the LLM to filter. This approach maximizes recall -- the LLM captures everything, and the system decides what is new.
### How It Works (Informed Extraction -- Default)
Informed extraction runs 1 LLM call, preceded by two zero-cost lookup steps:
1. **Alias lookup** (no LLM) -- Scans the message for known entity names using word-boundary matching against the alias cache (`MemoryEntityAlias`). This identifies which entities the message is about before any LLM call.
2. **Pre-retrieval / Profile load** (no LLM) -- For each recognized entity, loads context:
- If the entity has a `profile_text` (see [Entity Profiles](#entity-profiles) below), the profile is used as context.
- Otherwise, fetches the top-K existing facts for that entity via pgvector embedding similarity.
- The total context is capped at `informed_extraction_context_budget_tokens` to avoid prompt bloat.
3. **Informed extraction** (1 LLM call) -- The LLM receives the message, speaker context, temporal context (the `occurred_at` timestamp or current time, used to resolve relative references like "yesterday" or "last week"), and existing knowledge context. It returns a single JSON with:
- **Entities** -- with aliases, as before.
- **Facts** -- each annotated with an `action` (NEW or UPDATE) and an `importance_category`.
- **Relations** -- between entities.
- **Updated profiles** -- concise entity summaries reflecting the new information (see [Entity Profiles](#entity-profiles)).
The LLM is instructed to extract ALL factual information from the message, preserving specific details (proper nouns, titles, place names, dates, numbers). Facts marked UPDATE indicate a change to existing knowledge; facts marked NEW are genuinely novel information. The reconciliation step (ADD/UPDATE/NOOP/DELETE) handles deduplication downstream, ensuring nothing is lost even when the LLM re-extracts something already known.
**Importance categories:** Each fact receives a semantic `importance_category` from the LLM, which is mapped to a numeric importance value via the `IMPORTANCE_CATEGORY_MAP`:
| Category | Importance | Example |
|----------|-----------|---------|
| `biographical_milestone` | High | "Graduated from MIT", "Got married" |
| `relationship_change` | High | "Started dating Ana", "Left Acme Corp" |
| `stable_preference` | Medium | "Prefers Python over Java" |
| `specific_event` | Medium | "Went to a concert last Friday" |
| `routine_activity` | Low | "Goes to the gym on Mondays" |
| `conversational` | Low | "Said they're tired today" |
This replaces the flat 0.5 default importance: facts are born with a semantically grounded importance score instead of being uniformly scored and waiting for background jobs to adjust them.
### Fallback: Blind Extraction (Legacy)
If informed extraction fails (LLM timeout, invalid JSON, rate limit), the pipeline falls back to the legacy flow:
1. **Entity scan** (1 LLM call) -- Identify all entities mentioned in the message
2. **Fact extraction + Relation extraction** (2 concurrent LLM calls) -- via `asyncio.gather()`
3. **Reconciliation** (see [Stage 3](#stage-3-reconciliation)) -- Compares each fact against existing knowledge
This fallback ensures no data is lost. The event is still logged, and the legacy path produces the same end result -- just with more LLM calls and without the duplicate-elimination benefits of informed extraction.
Relation extraction in the fallback includes an **automatic retry**: if the LLM returns 0 relations but 2+ entities were found, the SDK retries the relation call once before accepting an empty result. When `verbose=True`, the trace includes `relation_retry_triggered`.
**Subject-centric extraction:** Facts are extracted from the perspective of the primary subject only. "Carlos lives in Curitiba" is a fact about Carlos - the system does NOT also create "Curitiba is where Carlos lives" as a separate fact. The relationship `Carlos → lives_in → Curitiba` + entity links handle cross-entity retrieval.
**Semantic dedup:** After extraction (both informed and fallback), facts are compared pairwise by embedding cosine similarity. Near-duplicates (> 0.85 similarity) are removed, keeping the first occurrence. This eliminates cross-entity reformulations that the LLM sometimes produces despite prompt instructions.
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `extraction_timeout_sec` | `30.0` | Timeout per LLM call |
| `enable_informed_extraction` | `True` | Enable memory-aware informed extraction. When `False`, always uses the legacy blind extraction + reconciliation flow |
| `informed_extraction_topk` | `10` | Number of existing facts to retrieve per entity during pre-retrieval (when no entity profile is available) |
| `informed_extraction_context_budget_tokens` | `800` | Maximum token budget for the existing knowledge context passed to the informed extraction LLM call |
### What Gets Extracted
For each message, the extraction stage produces:
- **Entities** - Named things: people, organizations, places, concepts, etc.
- **Facts** - Self-contained statements about entities in natural language (e.g., "Fernanda Lima is a software engineer", "Marcos Tavares lives in Porto Alegre"). Each fact text always includes the entity name - never just "is a software engineer" without a subject. Every relationship also generates a corresponding fact - so "Sarah is my wife" produces both a relation (`user → spouse_of → sarah`) and a fact ("Sarah is user's wife"). Duplicate facts (same subject + same text, ignoring punctuation) are automatically removed post-extraction.
- **Relations** - Connections between entities (e.g., "Rafael" → `works_at` → "Acme Corp"). Relations serve as graph edges for traversal; the paired fact makes the information searchable via text/embedding.
- **Updated profiles** (informed extraction only) - Concise entity summaries reflecting the new information. See [Entity Profiles](#entity-profiles).
Each fact includes a **confidence level**:
| Level | Score | Example |
|-------|-------|---------|
| Explicit statement | 0.95 | "I live in São Paulo" |
| Strong inference | 0.80 | "We went to the São Paulo office" (implies location) |
| Weak inference | 0.60 | Contextual implication |
| Speculation | 0.40 | Uncertain information |
> **How confidence works in practice:** Confidence is assigned by the LLM during extraction based on how the information was stated. Direct statements ("I live in SP") get high confidence; hedged statements ("I think maybe...") get lower confidence. You cannot set confidence directly - it's inferred. You can filter low-confidence facts at retrieval time using `min_confidence` in MemoryConfig (default 0.55).
> **Walkthrough: how informed extraction works:** **Message:** "Clara Rezende saiu da Vertix e foi pra Orion Tech como head de engenharia. O Thiago Nogueira a contratou pessoalmente."
**Step 1 - Alias lookup** (no LLM):
```
Scans message against alias cache.
Matches: "Clara Rezende" → person:clara_rezende, "Vertix" → organization:vertix
No match: "Orion Tech", "Thiago Nogueira" (new entities)
```
**Step 2 - Pre-retrieval / Profile load** (no LLM):
```
person:clara_rezende has profile_text → loaded as context
organization:vertix has no profile → top-10 facts fetched via pgvector
Existing context:
[Clara Rezende profile] "Software engineer, previously at Vertix since 2023."
[Vertix fact] "Vertix is a SaaS startup in Curitiba" (0.95)
```
**Step 3 - Informed extraction** (1 LLM call):
```
LLM receives: message + speaker context + existing knowledge
Returns:
Entities: [Clara Rezende (person), Vertix (organization), Orion Tech (organization), Thiago Nogueira (person)]
Facts:
[Clara Rezende] "Clara Rezende left Vertix" (0.95, action=NEW, category=relationship_change)
[Clara Rezende] "Clara Rezende joined Orion Tech as head of engineering" (0.95, action=NEW, category=biographical_milestone)
[Thiago Nogueira] "Thiago Nogueira personally hired Clara Rezende" (0.95, action=NEW, category=specific_event)
Relations:
Clara Rezende → former_employee_of → Vertix
Clara Rezende → works_at → Orion Tech
Thiago Nogueira → hired → Clara Rezende
Updated profiles:
Clara Rezende → "Software engineer. Left Vertix, joined Orion Tech as head of engineering."
```
Note: "Clara Rezende is a software engineer" was NOT re-extracted because the LLM saw it in the existing profile.
**Step 4 - Semantic dedup:**
No near-duplicates found (all facts are distinct). 3 facts pass through.
**Result:** 4 entities, 3 facts, 3 relations, 1 updated profile. Total: 1 LLM call (vs 3 in legacy mode).
> **Walkthrough: how fallback (blind) extraction works:** If informed extraction fails (e.g., LLM returns invalid JSON), the pipeline falls back to the legacy flow:
**Step 1 - Entity scan** (1 LLM call):
```
Entities: [Clara Rezende (person), Vertix (organization), Orion Tech (organization), Thiago Nogueira (person)]
```
**Step 2a - Fact extraction** (1 LLM call, all entities):
```
Facts:
[Clara Rezende] "Clara Rezende left Vertix" (0.95)
[Clara Rezende] "Clara Rezende joined Orion Tech as head of engineering" (0.95)
[Thiago Nogueira] "Thiago Nogueira personally hired Clara Rezende" (0.95)
```
**Step 2b - Relation extraction** (1 LLM call, concurrent with 2a):
```
Relations:
Clara Rezende → former_employee_of → Vertix
Clara Rezende → works_at → Orion Tech
Thiago Nogueira → hired → Clara Rezende
```
**Step 3 - Semantic dedup:**
No near-duplicates found (all facts are distinct). 3 facts pass through.
**Result:** 4 entities, 3 facts, 3 relations. Total: 3 LLM calls. Then proceeds to reconciliation.
### Alias Grouping & Subject Normalization
When the same entity is mentioned by multiple names in a single message (e.g., "my friend Guili (Guilherme Maturana)"), the extraction groups them into a **single entity with aliases** instead of creating duplicates.
The LLM is instructed to pick one canonical name (usually the most complete) and list the others as aliases:
```json
{
"entities": [
{"name": "Guilherme Maturana", "type": "person", "aliases": ["Guili"]}
]
}
```
After extraction, a **subject normalization** pass rewrites any fact or relation that references an alias to use the canonical name instead. Identity relations (e.g., `same_as` between an alias and its canonical name) are removed automatically since they become self-referencing after normalization.
This eliminates intra-message entity duplication at the source - before entity resolution even runs.
### Entity Types
Entity types are **free-form strings** - the LLM chooses the most appropriate type for each entity. Common types include `person`, `organization`, `place`, `product`, `event`, `concept`, `pet`, but any descriptive type is accepted. Types are normalized to lowercase during entity resolution (e.g., `"Person"` → `"person"`, `"PRODUCT"` → `"product"`).
The extraction prompt instructs the LLM to classify types carefully - for example, cities are `place`, companies are `organization`, software products are `product`.
### Language of Generated Content
All LLM-generated content -- entity profiles, summaries, cluster summaries, meta-observations, and procedural directives -- is always produced in **English**, regardless of the language of the input message. This ensures consistency in internal representations across multilingual conversations.
**Facts remain in the original conversation language.** A message in Portuguese produces fact texts in Portuguese (e.g., "Pedro mora em Porto Alegre"), but the entity profile for Pedro will be written in English (e.g., "Software engineer based in Porto Alegre, married to Ana."). This is by design: facts are verbatim extractions from the conversation, while generated content is internal metadata that benefits from a single canonical language.
### Fail-safe Behavior
If an LLM call fails (timeout, invalid JSON, rate limit), the extraction returns an empty result rather than raising an exception. The event is still logged - no data is lost. The next message may capture the same information.
> **Detecting timeouts:** When extraction times out, the result is indistinguishable from "message had no extractable content" - 0 entities, 0 facts, no exception. To detect timeouts, compare the extraction `duration_ms` in the trace against your configured `extraction_timeout_sec`, or check for 0 entities despite a content-rich message.
> **Neuroscience parallel:** Informed extraction mirrors **encoding relative to prior knowledge** in human memory. We don't encode new experiences in a vacuum -- the brain's orienting response compares incoming stimuli against existing schemas before committing anything to long-term storage. When you hear something you already know, your hippocampus suppresses re-encoding (repetition suppression). When you hear something genuinely new or contradictory, encoding is enhanced (the novelty/mismatch signal). Informed extraction replicates this: the LLM sees what's already known and only extracts what's new or changed.
---
## Entity Profiles
Entity profiles are concise summaries (~100-300 tokens) that capture what the system knows about an entity. Each entity's `profile_text` is both **input** and **output** of the informed extraction stage, creating a feedback loop that keeps profiles current.
### How Profiles Work
- **Input to extraction:** When the informed extraction runs, entity profiles are loaded during the pre-retrieval step and injected into the LLM context. When a profile is available, it replaces the individual fact retrieval for that entity -- a single concise summary instead of N separate facts, saving prompt tokens and providing better context.
- **Output from extraction:** The LLM returns `updated_profiles` as part of its response. These reflect the entity's state after incorporating the new information from the message.
- **Persistence:** Updated profiles are saved to the `memory_entities` table (`profile_text` column) in the same database transaction as the facts and relations. The `profile_refreshed_at` timestamp is updated to track freshness.
- **Cold start / Seeding:** On the first message about an entity, there is no pre-existing profile. The informed extraction creates an initial (seed) profile from the facts it extracts. The write pipeline only **seeds** profiles for entities that do not already have one -- it never overwrites an existing profile. The authoritative source for comprehensive, up-to-date profiles is the [`consolidate_entity_profiles()` background job](background-jobs.md#entity-profile-consolidation), which reads ALL facts per entity and generates a thorough profile covering every major aspect.
### Profiles vs Summaries
Entity profiles (`profile_text`) and entity summaries (`summary_text`) coexist but serve different purposes:
| | Profile (`profile_text`) | Summary (`summary_text`) |
|---|---|---|
| **When updated** | During the write pipeline (synchronous) | By background jobs (asynchronous) |
| **Scope** | Concise, ~100-300 tokens | More comprehensive |
| **Used by** | Informed extraction (write pipeline only) | Background jobs, importance scoring |
| **Freshness** | Always reflects the latest write | May lag behind recent writes |
### Profiles and Retrieval
Entity profiles are **internal to the write pipeline only**. They are used as context for informed extraction (so the LLM knows what the system already knows about an entity), but they are NOT injected into retrieval output. The read pipeline formats facts, meta-observations, and events directly -- profiles do not appear in the context string returned by `retrieve()`.
---
## Stage 2: Entity Resolution
**In plain English:** When someone says "Ana", "my wife Ana", and "Aninha" in different messages, they're all talking about the same person. This stage figures that out and links everything to one canonical entity - so you don't end up with three separate "Ana" records in the database.
### Three-Phase Resolution
```mermaid
flowchart LR
A["Entity name"] --> B{"Exact match?"}
B -->|Yes| F["Resolved"]
B -->|No| C{"Fuzzy match?"}
C -->|"≥ 0.85"| F
C -->|"0.50–0.85"| D{"LLM decides"}
C -->|"< 0.50"| E["Create new entity"]
D -->|Match| F
D -->|No match| E
E --> F
```
**Phase 1: Exact match**
Checks the alias cache, entity slugs, and display names. Instant, no LLM call.
Includes **prefix/diminutive matching** for person entities: "Carol" matches "Carolina" (minimum 3 characters). Note: "Jo" will NOT match "João" (< 3 chars). "Bob" will match "Roberto" only if registered as an alias, not via prefix matching.
**Phase 2: Fuzzy match**
Uses embedding cosine similarity (in-memory) to find candidates:
- **≥ `fuzzy_threshold`** (default 0.85) - High confidence match, resolves directly
- **0.50 - `fuzzy_threshold`** - Ambiguous, forwards top-3 candidates to Phase 3 (LLM)
- **< 0.50** - No match, creates a new entity
Lowering `fuzzy_threshold` expands the fuzzy-resolve range and reduces LLM calls. For example, setting `fuzzy_threshold=0.50` eliminates the ambiguous range entirely - everything above 0.50 resolves directly.
Falls back to `difflib.SequenceMatcher` when embeddings are unavailable.
**Phase 3: LLM fallback**
Sends ambiguous candidates to the injected `LLMProvider` for disambiguation. The LLM sees the entity name, the candidates, and decides which (if any) is a match.
> **Walkthrough: how entity resolution works:** **Message:** "Talked to Guili about the project. Guilherme said it's on track."
1. **Extract:** Two names found: "Guili" and "Guilherme"
2. **Phase 1 (exact):** "Guilherme" matches existing entity `person:guilherme_maturana`
3. **Phase 2 (fuzzy):** "Guili" has 0.87 cosine similarity with "Guilherme" → auto-resolves
4. **Result:** Both names resolve to the same entity. Alias "Guili" registered.
Next time "Guili" appears, Phase 1 catches it instantly via the alias cache - no fuzzy or LLM call needed.
### Special Cases
- **Speaker pronouns** - "I", "me", "eu", "myself" automatically resolve to the speaker entity (`person:{speaker_slug}`). For example, if `speaker_name="Rafael"`, these pronouns resolve to `person:rafael`.
- **Relationship terms** - "girlfriend", "brother", "amigo" resolve to the speaker entity when the bare word is the entity name (not "my girlfriend Ana" - there "Ana" is the entity). The match triggers when the entity name itself is a relationship term, not the full phrase.
- **Relational hints** - `"Carol (Rafael's girlfriend)"` strips the hint and forces `type="person"`
### Alias Registration
When a new alias is discovered (e.g., "Aninha" resolves to `person:ana`), it's registered in `MemoryEntityAlias` with **first-write-wins** semantics - concurrent writes won't create conflicting aliases. Aliases are scoped per `agent_id`: the same alias can map to different entities for different agents.
**Extraction-provided aliases** are also registered automatically: when entity resolution creates a new entity that has aliases from extraction (e.g., "Guili" for "Guilherme Maturana"), all aliases are registered in `MemoryEntityAlias` and added to the in-memory alias cache. This means subsequent entities in the same batch can immediately resolve via Phase 1 exact match - no fuzzy or LLM calls needed.
This creates a two-line defense against duplicates:
1. **Intra-message** - Alias grouping in extraction prevents duplicates within a single message
2. **Cross-message** - Registered aliases enable exact match in future messages (e.g., if message 1 creates "Guilherme Maturana" with alias "Guili", message 2 mentioning "Guili" resolves instantly via Phase 1)
### Entity Persistence
After entity resolution completes, the pipeline ensures **every resolved entity** has a row in the `memory_entities` table - not just newly created ones. Entities resolved via exact match, fuzzy match, or LLM disambiguation are also upserted using `ON CONFLICT DO UPDATE` (idempotent).
This is critical because background jobs (importance scoring, summary refresh, spreading activation) read from `memory_entities`. Without a row, these jobs are blind to the entity and can't operate on it.
The entity upsert is fail-safe: if one entity fails to persist (e.g., constraint violation), the others proceed normally and the pipeline continues.
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `fuzzy_threshold` | `0.85` | Cosine similarity threshold for direct fuzzy match |
| `enable_llm_resolution` | `True` | Whether to use LLM for ambiguous cases. When `False`, ambiguous candidates create a new entity instead of calling LLM. |
> **Model selection:** The LLM model used for entity resolution is determined by the `LLMProvider` you inject into `MemoryClient`. To use a different model for resolution vs. extraction, inject different providers.
> **Neuroscience parallel:** Entity resolution mirrors **associative memory** - the brain's ability to link new stimuli to existing representations. Hearing "Carol" activates the neural pattern for "Carolina" through pattern completion, just as fuzzy matching activates candidate entities through embedding similarity.
---
## Stage 3: Reconciliation
**In plain English:** If the user said "I live in Sao Paulo" last week and now says "I moved to Rio", the system needs to figure out that this is an update, not a second home. This stage compares each new fact against what's already stored and decides: is this new info? An update to something existing? Already known? Or a retraction?
> **Reconciliation always runs:** Reconciliation runs **unconditionally**, regardless of whether extraction was informed or blind. Informed extraction provides richer context that improves fact quality, but the decision of what to do with each fact (ADD, UPDATE, NOOP, DELETE) is always made by `reconcile_facts()`. This ensures contradictory facts are detected and invalidated.
### Decision Logic
For each extracted fact, the reconciler:
1. **Fetches existing facts** for the same entity
2. **Computes similarity** between the new fact and each existing fact (via embeddings)
3. **Decides the action**:
| Action | When | Example |
|--------|------|---------|
| **ADD** | New information, no similar existing fact (similarity < 0.50) | "speaks French" when no language fact exists |
| **UPDATE** | Supersedes an existing fact (similarity 0.50 - 0.85+) | "lives in Rio" supersedes "lives in São Paulo" |
| **NOOP** | Already known (high similarity) | "works at Acme" when this fact already exists |
| **DELETE** | Explicitly retracts a fact | "I no longer work at Acme" |
> **Walkthrough: how reconciliation decides:** **Scenario:** User previously said "Ricardo lives in São Paulo". Now says "Ricardo moved to Austin, Texas."
**Step 1 - Fetch existing facts for Ricardo:**
```
Existing: "Ricardo Gomes lives in São Paulo" (confidence: 0.95, active)
```
**Step 2 - Compute similarity:**
```
New fact: "Ricardo Gomes moved to Austin, Texas"
vs existing: "Ricardo Gomes lives in São Paulo"
Cosine similarity: 0.72 (both about Ricardo's location)
```
**Step 3 - Similarity ≥ 0.50 → slow path (LLM call):**
The LLM sees both facts and decides: this is an UPDATE. The user moved.
**Result:**
```
Old fact: "Ricardo Gomes lives in São Paulo" → valid_to = now, invalidated_at = now
New fact: "Ricardo Gomes moved to Austin, Texas" → supersedes_fact_id = old_fact.id
Relationship: ricardo → lives_in → sao_paulo → INVALIDATED (cascade)
New relationship: ricardo → lives_in → austin
```
If the similarity had been < 0.50 (e.g., "Ricardo likes jazz"), it would auto-ADD without an LLM call - fast path.
### Reconciliation Performance
- **Fast path (similarity < 0.50):** Auto-ADD without LLM call (~300ms). This is the common path for novel information.
- **Slow path (similarity ≥ 0.50):** LLM evaluates whether to ADD, UPDATE, DELETE, or NOOP (~2-3s). This requires an LLM call with full context.
Plan accordingly: bulk imports of new data are fast; updates to existing knowledge require LLM decision-making.
> **UPDATE chains may branch:** The reconciliation LLM may choose ADD over UPDATE when it interprets new information as distinct rather than a replacement. For example, "I moved to BH" might create separate facts for "lives in BH" and "used to live in RJ" instead of a simple update chain. This preserves more information but may break the `supersedes_fact_id` chain. This is expected behavior - the LLM prioritizes information preservation.
### Fail-safe Behavior
If the reconciliation LLM call fails, the system defaults to **ADD** - it's better to have a near-duplicate than to lose information. The background consolidation jobs (clustering, deduplication) clean up duplicates later.
### Fact Versioning
Facts are versioned using temporal validity windows (`valid_from`, `valid_to`):
- **Active facts** have `valid_to = NULL`
- **Updated facts** get both `valid_to` and `invalidated_at` set, and a new fact is created with `supersedes_fact_id` pointing to the old one
- **Deleted facts** get both `valid_to` and `invalidated_at` set
This enables time-travel queries: you can ask what the system knew at any point in time.
> **Neuroscience parallel:** Reconciliation mirrors **reconsolidation** - the process by which retrieved memories become labile and can be modified. When you recall a memory ("lives in São Paulo") and encounter new information ("just moved to Rio"), the original memory is updated. The brain doesn't simply overwrite - it creates a new trace linked to the original, just as UPDATE creates a new fact with `supersedes_fact_id`.
---
## Stage 4: Upsert
**In plain English:** This is where the decisions from the previous stage are actually saved to the database. New facts are inserted, outdated facts are marked as superseded, and relationships between entities are created or strengthened. Everything runs inside a transaction - if one fact fails to save, the others still go through.
| Decision | Database action |
|----------|-----------------|
| ADD | Create new `MemoryFact` with embedding |
| UPDATE | Close old fact (`valid_to = now`), create new one with `supersedes_fact_id` |
| NOOP | Update `last_confirmed_at` on existing fact |
| DELETE | Close fact (`valid_to = now`, `invalidated_at = now`) |
### Fact-Entity Links
After each fact is persisted (ADD or UPDATE), the pipeline creates **entity links** connecting the fact to every entity it mentions - not just its primary subject. This enables cross-entity retrieval without duplicating facts.
For example, "Clara Rezende left Vertix" is stored **once** with `entity_key = person:clara_rezende` (primary subject). But entity links are created for both `person:clara_rezende` (primary) and `organization:vertix` (secondary). When you query about Vertix, the system finds this fact via the link - no duplicate fact needed.
Links are created by matching entity display names against the fact text (case-insensitive substring match). Very short names (< 3 characters) are skipped to avoid false positives. Link creation is fail-safe: if it fails, the fact persists normally.
> **Walkthrough: how upsert + entity links work:** **Fact to persist:** "Clara Rezende joined Orion Tech as head of engineering" (decision: ADD)
**Step 1 - Create MemoryFact:**
```
id: fact_abc123
entity_key: person:clara_rezende (primary subject)
fact_text: "Clara Rezende joined Orion Tech as head of engineering"
confidence: 0.95
valid_from: now
valid_to: NULL
```
**Step 2 - Create entity links:**
```
Entity map has: {Clara Rezende → person:clara_rezende, Orion Tech → organization:orion_tech, ...}
Scan fact_text for entity display names:
"Clara Rezende" found → link (fact_abc123, person:clara_rezende, is_primary=true)
"Orion Tech" found → link (fact_abc123, organization:orion_tech, is_primary=false)
```
**Step 3 - Persist relationship:**
```
clara_rezende → works_at → orion_tech (strength: 0.8)
Evidence: fact_abc123 (matched because fact_text mentions both "Clara Rezende" and "Orion Tech")
```
**Result:** 1 fact, 2 entity links, 1 relationship. When someone asks "Who works at Orion Tech?", the system finds this fact via the `organization:orion_tech` link - without needing a separate fact about Orion Tech.
### Relationship Tracking
During upsert, extracted relationships are also persisted:
- Creates/updates `MemoryEntityRelationship` records
- Resolves source and target entities via the entity map
- **Strength reinforcement**: repeated relationships increase `strength` (initial: 0.8, reinforced up to 1.0 across multiple messages)
- **Self-referencing filter**: Relations where source and target resolve to the same entity (e.g., "caroline child_of caroline" after entity resolution) are silently filtered out. These typically arise from extraction artifacts or alias normalization.
- Uses `ON CONFLICT DO UPDATE` for idempotent upserts
> **Relationships are **unidirectional**:** Writing "Ana works at Acme" creates `ana → works_at → acme_corp`, but **not** `acme_corp → employs → ana`. This means graph retrieval starting from "Acme Corp" won't find Ana through relationships (but may still find her via semantic similarity). To create both directions, mention them explicitly: "Ana works at Acme. Acme has Ana as a data scientist."
#### Evidence Linkage & Cascade Invalidation
**The problem:** Without linkage between facts and relationships, contradictory edges accumulate. If a user says "I live in Curitiba" and later "I moved to São Paulo", the old relationship `user --[lives_in]--> curitiba` would remain active alongside the new one - polluting retrieval with stale context.
**The solution:** Each relationship is linked to the fact that supports it via `evidence_fact_id`. When that fact is superseded (UPDATE) or retracted (DELETE), the relationship is **automatically invalidated** - no manual cleanup needed.
How evidence linkage works:
1. After facts are persisted in the upsert stage, a heuristic match associates each relationship with a corresponding fact. For a relationship `(source, target)`, the matcher looks for facts whose `fact_text` mentions both entity names.
2. If multiple facts match, the one with the highest confidence is selected.
3. The matched fact's ID is stored as `evidence_fact_id` on the relationship.
When a fact is invalidated (via UPDATE or DELETE), **cascade invalidation** automatically sets `invalidated_at` and `valid_to` on all relationships that reference it. The [graph retrieval BFS](../advanced/read-api.md) already filters out invalidated relationships, so stale edges are immediately excluded from context.
```
User: "I live in Curitiba"
→ fact: "User lives in Curitiba" (fact_1)
→ rel: user --[lives_in]--> curitiba (evidence_fact_id = fact_1)
User: "I moved to São Paulo"
→ reconciliation: UPDATE fact_1 → fact_2 "User lives in São Paulo"
→ cascade: rel lives_in→curitiba is INVALIDATED (evidence_fact_id = fact_1)
→ new rel: user --[lives_in]--> sao_paulo (evidence_fact_id = fact_2)
```
> **Relationship types are dynamic:** The `rel_type` field accepts any descriptive `snake_case` string - not just a fixed set. Common types include `works_at`, `lives_in`, `family_of`, but the LLM may also produce types like `mentored_by` or `inspired_by`. See [Dynamic Relationship Types](../advanced/data-types.md#dynamic-relationship-types) for details on normalization and aliases.
#### Mirror Facts
Sometimes the LLM infers a relationship from context without extracting a corresponding fact. For example, "I'm going to Curitiba to visit my mom" implies `mom --[lives_in]--> curitiba`, but the LLM may only extract a fact about the user's trip - not about where mom lives. Without a fact, the relationship can't participate in cascade invalidation and isn't findable via semantic search.
To solve this, a **mirror fact** is automatically created as a fallback when no heuristic match is found. The mirror fact is a simple natural-language sentence generated from the relationship: `"{source_name} {rel_type} {target_name}"` (e.g., `"Mom lives in Curitiba"`).
**Mirror facts go through the canonical pipeline.** Starting in v0.11.7, mirror facts are not persisted directly — they are built as synthetic `ExtractedFact` objects and routed through the same `reconcile_facts → execute_upsert` path as facts extracted by the LLM. This guarantees:
- **Semantic dedup:** the reconciler catches near-duplicates like `"Pedro lives in Brazil"` vs `"Pedro lives in Brasil"` via embedding similarity + LLM reasoning — not just exact string match.
- **Speaker propagation:** the `speaker` field is populated on mirror facts via the same mechanism as regular facts, so provenance is preserved.
- **Single entry point:** the only way a `MemoryFact` is created in the write pipeline is through `_add_fact` (for ADD) or `_update_fact` (for versioned UPDATE). No parallel paths.
Mirror facts are marked with:
- `confidence = 0.60` (weak inference - lower priority in retrieval ranking)
- `source_context = "inferred_from_relation"` (allows filtering or downranking if needed)
> **Mirror facts may persist after source invalidation:** Mirror facts are not automatically invalidated when the source relationship is removed. They may persist as stale data. Applications should consider filtering by `source_context` when accuracy is critical.
The mirror fact's ID is used as the relationship's `evidence_fact_id`, so cascade invalidation works for inferred relationships too. When the reconciler decides `NOOP` (an equivalent fact already exists), the existing fact's ID is used as the evidence link instead — strengthening the evidence chain.
> **Reducing mirror facts:** The extraction prompts instruct the LLM to extract implicit facts alongside relationships (e.g., "my mom lives in Curitiba" as a fact, not just a relation). As LLM extraction improves, fewer mirror facts are needed - they're the safety net, not the primary mechanism.
### Transaction Safety
The entire write pipeline runs inside a database transaction. Individual fact upserts use **savepoints** (`session.begin_nested()`) so that a failure in one fact doesn't abort the entire batch:
```python
# If this fact fails, only this savepoint rolls back
async with session.begin_nested():
session.add(new_fact)
await session.flush()
```
The event record is created and flushed first, so it survives even if all subsequent stages fail.
---
## WriteResult
After the pipeline completes, you get a `WriteResult` with full observability:
```python
result = await memory.write(
agent_id="user_123",
message="...",
speaker_name="Rafael",
recent_messages=["previous message for pronoun resolution"], # optional
occurred_at=datetime(2025, 6, 15, tzinfo=UTC), # optional, for historical imports
)
# What happened
print(result.facts_added) # List of facts created
print(result.facts_updated) # List of facts superseded
print(result.facts_unchanged) # List of confirmed facts (NOOP decisions)
print(result.facts_deleted) # List of retracted facts (DELETE decisions)
print(result.entities_resolved) # List of resolved entities
print(result.duration_ms) # Total pipeline time
print(result.event_id) # Unique event ID for this write
print(result.tokens_used) # TokenUsage(input_tokens=..., output_tokens=..., total_tokens=...)
print(result.pipeline) # PipelineTrace (when verbose=True)
print(result.success) # True if pipeline completed without errors (always check this)
print(result.error) # Error message if pipeline failed (None on success)
```
### Trace Enrichment (verbose=True)
When `verbose=True`, the extraction step in `PipelineTrace` includes additional metadata:
| Field | Type | Description |
|-------|------|-------------|
| `relation_retry_triggered` | `bool` | Whether the automatic relation retry was used |
```python
result = await memory.write(agent_id, message, speaker_name="Rafael", verbose=True)
extraction_step = result.pipeline.steps[0] # "extraction"
print(extraction_step.data["relation_retry_triggered"]) # True/False
```
### Token Usage
`tokens_used` reports the total LLM tokens consumed across all calls in the pipeline (extraction, entity resolution, reconciliation). Useful for benchmarking and cost estimation.
```python
result = await memory.write(agent_id, message, speaker_name="Rafael")
print(result.tokens_used.input_tokens) # e.g. 1200
print(result.tokens_used.output_tokens) # e.g. 350
print(result.tokens_used.total_tokens) # e.g. 1550
```
> **Token tracking requires provider support:** `tokens_used` is populated from `LLMResult.usage` returned by your `LLMProvider`. The built-in `OpenAIProvider` reports usage automatically. Custom providers that return `LLMResult(text=..., usage=None)` will show zero tokens.
### Config Overrides
Override any `MemoryConfig` field for a single `write()` call without creating a new client:
```python
result = await memory.write(
agent_id="user_123",
message="...",
speaker_name="Rafael",
config_overrides={"extraction_timeout_sec": 60.0},
)
```
Only the provided keys are changed; all others inherit from the client config. Invalid keys emit a warning and are ignored. Type mismatches raise `ValueError`.
### Dry Run
Run extraction without persisting anything to the database:
```python
result = await memory.write(
agent_id="user_123",
message="I live in São Paulo with my wife Ana",
speaker_name="Rafael",
dry_run=True,
)
# result.facts_added contains what WOULD be extracted
# result.tokens_used shows cost of this extraction
# No event, no facts, no entities persisted
```
Useful for benchmarking: run the same message with `dry_run=True` and compare `tokens_used` across different configurations.
---
## Pipeline Diagram (Complete)
```mermaid
flowchart TD
MSG["User message"] --> EVT["Create MemoryEvent\n(immutable log + embedding)"]
EVT --> ALIAS["Alias Lookup\n(word-boundary match on alias cache)"]
ALIAS --> PRE["Pre-retrieval / Profile Load\n(pgvector top-K or entity profiles)"]
PRE --> INF{"Informed\nExtraction"}
INF -->|Success| DEDUP["Semantic Dedup\n(remove near-duplicates)"]
INF -->|Failure| FALLBACK["Fallback: Blind Extraction\n(Entity Scan → Facts + Relations)"]
FALLBACK --> DEDUP
DEDUP --> RES["Entity Resolution\n(exact → fuzzy → LLM)"]
RES --> REC{"Informed\npath?"}
REC -->|Yes| UPS["Upsert + Entity Links + Profiles\n(with savepoints)"]
REC -->|No: fallback| RECONCILE["Reconciliation\n(ADD / UPDATE / NOOP / DELETE)"]
RECONCILE --> UPS
UPS --> REL["Relationship Tracking\n(strength reinforcement)"]
REL --> WR["WriteResult"]
```
---
# Read Pipeline
When you call `memory.retrieve()`, the SDK searches everything it knows about an agent and returns the facts most relevant to your query - ranked, scored, and formatted as a string you can paste directly into an LLM prompt.
**You don't need to understand the internals to use it.** Just call `retrieve()` and use `result.context`. This page explains what happens under the hood for when you want to tune behavior or debug results.
```mermaid
flowchart LR
A["Query"] --> B["Plan"]
B --> C["Retrieve\n(3 signals)"]
C --> D["Enhance"]
D --> E["Rerank"]
E --> F["RetrieveResult"]
```
## Overview
Every `memory.retrieve(agent_id, query)` call runs five stages:
1. **Plan** - Figures out *what* to search for. Detects greetings, aggregation patterns, and broad queries. Entity resolution runs deterministically in parallel.
2. **Retrieve** - Searches for matching facts using three methods in parallel: meaning similarity, keyword matching, and relationship graph traversal.
3. **Enhance** - Expands context by following entity relationships to find related facts that weren't directly matched.
4. **Rerank** - An LLM re-evaluates the top results and reorders them by actual relevance to your query.
5. **Format** - Compresses the ranked facts into a clean, token-budgeted context string with facts, patterns, and conversation snippets.
---
## Stage 1: Deterministic Planner
**In plain English:** Before searching, the pipeline analyzes your query to figure out what kind of search to run. It detects greetings (skip), aggregation queries ("who are my friends?"), broad requests ("tell me everything"), and identifies which entities are mentioned. All of this is **deterministic** — same query always produces the same plan, zero LLM calls.
The planner produces a `RetrievalPlan` using regex pattern matching and schema lookups. No LLM is involved.
> **Why no LLM in the planner?:** Prior to v0.13.0, the planner called an LLM for query reformulation and entity extraction. This introduced **non-determinism**: the same query against the same memory could return different facts between runs, because cloud LLM APIs are not deterministic even at `temperature=0` (documented behavior — batching, fp16 rounding, GPU routing all introduce variation). v0.13.0 replaces the LLM planner with a fully deterministic implementation. The query goes straight to semantic search unchanged, and entity extraction is handled by a dedicated deterministic resolver.
### What the Planner Decides
| Field | Description | Example |
|-------|-------------|---------|
| `strategy` | Retrieval strategy | `"multi_signal"` (default) or `"skip"` (for greetings) |
| `similarity_query` | Query for semantic search (always the original) | `"where do I live?"` (passed through unchanged) |
| `pattern_queries` | SQL LIKE patterns for aggregation | `["person:%"]` (from "who are my friends?") |
| `broad_query` | Whether to expand graph scope | `true` for "tell me everything about..." |
| `reason` | Explanation of the strategy | `"deterministic"`, `"aggregation"`, `"broad"`, `"greeting"` |
### Entity Resolution
When you ask "Where does Carlos live?", the pipeline needs to figure out that "Carlos" means the entity `person:carlos` in the database. It uses two deterministic methods:
1. **Deterministic resolution (primary)** — Matches words in the query against known entity aliases (`MemoryEntityAlias`), display names (`MemoryEntity.display_name`), and entity_key slugs. Fast (< 10ms), reliable, zero LLM cost. For example, "Onde o Carlos mora?" deterministically resolves to `person:carlos` via slug match.
2. **Query expansion (alias priming)** — `expand_query()` resolves aliases and fetches 1-hop KG neighbors, adding related entities.
Both sources are **unified** before the graph gate. If either source identifies an entity, the graph traversal runs.
The trace step `"retrieval"` includes an `entities_sources` breakdown showing which entities came from each source (`deterministic`, `expansion`).
### Aggregation Detection
For queries like "who are my friends?" or "list my projects", the planner matches keywords against schema prefixes and generates SQL LIKE patterns (e.g., `person:%`). This only triggers when the prefix actually exists in the user's schema.
### Skip Strategy
For greetings and casual messages ("hi", "oi", "bom dia"), the planner returns `strategy: "skip"` via regex matching, short-circuiting the pipeline. No database queries, no LLM calls, instant response.
### Anaphora Resolution (Caller's Responsibility)
If your query contains pronouns ("Where does **she** live?"), the Arandu SDK does **not** resolve them. Pronoun resolution depends on conversation context (short-term memory), which is the caller's domain. Resolve pronouns before calling `retrieve()`:
```python
# The caller (your agent) resolves "she" → "Ana" using conversation context
resolved_query = "Where does Ana live?" # not "Where does she live?"
result = await memory.retrieve(agent_id="user_123", query=resolved_query)
```
> **Neuroscience parallel:** The planner mirrors **retrieval cues** in cognitive psychology. When you try to remember something, your brain doesn't do an exhaustive search — it uses contextual cues to narrow down the search space. The planner identifies entities and detects query patterns as cues that guide the retrieval signals.
> **Walkthrough: full query lifecycle:** **Query:** "Onde o Marcos Tavares mora?"
**Stage 1 — Planning (deterministic):**
```
Greeting check: no match → proceed
Aggregation check: no match → no pattern queries
Broad check: no match → broad_query = false
Deterministic resolver: "marcos tavares" → person:marcos_tavares (slug match)
similarity_query = "Onde o Marcos Tavares mora?" (original, unchanged)
```
**Stage 2 — Multi-signal retrieval (parallel):**
```
Semantic: embedding("Onde o Marcos Tavares mora?") → top match: "Marcos Tavares lives in Porto Alegre" (0.91)
Keyword: "marcos" + "tavares" → matches 4 facts about Marcos
Graph: BFS from person:marcos_tavares → finds facts via entity links + relationships
```
**Stage 3 — Enhancement:**
```
Spreading activation: from seed "lives in Porto Alegre" → finds related facts:
"Marcos Tavares is married to Carolina" (via entity hop)
"Carolina is an architect" (via 2-hop)
```
**Stage 4 — Reranking (multiplicative blend):**
```
"Marcos Tavares lives in Porto Alegre" → formula=0.91, reranker=1.0 → final=0.91
"Marcos Tavares is a product manager at Vertix" → formula=0.65, reranker=0.2 → final=0.28
"Carolina is an architect" → formula=0.30, reranker=0.0 → final=0.09 → filtered (< 0.15)
```
**Stage 5 — Formatting:**
```
Known facts:
- Marcos Tavares lives in Porto Alegre
```
**Result:** 1 highly relevant fact, 210ms, 800 tokens.
---
## Stage 2: Multi-Signal Retrieval
**In plain English:** The pipeline searches for relevant facts using three different methods at the same time - like searching by meaning, by exact words, and by connections between entities. This catches facts that any single method alone would miss.
Three independent signals run **in parallel** via `asyncio.gather()`, each finding candidates from a different angle:
```mermaid
flowchart TD
P["RetrievalPlan"] --> S["Semantic Search\n(pgvector cosine)"]
P --> K["Keyword Search\n(SQL ILIKE)"]
P --> G["Graph Traversal\n(BFS 2-hop)"]
S --> M["Merge & Rank\n(dedup + weighted scoring)"]
K --> M
G --> M
```
### Signal 1: Semantic Search
Uses pgvector cosine similarity to find facts whose embeddings are close to the query embedding.
- Embeds the query (passed through unchanged from the planner)
- Searches the `MemoryFact` table with HNSW index
- Returns top-N candidates above `min_similarity` threshold
- Filters: `agent_id`, active facts (`valid_to IS NULL`), confidence ≥ `min_confidence`
This is the primary signal - it finds facts that are **semantically similar** to the query, even if they don't share exact keywords.
### Signal 2: Keyword Search
SQL ILIKE matching on `fact_text` for exact or partial keyword hits.
- Extracts significant words (> 2 characters) from the query
- Matches against fact text (up to 5 keywords)
- Score = fraction of query words found in the fact
This complements semantic search by catching exact matches that embedding similarity might miss (e.g., proper nouns, technical terms, abbreviations).
### Signal 3: Graph Retrieval
Traverses entity relationships to find facts connected to the query entities.
- Starts from entities identified by the planner
- BFS traversal up to 2 hops through `MemoryEntityRelationship`
- **Hop decay**: Hop 1 facts receive full score (1.0×). Hop 2 facts receive 0.5× penalty. This prevents distant facts from dominating the candidate pool.
- Facts are fetched via **entity links** (`MemoryFactEntityLink`), not just the primary `entity_key`. This means a fact "Clara left Vertix" (primary subject: Clara) is also found when querying about Vertix - because the fact has a secondary entity link to Vertix.
- Scoring formula: `edge_strength × recency_factor × edge_recency_factor × query_bonus × hop_decay`
- `query_bonus`: 1.5× when the entity name appears in the query text
- **Fallback**: if the entity links table is empty (pre-migration), retrieval falls back to direct `entity_key` matching
Graph retrieval excels at finding **contextual** facts. When you ask about a person, it also finds facts about their workplace, their relationships, and their projects.
> **Walkthrough: cross-entity retrieval via entity links:** **Query:** "O que aconteceu com a Vertix?"
**Without entity links (old behavior):**
```
Graph starts from organization:vertix
Searches MemoryFact WHERE entity_key = 'organization:vertix'
Finds: only facts where Vertix is the PRIMARY subject
Misses: "Clara Rezende left Vertix" (entity_key = person:clara_rezende)
```
**With entity links (current behavior):**
```
Graph starts from organization:vertix
Searches MemoryFactEntityLink WHERE entity_key = 'organization:vertix'
Finds fact_ids linked to Vertix, regardless of primary subject:
→ "Clara Rezende left Vertix" (primary: Clara, link: Vertix) ✅
→ "Vertix received Series A of R$ 20M" (primary: Vertix) ✅
→ "Ricardo Gomes is co-founder of Vertix" (primary: Ricardo, link: Vertix) ✅
→ "Vertix signed contract with Ambev" (primary: Vertix) ✅
```
**Result:** 4 facts found vs 2 without links. The query about Vertix surfaces facts from Clara, Ricardo, and the Ambev deal - all linked to Vertix but not primarily about Vertix.
### Merge & Rank
After all three signals return, results are merged:
1. **Deduplicate** by fact ID (same fact may appear in multiple signals)
2. **Apply recency decay** - Exponential decay with configurable half-life (`recency_half_life_days`, default 14)
3. **Apply confidence decay** - Older facts with lower confidence are penalized
4. **Compute combined score** - Weighted sum:
> **Reranker blends with these weights:** By default, `enable_reranker=True` - the LLM reranker uses a multiplicative blend with the formula score computed from these weights. The formula score remains important because the reranker can only dampen or boost it, never zero it out. Set `enable_reranker=False` to rely on these weights alone for final ranking.
```python
score = (
score_weights["semantic"] * semantic_score + # default 0.70
score_weights["recency"] * recency_score + # default 0.20
score_weights["importance"] * importance_score # default 0.10
)
```
### Complete Score Breakdown
Each fact gets scored on multiple dimensions. You can inspect these in `fact.scores` to understand **why** a fact ranked where it did:
| Key | Source | Range | Description |
|-----|--------|-------|-------------|
| `semantic` | Semantic search | 0.0 - 1.0 | Cosine similarity between query and fact embeddings. Primary retrieval signal. |
| `keyword` | Keyword search | 0.0 - 1.0 | Fraction of query words found in the fact text. Complements semantic for exact matches. |
| `recency` | Merge & Rank | 0.0 - 1.0 | Exponential decay from `created_at`, half-life = `recency_half_life_days` (default 14). |
| `importance` | Dynamic importance | 0.0 - 1.0 | Raw importance value from the database. When informed extraction is active, new facts receive an initial importance based on their semantic `importance_category` (e.g., `biographical_milestone` gets a higher value than `conversational`). Otherwise, starts at 0.5. Evolves over time via the background importance job (retrieval frequency, recency of use, user corrections, pattern membership). |
| `confidence` | Merge & Rank | 0.0 - 1.0 | Effective confidence after temporal decay. Present in the `scores` dict for debugging, but NOT part of the weighted formula (`score_weights` only uses `semantic`, `recency`, `importance`). The base confidence is assigned by the LLM during extraction (typically 0.95 for assertive statements). It decays over time and is used as a filter (`min_confidence`). |
| `reranker` | Reranking | 0.0 - 1.0 | LLM-based relevance score. Only present when `enable_reranker=True`. Continuous float returned by the reranker LLM. |
Additional signals computed during enhancement (not in `score_weights` but affect final score):
| Key | Source | Description |
|-----|--------|-------------|
| `pattern` | Enhancement | Additive boost for facts with high `reinforcement_count` (up to +0.10). |
| `graph` | Graph traversal | Score from BFS 2-hop entity relationship traversal. |
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `topk_facts` | `20` | Maximum facts to return |
| `topk_events` | `8` | Maximum events to consider |
| `min_similarity` | `0.20` | Minimum cosine similarity for semantic results |
| `min_confidence` | `0.55` | Minimum fact confidence |
| `recency_half_life_days` | `14` | Half-life for recency decay |
| `score_weights` | See above | Weights for each scoring signal |
| `min_score` | `0.15` | Minimum final score for returned facts |
| `enable_reranker` | `True` | Whether to use LLM reranking |
> **Neuroscience parallel:** Multi-signal retrieval mirrors **spreading activation** in semantic networks (Collins & Loftus, 1975). When you think of "doctor", activation spreads to related concepts ("hospital", "medicine", "appointment") through associative links. Similarly, graph retrieval spreads from query entities along relationship edges, while semantic search activates facts through embedding proximity.
---
## Stage 3: Enhancement
**In plain English:** After finding the initial results, the pipeline follows connections to discover related facts. If you ask about a person, it might also pull in facts about their workplace, projects, or team - things you didn't directly ask about but that add useful context.
### Spreading Activation
Starting from the top-K seed facts, the pipeline expands context by following entity relationships:
- For each seed fact, find its entity's relationships
- Traverse relationships for N hops (`spreading_activation_hops`, default 2). Set to `0` to disable spreading entirely.
- Apply decay factor per hop (`spreading_decay_factor`, default 0.50). Hop 1 uses the factor directly; Hop 2 uses the factor squared (compounded decay).
- Return up to `spreading_facts_per_entity` additional facts per entity (default 3), applied in both Hop 1 and Hop 2.
This catches important context that wasn't directly matched. If you ask "what does Rafael do?", spreading activation might surface facts about his workplace, team, and projects.
> **When does spreading activation matter?:** Spreading has the most impact with **20+ entities** and cross-domain relationships (e.g., people → projects → clients → technologies). With small datasets (< 15 entities), the semantic, keyword, and graph signals already cover the full fact space - spreading may return candidates but they'll be deduplicated against existing results. The trace fields `spreading_candidates_returned` and `spreading_candidates_unique` let you confirm whether spreading is contributing new facts for your dataset.
### Pattern Signal
Facts with a high `reinforcement_count` (incremented by NOOP decisions in write) get an additive score boost:
- High reinforcement count → up to 0.10 extra score
- Captures frequently mentioned, well-established facts
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `spreading_activation_hops` | `2` | Maximum hops from seed facts. Set to `0` to disable spreading. |
| `spreading_decay_factor` | `0.50` | Score decay per hop. Hop 1 = factor, Hop 2 = factor² |
| `spreading_facts_per_entity` | `3` | Max facts fetched per entity in both Hop 1 and Hop 2 |
| `spreading_max_related_entities` | `5` | Max KG-related entities to explore in Hop 1 |
---
## Stage 4: Reranking (Optional)
**In plain English:** The previous stages find relevant facts, but their ranking is based on math (similarity scores, keyword overlap). The reranker asks an LLM: "Given what this person is asking, which of these facts are actually most useful?" This produces a smarter final ranking.
When `enable_reranker=True`, the top candidates are reranked by an LLM that considers query intent:
- The reranker evaluates **2× `topk_facts`** candidates (default: 40). This expanded pool ensures that semantically relevant facts beyond the initial top-20 reach the reranker. Increase `topk_facts` to expand reranker coverage.
- Respects the semantic meaning of the query (not just keyword overlap)
- Can promote facts that are indirectly relevant but important
- Graceful degradation: if the reranker fails or exceeds `reranker_timeout_sec` (default 5.0s), the original ranking is preserved
- Timeout is enforced via `asyncio.wait_for` - the LLM call is cancelled if it exceeds the configured timeout
- Uses the same LLM provider configured for the client (no separate provider needed)
The reranker is the most expensive stage but provides the highest quality improvement for complex queries.
### Configuration
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `reranker_weight` | float | `0.70` | Weight of the reranker score in the multiplicative blend |
| `min_reranker_score` | float | `0.10` | Minimum reranker score; facts below this are eliminated |
| `reranker_timeout_sec` | float | `5.0` | Timeout for the reranker LLM call (seconds) |
> **Reranker veto: `min_reranker_score`:** When `enable_reranker=True`, any fact that receives a reranker score **below** `min_reranker_score` (default `0.10`) is eliminated from results (final_score set to 0.0). This gives the reranker veto power over completely irrelevant facts - even if the formula score is high (e.g., graph BFS gives 0.80 to a distant, unrelated fact). When `enable_reranker=False`, this setting has no effect. Tune it: `config_overrides={"min_reranker_score": 0.05}` for more permissive results, `0.20` for stricter filtering.
> **Multiplicative blend scoring:** The reranker does NOT replace the formula score. It uses a **multiplicative blend**:
`final_score = formula_score × (floor + reranker_weight × reranker_score)`
where `floor = 1 - reranker_weight`. With the default `reranker_weight=0.70`, a fact with formula=0.9 and reranker=0.0 gets final = 0.9 × (0.30 + 0) = 0.27 (not 0.0). A fact with formula=0.9 and reranker=1.0 gets final = 0.9 × (0.30 + 0.70) = 0.90. The reranker can boost or dampen facts but **cannot zero out** a fact with strong retrieval signals through the blend alone. However, `min_reranker_score` IS an exception - facts scoring below it are set to 0.0 regardless of their formula score. The `scores` dict preserves both `formula` (pre-reranker) and `reranker` (LLM score) for debugging.
> **Walkthrough: how the reranker blend works:** **Query:** "Qual o time de futebol do Bruno Almeida?"
**Pre-reranker candidates (formula scores):**
```
1. "Bruno Almeida runs marathons" → formula = 0.45 (semantic: "sports" similarity)
2. "Bruno Almeida developed an ML model" → formula = 0.38
3. "Bruno Almeida works at Orion Tech" → formula = 0.35
```
**Reranker scores (LLM evaluation):**
```
1. "Bruno Almeida runs marathons" → reranker = 0.0 (marathon ≠ football)
2. "Bruno Almeida developed an ML model" → reranker = 0.0 (irrelevant)
3. "Bruno Almeida works at Orion Tech" → reranker = 0.0 (irrelevant)
```
**Multiplicative blend** (weight=0.70, floor=0.30):
```
1. final = 0.45 × (0.30 + 0.70 × 0.0) = 0.45 × 0.30 = 0.135 → filtered (< 0.15)
2. final = 0.38 × 0.30 = 0.114 → filtered
3. final = 0.35 × 0.30 = 0.105 → filtered
```
**Result:** 0 facts returned. Correct - there's no information about Bruno's football team in memory.
**Compare with a relevant query** - "O que o Bruno Almeida desenvolveu?":
```
"Bruno developed an ML model for fraud detection" → formula=0.92, reranker=1.0
final = 0.92 × (0.30 + 0.70 × 1.0) = 0.92 × 1.0 = 0.92 ✅
```
---
## Stage 5: Formatting
**In plain English:** The pipeline takes the ranked facts and organizes them into a clean, ready-to-use string for your LLM prompt. Facts come first as a bullet list, followed by observed patterns (meta-observations), and relevant conversation snippets -- all within a token budget so you don't blow up your prompt. The format is designed for direct LLM consumption: no timestamps on facts, no entity prefixes, no confidence scores -- just clean, readable information.
### Context Compression
Facts are organized within a token budget (`context_max_tokens`) into a clean format with three sections:
> **`context_max_tokens` is a proportional budget, not a hard cap:** The `context_max_tokens` parameter controls the **relative** size of the output context, but the actual token count may exceed the configured value. The pipeline guarantees a minimum context for core facts and uses the parameter as a proportional budget across tiers. Treat it as a target, not a strict limit. For example, setting `context_max_tokens=100` may produce ~240 tokens due to minimum guarantees for the hot tier.
| Section | Output Label | Budget | Content |
|---------|-------------|--------|---------|
| **Facts** | `Known facts:` | hot + warm budget (80%) | Clean bullet list of relevant facts, ordered by score. No timestamps, no entity prefixes. |
| **Patterns** | `Observed patterns:` | cold budget (20%), up to 3 | Meta-observation titles (insights, patterns, trends). |
| **Events** | `Relevant conversations:` | remaining budget + 400 token overflow | Recent conversation snippets, up to 300 chars each, with dates. |
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `context_max_tokens` | `2000` | Maximum tokens in formatted context |
| `hot_tier_ratio` | `0.50` | Share of budget for top facts |
| `warm_tier_ratio` | `0.30` | Share of budget for supporting facts |
### Output Format
The `context` string is formatted for direct injection into LLM prompts. The format is clean and free of internal metadata -- no timestamps on facts, no entity prefixes, no confidence scores:
```
Known facts:
- Lives in Sao Paulo
- Works at Acme Corp as a backend engineer
- Wife's name is Ana
Observed patterns:
- Regularly discusses work-life balance topics
Relevant conversations:
- (2026-03-28) Hey, just wanted to share that I got promoted to tech lead!
- (2026-03-25) Had a great weekend at the beach with Ana...
```
---
## RetrieveResult
```python
result = await memory.retrieve(agent_id="user_123", query="...")
# Pre-formatted context (ready for LLM prompts)
print(result.context)
# Individual facts with scores
for fact in result.facts:
print(f"[{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
print(f" Scores: {fact.scores}") # {"semantic": 0.85, "recency": 0.72, ...}
# Pipeline stats
print(f"Candidates evaluated: {result.total_candidates}")
print(f"Duration: {result.duration_ms:.0f}ms")
```
---
## Pipeline Diagram (Complete)
```mermaid
flowchart TD
Q["User query"] --> AG["Deterministic Planner\n(regex + schema)"]
AG -->|skip| SKIP["Return empty\n(greeting/casual)"]
AG -->|multi_signal| PAR["Parallel retrieval"]
PAR --> SEM["Semantic Search\n(pgvector cosine)"]
PAR --> KW["Keyword Search\n(SQL ILIKE)"]
PAR --> GR["Graph Traversal\n(BFS 2-hop)"]
SEM --> MERGE["Merge & Rank\n(dedup + weighted scoring)"]
KW --> MERGE
GR --> MERGE
MERGE --> SA["Spreading Activation\n(expand context along edges)"]
SA --> RR{"Reranker\nenabled?"}
RR -->|yes| RERANK["LLM Rerank"]
RR -->|no| FMT["Format & Compress"]
RERANK --> FMT
FMT --> RES["RetrieveResult"]
```
> **Neuroscience parallel:** The tiered compression (facts/patterns/events) mirrors **levels of activation** in working memory. In Cowan's embedded-process model, a small number of items are in the focus of attention (top-ranked facts), surrounded by activated long-term memory (patterns and meta-observations), with the rest of long-term memory available but not active (conversation snippets). The token budget acts as the capacity limit of working memory.
---
# Background Jobs
Background jobs improve memory quality over time. They run **separately** from `write()` and `retrieve()` - you schedule them yourself (every few hours, via cron, APScheduler, or a simple loop).
### Do I need them?
**For getting started: no.** The `write()` and `retrieve()` pipelines work without background jobs. Your agent will still extract facts, resolve entities, and return relevant context.
**For production: yes.** Without them, importance scores stay flat (0.5 for everything), entity summaries are never generated, patterns and contradictions go undetected, and retrieval quality degrades over time as the memory grows.
```mermaid
flowchart LR
A["Scheduler\n(periodic)"] --> B["Clustering"]
A --> C["Consolidation"]
A --> D["Importance\nScoring"]
A --> E["Summary\nRefresh"]
A --> F["Profile\nConsolidation"]
```
## Overview
`arandu` provides four categories of background jobs:
| Job | Purpose | Uses LLM? | Frequency |
|-----|---------|-----------|-----------|
| **Clustering** | Group related facts semantically | Yes (summaries) | Every 4-8 hours |
| **Consolidation** | Detect patterns, contradictions, trends | Yes | Every 4-8 hours |
| **Entity Profile Consolidation** | Rebuild comprehensive entity profiles from all facts | Yes | Every 4-8 hours |
| **Memify** | Convert episodic facts to procedural/semantic knowledge | Yes | Daily |
| **Sleep-time compute** | Score importance, refresh summaries, detect communities | Partially | Every 4-8 hours |
All jobs are exposed as async functions you can call directly or schedule with your preferred task runner (APScheduler, Celery, cron, etc.).
> **Neuroscience parallel:** Background jobs mirror **sleep-time processing** in the brain. During sleep, the brain consolidates memories, transfers information from hippocampus (short-term) to neocortex (long-term), prunes irrelevant connections, and strengthens important ones. These jobs perform the same operations on your agent's memory.
---
## Clustering
**In plain English:** Groups related facts together. Facts about someone's job, colleagues, and projects end up in one cluster. This makes retrieval more contextual - when you ask about someone's work, the system knows which facts are related.
### Fact Clustering
```python
from arandu import cluster_user_facts, ClusteringResult
result: ClusteringResult = await cluster_user_facts(
session=db_session,
agent_id="user_123",
embedding_provider=embedding_provider,
llm_provider=llm_provider,
config=memory_config,
)
```
> **Legacy function names:** Some background job functions retain "user" in their names (e.g., `cluster_user_facts`, `get_entities_for_user`). These accept `agent_id` as their parameter - the names are historical and will be aliased in a future version.
**How it works:**
1. Groups facts by `(entity_type, entity_key)` - facts about the same entity stay together
2. Generates a 2-3 sentence summary per cluster using an LLM
3. Computes and stores cluster embeddings for later community detection
4. Idempotent - updates existing clusters rather than creating duplicates
### Community Detection
```python
from arandu import detect_communities, CommunityDetectionResult
result: CommunityDetectionResult = await detect_communities(
session=db_session,
agent_id="user_123",
embedding_provider=embedding_provider,
llm_provider=llm_provider,
config=memory_config,
)
```
**How it works:**
1. Compares cluster embeddings using cosine similarity
2. Groups clusters above `community_similarity_threshold` (default 0.75)
3. Creates `MemoryMetaObservation` records with type `"community_theme"`
4. Example: a "work" community might include clusters about colleagues, projects, and company facts
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `cluster_max_age_days` | `90` | Maximum age of facts to include in clustering |
| `community_similarity_threshold` | `0.75` | Cosine similarity threshold for grouping clusters |
> **Walkthrough: before and after clustering:** **Before clustering** - 12 facts about a user, ungrouped:
```
person:fernanda → "Fernanda works at Orion Tech"
person:fernanda → "Fernanda graduated from Unicamp"
person:fernanda → "Fernanda built the data pipeline"
person:bruno → "Bruno works at Orion Tech"
person:bruno → "Bruno developed the ML model"
person:bruno → "Bruno runs marathons"
person:marcos → "Marcos lives in Porto Alegre"
person:marcos → "Marcos is a product manager"
...
```
**After clustering:**
```
Cluster 1: "Orion Tech engineering team"
→ Fernanda works at Orion Tech
→ Bruno works at Orion Tech
→ Fernanda built the data pipeline
→ Bruno developed the ML model
Summary: "The Orion Tech data/ML team includes Fernanda (pipeline) and Bruno (ML model)"
Cluster 2: "Marcos's personal life"
→ Marcos lives in Porto Alegre
→ Marcos is married to Carolina
Summary: "Marcos lives in Porto Alegre with his wife Carolina"
```
**Impact on retrieval:** When someone asks "Tell me about the Orion Tech team", the cluster summary and its facts score higher because they're grouped together - the system understands they're related.
---
## Consolidation
**In plain English:** Looks across all recent facts and events to find bigger patterns: "This person mentions running every Monday" (pattern), "They said they live in SP but also in RJ" (contradiction), "Their mood has been improving lately" (trend). Stores these as meta-observations that enrich retrieval.
### Periodic Consolidation (L2)
```python
from arandu import run_consolidation, ConsolidationResult
result: ConsolidationResult = await run_consolidation(
session=db_session,
agent_id="user_123",
llm_provider=llm_provider,
config=memory_config,
)
```
**How it works:**
1. Analyzes events and facts over a lookback window (`consolidation_lookback_days`)
2. Detects patterns across facts:
- **Insights** - Emergent understanding from multiple facts
- **Patterns** - Repeated behaviors or preferences
- **Contradictions** - Conflicting facts that need resolution
- **Trends** - Changes over time
3. Generates `MemoryMetaObservation` records
4. Tags events with emotions (emotion, intensity, energy level)
### Profile Consolidation (L3)
```python
from arandu import run_profile_consolidation
await run_profile_consolidation(
session=db_session,
agent_id="user_123",
llm_provider=llm_provider,
)
```
**How it works:**
1. Refreshes entity summaries via LLM - a higher-level view of each entity
2. Updates the overall profile overview
3. Triggered periodically (less frequently than L2)
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `consolidation_min_events` | `3` | Minimum events before running consolidation |
| `consolidation_lookback_days` | `7` | How far back to look for patterns |
> **Neuroscience parallel:** Consolidation mirrors the brain's **memory consolidation during sleep**. The hippocampus replays recent experiences, the neocortex detects patterns and integrates them into existing knowledge structures, and contradictions are flagged for resolution. L2 consolidation is analogous to slow-wave sleep (SWS) replay, while L3 profile consolidation is analogous to REM sleep's role in integrating memories into semantic knowledge.
---
## Entity Profile Consolidation
**In plain English:** The write pipeline seeds a thin profile when an entity is first seen, but it never overwrites an existing profile. This background job is the authoritative source for comprehensive, up-to-date entity profiles. It reads ALL active facts for each entity and generates a thorough profile covering every major aspect: identity, relationships, interests, activities, events, and preferences.
### Run Profile Consolidation
```python
from arandu import consolidate_entity_profiles, ProfileConsolidationResult
result: ProfileConsolidationResult = await consolidate_entity_profiles(
session=db_session,
agent_id="user_123",
llm_provider=llm_provider,
config=memory_config,
)
print(result.profiles_consolidated) # Number of profiles rebuilt
print(result.profiles_skipped) # Entities skipped (no facts or LLM error)
```
**How it works:**
1. Finds entities ordered by fact count (highest first), limited to 20 per run
2. Filters to entities whose profile is NULL or older than `summary_refresh_interval_days`
3. For each entity, loads up to 50 active facts (ordered by importance)
4. Sends all facts to the LLM with instructions to generate a comprehensive profile (100-300 tokens) covering ALL major aspects
5. Saves the profile to `memory_entities.profile_text` and updates `profile_refreshed_at`
6. Profiles are always generated in English, regardless of the language of the underlying facts
**Write pipeline vs background job:**
| | Write Pipeline | `consolidate_entity_profiles()` |
|---|---|---|
| **When** | During `write()` (synchronous) | Scheduled background job |
| **Scope** | Seeds thin profiles for NEW entities only | Rebuilds comprehensive profiles from ALL facts |
| **Overwrites** | Never overwrites existing profiles | Refreshes stale profiles |
| **Authority** | Initial seed only | Authoritative source for `profile_text` |
Entity profiles are used as context during informed extraction (write pipeline) to help the LLM understand what is already known about an entity. They are NOT injected into retrieval output.
### Configuration
| Parameter | Default | Description |
|-----------|---------|-------------|
| `summary_refresh_interval_days` | `7` | Days before a profile is considered stale and eligible for refresh |
---
## Memify
**In plain English:** Over time, specific details ("went to a Python meetup on March 5") become general knowledge ("regularly attends tech meetups"). Memify distills episodic facts into higher-level knowledge and prunes stale facts that haven't been mentioned in a while.
### Run Memify
```python
from arandu import run_memify, MemifyResult
result: MemifyResult = await run_memify(
session=db_session,
agent_id="user_123",
llm_provider=llm_provider,
embedding_provider=embedding_provider,
config=memory_config,
)
```
**How it works:**
1. Groups related facts by entity and topic
2. Generates distilled summaries (procedural/semantic knowledge)
3. Checks vitality - facts mentioned recently are kept; stale facts may be deprecated
4. Merges similar procedures to prevent knowledge fragmentation
### Vitality Scoring
```python
from arandu import compute_vitality
# Synchronous, per-fact function — NOT async
score = compute_vitality(fact) # uses datetime.now(UTC)
score = compute_vitality(fact, now=ts) # custom timestamp
```
Vitality measures how "alive" a fact is based on:
- **Retrieval** (0.30) - Log-scale of times_retrieved
- **Recency** (0.25) - Exponential decay from last_retrieved_at (30-day half-life)
- **Confidence** (0.20) - Raw confidence value
- **Reinforcement** (0.15) - Bounded reinforcement count (up to 5)
Correction penalty: `0.8 ^ user_correction_count`. Superseded facts (valid_to is not None) return 0.0.
> **Neuroscience parallel:** Memify mirrors the **forgetting curve** described by Hermann Ebbinghaus (1885). Memories decay exponentially over time unless reinforced through retrieval practice. Facts with high vitality (frequently accessed) resist decay, while low-vitality facts gradually fade - just as the brain prunes synaptic connections for unused information.
---
## Sleep-Time Compute
**In plain English:** Three maintenance jobs that keep retrieval sharp: (1) score which entities matter most, (2) refresh entity summaries for the important ones, (3) detect communities of related entities. The first one is pure SQL (cheap), the other two use LLM calls.
### Job 1: Entity Importance Scoring
```python
from arandu import compute_entity_importance, EntityImportanceResult
result: EntityImportanceResult = await compute_entity_importance(
session=db_session,
agent_id="user_123",
config=memory_config,
)
```
Pure SQL computation (no LLM calls). Scores each entity from 0.0 to 1.0 using four normalized signals:
| Signal | Weight | Description |
|--------|--------|-------------|
| Fact density | 0.30 | Number of facts linked to the entity (via `MemoryFactEntityLink`). Includes facts where the entity is primary subject AND facts that merely mention it. |
| Recency | 0.25 | Exponential decay (30-day half-life) |
| Retrieval frequency | 0.25 | How often facts about this entity are retrieved |
| Relationship degree | 0.20 | Number of incoming + outgoing relationships |
The importance score is used as a signal in retrieval scoring and as a priority factor for summary refresh.
> **Walkthrough: how importance scoring changes retrieval:** **Before importance scoring** - all entities have default importance = 0.5:
```
person:fernanda → importance: 0.50 (default)
person:marcos → importance: 0.50 (default)
organization:vertix → importance: 0.50 (default)
product:xgboost → importance: 0.50 (default)
```
**After importance scoring:**
```
person:fernanda → importance: 0.85 (12 facts, recent, high retrieval freq)
organization:vertix → importance: 0.78 (8 facts, many relationships)
person:marcos → importance: 0.55 (4 facts, moderate activity)
product:xgboost → importance: 0.25 (1 fact, mentioned once, no relationships)
```
**Impact on retrieval:** When two facts have similar semantic scores, the one about Fernanda (importance=0.85) ranks above the one about XGBoost (importance=0.25). This reflects the reality that Fernanda is a central entity in this user's memory while XGBoost is a peripheral detail.
**Impact on summary refresh:** Fernanda and Vertix get their summaries refreshed first (higher priority). XGBoost may never get a summary - it's too low priority.
### Job 2: Entity Summary Refresh
```python
from arandu import refresh_entity_summaries, SummaryRefreshResult
result: SummaryRefreshResult = await refresh_entity_summaries(
session=db_session,
agent_id="user_123",
llm_provider=llm_provider,
config=memory_config,
)
```
Refreshes stale entity summaries:
- **Stale condition**: `summary_text IS NULL` or last refresh > 7 days ago
- **Priority**: entities with higher `importance_score` refreshed first
- **Limit**: 10 entities per run (prevents timeout)
- Generates 2-3 sentence summaries from the entity's facts using an LLM
### Job 3: Entity Community Detection
```python
from arandu import detect_entity_communities
result = await detect_entity_communities(
session=db_session,
agent_id="user_123",
config=memory_config,
)
```
Connected-component clustering on the entity relationship graph (no LLM calls):
1. Loads active entities and edges (strength >= 0.3)
2. Runs Union-Find (with path compression + union by rank) to find connected components
3. Groups entities into communities (components with >= 2 members)
4. Returns `{"communities_found": int, "total_entities_assigned": int}`
> **Neuroscience parallel:** Sleep-time compute mirrors **offline processing during sleep**. The brain doesn't just passively store memories during sleep - it actively reorganizes them. Importance scoring is analogous to the brain's process of **synaptic homeostasis** (Tononi & Cirelli), where strongly activated synapses are maintained while weakly activated ones are pruned. Summary refresh mirrors the formation of **gist memories** - compressed representations that capture the essence of detailed episodes.
---
## Scheduling
`arandu` doesn't include a scheduler - you bring your own. All background functions are simple async callables that can be integrated with any scheduling system.
### Example: APScheduler
```python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from arandu import (
cluster_user_facts,
consolidate_entity_profiles,
run_consolidation,
compute_entity_importance,
refresh_entity_summaries,
)
scheduler = AsyncIOScheduler()
async def maintenance_cycle():
async with get_session() as session:
for agent_id in await get_active_users(session):
await compute_entity_importance(session, agent_id, config=config)
await refresh_entity_summaries(session, agent_id, llm_provider=llm, config=config)
await consolidate_entity_profiles(session, agent_id, llm_provider=llm, config=config)
await cluster_user_facts(session, agent_id, embeddings, llm, config)
await run_consolidation(session, agent_id, llm_provider=llm, config=config)
scheduler.add_job(maintenance_cycle, "interval", hours=4)
scheduler.start()
```
### Example: Simple Loop
```python
import asyncio
async def background_loop():
while True:
await maintenance_cycle()
await asyncio.sleep(4 * 3600) # every 4 hours
```
### Recommended Cadence
| Job | Frequency | Cost |
|-----|-----------|------|
| Entity importance | Every 4h | Cheap (SQL only) |
| Summary refresh | Every 4h | Moderate (LLM, limited to 10/run) |
| Profile consolidation | Every 4-8h | Moderate (LLM, limited to 20/run) |
| Clustering | Every 4-8h | Moderate (LLM for summaries) |
| Consolidation | Every 4-8h | Moderate (LLM for pattern detection) |
| Memify | Daily | Moderate (LLM for distillation) |
| Community detection | Daily | Moderate (LLM + embeddings) |
Run importance scoring first - its output is used by summary refresh to prioritize entities.
---
# Design Philosophy
`arandu` is designed around two foundations: **software engineering principles** that make it reliable and extensible, and **cognitive science models** that inform its architecture. This page covers both - the engineering decisions and the neuroscience parallels that inspired them.
---
## Engineering Principles
### Protocol-Based Dependency Injection
The SDK uses Python's `typing.Protocol` for all external dependencies (LLM, embeddings). No inheritance required - just implement the method signatures:
```python
@runtime_checkable
class LLMProvider(Protocol):
async def complete(
self,
messages: list[dict],
temperature: float = 0,
response_format: dict | None = None,
max_tokens: int | None = None,
) -> LLMResult: ...
```
**Why:** Vendor lock-in kills adoption. By using structural subtyping (duck typing), any LLM provider works without inheriting from a base class. The OpenAI provider is included for convenience, but you can swap in Anthropic, local models, or custom endpoints with zero SDK changes.
### Fail-Safe by Default
Every stage of the pipeline has fallback behavior:
| Stage | Failure | Fallback |
|-------|---------|----------|
| Informed Extraction | LLM timeout/error | Fall back to legacy blind extraction + reconciliation |
| Extraction (legacy) | LLM timeout/error | Return empty extraction; event still logged |
| Entity Resolution | LLM fallback fails | Create new entity (prefer duplicates over lost data) |
| Reconciliation | LLM error | Default to ADD |
| Reranking | Reranker fails | Keep original ranking |
| Background jobs | Any job fails | Other jobs proceed independently |
**Why:** In a production AI agent, memory is a supporting system - it should never crash the main flow. A degraded response (missing some context) is always better than an error.
### Composition Over Inheritance
The SDK has no abstract base classes, no deep class hierarchies. It's built from small, focused modules composed into pipelines:
- `write/extract.py` → `write/entity_resolution.py` → `write/reconcile.py` → `write/upsert.py`
- `read/retrieval_agent.py` (deterministic planner) → `read/retrieval.py` → `read/reranker.py`
**Why:** Each module has a single responsibility with clear inputs and outputs. You can understand, test, and replace any module independently. This follows the Unix philosophy: do one thing well.
### Savepoint-Based Transaction Safety
Write operations use database savepoints (`session.begin_nested()`) so that a failure in one fact doesn't abort the entire batch:
```python
async with session.begin_nested():
# If this fails, only this savepoint rolls back
session.add(new_fact)
await session.flush()
```
**Why:** In a pipeline that processes multiple facts per message, atomic all-or-nothing transactions are too fragile. Savepoints give per-fact atomicity while keeping the outer transaction alive.
---
## Neuroscience Parallels
The architecture of `arandu` draws from established models in cognitive neuroscience. Each parallel below maps a system component to its biological counterpart.
### Encoding: The Write Pipeline
**System:** Message → Alias Lookup → Pre-retrieval → Informed Extraction → Resolve → Upsert
**Brain:** Sensory input → Orienting response → Schema activation → Encoding → Association → Storage
When you experience something, your brain doesn't record a raw video. It encodes a **selective representation** - extracting salient features, linking them to existing knowledge, and storing the result in a form that can be retrieved later. The write pipeline does the same:
- **Alias lookup + pre-retrieval** is the orienting response: the brain compares incoming stimuli against existing schemas before committing anything to long-term storage. Known information triggers repetition suppression (lower neural firing), while novel or contradictory information triggers enhanced encoding (the novelty/mismatch signal from the hippocampus). Informed extraction replicates this -- the LLM sees what's already known and only extracts what's genuinely new or changed.
- **Informed extraction** is perception-with-context: an LLM extracts all factual information from the raw message, guided by prior knowledge. Just as human encoding is shaped by what you already know (schema-dependent encoding), the LLM receives existing entity profiles and facts as context. The reconciliation step (ADD/UPDATE/NOOP/DELETE) handles deduplication downstream, maximizing recall of specific details.
- **Entity resolution** is association: linking new mentions to existing memory traces
- **Reconciliation** (fallback path) is reconsolidation: updating existing memories when new information arrives
- **Upsert** is storage: committing the processed trace to long-term memory
### Associative Memory: Entity Resolution
**System:** 3-phase resolution (exact → fuzzy → LLM)
**Brain:** Pattern completion in hippocampal-neocortical circuits
The brain doesn't store memories as isolated records - it stores them as patterns of activation across neural networks. When you encounter a partial cue ("Carol"), your brain completes the pattern to retrieve the full representation ("Carolina, my colleague from work").
Entity resolution mirrors this process:
- **Exact match** = direct retrieval (strong, well-established associations)
- **Fuzzy match** = pattern completion (partial cue activates the most similar existing pattern)
- **LLM fallback** = deliberate recall (conscious effort to disambiguate when automatic retrieval fails)
The **fuzzy threshold** (0.85) and **LLM fallback range** (0.50-0.85) model the brain's confidence gradient: strong matches are automatic, ambiguous matches require deliberation.
### Reconsolidation: Fact Reconciliation
**System:** ADD / UPDATE / NOOP / DELETE decisions
**Brain:** Memory reconsolidation (Nader, Schiller, & LeDoux, 2000)
When a memory is retrieved, it enters a **labile state** where it can be modified. This is reconsolidation - the brain's mechanism for updating memories with new information while preserving the original trace.
The reconciliation stage models this process:
- **NOOP** = retrieval without modification (memory confirmed, `last_confirmed_at` updated)
- **UPDATE** = reconsolidation (old memory superseded, new version created with provenance link via `supersedes_fact_id`)
- **ADD** = new encoding (no existing memory to reconsolidate)
- **DELETE** = active forgetting (explicit retraction, modeled by setting `invalidated_at`)
The fact versioning system (`valid_from`, `valid_to`, `supersedes_fact_id`) preserves the full history - just as the brain retains traces of original memories even after reconsolidation.
### Spreading Activation: Graph Retrieval
**System:** BFS 2-hop traversal with decay factor
**Brain:** Spreading activation in semantic networks (Collins & Loftus, 1975)
In Collins and Loftus's model, when a concept is activated (e.g., "fire engine"), activation spreads along associative links to related concepts ("red", "truck", "emergency"), with strength decreasing as distance increases.
Graph retrieval implements this directly:
- **Seed entities** from the query activate the starting nodes
- **Hop 1** activates direct neighbors (no pruning - all connections fire)
- **Hop 2** activates second-degree connections (pruned by `min_edge_strength`)
- **Decay factor** (0.50 per hop) models the attenuation of activation over distance
- **Edge strength** models the associative strength between concepts (reinforced by repeated co-mention)
The `query_bonus` (1.5×) for entities whose names appear in the query models **top-down priming** - when you explicitly mention an entity, its connections are more strongly activated.
### Sleep-Time Compute: Background Processing
**System:** Clustering, consolidation, importance scoring, summary refresh
**Brain:** Memory consolidation during sleep (Diekelmann & Born, 2010)
During sleep, the brain performs critical maintenance:
1. **Hippocampal replay** - Recent experiences are replayed in compressed form, transferring them from short-term (hippocampal) to long-term (neocortical) storage
2. **Synaptic homeostasis** - Strongly activated synapses are maintained while weakly activated ones are pruned (Tononi & Cirelli)
3. **Pattern detection** - The neocortex detects statistical regularities across episodes
4. **Gist extraction** - Detailed episodic memories are compressed into semantic knowledge
The background jobs map to these processes:
| Brain process | System job | Mechanism |
|---------------|-----------|-----------|
| Hippocampal replay | Consolidation | Reviews recent events, detects patterns and contradictions |
| Synaptic homeostasis | Importance scoring | Scores entities by density + recency + retrieval frequency + connectivity |
| Pattern detection | Community detection | Finds groups of related entities via graph analysis |
| Gist extraction | Summary refresh + Memify | Generates compressed summaries from detailed facts |
### Forgetting Curve: Vitality and Recency
**System:** Recency decay, vitality scoring, importance-based pruning
**Brain:** Ebbinghaus forgetting curve (1885)
Hermann Ebbinghaus demonstrated that memory retention decays exponentially over time, but each retrieval (practice) resets the curve and slows future decay. This is the **spacing effect** - the most robust finding in memory research.
`arandu` models this with:
- **Recency decay** - Exponential decay with configurable half-life (`recency_half_life_days`). Recent facts score higher. This models the basic forgetting curve.
- **Retrieval reinforcement** - Each NOOP decision (fact confirmed during write) updates `last_confirmed_at`, effectively "practicing" the fact and resetting its decay curve.
- **Vitality scoring** - Combines recency, confirmation recency (`last_confirmed_at`), and importance to determine how "alive" a fact is. Low-vitality facts are candidates for consolidation or pruning.
### Selective Attention: Reranking
**System:** LLM reranker on retrieval candidates
**Brain:** Selective attention (Broadbent, 1958; Treisman, 1964)
The brain doesn't process all sensory input equally - selective attention filters and prioritizes information based on current goals. The cocktail party effect demonstrates this: you can focus on one conversation in a noisy room by filtering out irrelevant signals.
The reranker acts as the attention filter:
- Raw retrieval signals (semantic, keyword, graph) produce a broad set of candidates - like the full sensory input
- The reranker evaluates each candidate against the query intent - like attentional selection
- Only the most relevant facts pass through to the context - like the attended signal
This is why the reranker uses an LLM (not just scoring heuristics): attention is goal-directed and requires understanding the **meaning** of both query and candidates.
### Working Memory: Context Budget
**System:** Token budget with facts/patterns/events sections
**Brain:** Working memory (Baddeley & Hitch, 1974; Cowan, 2001)
Working memory has a strict capacity limit - Cowan estimates 4±1 items can be held in the focus of attention simultaneously. The context budget models this constraint:
- **Token budget** = capacity limit (you can't send infinite context to an LLM)
- **Facts** (80% budget) = focus of attention (the most relevant facts for the current query, as a clean bullet list)
- **Patterns** (20% budget) = activated long-term memory (meta-observations and trends)
- **Events** (overflow) = peripheral activation (recent conversation snippets for episodic context)
This tiered approach ensures the LLM receives a focused, prioritized context rather than a noisy dump of everything the system knows. The format is clean and free of internal metadata -- no timestamps on facts, no entity prefixes, no confidence scores.
---
## Summary Table
| System Component | Neuroscience Model | Key Reference |
|-----------------|-------------------|---------------|
| Write Pipeline | Encoding | - |
| Informed Extraction | Orienting response / Schema-dependent encoding | Sokolov (1963); Tulving & Kroll (1995) |
| Entity Profiles | Schema activation (write-time only) | Meyer & Schvaneveldt (1971) |
| Entity Resolution | Associative memory / Pattern completion | - |
| Reconciliation | Reconsolidation | Nader, Schiller, & LeDoux (2000) |
| Graph Retrieval | Spreading activation | Collins & Loftus (1975) |
| Recency Decay | Forgetting curve | Ebbinghaus (1885) |
| Background Jobs | Sleep consolidation | Diekelmann & Born (2010) |
| Importance Scoring | Synaptic homeostasis | Tononi & Cirelli (SHY) |
| Summary Refresh | Gist memory formation | - |
| Reranking | Selective attention | Broadbent (1958) |
| Context Budget | Working memory capacity | Baddeley & Hitch (1974); Cowan (2001) |
| Vitality/Reinforcement | Spacing effect | Ebbinghaus (1885) |
> **These are analogies, not claims:** The parallels above are architectural inspirations, not scientific claims. `arandu` is an engineering system, not a cognitive model. The brain is vastly more complex - these parallels highlight the design intuitions, not the biological mechanisms.
---
# Write Pipeline API
> **Advanced API:** These are advanced APIs for power users who want to interact with individual pipeline stages directly. Most users should use [`MemoryClient.write()`](../reference/index.md) instead, which orchestrates the full pipeline automatically.
All write pipeline functions are exported from `arandu.write`.
```python
from arandu.write import (
classify_input, select_strategy, run_write_pipeline,
canonicalize_attribute_key, normalize_key, validate_proposed_key,
create_or_update_entity, get_entities_for_user, get_entity_by_key,
detect_and_record_corrections, is_user_correction,
get_pending, clear_pending, save_pending_execution, save_pending_selection,
)
```
---
## Pipeline Orchestrator
### run_write_pipeline
Executes the full write pipeline. By default (when `enable_informed_extraction=True`), the pipeline runs: **alias lookup** -> **pre-retrieval / profile load** -> **informed extraction** -> **resolve** -> **upsert**. If informed extraction fails, it falls back to: **extract** -> **resolve** -> **reconcile** -> **upsert**. When `enable_informed_extraction=False`, it always uses the legacy path.
```python
async def run_write_pipeline(
session: AsyncSession,
agent_id: str,
message: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
speaker_name: str,
source: str = "api",
recent_messages: list[str] | None = None,
trace: PipelineTrace | None = None,
) -> dict
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session (caller manages transaction/commit). |
| `agent_id` | `str` | Unique identifier for the agent. |
| `message` | `str` | The user's message text. |
| `llm` | `LLMProvider` | Injected LLM provider. |
| `embeddings` | `EmbeddingProvider` | Injected embedding provider. |
| `config` | `MemoryConfig` | Memory configuration. |
| `speaker_name` | `str` | Name of the person speaking the message. Pronouns like "I", "me", "eu" resolve to this speaker entity (`person:{speaker_slug}`). |
| `source` | `str` | Source channel identifier (default `"api"`). |
| `recent_messages` | `list[str] | None` | Optional conversation context (last N messages) for resolving pronouns and anaphora. |
| `trace` | `PipelineTrace | None` | Optional pipeline trace for verbose mode. When provided, each stage records intermediate data. |
**Returns:** `dict` with keys `event_id`, `facts_added`, `facts_updated`, `facts_unchanged`, `facts_deleted`, `entities_resolved`, `duration_ms`.
The pipeline creates an immutable `MemoryEvent` first (survives even if later stages fail), then runs informed extraction (or fallback blind extraction), entity resolution, reconciliation, and upsert inside a savepoint for atomicity. Reconciliation always runs regardless of extraction mode — it compares new facts against existing ones and decides ADD/UPDATE/NOOP/DELETE. Entity profiles are persisted in the same transaction when returned by informed extraction.
---
## Extraction Strategy
Pure functions (no LLM, no DB) that classify input text and choose an extraction mode based on heuristics.
### InputType
```python
class InputType(str, Enum):
SHORT = "short" # < 500 chars
MEDIUM = "medium" # 500-2000 chars, unstructured
LONG = "long" # > 2000 chars, unstructured
STRUCTURED = "structured" # > 500 chars with headers/bullets/tables
```
### ExtractionMode
```python
class ExtractionMode(str, Enum):
SINGLE_SHOT = "single_shot"
CHUNKED = "chunked"
```
### InputClassification
Result of input text analysis.
| Field | Type | Description |
|-------|------|-------------|
| `input_type` | `InputType` | Classified input type. |
| `char_count` | `int` | Number of characters. |
| `estimated_tokens` | `int` | Estimated token count (chars // 4). |
| `has_headers` | `bool` | Whether headers were detected. |
| `has_bullets` | `bool` | Whether bullet points were detected. |
| `has_tables` | `bool` | Whether tables were detected. |
| `section_count` | `int` | Number of text sections. |
| `line_count` | `int` | Number of lines. |
### ExtractionStrategy
Selected extraction strategy.
| Field | Type | Description |
|-------|------|-------------|
| `mode` | `ExtractionMode` | Extraction mode (single_shot or chunked). |
| `reason` | `str` | Human-readable reason for the selection. |
| `max_tokens_per_call` | `int` | Max tokens per LLM call. |
| `estimated_chunks` | `int` | Number of expected chunks (1 for single-shot). |
| `chunk_context_hint` | `str | None` | Hint about document type for chunked mode. |
### classify_input
Classify input text using heuristics (no LLM call).
```python
def classify_input(text: str) -> InputClassification
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `text` | `str` | Input text to classify. |
**Returns:** `InputClassification` with detected features.
```python
from arandu.write import classify_input, select_strategy
classification = classify_input("My wife's name is Ana and we live in Sao Paulo.")
print(classification.input_type) # InputType.SHORT
print(classification.char_count) # 49
```
### select_strategy
Select extraction strategy from a classification result.
```python
def select_strategy(classification: InputClassification) -> ExtractionStrategy
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `classification` | `InputClassification` | Result of `classify_input()`. |
**Returns:** `ExtractionStrategy` with mode and parameters.
```python
strategy = select_strategy(classification)
print(strategy.mode) # ExtractionMode.SINGLE_SHOT
print(strategy.estimated_chunks) # 1
```
---
## Attribute Key Canonicalization
Pipeline: **exact match** -> **alias** -> **dotted variant** -> **suffix** -> **open catalog** -> **drop**.
### normalize_key
Normalize a raw attribute key: lowercase, strip, spaces/hyphens to dots. Underscores are preserved.
```python
def normalize_key(raw: str) -> str
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `raw` | `str` | Raw attribute key string. |
**Returns:** Normalized key string.
```python
from arandu.write import normalize_key
normalize_key("Personal Info") # "personal.info"
normalize_key("food_preference") # "food_preference"
```
### validate_proposed_key
Validate that a proposed key meets naming rules.
```python
def validate_proposed_key(
key: str,
extra_namespaces: set[str] | None = None,
) -> bool
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `key` | `str` | Normalized key to validate. |
| `extra_namespaces` | `set[str] | None` | Optional deployer-provided namespaces to accept. |
**Returns:** `True` if key is well-formed and in an allowed namespace.
### canonicalize_attribute_key
Canonicalize an attribute key via catalog, alias, and recovery strategies. This is an async function that queries the database for registry lookups.
```python
async def canonicalize_attribute_key(
session: AsyncSession,
agent_id: str,
raw_key: str,
config: MemoryConfig,
) -> tuple[str | None, Literal["allow", "map", "propose", "drop"], dict[str, Any]]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `raw_key` | `str` | Raw attribute key from extraction. |
| `config` | `MemoryConfig` | Memory configuration. |
**Returns:** Tuple of `(canonical_key, action, metadata)` where action is one of `"allow"`, `"map"`, `"propose"`, or `"drop"`.
---
## Entity Helpers
Async CRUD operations for `MemoryEntity` records using PostgreSQL `ON CONFLICT` upsert.
### create_or_update_entity
Create a `MemoryEntity` or update if it exists.
```python
async def create_or_update_entity(
session: AsyncSession,
agent_id: str,
canonical_key: str,
display_name: str | None = None,
entity_type: str = "other",
) -> MemoryEntity
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `canonical_key` | `str` | Canonical entity key. |
| `display_name` | `str | None` | Optional display name. |
| `entity_type` | `str` | Entity type (person, pet, place, etc.). Default `"other"`. |
**Returns:** The created or updated `MemoryEntity`.
### get_entity_by_key
Get a single `MemoryEntity` by agent_id and canonical_key.
```python
async def get_entity_by_key(
session: AsyncSession,
agent_id: str,
canonical_key: str,
) -> MemoryEntity | None
```
**Returns:** `MemoryEntity` or `None` if not found.
### get_entities_for_user
List all `MemoryEntity` records for a user.
```python
async def get_entities_for_user(
session: AsyncSession,
agent_id: str,
active_only: bool = True,
) -> list[MemoryEntity]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `active_only` | `bool` | If True, only return active entities. Default `True`. |
**Returns:** List of `MemoryEntity` records, ordered by `last_seen_at` descending.
---
## Correction Detection
Detects when users correct memory facts by comparing old vs new values for the same attribute_key.
### CorrectionResult
| Field | Type | Description |
|-------|------|-------------|
| `corrections_detected` | `int` | Number of corrections found. Default `0`. |
| `corrected_keys` | `list[str]` | Attribute keys that were corrected. |
| `facts_corrected_ids` | `list[str]` | IDs of old facts that were corrected. |
### is_user_correction
Check if a new fact corrects an old fact (same key, different value).
```python
def is_user_correction(old_fact: object, new_fact: object) -> bool
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `old_fact` | `object` | The existing fact being superseded. |
| `new_fact` | `object` | The new fact replacing it. |
**Returns:** `True` if this is a user correction.
### detect_and_record_corrections
Detect supersedes with value changes and increment correction count on old facts.
```python
async def detect_and_record_corrections(
session: AsyncSession,
agent_id: str,
saved_facts: list[Any],
) -> CorrectionResult
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `saved_facts` | `list[Any]` | List of newly saved MemoryFact objects. |
**Returns:** `CorrectionResult` with detection stats.
---
## Pending Operations
In-memory store for pending destructive operations with a 5-minute TTL. State is per-process and lost on restart.
### save_pending_selection
Save a pending selection when a search returned results awaiting user choice.
```python
def save_pending_selection(
agent_id: str,
intent: str,
transactions: list[Any],
confirmation_text: str,
edit_params: dict[str, Any] | None = None,
) -> None
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `agent_id` | `str` | Agent identifier. |
| `intent` | `str` | The user's intent (delete, edit, etc.). |
| `transactions` | `list[Any]` | List of candidate transactions. |
| `confirmation_text` | `str` | Text to show user for confirmation. |
| `edit_params` | `dict | None` | Optional parameters for edit operations. |
### save_pending_execution
Save a pending execution when a destructive operation was blocked.
```python
def save_pending_execution(
agent_id: str,
tool_calls: list[Any],
search_result: str,
confirmation_text: str,
) -> None
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `agent_id` | `str` | Agent identifier. |
| `tool_calls` | `list[Any]` | Blocked tool calls. |
| `search_result` | `str` | Context from the search. |
| `confirmation_text` | `str` | Text to show user for confirmation. |
### get_pending
Get pending operation if it exists and hasn't expired (5-minute TTL).
```python
def get_pending(agent_id: str) -> dict[str, Any] | None
```
**Returns:** Pending operation dict, or `None` if expired/absent.
### clear_pending
Remove pending operation after execution or cancellation.
```python
def clear_pending(agent_id: str) -> None
```
---
# Read Pipeline API
> **Advanced API:** These are advanced APIs for power users who want to interact with individual retrieval stages directly. Most users should use [`MemoryClient.retrieve()`](../reference/index.md) instead, which orchestrates the full multi-signal pipeline automatically.
All read pipeline functions are exported from `arandu.read`.
```python
from arandu.read import (
run_read_pipeline,
plan_retrieval, expand_query,
retrieve_relevant_events, compute_pattern_signal,
retrieve_graph_facts, spread_activation,
compress_context, compress_broad_context,
materialize_emotional_trends, get_emotional_summary_for_context,
compute_dynamic_importance,
generate_optimized_directives, check_directive_contradiction,
effective_confidence, invalidate_directive_cache,
)
```
---
## Pipeline Orchestrator
### run_read_pipeline
Executes the full read pipeline: **plan** -> **retrieve (multi-signal)** -> **rerank** -> **format**.
Multi-signal retrieval runs semantic + keyword + graph in parallel via `asyncio.gather()`. The deterministic planner identifies entities and generates pattern queries for the keyword signal.
```python
async def run_read_pipeline(
session: AsyncSession,
agent_id: str,
query: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
trace: PipelineTrace | None = None,
) -> ReadResult
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session (caller manages transaction). |
| `agent_id` | `str` | Agent identifier. |
| `query` | `str` | The query to search memory for. |
| `llm` | `LLMProvider` | Injected LLM provider. |
| `embeddings` | `EmbeddingProvider` | Injected embedding provider. |
| `config` | `MemoryConfig` | Memory configuration. |
| `trace` | `PipelineTrace | None` | Optional pipeline trace for verbose mode. When provided, each stage records intermediate data. |
**Returns:** `ReadResult` with `facts` (list of `ScoredFact`), `context` (prompt-ready string), `total_candidates`, and `duration_ms`.
---
## Retrieval Planner
The retrieval planner is a deterministic function that analyzes the user query and decides the retrieval strategy before any search happens. It uses regex matching and schema lookups -- no LLM call is involved.
### PatternQuery
A pattern-based query for keyword signal matching.
| Field | Type | Description |
|-------|------|-------------|
| `entity_pattern` | `str` | SQL LIKE pattern for entity_key matching. |
| `attribute_filter` | `str | None` | Optional attribute key filter (always `None` in V5). |
### RetrievalPlan
Output of the retrieval agent. V5 runs all signals (semantic, graph, keyword) in parallel.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `strategy` | `str` | `"multi_signal"` | `"multi_signal"` (default) or `"skip"`. |
| `entities` | `list[str]` | `[]` | Detected entity_keys for graph signal. |
| `pattern_queries` | `list[PatternQuery]` | `[]` | Pattern queries for keyword signal. |
| `similarity_query` | `str | None` | `None` | Query for semantic signal (always the original query -- no reformulation). |
| `max_facts` | `int` | `50` | Budget per signal. |
| `reason` | `str` | `""` | Why this plan was chosen. |
| `latency_ms` | `float` | `0.0` | Time spent planning. |
| `as_of_range` | `tuple[datetime, datetime] | None` | `None` | Optional time-travel window. |
| `broad_query` | `bool` | `False` | True for comprehensive queries. |
### plan_retrieval
Deterministic function that decides retrieval strategy using regex matching and schema lookups. No LLM call is made. Entity extraction is handled downstream by `resolve_query_entities`. Anaphora resolution (e.g., resolving "she" or "it" from conversation context) is the caller's responsibility -- the query should already contain resolved references.
```python
def plan_retrieval(
query_text: str,
schema_prefixes: list[str],
) -> RetrievalPlan
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `query_text` | `str` | The user's query (anaphora should already be resolved by the caller). |
| `schema_prefixes` | `list[str]` | Known entity key prefixes from the agent's schema (e.g., `["person", "org"]`). |
**Returns:** `RetrievalPlan` with strategy, pattern queries, and query parameters.
---
## Query Expansion
Post-processes a `RetrievalPlan` with entity priming -- resolves entities mentioned in the query via the knowledge graph (aliases + relationships) and injects context terms.
### ExpandedQuery
| Field | Type | Description |
|-------|------|-------------|
| `primed_entities` | `list[str]` | Entity keys discovered via alias + KG priming. |
| `temporal_range` | `tuple[datetime, datetime] | None` | Resolved date range (from retrieval agent). |
| `expanded_terms` | `list[str]` | Additional context terms from entity facts. |
### expand_query
Expand a retrieval plan with entity priming. Fail-safe: any exception returns an empty `ExpandedQuery`.
```python
async def expand_query(
session: AsyncSession,
agent_id: str,
query: str,
plan: RetrievalPlan,
llm: object,
) -> ExpandedQuery
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `query` | `str` | Original user query text. |
| `plan` | `RetrievalPlan` | RetrievalPlan from the retrieval agent. |
| `llm` | `object` | LLM provider (reserved for future use). |
**Returns:** `ExpandedQuery` with primed entities, temporal range, and expanded terms.
---
## Fact Retrieval
### retrieve_relevant_events
Retrieve relevant events by embedding similarity + recency scoring.
```python
async def retrieve_relevant_events(
session: AsyncSession,
agent_id: str,
query_embedding: list[float],
config: MemoryConfig,
limit: int | None = None,
) -> list[dict[str, Any]]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `query_embedding` | `list[float]` | Query embedding vector. |
| `config` | `MemoryConfig` | Memory configuration. |
| `limit` | `int | None` | Max events to return. |
**Returns:** List of event dicts with `date`, `text`, `score`, `event_id`.
### compute_pattern_signal
Boost facts that have been recently confirmed (pattern signal). Facts with recent `last_confirmed_at` timestamps (confirmed via NOOP decisions in write) get a small additive score boost (up to 0.1).
```python
def compute_pattern_signal(
candidates: list[RetrievalCandidate],
) -> list[RetrievalCandidate]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `candidates` | `list[RetrievalCandidate]` | Current ranked candidates. |
**Returns:** Candidates with updated scores, sorted by `final_score`.
---
## Graph Retrieval
BFS 2-hop traversal on the `MemoryEntityRelationship` knowledge graph with relevance pruning.
### GraphRetrievalResult
| Field | Type | Description |
|-------|------|-------------|
| `facts` | `list[dict[str, Any]]` | Scored fact dicts with `source="graph"`. |
| `neighbor_keys` | `list[str]` | Entity keys discovered via BFS. |
| `edges_traversed` | `int` | Total edges examined during BFS. |
| `edges` | `list[dict[str, Any]]` | Deduplicated edge dicts with display names. |
### retrieve_graph_facts
BFS 2-hop retrieval with composite scoring: `edge_strength * recency * edge_recency * query_bonus`.
```python
async def retrieve_graph_facts(
session: AsyncSession,
agent_id: str,
entity_keys: list[str],
*,
min_confidence: float = 0.3,
as_of_start: datetime | None = None,
as_of_end: datetime | None = None,
broad_query: bool = False,
max_facts: int | None = None,
query_text: str = "",
min_edge_strength: float = 0.5,
) -> GraphRetrievalResult
```
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `session` | `AsyncSession` | -- | Database session. |
| `agent_id` | `str` | -- | Agent identifier. |
| `entity_keys` | `list[str]` | -- | Seed entity_keys to start BFS from. |
| `min_confidence` | `float` | `0.3` | Minimum fact confidence threshold. |
| `as_of_start` | `datetime | None` | `None` | Start of temporal window. |
| `as_of_end` | `datetime | None` | `None` | End of temporal window. |
| `broad_query` | `bool` | `False` | When True, allows expanded budget. |
| `max_facts` | `int | None` | `None` | Override default limit (30). |
| `query_text` | `str` | `""` | Original query text for query_bonus scoring. |
| `min_edge_strength` | `float` | `0.5` | Minimum edge strength for hop 2+ pruning. |
**Returns:** `GraphRetrievalResult` with scored facts and graph metadata.
---
## Spreading Activation
Expands context from seed facts by following `entity_key`, `cluster_id`, and knowledge graph relationship links. Uses dynamic importance scoring with decay per hop.
### SpreadingActivationResult
| Field | Type | Description |
|-------|------|-------------|
| `candidates` | `list[RetrievalCandidate]` | Expanded candidates from hop 1-2. |
| `meta_observations` | `list[Any]` | Relevant meta-observations referencing seed facts. |
| `entities_explored` | `list[str]` | Entity keys explored during spreading. |
| `clusters_explored` | `list[str]` | Cluster IDs explored during spreading. |
| `hop1_count` | `int` | Number of facts found in hop 1. |
| `hop2_count` | `int` | Number of facts found in hop 2. |
| `kg_relationships_explored` | `int` | Number of KG relationships traversed. |
### spread_activation
Expand context from seed facts via entity_key, cluster_id, and KG relationships (hop 1-2).
```python
async def spread_activation(
session: AsyncSession,
agent_id: str,
seed_fact_ids: list[str],
config: MemoryConfig,
*,
seed_scores: dict[str, float] | None = None,
allowed_keys: set[str] | None = None,
) -> list[RetrievalCandidate]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `seed_fact_ids` | `list[str]` | IDs of seed facts to expand from. |
| `config` | `MemoryConfig` | Memory configuration with spreading activation params. |
| `seed_scores` | `dict[str, float] | None` | Optional dict mapping seed fact ID to score. |
| `allowed_keys` | `set[str] | None` | Optional set of allowed attribute keys. |
**Returns:** List of `RetrievalCandidate` objects from spreading activation. Fail-safe: returns empty list on error.
---
## Context Compression
Builds a prompt-ready context string from scored facts, events, clusters, and meta-observations using a tiered system: **Hot** (Tier 1), **Warm** (Tier 2), **Cold** (Tier 3).
### CompressedContext
| Field | Type | Description |
|-------|------|-------------|
| `context_text` | `str` | Final prompt-ready context string. |
| `hot_count` | `int` | Number of facts in hot tier (Tier 1). |
| `warm_count` | `int` | Number of facts in warm tier (Tier 2). |
| `cold_count` | `int` | Number of items in cold tier (Tier 3). |
| `total_tokens` | `int` | Estimated token count of context_text. |
### compress_context
Build tiered context text within token budget.
```python
async def compress_context(
facts: list[dict[str, Any]],
events: list[dict[str, Any]],
config: MemoryConfig,
*,
clusters: list[Any] | None = None,
meta_observations: list[Any] | None = None,
stale_keys: set[str] | None = None,
stale_threshold_days: int = 90,
now: datetime | None = None,
) -> CompressedContext
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `facts` | `list[dict]` | Scored fact dicts (must have `score`, `fact`, `entity`, `attribute`, `value`, `date` keys). |
| `events` | `list[dict]` | Event dicts with `date` and `text` keys. |
| `config` | `MemoryConfig` | Memory configuration with token budget and tier ratios. |
| `clusters` | `list | None` | Optional cluster objects. |
| `meta_observations` | `list | None` | Optional meta-observation objects. |
| `stale_keys` | `set[str] | None` | Attribute keys considered always-stale. |
| `stale_threshold_days` | `int` | Days after which a fact is stale (default 90). |
| `now` | `datetime | None` | Current timestamp (defaults to UTC now). |
**Returns:** `CompressedContext` with tiered context text.
### compress_broad_context
Build context for broad queries using clusters as primary unit.
```python
async def compress_broad_context(
cluster_facts: dict[str, list[dict[str, Any]]],
clusters: list[Any],
config: MemoryConfig,
*,
meta_observations: list[Any] | None = None,
events: list[dict[str, Any]] | None = None,
) -> CompressedContext
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `cluster_facts` | `dict[str, list[dict]]` | Mapping of cluster_label to fact dicts. |
| `clusters` | `list[Any]` | Cluster objects with `label`, `summary_text`, `fact_count`. |
| `config` | `MemoryConfig` | Memory configuration. |
| `meta_observations` | `list | None` | Optional meta-observation objects. |
| `events` | `list[dict] | None` | Optional event dicts. |
**Returns:** `CompressedContext` with cluster-first context text.
---
## Emotional Trends
Materializes emotional trends from memory events and provides formatted summaries for injection into retrieval context.
### EmotionalTrendsResult
| Field | Type | Description |
|-------|------|-------------|
| `emotion_counts` | `dict[str, int]` | Mapping of emotion to occurrence count. |
| `trend_direction` | `str` | `"increasing"`, `"decreasing"`, or `"stable"`. |
| `dominant_emotion` | `str | None` | Most frequent emotion, or None. |
| `trigger_keywords` | `list[str]` | Top keywords from high-intensity events. |
| `avg_intensity` | `float` | Average emotion intensity across events. |
| `dominant_intensity` | `float` | Average intensity of the dominant emotion. |
| `dominant_energy` | `str` | Predominant energy level (high/medium/low). |
| `events_analyzed` | `int` | Number of events analyzed. |
| `observation_created` | `bool` | Whether a meta-observation was created/updated. |
| `observation_id` | `str | None` | ID of the created/updated observation. |
### materialize_emotional_trends
Aggregate emotion data from events, detect trends, and materialize as a meta-observation.
```python
async def materialize_emotional_trends(
session: AsyncSession,
agent_id: str,
config: MemoryConfig,
) -> EmotionalTrendsResult
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `config` | `MemoryConfig` | Memory configuration with trend window and min events. |
**Returns:** `EmotionalTrendsResult` with aggregated trend data.
### get_emotional_summary_for_context
Return formatted emotional summary for injection into retrieval context. Returns `None` if no recent (7-day) active emotional trend exists.
```python
async def get_emotional_summary_for_context(
session: AsyncSession,
agent_id: str,
) -> str | None
```
**Returns:** Formatted summary string, or `None`.
---
## Dynamic Importance
### compute_dynamic_importance
Compute dynamic importance score for a memory fact. Inspired by cognitive memory strength models.
Components:
- **retrieval_boost**: `log(1 + times_retrieved)` -- saturates gradually
- **recency_of_use_boost**: decays from `last_retrieved_at` (half-life 7 days)
- **correction_penalty**: `0.8^n` for each user correction
- **pattern_boost**: 1.3x if fact is part of an active meta-observation
```python
def compute_dynamic_importance(
base_importance: float,
times_retrieved: int,
last_retrieved_at: datetime | None,
user_correction_count: int,
is_in_active_pattern: bool,
now: datetime | None = None,
) -> float
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `base_importance` | `float` | Base importance score (typically 0.5). |
| `times_retrieved` | `int` | Number of times this fact has been retrieved. |
| `last_retrieved_at` | `datetime | None` | When the fact was last retrieved. |
| `user_correction_count` | `int` | Number of user corrections on this fact. |
| `is_in_active_pattern` | `bool` | Whether fact is part of an active meta-observation. |
| `now` | `datetime | None` | Current timestamp (defaults to UTC now). |
**Returns:** Dynamic importance score, clamped to `[0.05, 3.0]`.
---
## Procedural Memory
LLM-optimized behavioral directives system that compresses persona + learned behavioral preferences into cohesive instruction blocks.
### DirectiveBlock
| Field | Type | Description |
|-------|------|-------------|
| `text` | `str` | Cohesive behavioral instructions block. |
| `directive_count` | `int` | Number of active directives used. |
| `cache_hit` | `bool` | Whether this was served from cache. |
### ContradictionResult
| Field | Type | Description |
|-------|------|-------------|
| `has_contradiction` | `bool` | Whether a contradiction was found. |
| `conflicting_directive` | `str | None` | Title of the conflicting directive. |
| `resolution` | `str | None` | Explanation of how the contradiction was resolved. |
### generate_optimized_directives
Generate an LLM-optimized behavioral instructions block by integrating persona + learned directives.
```python
async def generate_optimized_directives(
session: AsyncSession,
agent_id: str,
llm_provider: LLMProvider,
config: MemoryConfig,
*,
persona_text: str = "",
) -> DirectiveBlock
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `llm_provider` | `LLMProvider` | Injected LLM provider. |
| `config` | `MemoryConfig` | Memory configuration. |
| `persona_text` | `str` | Optional persona description. |
**Returns:** `DirectiveBlock` with generated text. Result is cached by hash of directive IDs + reinforcement counts. Fail-safe: returns empty `DirectiveBlock` on error.
### check_directive_contradiction
Check a new directive against existing ones for contradictions. Uses embedding similarity as pre-filter, then LLM as judge.
```python
async def check_directive_contradiction(
session: AsyncSession,
agent_id: str,
new_directive: str,
embedding_provider: EmbeddingProvider,
llm_provider: LLMProvider,
*,
similarity_threshold: float = 0.80,
) -> ContradictionResult
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `session` | `AsyncSession` | Database session. |
| `agent_id` | `str` | Agent identifier. |
| `new_directive` | `str` | Text of the new directive to check. |
| `embedding_provider` | `EmbeddingProvider` | Injected embedding provider. |
| `llm_provider` | `LLMProvider` | Injected LLM provider. |
| `similarity_threshold` | `float` | Minimum similarity to trigger LLM check (default 0.80). |
**Returns:** `ContradictionResult` with check outcome. Fail-safe: returns no contradiction on error.
### effective_confidence
Apply temporal decay to directive confidence. Formula: `base_confidence * 0.95^weeks`.
```python
def effective_confidence(
base_confidence: float,
created_at: datetime,
now: datetime | None = None,
) -> float
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `base_confidence` | `float` | Original confidence value (0.0-1.0). |
| `created_at` | `datetime` | When the directive was created. |
| `now` | `datetime | None` | Current timestamp (defaults to UTC now). |
**Returns:** Decayed confidence, floored at 0.10.
### invalidate_directive_cache
Manually invalidate the directive cache for a user.
```python
def invalidate_directive_cache(agent_id: str) -> None
```
---
# Database Utilities
The `arandu.db` module provides low-level database setup functions. These are used internally by `MemoryClient` but are available for advanced use cases where you need direct control over the database engine and session lifecycle.
```python
from arandu.db import create_engine, create_session_factory, init_db
```
---
## create_engine
Create an async SQLAlchemy engine from a connection string.
Automatically converts `postgresql://` to `postgresql+psycopg://` if the async driver prefix is missing.
```python
def create_engine(database_url: str) -> AsyncEngine
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `database_url` | `str` | PostgreSQL connection string. |
**Returns:** `AsyncEngine` instance.
```python
from arandu.db import create_engine
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
# Internally becomes: postgresql+psycopg://user:pass@localhost:5432/mydb
```
---
## create_session_factory
Create an async session factory bound to the given engine.
```python
def create_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `engine` | `AsyncEngine` | The async engine to bind sessions to. |
**Returns:** `async_sessionmaker[AsyncSession]` with `expire_on_commit=False`.
```python
from arandu.db import create_engine, create_session_factory
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
SessionFactory = create_session_factory(engine)
async with SessionFactory() as session:
# Use session for queries
...
```
---
## init_db
Create all memory tables in the consumer's database.
Uses `Base.metadata.create_all` -- safe to call multiple times (creates only tables that don't already exist). This ensures all SQLAlchemy model classes are registered before creating tables.
```python
async def init_db(engine: AsyncEngine) -> None
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `engine` | `AsyncEngine` | The async engine to create tables on. |
```python
from arandu.db import create_engine, init_db
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
await init_db(engine)
```
---
## Database Schema
The SDK defines its SQLAlchemy models in `arandu.models`. Key tables include:
| Table | Description |
|-------|-------------|
| `memory_events` | Immutable event records (user messages with embeddings). |
| `memory_facts` | Extracted facts with entity/attribute/value triples and embeddings. |
| `memory_entities` | Entity registry (people, places, pets, etc.). |
| `memory_entity_aliases` | Aliases for entity resolution. |
| `memory_entity_relationships` | Knowledge graph edges between entities. |
| `memory_clusters` | Semantic clusters of related facts. |
| `memory_meta_observations` | Detected patterns, insights, and behavioral preferences. |
| `memory_attribute_registry` | Custom attribute key registry per user. |
| `session_observations` | L1 session-level observations from the observer. |
All tables use UUID primary keys and include `agent_id` for multi-tenant isolation. The `memory_facts` and `memory_events` tables have `pgvector` embedding columns for semantic search.
> **Schema Management:** For production deployments, consider using Alembic migrations instead of `init_db()`. The `init_db()` function is convenient for development and testing but does not handle schema migrations for existing tables.
---
# Data Types Reference
This page documents all dataclasses, enums, and result types used across the write pipeline, read pipeline, and background jobs that are not covered in the main [API Reference](../reference/index.md).
---
## Write Pipeline Types
### InputType
```python
class InputType(str, Enum)
```
Input text classification types, determined by heuristics in `classify_input()`.
| Value | Description |
|-------|-------------|
| `SHORT` | Less than 500 characters. |
| `MEDIUM` | 500-2000 characters, unstructured. |
| `LONG` | More than 2000 characters, unstructured. |
| `STRUCTURED` | More than 500 characters with headers, bullets, or tables. |
### ExtractionMode
```python
class ExtractionMode(str, Enum)
```
| Value | Description |
|-------|-------------|
| `SINGLE_SHOT` | Single LLM call for extraction. |
| `CHUNKED` | Input is split into chunks, each processed separately. |
### InputClassification
```python
@dataclass
class InputClassification
```
Result of `classify_input()`. See [Write Pipeline API](write-api.md#inputclassification) for full field reference.
### ExtractionStrategy
```python
@dataclass
class ExtractionStrategy
```
Result of `select_strategy()`. See [Write Pipeline API](write-api.md#extractionstrategy) for full field reference.
### CorrectionResult
```python
@dataclass
class CorrectionResult
```
Result of correction detection.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `corrections_detected` | `int` | `0` | Number of corrections found. |
| `corrected_keys` | `list[str]` | `[]` | Attribute keys that were corrected. |
| `facts_corrected_ids` | `list[str]` | `[]` | IDs of old facts that were corrected. |
---
## Read Pipeline Types
### ExpandedQuery
```python
@dataclass
class ExpandedQuery
```
Result of query expansion (entity priming).
| Field | Type | Description |
|-------|------|-------------|
| `primed_entities` | `list[str]` | Entity keys discovered via alias + KG priming. |
| `temporal_range` | `tuple[datetime, datetime] | None` | Resolved date range. |
| `expanded_terms` | `list[str]` | Additional context terms from entity facts. |
### PatternQuery
```python
@dataclass
class PatternQuery
```
A pattern-based query for keyword signal matching.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `entity_pattern` | `str` | -- | SQL LIKE pattern for entity_key matching. |
| `attribute_filter` | `str | None` | `None` | Optional attribute key filter. |
### RetrievalPlan
```python
@dataclass
class RetrievalPlan
```
Output of the retrieval agent LLM planner. See [Read Pipeline API](read-api.md#retrievalplan) for full field reference.
### GraphRetrievalResult
```python
@dataclass
class GraphRetrievalResult
```
Result of graph-based BFS 2-hop retrieval.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `facts` | `list[dict[str, Any]]` | `[]` | Scored fact dicts with `source="graph"`. |
| `neighbor_keys` | `list[str]` | `[]` | Entity keys discovered via BFS. |
| `edges_traversed` | `int` | `0` | Total edges examined during BFS. |
| `edges` | `list[dict[str, Any]]` | `[]` | Deduplicated edge dicts with display names. |
### SpreadingActivationResult
```python
@dataclass
class SpreadingActivationResult
```
Result of spreading activation expansion.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `candidates` | `list[RetrievalCandidate]` | `[]` | Expanded candidates from hop 1-2. |
| `meta_observations` | `list[Any]` | `[]` | Relevant meta-observations referencing seed facts. |
| `entities_explored` | `list[str]` | `[]` | Entity keys explored during spreading. |
| `clusters_explored` | `list[str]` | `[]` | Cluster IDs explored during spreading. |
| `hop1_count` | `int` | `0` | Number of facts found in hop 1. |
| `hop2_count` | `int` | `0` | Number of facts found in hop 2. |
| `kg_relationships_explored` | `int` | `0` | Number of KG relationships traversed. |
### CompressedContext
```python
@dataclass
class CompressedContext
```
Result of context compression (tiered hot/warm/cold).
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `context_text` | `str` | `""` | Final prompt-ready context string. |
| `hot_count` | `int` | `0` | Number of facts in hot tier (Tier 1). |
| `warm_count` | `int` | `0` | Number of facts in warm tier (Tier 2). |
| `cold_count` | `int` | `0` | Number of items in cold tier (Tier 3). |
| `total_tokens` | `int` | `0` | Estimated token count of context_text. |
### EmotionalTrendsResult
```python
@dataclass
class EmotionalTrendsResult
```
Result of emotional trend materialization.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `emotion_counts` | `dict[str, int]` | `{}` | Mapping of emotion to occurrence count. |
| `trend_direction` | `str` | `"stable"` | `"increasing"`, `"decreasing"`, or `"stable"`. |
| `dominant_emotion` | `str | None` | `None` | Most frequent emotion. |
| `trigger_keywords` | `list[str]` | `[]` | Top keywords from high-intensity events. |
| `avg_intensity` | `float` | `0.0` | Average emotion intensity. |
| `dominant_intensity` | `float` | `0.0` | Average intensity of the dominant emotion. |
| `dominant_energy` | `str` | `"medium"` | Predominant energy level. |
| `events_analyzed` | `int` | `0` | Number of events analyzed. |
| `observation_created` | `bool` | `False` | Whether a meta-observation was created/updated. |
| `observation_id` | `str | None` | `None` | ID of the created/updated observation. |
### DirectiveBlock
```python
@dataclass
class DirectiveBlock
```
Result of directive generation (procedural memory).
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `text` | `str` | `""` | Cohesive behavioral instructions block. |
| `directive_count` | `int` | `0` | Number of active directives used. |
| `cache_hit` | `bool` | `False` | Whether this was served from cache. |
### ContradictionResult
```python
@dataclass
class ContradictionResult
```
Result of contradiction check between directives.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `has_contradiction` | `bool` | `False` | Whether a contradiction was found. |
| `conflicting_directive` | `str | None` | `None` | Title of the conflicting directive. |
| `resolution` | `str | None` | `None` | Explanation of the resolution. |
---
## Background Job Result Types
### ClusteringResult
```python
@dataclass
class ClusteringResult
```
Result of fact clustering.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `clusters_created` | `int` | `0` | Number of new clusters created. |
| `clusters_reinforced` | `int` | `0` | Number of existing clusters updated. |
| `summaries_generated` | `int` | `0` | Number of cluster summaries generated via LLM. |
| `facts_assigned` | `int` | `0` | Number of facts assigned to clusters. |
### CommunityDetectionResult
```python
@dataclass
class CommunityDetectionResult
```
Result of cross-entity community detection.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `communities_created` | `int` | `0` | New community observations created. |
| `communities_reinforced` | `int` | `0` | Existing community observations reinforced. |
| `clusters_in_communities` | `int` | `0` | Total clusters assigned to communities. |
| `skipped` | `bool` | `False` | Whether detection was skipped. |
| `skip_reason` | `str | None` | `None` | Reason for skipping. |
### ConsolidationResult
```python
@dataclass
class ConsolidationResult
```
Result of L2/L3 consolidation.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `events_processed` | `int` | `0` | Number of events analyzed. |
| `observations_created` | `int` | `0` | New meta-observations created. |
| `observations_reinforced` | `int` | `0` | Existing observations reinforced. |
| `skipped` | `bool` | `False` | Whether consolidation was skipped. |
| `skip_reason` | `str | None` | `None` | Reason for skipping. |
### MemifyResult
```python
@dataclass
class MemifyResult
```
Result of the memify pipeline (vitality scoring, staleness marking, edge management).
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `facts_scored` | `int` | `0` | Number of facts scored for vitality. |
| `facts_marked_stale` | `int` | `0` | Number of facts marked as stale. |
| `edges_reinforced` | `int` | `0` | Number of KG edges reinforced. |
| `merges_executed` | `int` | `0` | Number of entity merges executed. |
### EntityImportanceResult
```python
@dataclass
class EntityImportanceResult
```
Result of entity importance scoring (sleep-time compute).
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `entities_scored` | `int` | `0` | Number of entities scored. |
| `top_entities` | `list[tuple[str, float]]` | `[]` | Top entities by score (key, score) pairs. |
### SummaryRefreshResult
```python
@dataclass
class SummaryRefreshResult
```
Result of entity summary refresh (sleep-time compute).
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `summaries_refreshed` | `int` | `0` | Number of summaries generated. |
| `summaries_skipped` | `int` | `0` | Number of entities skipped. |
---
## Background Functions
### tag_event_emotion
Infer emotion, intensity, and energy from event text via LLM.
```python
async def tag_event_emotion(
event_text: str,
llm: LLMProvider,
) -> dict[str, Any] | None
```
| Parameter | Type | Description |
|-----------|------|-------------|
| `event_text` | `str` | Text to analyze. |
| `llm` | `LLMProvider` | Injected LLM provider. |
**Returns:** Dict with `emotion`, `intensity`, `energy` keys, or `None` on failure.
```python
from arandu.background import tag_event_emotion
result = await tag_event_emotion("I'm so happy today!", llm)
# {"emotion": "joy", "intensity": 0.85, "energy": "high"}
```
---
## Database Models
The SQLAlchemy models below define the persistence layer. They live in `arandu.models` and are useful for advanced queries executed directly against the database.
```mermaid
erDiagram
MemoryEvent ||--o{ MemoryFact : "source_event_id"
MemoryFact ||--o{ MemoryFactEntityLink : "fact_id"
MemoryFact ||--o| MemoryCluster : "cluster_id"
MemoryFact ||--o| MemoryFact : "supersedes_fact_id"
MemoryEntity ||--o{ MemoryEntityAlias : "canonical_entity_key"
MemoryEntity ||--o{ MemoryFactEntityLink : "entity_key"
MemoryEntity ||--o{ MemoryEntityRelationship : "source/target"
MemoryEntityRelationship ||--o| MemoryFact : "evidence_fact_id"
MemoryFact }o--o| MemoryAttributeRegistry : "attribute_key"
MemoryIntention }o--|| MemoryEvent : "agent_id"
SessionObservation }o--|| MemoryEvent : "agent_id"
MemoryEvent {
UUID id PK
Text agent_id
DateTime occurred_at
Text text
Vector embedding_vec
}
MemoryFact {
UUID id PK
Text agent_id
String entity_key
Text fact_text
Float confidence
DateTime valid_from
DateTime valid_to
Vector embedding_vec
}
MemoryEntity {
UUID id PK
Text agent_id
String canonical_key UK
String display_name
String entity_type
Text summary_text
Float importance_score
}
MemoryFactEntityLink {
UUID id PK
UUID fact_id FK
String entity_key
Boolean is_primary
Text agent_id
}
MemoryEntityRelationship {
UUID id PK
Text agent_id
String source_entity_key
String target_entity_key
String rel_type
Float strength
UUID evidence_fact_id FK
}
MemoryEntityAlias {
UUID id PK
Text agent_id
String alias UK
String canonical_entity_key
}
MemoryCluster {
UUID id PK
Text agent_id
String label
Text summary_text
Vector embedding_vec
}
MemoryMetaObservation {
UUID id PK
Text agent_id
String observation_type
Text text
Float confidence
}
MemoryAttributeRegistry {
UUID id PK
String key UK
String status
String value_type
Integer seen_count
}
MemoryIntention {
UUID id PK
Text agent_id
String trigger_type
Text trigger_condition
Text intended_action
String status
}
SessionObservation {
UUID id PK
Text agent_id
Text content
String topic
Boolean is_active
}
```
### MemoryFact
Versioned fact ledger - stores structured facts with validity windows.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `entity_type` | `String` | Entity type (e.g. `"person"`). |
| `entity_key` | `String` | Canonical entity key (e.g. `"person:ana"`). |
| `entity_name` | `String?` | Human-readable entity name. |
| `attribute_key` | `String?` | Attribute key (e.g. `"occupation"`). |
| `fact_text` | `Text` | Natural-language fact sentence. |
| `category` | `String(50)?` | Fact category. |
| `confidence` | `Float` | Confidence score (default 0.8). |
| `importance` | `Float` | Importance score (default 0.5). |
| `is_sensitive` | `Boolean` | Whether the fact contains sensitive data. |
| `valid_from` | `DateTime` | Start of the validity window. |
| `valid_to` | `DateTime?` | End of validity (`NULL` = currently active). |
| `ttl_days` | `Integer?` | Optional time-to-live in days. |
| `source_event_id` | `UUID?` | FK to `MemoryEvent`. |
| `supersedes_fact_id` | `UUID?` | ID of the fact this one replaces. |
| `embedding_vec` | `Vector(1536)` | pgvector embedding for semantic search. |
| `vitality_score` | `Float?` | Sleep-time vitality score. |
| `is_stale` | `Boolean` | Whether marked stale by memify. |
| `cluster_id` | `UUID?` | FK to `MemoryCluster`. |
| `value_json` | `JSONB?` | Structured value (JSON) for the attribute. |
| `needs_confirmation` | `Boolean` | Whether the fact requires user confirmation. |
| `last_confirmed_at` | `DateTime?` | When the fact was last confirmed by the user. |
| `times_retrieved` | `Integer` | How many times this fact was retrieved. |
| `last_retrieved_at` | `DateTime?` | When the fact was last retrieved. |
| `user_correction_count` | `Integer` | Number of times the user corrected this fact. |
| `source_context` | `String(512)?` | Source context of the original input. |
| `agent_annotation` | `Text?` | Free-text annotation added by the agent. |
| `embedding` | `JSONB?` | Raw embedding as JSON (non-pgvector fallback). |
| `search_vector` | `TSVECTOR` | Full-text search index column. |
| `created_at` | `DateTime` | Row creation timestamp. |
| `ingested_at` | `DateTime` | Bi-temporal ingestion timestamp. |
| `invalidated_at` | `DateTime?` | When the fact was invalidated (bi-temporal). |
### MemoryEntity
First-class entity node in the knowledge graph.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `canonical_key` | `String(128)` | Unique canonical key (e.g. `"person:ana"`). |
| `display_name` | `String(256)?` | Human-readable display name. |
| `entity_type` | `String(32)` | Type (`"person"`, `"organization"`, etc.). |
| `summary_text` | `Text?` | LLM-generated entity summary. |
| `embedding_vec` | `Vector(1536)` | Entity embedding. |
| `fact_count` | `Integer` | Number of linked facts. |
| `importance_score` | `Float?` | Sleep-time importance score. |
| `is_active` | `Boolean` | Whether the entity is active. |
| `first_seen_at` | `DateTime` | When the entity was first observed. |
| `last_seen_at` | `DateTime` | When the entity was last observed. |
| `summary_refreshed_at` | `DateTime?` | When the entity summary was last refreshed. |
| `created_at` | `DateTime` | Row creation timestamp. |
Unique constraint: `(agent_id, canonical_key)`.
### MemoryEntityAlias
Maps alias names to canonical entity keys for entity resolution.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `alias` | `String` | Alias text (e.g. `"Ana"`). |
| `canonical_entity_key` | `String` | Target canonical key. |
| `canonical_entity_type` | `String` | Target entity type. |
| `created_at` | `DateTime` | Row creation timestamp. |
Unique constraint: `(agent_id, alias)`.
### MemoryEntityRelationship
Directed edge between two entities in the knowledge graph.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `source_entity_key` | `String(128)` | Source entity canonical key. |
| `target_entity_key` | `String(128)` | Target entity canonical key. |
| `rel_type` | `String(64)` | Relationship type (e.g. `"works_at"`, `"mentored_by"`). |
| `strength` | `Float` | Edge strength (default 0.8). |
| `evidence_fact_id` | `UUID?` | FK to the fact that evidences this edge. |
| `provenance` | `String(16)` | How the edge was created (`"rule"`, `"llm"`). |
| `valid_from` | `DateTime` | Start of validity. |
| `valid_to` | `DateTime?` | End of validity (`NULL` = active). |
| `created_at` | `DateTime` | Row creation timestamp. |
| `updated_at` | `DateTime` | Last update timestamp. |
| `last_used_at` | `DateTime?` | When the relationship was last used in retrieval. |
| `invalidated_at` | `DateTime?` | When the relationship was invalidated. |
Unique constraint: `(agent_id, source_entity_key, target_entity_key, rel_type)`.
#### Dynamic Relationship Types
The `rel_type` field accepts **any** short, descriptive `snake_case` string - it is not restricted to a fixed set. The extraction pipeline instructs the LLM to choose the most descriptive type for each relationship.
Common types (used as examples in the extraction prompt, not as restrictions):
`works_at`, `manages`, `reports_to`, `family_of`, `friend_of`, `partner_of`, `owns`, `lives_in`, `member_of`, `studies_at`, `works_with`
The LLM may also produce types like `mentored_by`, `inspired_by`, `competed_with`, or any other descriptive type.
**Normalization**: All relationship types are normalized via `normalize_rel_type()` before persistence:
- Lowercase + underscores (e.g. `"Mentored By"` → `"mentored_by"`)
- Known aliases are mapped to common types (e.g. `"boss"` → `"reports_to"`, `"spouse"` → `"partner_of"`)
- Unknown types pass through after sanitization
The `CANONICAL_REL_TYPES` set in `arandu.constants` is available as a **reference** for consumers who want to filter by known types, but it is not used as a validation filter.
See [Evidence Linkage & Cascade Invalidation](../concepts/write-pipeline.md#evidence-linkage-cascade-invalidation) for how relationships are linked to supporting facts and automatically cleaned up when facts change.
### MemoryEvent
Immutable event log - stores all user messages with embeddings.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `occurred_at` | `DateTime` | When the event happened. |
| `text` | `Text` | Event text content. |
| `source` | `String` | Origin (default `"api"`). |
| `importance` | `Float` | Importance score (default 0.5). |
| `embedding_vec` | `Vector(1536)` | Event embedding for retrieval. |
| `embedding` | `JSONB?` | Raw embedding as JSON (non-pgvector fallback). |
| `trace_json` | `JSONB?` | Trace/debug metadata for the event. |
| `created_at` | `DateTime` | Row creation timestamp. |
| `emotion_primary` | `String(32)?` | Primary emotion label. |
| `emotion_intensity` | `Float?` | Emotion intensity (0-1). |
| `energy_level` | `String(16)?` | Energy level (`"low"`, `"medium"`, `"high"`). |
### MemoryCluster
Semantic cluster grouping related facts for richer context.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `label` | `String(128)` | Cluster label. |
| `summary_text` | `Text?` | LLM-generated cluster summary. |
| `cluster_type` | `String(32)` | Cluster type (default `"auto"`). |
| `fact_count` | `Integer` | Number of facts in the cluster. |
| `importance` | `Float` | Cluster importance (default 0.5). |
| `embedding_vec` | `Vector(1536)` | Cluster embedding. |
| `is_active` | `Boolean` | Whether the cluster is active. |
| `last_updated_at` | `DateTime` | When the cluster was last updated. |
| `created_at` | `DateTime` | Row creation timestamp. |
### MemoryMetaObservation
Meta-observations derived from consolidation - patterns, insights, trends.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `observation_type` | `String(32)` | Type (`"pattern"`, `"trend"`, `"community"`, etc.). |
| `title` | `String(256)` | Short title. |
| `text` | `Text` | Full observation text. |
| `supporting_event_ids` | `JSONB` | List of supporting event UUIDs. |
| `supporting_fact_ids` | `JSONB` | List of supporting fact UUIDs. |
| `confidence` | `Float` | Confidence (default 0.7). |
| `importance` | `Float` | Importance (default 0.5). |
| `times_reinforced` | `Integer` | How many times this observation was reinforced. |
| `is_active` | `Boolean` | Whether the observation is active. |
| `embedding_vec` | `Vector(1536)` | Observation embedding. |
| `first_detected_at` | `DateTime` | When the observation was first detected. |
| `last_reinforced_at` | `DateTime` | When the observation was last reinforced. |
| `created_at` | `DateTime` | Row creation timestamp. |
### MemoryFactEntityLink
Cross-reference table linking each fact to ALL entities it mentions - enables cross-entity retrieval without fact duplication.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `fact_id` | `UUID` | FK to `MemoryFact`. |
| `entity_key` | `String` | Entity canonical key (e.g. `"person:clara_rezende"`). |
| `is_primary` | `Boolean` | Whether this entity is the fact's primary subject. |
| `agent_id` | `Text` | Owner agent ID. |
Unique constraint: `(fact_id, entity_key)`.
Indexes: `(agent_id, entity_key)` for retrieval, `(fact_id)` for cascade deletes.
### MemoryAttributeRegistry
Registry for managing attribute keys - tracks proposed vs active keys.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `key` | `String(64)` | Unique attribute key. |
| `status` | `String(20)` | `"proposed"` or `"active"`. |
| `value_type` | `String(20)` | Expected value type (default `"string"`). |
| `conflict_policy` | `String(20)` | How to handle conflicts (default `"supersede"`). |
| `ttl_days` | `Integer?` | Optional default TTL for facts with this key. |
| `seen_count` | `Integer` | How many times this key has been seen. |
| `proposed_by` | `String(20)` | Who proposed the key (`"llm"`, `"user"`). |
| `reason` | `Text?` | Why the key was proposed. |
| `first_seen_at` | `DateTime` | When the attribute key was first seen. |
| `last_seen_at` | `DateTime` | When the attribute key was last seen. |
| `example_raw_key` | `String(128)?` | Example of the raw key before normalization. |
| `created_at` | `DateTime` | Row creation timestamp. |
| `updated_at` | `DateTime?` | Last update timestamp. |
### MemoryIntention
Prospective memory -- future intentions with time-based or event-based triggers.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `trigger_type` | `String(16)` | Trigger type (`"time"`, `"event"`, etc.). |
| `trigger_condition` | `Text` | Natural-language description of the trigger condition. |
| `intended_action` | `Text` | What the agent should do when triggered. |
| `due_date` | `DateTime?` | Optional due date for time-based triggers. |
| `status` | `String(16)` | Status (`"pending"`, `"triggered"`, `"fulfilled"`, `"expired"`). Default `"pending"`. |
| `trigger_embedding_vec` | `Vector(1536)` | Embedding of the trigger condition for semantic matching. |
| `source_context` | `String(32)?` | Source context identifier. |
| `outcome_note` | `Text?` | Note about the outcome after fulfillment. |
| `created_at` | `DateTime` | Row creation timestamp. |
| `triggered_at` | `DateTime?` | When the intention was triggered. |
| `fulfilled_at` | `DateTime?` | When the intention was fulfilled. |
### SessionObservation
Persistent session observations created by the LLM-driven Observer -- captures in-session context that persists across turns.
| Column | Type | Description |
|--------|------|-------------|
| `id` | `UUID` | Primary key. |
| `agent_id` | `Text` | Owner agent ID (any string: UUID, slug, numeric, etc.). |
| `content` | `Text` | Observation content text. |
| `topic` | `String(64)?` | Topic tag for the observation. |
| `entities_mentioned` | `JSONB` | List of entity keys mentioned in the observation. |
| `created_at` | `DateTime` | Row creation timestamp. |
| `referenced_at` | `DateTime?` | When the observation was last referenced. |
| `relative_offset` | `String(64)?` | Relative time offset descriptor (e.g. `"2 messages ago"`). |
| `source_message_ids` | `JSONB` | List of source message IDs that originated this observation. |
| `is_active` | `Boolean` | Whether the observation is active. |
| `merged_into_id` | `UUID?` | ID of the observation this one was merged into. |
| `emotion_label` | `String(32)?` | Detected emotion label for the session context. |
| `embedding_vec` | `Vector(1536)` | Observation embedding for semantic retrieval. |
---
# Configuration Reference
All tuning parameters for the Arandu SDK live in `MemoryConfig`. Every field has a sensible default - override only what you need.
```python
from arandu.config import MemoryConfig
config = MemoryConfig(
topk_facts=10,
enable_reranker=False,
min_score=0.15,
)
```
You can also override per-request via `config_overrides`:
```python
result = await memory.retrieve(
agent_id="user_123",
query="...",
config_overrides={"topk_facts": 5, "enable_reranker": False},
)
```
---
## Extraction
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `extraction_timeout_sec` | `float` | `30.0` | Timeout per LLM call during extraction. |
## Entity Resolution
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `fuzzy_threshold` | `float` | `0.85` | Cosine similarity threshold for direct fuzzy match. Above this → auto-resolve. Below 0.50 → new entity. Between → LLM decides. |
| `enable_llm_resolution` | `bool` | `True` | Whether to use LLM for ambiguous entity matches. When `False`, ambiguous cases create new entities instead. |
## Retrieval
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `topk_facts` | `int` | `20` | Maximum facts returned by `retrieve()`. |
| `topk_events` | `int` | `8` | Maximum events included in context. |
| `event_max_scan` | `int` | `200` | Maximum events scanned for relevance. |
| `min_similarity` | `float` | `0.20` | Minimum cosine similarity for semantic search candidates. |
| `min_confidence` | `float` | `0.55` | Minimum confidence for facts to be considered. |
| `min_score` | `float` | `0.15` | Minimum final score for facts to be included in results. Set higher (e.g., `0.20`) to filter low-relevance facts. |
| `recency_half_life_days` | `int` | `14` | Half-life for recency decay scoring. |
## Reranker
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `enable_reranker` | `bool` | `True` | Enable LLM-based reranking. When enabled, the reranker uses a multiplicative blend with the formula score - `score_weights` still matter for the base score. |
| `reranker_timeout_sec` | `float` | `5.0` | Timeout for the reranker LLM call. Falls back to original ranking on timeout. |
| `reranker_weight` | `float` | `0.70` | Weight of reranker score in the multiplicative blend: `final = formula × (floor + w × reranker)` where `floor = 1 - w`. Higher = more reranker influence. Lower = more formula score influence. |
| `min_reranker_score` | `float` | `0.10` | Minimum reranker score for a fact to survive. Facts below this threshold are eliminated (final_score = 0.0), giving the reranker veto power over irrelevant facts. Only applies when `enable_reranker=True`. Set `0.05` for more permissive, `0.20` for stricter. |
## Score Weights
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `score_weights` | `dict` | `{"semantic": 0.70, "recency": 0.20, "importance": 0.10}` | Weights for hybrid ranking formula. Always affects the base formula score, which the reranker blends with multiplicatively. |
## Confidence
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `confidence_level_map` | `dict` | `{explicit: 0.95, strong: 0.80, weak: 0.60, speculation: 0.40}` | Confidence scores assigned during extraction. |
| `confidence_default` | `float` | `0.60` | Default confidence when LLM doesn't specify. |
## Spreading Activation
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `spreading_activation_hops` | `int` | `2` | Maximum hops from seed facts. Set to `0` to disable. |
| `spreading_decay_factor` | `float` | `0.50` | Score decay per hop. Hop 1 = factor, Hop 2 = factor². |
| `spreading_max_related_entities` | `int` | `5` | Max KG-related entities explored per hop. |
| `spreading_facts_per_entity` | `int` | `3` | Max facts fetched per entity in spreading. |
## Context Compression
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `context_max_tokens` | `int` | `2000` | Proportional token budget for context compression. Not a hard cap. |
| `hot_tier_ratio` | `float` | `0.5` | Fraction of budget for top-scoring facts (full detail). |
| `warm_tier_ratio` | `float` | `0.3` | Fraction of budget for mid-scoring facts (summarized). |
## Emotional Trends
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `emotional_trend_window_days` | `int` | `30` | Lookback window for emotional trend detection. |
| `emotional_trend_min_events` | `int` | `5` | Minimum events to compute a trend. |
## Clustering
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `cluster_max_age_days` | `int` | `90` | Maximum age of facts included in clustering. |
| `cluster_min_facts` | `int` | `2` | Minimum facts per cluster. |
| `community_similarity_threshold` | `float` | `0.75` | Cosine similarity threshold for grouping clusters into communities. |
| `community_min_clusters` | `int` | `2` | Minimum clusters to form a community. |
## Consolidation
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `consolidation_min_events` | `int` | `3` | Minimum events before running consolidation. |
| `consolidation_lookback_days` | `int` | `7` | How far back to look for patterns. |
## Sleep-Time Compute
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `importance_recency_halflife_days` | `int` | `30` | Half-life for importance score recency signal. |
| `summary_refresh_interval_days` | `int` | `7` | Entity summaries older than this are marked stale. |
## Memify
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `vitality_stale_threshold` | `float` | `0.2` | Vitality score below which facts are considered stale. |
| `memify_merge_similarity_threshold` | `float` | `0.90` | Threshold for merging similar procedural memories. |
## Procedural Memory
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `directive_max_tokens` | `int` | `300` | Max tokens for procedural directive generation. |
| `directive_cache_ttl_minutes` | `int` | `30` | TTL for directive cache. |
| `contradiction_similarity_threshold` | `float` | `0.80` | Threshold for detecting contradictions. |
## Locale
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `timezone` | `str` | `"UTC"` | IANA timezone for temporal resolution in retrieval. |
## Open Catalog (Extensions)
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `extra_attribute_keys` | `set[str]` | `set()` | Additional attribute keys accepted by the system. |
| `attribute_aliases` | `dict[str, str]` | `{}` | Aliases for attribute keys. |
| `extra_namespaces` | `set[str]` | `set()` | Additional entity namespaces. |
| `extra_self_references` | `frozenset[str]` | `frozenset()` | Additional terms that resolve to `user:self`. |
| `extra_relationship_hints` | `frozenset[str]` | `frozenset()` | Additional relationship hint patterns. |
## Limits
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `max_facts_per_event` | `int` | `100` | Maximum facts extracted per message (safety limit). |
| `embedding_dimensions` | `int` | `1536` | Embedding vector dimensions (must match your provider). |
---
# Database Schema
Arandu uses PostgreSQL with pgvector. All tables are created automatically by `memory.initialize()`. This page documents each table for debugging, querying, and understanding the data model.
---
## Core Tables
### memory_events
Immutable audit log. Every `write()` call creates one event.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `content` | TEXT | Raw message text |
| `embedding` | VECTOR | Message embedding |
| `emotion` | VARCHAR | Detected emotion (joy, sadness, anger, etc.) |
| `emotion_intensity` | FLOAT | Emotion intensity 0.0 - 1.0 |
| `energy_level` | VARCHAR | high, medium, or low |
| `created_at` | TIMESTAMP | When the event was created |
### memory_facts
Versioned factual knowledge. Each fact is a self-contained natural language statement about an entity.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `entity_type` | VARCHAR | Free-form entity type (person, organization, place, etc.) |
| `entity_key` | VARCHAR | Canonical entity key (e.g., `person:carlos`) |
| `entity_name` | VARCHAR | Display name of the entity |
| `attribute_key` | VARCHAR | Optional attribute category |
| `fact_text` | TEXT | The fact in natural language |
| `embedding` | VECTOR | Fact text embedding |
| `confidence` | FLOAT | Extraction confidence 0.0 - 1.0 |
| `importance` | FLOAT | Base importance score |
| `source_event_id` | UUID | FK to the event that created this fact |
| `supersedes_fact_id` | UUID | FK to the fact this one replaces (UPDATE chain) |
| `valid_from` | TIMESTAMP | When this fact became active |
| `valid_to` | TIMESTAMP | When this fact was superseded (NULL = active) |
| `invalidated_at` | TIMESTAMP | When explicitly invalidated |
| `is_stale` | BOOLEAN | Marked stale by memify |
| `last_confirmed_at` | TIMESTAMP | Last NOOP confirmation |
| `times_retrieved` | INT | Retrieval counter |
| `last_retrieved_at` | TIMESTAMP | Last retrieval time |
| `source_context` | VARCHAR | Origin marker (e.g., `inferred_from_relation` for mirror facts) |
| `cluster_id` | UUID | FK to cluster |
| `created_at` | TIMESTAMP | Row creation time |
### memory_fact_entity_links
Cross-entity links. Each fact is linked to ALL entities it mentions, not just its primary subject.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `fact_id` | UUID | FK to memory_facts (CASCADE delete) |
| `entity_key` | VARCHAR | Entity this fact is linked to |
| `is_primary` | BOOLEAN | True if this is the fact's primary subject |
| `agent_id` | TEXT | Agent identifier |
**Unique constraint:** `(fact_id, entity_key)` - one link per fact-entity pair.
**Indexes:** `(agent_id, entity_key)` for retrieval queries, `(fact_id)` for cascade operations.
### memory_entities
Canonical entity records. Created during entity resolution.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `canonical_key` | VARCHAR | Unique key (e.g., `person:carlos`) |
| `display_name` | VARCHAR | Human-readable name |
| `entity_type` | VARCHAR | Free-form type string |
| `embedding_vec` | VECTOR | Entity name embedding |
| `summary_text` | TEXT | LLM-generated summary (from background jobs) |
| `profile_text` | TEXT | Living entity profile (~100-300 tokens). Updated synchronously by the write pipeline during informed extraction. Used as input context for subsequent extractions (replacing per-fact retrieval when available) and injected as "Tier 0" in retrieval context compression. Nullable -- `NULL` until the first write about this entity. Coexists with `summary_text` (background job). See [Entity Profiles](../concepts/write-pipeline.md#entity-profiles) |
| `profile_refreshed_at` | TIMESTAMP | When `profile_text` was last updated. Nullable -- `NULL` when no profile has been generated yet. Updated in the same transaction as fact upserts |
| `importance_score` | FLOAT | Computed importance 0.0 - 1.0 |
| `fact_count` | INT | Number of linked facts |
| `is_active` | BOOLEAN | Whether entity is active |
### memory_entity_aliases
Alias cache for fast exact-match entity resolution.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `alias` | VARCHAR | Normalized alias text |
| `canonical_entity_key` | VARCHAR | Resolved entity key |
| `canonical_entity_type` | VARCHAR | Entity type |
**Unique constraint:** `(agent_id, alias)` - first-write-wins semantics.
### memory_entity_relationships
Knowledge graph edges between entities.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `source_entity_key` | VARCHAR | Source entity |
| `target_entity_key` | VARCHAR | Target entity |
| `rel_type` | VARCHAR | Relationship type (snake_case, free-form) |
| `strength` | FLOAT | 0.0 - 1.0, reinforced on repetition |
| `evidence_fact_id` | UUID | FK to the fact supporting this relationship |
| `valid_from` | TIMESTAMP | When created |
| `valid_to` | TIMESTAMP | When invalidated (NULL = active) |
| `invalidated_at` | TIMESTAMP | Cascade invalidation timestamp |
**Unique constraint:** `(agent_id, source_entity_key, target_entity_key, rel_type)`.
> **Relationships are unidirectional:** `ana → works_at → acme` does NOT create `acme → employs → ana`. Graph retrieval traverses both directions, but the edge itself is one-way.
---
## Supporting Tables
### memory_clusters
Semantic fact clusters (created by background jobs).
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `entity_type` | VARCHAR | Cluster entity type |
| `entity_key` | VARCHAR | Cluster entity key |
| `summary` | TEXT | LLM-generated cluster summary |
| `embedding` | VECTOR | Cluster embedding |
| `created_at` | TIMESTAMP | Creation time |
### memory_meta_observations
Higher-order patterns detected by consolidation.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `observation_type` | VARCHAR | Type: insight, pattern, contradiction, trend, entity_community |
| `content` | TEXT | Observation text |
| `supporting_fact_ids` | JSONB | Array of fact IDs supporting this observation |
| `is_active` | BOOLEAN | Whether still relevant |
| `created_at` | TIMESTAMP | Creation time |
### memory_attribute_registry
Tracks known attribute keys per user.
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `attribute_key` | VARCHAR | Attribute key |
| `first_seen_at` | TIMESTAMP | When first used |
### memory_intentions
User intentions detected from events (experimental).
| Column | Type | Description |
|--------|------|-------------|
| `id` | UUID | Primary key |
| `agent_id` | TEXT | Agent identifier |
| `intention` | TEXT | Detected intention |
| `source_event_id` | UUID | Source event |
| `confidence` | FLOAT | Detection confidence |
| `created_at` | TIMESTAMP | Creation time |
---
# Building a Personal Assistant
This guide shows how to give an AI agent **human-like memory** using Arandu — so it remembers what it heard, what it said, who told it what, and what it did.
## The Problem
Think about how your own memory works. When you talk to someone:
- You remember **what they told you** — "Pedro told me he lives in Porto Alegre"
- You remember **what you said** — "I recommended a restaurant to him"
- You remember **what you learned about someone else** from the conversation — "Pedro mentioned his girlfriend Ana is a designer"
- You remember **what you did** — "I created a document for Pedro"
- When asked about any topic, you recall **everything you know**, regardless of who told you or when
An AI agent has none of this by default. Every session starts blank. The agent doesn't know what happened before, who said what, or what it did. It has no memory.
Arandu gives your agent the same kind of memory a human has. Every fact carries who it's about, who said it, and when — and retrieval works like human recall: ask about a topic, get everything relevant.
## Architecture
A personal assistant needs three layers of memory:
| Layer | Tool | What it stores |
|---|---|---|
| **Semantic memory** | **Arandu** | Facts about people, preferences, actions, knowledge |
| **Procedural memory** | System prompt / config | How the assistant should behave |
| **Episodic memory** | File system / database | Daily logs, task boards, structured records |
Arandu handles semantic memory — the "what I know" layer. It extracts facts from natural language, reconciles them with existing knowledge, and retrieves by relevance.
## Core Pattern: Record Both Sides
The key insight: **record both the user's messages and the assistant's responses**.
```python
# User said something → record with user's speaker_name
await memory.write(
agent_id="my-assistant",
message="My girlfriend Ana is a designer, she works at Stone",
speaker_name="Pedro",
)
# Extracted facts:
# "Ana is a designer" (entity: person:ana)
# "Ana works at Stone" (entity: person:ana, organization:stone)
# "Ana is Pedro's girlfriend" (entity: person:ana)
# Assistant responded → record with assistant's speaker_name
await memory.write(
agent_id="my-assistant",
message="Created the note 'Ana.md' in the vault and scheduled a birthday reminder for May 15",
speaker_name="Assistant",
)
# Extracted facts:
# "Assistant created note Ana.md in the vault" (entity: person:assistant)
# "Assistant scheduled birthday reminder for May 15" (entity: person:assistant)
```
The `speaker_name` resolves pronouns: "I created" with `speaker_name="Assistant"` becomes "Assistant created". Each fact carries `speaker` metadata showing who said it.
## Session Flow
```
┌─────────────────────────────────────────────────────┐
│ Session start │
│ │
│ 1. retrieve(query="recent context") │
│ → Recovers what the assistant knows and did │
│ → Inject into system prompt as context │
│ │
├─────────────────────────────────────────────────────┤
│ During conversation │
│ │
│ 2. User sends message │
│ → write(message=msg, speaker_name="Pedro") │
│ │
│ 3. Assistant processes and responds │
│ → retrieve(query=msg) to get relevant context │
│ → Generate response │
│ → write(message=response, speaker_name="Assist") │
│ │
├─────────────────────────────────────────────────────┤
│ Session end │
│ │
│ Nothing special. Everything was recorded during │
│ the conversation. Next session starts with retrieve.│
└─────────────────────────────────────────────────────┘
```
## Retrieval Examples
Once both sides are recorded, the assistant can answer questions about anyone and anything:
```python
# "What do I know about Ana?"
result = await memory.retrieve(agent_id="my-assistant", query="Ana")
# Returns ALL facts about Ana, regardless of who said them:
# - Ana is a designer (Pedro said)
# - Ana works at Stone (Pedro said)
# - Assistant created note Ana.md (Assistant did)
# "What did I do recently?"
result = await memory.retrieve(agent_id="my-assistant", query="what was done recently")
# Returns assistant's actions:
# - Created note Ana.md
# - Scheduled birthday reminder
# Scoped to a specific entity
result = await memory.retrieve(
agent_id="my-assistant",
query="work",
entity_keys=["person:ana"],
)
# Returns only work-related facts about Ana
```
`entity_keys` accepts aliases too — pass `"person:ana"` even if the canonical key is `person:ana_silva`, and the SDK resolves it via `memory_entity_aliases`. Keys that do not resolve are surfaced in `result.warnings` so the caller can distinguish "no matches" from "bad key":
```python
result = await memory.retrieve(
agent_id="my-assistant",
query="work",
entity_keys=["person:ana", "person:unknown"],
)
# result.warnings → ["entity_key 'person:unknown' not found (not canonical, no matching alias)"]
```
## Speaker Provenance
Every fact carries a `speaker` field showing who said the message it was extracted from. This is available in both `FactDetail` (from `get`/`get_all`) and `ScoredFact` (from `retrieve`):
```python
result = await memory.retrieve(agent_id="my-assistant", query="Stone")
for fact in result.facts:
print(f"[{fact.speaker}] {fact.fact_text}")
# Output:
# [Pedro] Ana works at Stone
# [Assistant] Stone is a Brazilian fintech, founded in 2012
```
No competitor persists speaker provenance natively — this is an Arandu differentiator.
## Managing Memory
The assistant can inspect and manage its own memory:
```python
# List all known entities
entities = await memory.entities(agent_id="my-assistant")
for e in entities:
print(f"[{e.entity_type}] {e.display_name} ({e.fact_count} facts)")
# [person] Pedro (12 facts)
# [person] Ana (5 facts)
# [organization] Stone (3 facts)
# [person] Assistant (8 facts)
# List facts about a specific entity
ana_facts = await memory.get_all(
agent_id="my-assistant",
entity_keys=["person:ana"],
)
# Delete a specific fact
await memory.delete(agent_id="my-assistant", fact_id="some-uuid")
# Reset all memory (use with caution)
await memory.delete_all(agent_id="my-assistant")
```
## Complete Example
A minimal personal assistant with memory:
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def handle_message(
memory: MemoryClient,
agent_id: str,
user_msg: str,
speaker: str,
) -> str:
# 1. Record user message
await memory.write(
agent_id=agent_id,
message=user_msg,
speaker_name=speaker,
)
# 2. Retrieve relevant context
context = await memory.retrieve(agent_id=agent_id, query=user_msg)
# 3. Generate response (replace with your LLM call)
llm = memory._llm # reuse the SDK's provider
response = await llm.complete(
messages=[
{
"role": "system",
"content": f"You are a personal assistant. Memory context:\n{context.context}",
},
{"role": "user", "content": user_msg},
]
)
# 4. Record assistant response
await memory.write(
agent_id=agent_id,
message=response.text,
speaker_name="Assistant",
)
return response.text
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost:5432/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
agent_id = "pedro-assistant"
try:
# Session start: check what happened before
recent = await memory.retrieve(agent_id=agent_id, query="recent context")
if recent.facts:
print("Resuming with context from previous sessions...")
# Simulate conversation
reply = await handle_message(
memory, agent_id,
"My girlfriend Ana is a designer at Stone",
speaker="Pedro",
)
print(f"Assistant: {reply}")
reply = await handle_message(
memory, agent_id,
"What do you know about Ana?",
speaker="Pedro",
)
print(f"Assistant: {reply}")
finally:
await memory.close()
asyncio.run(main())
```
## How Arandu Compares
| Capability | Mem0 | Zep | Letta | Arandu |
|---|---|---|---|---|
| Fact extraction from text | Yes | Yes | No (manual) | Yes |
| Filter on retrieve | `user_id`/`agent_id` | `group_ids` | Block labels | `entity_keys` |
| Speaker provenance | No | Indirect (episode lookup) | No | **Yes (native)** |
| Entity graph | Optional | Yes | No | Yes |
| CRUD operations | Yes | Yes | Yes | Yes |
| Reconciliation (dedup) | Yes | Automatic | Manual | Yes (LLM-based) |
---
# Configuration
All memory system parameters are configured through a single `MemoryConfig` dataclass. Every parameter has a sensible default - override only what matters for your use case.
```python
from arandu import MemoryClient, MemoryConfig
config = MemoryConfig(
extraction_timeout_sec=15.0,
topk_facts=30,
enable_reranker=True,
)
memory = MemoryClient(
database_url="postgresql+psycopg://...",
llm=provider,
embeddings=provider,
config=config,
)
```
---
## Extraction
Parameters controlling how facts, entities, and relationships are extracted from messages.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `extraction_timeout_sec` | `float` | `30.0` | Timeout per LLM call during extraction. On timeout, extraction returns an empty result (fail-safe) - no exception is raised. See [Fail-safe Behavior](#fail-safe-timeout) |
| `enable_informed_extraction` | `bool` | `True` | Enable memory-aware informed extraction. When `True` (default), the write pipeline performs alias lookup + pre-retrieval before a single LLM call that receives existing knowledge as context, eliminating duplicates at the source. When `False`, uses the legacy blind extraction flow (entity scan + fact extraction + relation extraction + reconciliation). Falls back to blind extraction automatically on failure. See [Write Pipeline - Memory-Aware Extraction](concepts/write-pipeline.md#stage-1-memory-aware-extraction) |
| `informed_extraction_topk` | `int` | `10` | Number of existing facts to retrieve per entity during the pre-retrieval step of informed extraction. Only used when the entity has no `profile_text` (profiles replace individual fact retrieval when available). Higher values give the LLM more context about existing knowledge but cost more prompt tokens |
| `informed_extraction_context_budget_tokens` | `int` | `800` | Maximum token budget for the existing knowledge context injected into the informed extraction LLM call. Distributed across all matched entities. Controls how much prior knowledge the LLM sees. Increase for entities with rich histories; decrease if prompt length is a concern |
### `occurred_at` Parameter (on `write()`)
The `write()` method accepts an optional `occurred_at` parameter (`datetime | None`, default: `None` which means "now"). This timestamp tells the extraction pipeline when the message was sent, and is used to resolve relative time references in the message text.
For example, if `occurred_at=datetime(2025, 6, 15)` and the message says "yesterday I went to the beach", the extraction resolves "yesterday" to June 14, 2025 and includes the absolute date in the fact text.
This is primarily useful for **historical imports** -- when you're ingesting past conversations and want the extraction to correctly resolve temporal references relative to when the message was originally sent, not when the import runs.
```python
from datetime import datetime, UTC
# Importing a historical conversation
result = await memory.write(
agent_id="user_123",
message="Yesterday I started my new job at Acme Corp",
speaker_name="Rafael",
occurred_at=datetime(2025, 3, 10, tzinfo=UTC), # resolves "yesterday" to March 9
)
```
**Tips:**
- The extraction model is determined by the `LLMProvider` you inject into `MemoryClient`. To use a cheaper model for extraction, inject a provider configured with that model
- Lower `extraction_timeout_sec` if you need faster responses at the cost of potentially missed extractions
- Set `enable_informed_extraction=False` to revert to the legacy extraction flow if your LLM provider struggles with the unified prompt (e.g., smaller models)
- Increase `informed_extraction_topk` (e.g., 20) for agents with many facts per entity where more context helps the LLM avoid duplicates
- Increase `informed_extraction_context_budget_tokens` (e.g., 1200) for complex domains where entities have extensive histories
---
## Entity Resolution
Parameters controlling how extracted entity mentions are resolved to canonical entity records.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `fuzzy_threshold` | `float` | `0.85` | Cosine similarity threshold for direct fuzzy match. Score ≥ this value resolves directly; score between 0.50 and this value forwards to LLM; score < 0.50 creates new entity. Lowering this value expands the fuzzy-resolve range and reduces LLM calls |
| `enable_llm_resolution` | `bool` | `True` | Whether to use an LLM for ambiguous fuzzy matches (0.50 - `fuzzy_threshold` range). When `False`, ambiguous candidates create a new entity instead |
**Tips:**
- Lower `fuzzy_threshold` (e.g., 0.75) to be more aggressive in matching similar entity names - this shrinks the "ambiguous" range that requires LLM calls
- Set `enable_llm_resolution=False` to skip the LLM fallback for ambiguous matches (faster, but may create more duplicate entities)
- The LLM model for entity resolution and reconciliation is determined by the `LLMProvider` you inject into `MemoryClient`
---
## Retrieval
Parameters controlling how facts are retrieved in response to queries.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `topk_facts` | `int` | `20` | Maximum number of facts to return |
| `topk_events` | `int` | `8` | Maximum number of events to consider for context |
| `event_max_scan` | `int` | `200` | Maximum events to scan during retrieval |
| `min_similarity` | `float` | `0.20` | Minimum cosine similarity for semantic search results |
| `min_confidence` | `float` | `0.55` | Minimum fact confidence to include in retrieval results |
> **`min_confidence` is a **read-time filter only**:** All facts are persisted during write regardless of their confidence score. Filtering happens during `memory.retrieve()`. This is by design: confidence can be adjusted over time via reinforcement (NOOP confirmations), and discarding facts at write-time would be irreversible.
| `recency_half_life_days` | `int` | `14` | Half-life (in days) for exponential recency decay |
| `enable_reranker` | `bool` | `True` | Whether to use LLM reranking on retrieval results |
| `reranker_timeout_sec` | `float` | `5.0` | Timeout for reranker LLM calls |
**Tips:**
- Increase `topk_facts` (e.g., 50) for broader context at the cost of more noise
- Lower `min_similarity` (e.g., 0.10) to catch more distant semantic matches
- Increase `recency_half_life_days` (e.g., 30) if older facts should remain relevant longer
- Set `enable_reranker=False` for faster retrieval when precision is less critical
---
## Score Weights
Weights for the hybrid ranking formula that combines multiple retrieval signals.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `score_weights` | `dict` | `{"semantic": 0.70, "recency": 0.20, "importance": 0.10}` | Weights for each scoring signal (must sum to ~1.0) |
```python
config = MemoryConfig(
score_weights={
"semantic": 0.60, # reduce semantic, boost other signals
"recency": 0.25,
"importance": 0.15,
},
)
```
**Tips:**
- Increase `"recency"` weight for applications where freshness matters more than semantic relevance
- Increase `"importance"` weight to favor well-established entities and frequently mentioned facts
---
## Confidence
Parameters controlling confidence levels assigned to extracted facts.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `confidence_level_map` | `dict` | `{"explicit_statement": 0.95, "strong_inference": 0.80, "weak_inference": 0.60, "speculation": 0.40}` | Mapping from confidence level names to numeric scores |
| `confidence_default` | `float` | `0.60` | Default confidence when the LLM doesn't specify a level |
---
## Spreading Activation
Parameters controlling how context expands from seed facts along entity relationships.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `spreading_activation_hops` | `int` | `2` | Maximum number of relationship hops from seed facts |
| `spreading_decay_factor` | `float` | `0.50` | Score decay multiplier per hop (0.5 = halved each hop) |
| `spreading_max_related_entities` | `int` | `5` | Maximum related entities to follow per seed |
| `spreading_facts_per_entity` | `int` | `3` | Maximum facts to pull from each related entity |
---
## Context Compression
Parameters controlling how retrieved facts are compressed into the final context string.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `context_max_tokens` | `int` | `2000` | Maximum tokens in the formatted context output |
| `hot_tier_ratio` | `float` | `0.50` | Share of token budget for highest-scored facts |
| `warm_tier_ratio` | `float` | `0.30` | Share of token budget for supporting facts |
The remaining budget (1 - hot - warm = 0.20) goes to the cold tier (background context).
---
## Emotional Trends
Parameters for detecting emotional patterns in user messages.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `emotional_trend_window_days` | `int` | `30` | Window for analyzing emotional trends |
| `emotional_trend_min_events` | `int` | `5` | Minimum events required to detect a trend |
---
## Clustering
Parameters for the fact clustering background job.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `cluster_max_age_days` | `int` | `90` | Maximum age of facts to include in clustering |
| `cluster_min_facts` | `int` | `2` | Minimum facts per cluster |
| `community_similarity_threshold` | `float` | `0.75` | Cosine similarity threshold for grouping clusters into communities |
| `community_min_clusters` | `int` | `2` | Minimum clusters to form a community |
---
## Consolidation
Parameters for the consolidation background job.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `consolidation_min_events` | `int` | `3` | Minimum events before running consolidation |
| `consolidation_lookback_days` | `int` | `7` | How far back (in days) to look for patterns |
---
## Sleep-Time Compute
Parameters for background importance scoring and summary refresh.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `importance_recency_halflife_days` | `int` | `30` | Half-life for recency signal in importance scoring |
| `summary_refresh_interval_days` | `int` | `7` | Days before an entity summary is considered stale |
---
## Memify
Parameters for the memify (episodic → procedural knowledge) background job.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `vitality_stale_threshold` | `float` | `0.2` | Vitality score below which a fact is considered stale |
| `memify_merge_similarity_threshold` | `float` | `0.90` | Similarity threshold for merging similar procedures |
---
## Procedural Memory
Parameters for directive/procedural memory retrieval.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `directive_max_tokens` | `int` | `300` | Maximum tokens for procedural directives |
| `directive_cache_ttl_minutes` | `int` | `30` | Cache TTL for directive lookups |
| `contradiction_similarity_threshold` | `float` | `0.80` | Threshold for detecting contradictory directives |
---
## Locale / Deployment
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `timezone` | `str` | `"UTC"` | IANA timezone for interpreting relative time references |
The `timezone` parameter affects how relative time references ("yesterday", "last week", "this morning") are interpreted during fact extraction and retrieval. All timestamps in the database are stored in **UTC** regardless of this setting.
For example: if `timezone="Asia/Tokyo"` and the user says "yesterday", the SDK interprets "yesterday" relative to Tokyo time (JST), not UTC.
---
## Open Catalog (Deployer Extensions)
Parameters for extending the built-in attribute catalog with custom entries.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `extra_attribute_keys` | `set[str]` | `set()` | Additional attribute keys recognized during extraction |
| `attribute_aliases` | `dict[str, str]` | `{}` | Aliases for attribute keys (e.g., `{"hometown": "city"}`) |
| `extra_namespaces` | `set[str]` | `set()` | Additional entity namespaces beyond built-in types |
| `extra_self_references` | `frozenset[str]` | `frozenset()` | Additional words treated as self-references (e.g., `{"yo"}` for Spanish) |
| `extra_relationship_hints` | `frozenset[str]` | `frozenset()` | Additional relationship hint words for entity resolution |
---
## Limits
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `max_facts_per_event` | `int` | `100` | Maximum facts extracted from a single message |
| `embedding_dimensions` | `int` | `1536` | Dimensionality of embedding vectors (must match your provider) |
---
# Custom Providers
`arandu` uses Python protocols for dependency injection. You can use any LLM or embedding backend by implementing two simple interfaces - no inheritance required.
## Built-in Providers
The SDK includes two built-in providers:
| Provider | Install | LLM | Embeddings |
|----------|---------|-----|-----------|
| **OpenAI** | `pip install arandu[openai]` | ✅ GPT-4o, GPT-4o-mini, etc. | ✅ text-embedding-3-small, etc. |
| **Anthropic** | `pip install arandu[anthropic]` | ✅ Claude Sonnet, Opus, Haiku | ❌ Use OpenAI for embeddings |
```python
# OpenAI (LLM + embeddings in one provider)
from arandu.providers.openai import OpenAIProvider
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(database_url="...", llm=provider, embeddings=provider)
# Anthropic (Claude for LLM, OpenAI for embeddings)
from arandu.providers.anthropic import AnthropicProvider
from arandu.providers.openai import OpenAIProvider
llm = AnthropicProvider(api_key="sk-ant-...")
embeddings = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(database_url="...", llm=llm, embeddings=embeddings)
```
### OpenAI-Compatible Providers
`OpenAIProvider` works with any API that follows the OpenAI chat completions format. Just set `base_url` to point at the provider's endpoint:
```python
from arandu.providers.openai import OpenAIProvider
# DeepSeek
llm = OpenAIProvider(api_key="sk-deepseek-...", model="deepseek-chat", base_url="https://api.deepseek.com/v1")
# Groq
llm = OpenAIProvider(api_key="gsk_...", model="llama-3.3-70b-versatile", base_url="https://api.groq.com/openai/v1")
# Together AI
llm = OpenAIProvider(api_key="tog_...", model="meta-llama/Llama-3.3-70B-Instruct-Turbo", base_url="https://api.together.xyz/v1")
# Fireworks AI
llm = OpenAIProvider(api_key="fw_...", model="accounts/fireworks/models/llama-v3p3-70b-instruct", base_url="https://api.fireworks.ai/inference/v1")
# Ollama (local)
llm = OpenAIProvider(api_key="ollama", model="llama3.1", base_url="http://localhost:11434/v1")
```
This covers LLM calls only. Embeddings still require OpenAI or a custom `EmbeddingProvider` since most of these providers don't offer an embedding API.
If the built-in providers cover your use case, you don't need to read the rest of this page.
---
## The Protocols
If you need a different provider (Ollama, LiteLLM, Groq, etc.), implement the protocols:
### LLMProvider
```python
from arandu.protocols import LLMResult, TokenUsage
class LLMProvider(Protocol):
async def complete(
self,
messages: list[dict],
temperature: float = 0,
response_format: dict | None = None,
max_tokens: int | None = None,
) -> LLMResult: ...
```
| Parameter | Description |
|-----------|-------------|
| `messages` | List of message dicts with `"role"` and `"content"` keys (OpenAI format) |
| `temperature` | Sampling temperature (0 = deterministic) |
| `response_format` | Optional format spec (e.g., `{"type": "json_object"}`) |
| `max_tokens` | Optional maximum tokens for the response |
| **Returns** | `LLMResult(text="...", usage=TokenUsage(...))` |
> **JSON mode support:** The pipeline relies on JSON-mode responses (`response_format={"type": "json_object"}`).
If your backend doesn't support this natively, append a JSON instruction to the system prompt.
### EmbeddingProvider
```python
class EmbeddingProvider(Protocol):
async def embed(self, texts: list[str]) -> list[list[float]]: ...
async def embed_one(self, text: str) -> list[float] | None: ...
```
| Method | Description |
|--------|-------------|
| `embed(texts)` | Generate embeddings for a batch of texts. Returns one vector per input. |
| `embed_one(text)` | Generate embedding for a single text. Returns `None` if empty/invalid. |
> **Embedding dimensions:** The default `embedding_dimensions` is 1536 (OpenAI `text-embedding-3-small`).
If your provider uses different dimensions, set `MemoryConfig(embedding_dimensions=...)`.
---
## Example: Local Model Provider
For running with local models (e.g., via Ollama):
```python
import httpx
from arandu.protocols import LLMResult, TokenUsage
class OllamaProvider:
"""LLM + Embedding provider using a local Ollama server."""
def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "llama3.1",
embedding_model: str = "nomic-embed-text",
) -> None:
self._base_url = base_url
self._model = model
self._embedding_model = embedding_model
self._client = httpx.AsyncClient(timeout=60.0)
# -- LLMProvider --
async def complete(
self,
messages: list[dict],
temperature: float = 0,
response_format: dict | None = None,
max_tokens: int | None = None,
) -> LLMResult:
payload: dict = {
"model": self._model,
"messages": messages,
"stream": False,
"options": {"temperature": temperature},
}
if response_format and response_format.get("type") == "json_object":
payload["format"] = "json"
response = await self._client.post(
f"{self._base_url}/api/chat",
json=payload,
)
response.raise_for_status()
text = response.json()["message"]["content"]
return LLMResult(text=text, usage=None) # Ollama doesn't report usage
# -- EmbeddingProvider --
async def embed(self, texts: list[str]) -> list[list[float]]:
results = []
for text in texts:
if not text.strip():
continue
response = await self._client.post(
f"{self._base_url}/api/embed",
json={"model": self._embedding_model, "input": text},
)
response.raise_for_status()
results.append(response.json()["embeddings"][0])
return results
async def embed_one(self, text: str) -> list[float] | None:
if not text or not text.strip():
return None
results = await self.embed([text])
return results[0] if results else None
```
> **Embedding dimensions:** When using local models, configure the dimensions:
```python
config = MemoryConfig(
embedding_dimensions=768, # nomic-embed-text uses 768 dims
)
```
---
## Testing Your Provider
Verify your provider works before going to production:
```python
import asyncio
from arandu import MemoryClient, MemoryConfig
async def test_provider():
provider = YourProvider(...)
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Test write
result = await memory.write(
agent_id="test",
message="Testing the provider. My name is Alice and I work at Acme.",
)
assert len(result.facts_added) > 0, "No facts extracted — check LLM responses"
assert len(result.entities_resolved) > 0, "No entities resolved"
print(f"Write OK: {len(result.facts_added)} facts, {len(result.entities_resolved)} entities")
# Test retrieve
context = await memory.retrieve(agent_id="test", query="who is Alice?")
assert len(context.facts) > 0, "No facts retrieved — check embeddings"
print(f"Retrieve OK: {len(context.facts)} facts found")
print(f"Context: {context.context}")
finally:
await memory.close()
asyncio.run(test_provider())
```
## Key Requirements
1. **`LLMResult`** - `complete()` returns `LLMResult(text=..., usage=...)`, not `str`. If your backend doesn't report usage, pass `usage=None`.
2. **JSON mode** - The pipeline sends `response_format={"type": "json_object"}` frequently. Your provider must return valid JSON when this is set.
3. **Async** - Both protocols are async (`async def`). If your backend SDK is synchronous, wrap calls with `asyncio.to_thread()`.
4. **Empty/error handling** - `embed_one` returns `None` for empty input. `embed` returns `[]` for empty input.
5. **Timeout** - Add timeouts to your provider. The SDK sets timeouts on its side, but provider-level timeouts add safety.
6. **Embedding dimensions** - Set `MemoryConfig(embedding_dimensions=N)` to match your provider's output dimensions.
---
# Cookbook
Complete, copy-paste-ready examples for common use cases.
---
## Basic Usage
The simplest integration: write facts from user messages and retrieve context for responses.
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost:5432/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Simulate a conversation
messages = [
"Hi, I'm Rafael. I'm a backend engineer at Acme Corp in São Paulo.",
"My girlfriend Ana is a UX designer. We have a cat named Pixel.",
"I've been learning Rust lately, mostly on weekends.",
"Actually, I just moved to Rio de Janeiro. Still remote at Acme.",
]
for msg in messages:
result = await memory.write(agent_id="rafael", message=msg, speaker_name="Rafael")
added = len(result.facts_added)
updated = len(result.facts_updated)
print(f"Write: +{added} facts, ~{updated} updates ({result.duration_ms:.0f}ms)")
# Retrieve context for different queries
queries = [
"where does Rafael live?",
"tell me about Rafael's relationships",
"what are Rafael's hobbies?",
]
for query in queries:
result = await memory.retrieve(agent_id="rafael", query=query)
print(f"\nQuery: {query}")
print(f"Found {len(result.facts)} facts ({result.duration_ms:.0f}ms)")
for fact in result.facts[:5]:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
finally:
await memory.close()
asyncio.run(main())
```
---
## Using Anthropic (Claude)
Use Claude as your LLM while keeping OpenAI for embeddings (Anthropic doesn't offer an embeddings API):
```python
import asyncio
from arandu import MemoryClient, MemoryConfig
from arandu.providers.anthropic import AnthropicProvider
from arandu.providers.openai import OpenAIProvider
async def main():
# Claude for reasoning, OpenAI for embeddings
llm = AnthropicProvider(api_key="sk-ant-...", model="claude-sonnet-4-20250514")
embeddings = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=llm,
embeddings=embeddings,
)
await memory.initialize()
try:
result = await memory.write(
agent_id="demo",
message="I love hiking in the mountains. Last weekend I went to Serra da Mantiqueira.",
speaker_name="Rafael",
)
print(f"Extracted {len(result.facts_added)} facts using Claude")
context = await memory.retrieve(agent_id="demo", query="outdoor activities")
print(context.context)
finally:
await memory.close()
asyncio.run(main())
```
---
## Using Other LLM Providers
`OpenAIProvider` works with any provider that exposes an OpenAI-compatible API. Just change `api_key`, `model`, and `base_url`:
```python
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
# DeepSeek V3 (cheap, high quality)
llm = OpenAIProvider(
api_key="sk-deepseek-...",
model="deepseek-chat",
base_url="https://api.deepseek.com/v1",
)
# Groq (fast inference)
llm = OpenAIProvider(
api_key="gsk_...",
model="llama-3.3-70b-versatile",
base_url="https://api.groq.com/openai/v1",
)
# Together AI
llm = OpenAIProvider(
api_key="tog_...",
model="meta-llama/Llama-3.3-70B-Instruct-Turbo",
base_url="https://api.together.xyz/v1",
)
# Fireworks AI
llm = OpenAIProvider(
api_key="fw_...",
model="accounts/fireworks/models/llama-v3p3-70b-instruct",
base_url="https://api.fireworks.ai/inference/v1",
)
# Local model via Ollama
llm = OpenAIProvider(
api_key="ollama", # any non-empty string
model="llama3.1",
base_url="http://localhost:11434/v1",
)
# Use with Arandu (same as OpenAI)
memory = MemoryClient(
database_url="postgresql+psycopg://...",
llm=llm,
embeddings=OpenAIProvider(api_key="sk-..."), # embeddings still need OpenAI
)
```
> **Embeddings still require OpenAI (or a custom provider):** Most OpenAI-compatible providers don't offer embedding APIs. Use `OpenAIProvider` with your real OpenAI key for embeddings, or implement a custom `EmbeddingProvider` (see [Custom Providers](custom-providers.md)).
---
## Advanced Configuration (Retrieval Tuning)
Fine-tune retrieval for different use cases:
```python
import asyncio
from arandu import MemoryClient, MemoryConfig
from arandu.providers.openai import OpenAIProvider
async def main():
# Single provider for all LLM operations (extraction, reranker, etc.)
llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")
# Configuration for a chatbot that needs broad, fresh context
config = MemoryConfig(
# Extraction: tight timeout for real-time chat
extraction_timeout_sec=15.0,
# Retrieval: more results, favor recency
topk_facts=40,
min_similarity=0.15, # cast a wider net
recency_half_life_days=7, # favor recent facts more aggressively
# Score weights: boost recency for a fast-moving conversation
score_weights={
"semantic": 0.50,
"recency": 0.35,
"importance": 0.15,
},
# Reranker
enable_reranker=True,
# Context: larger budget for rich responses
context_max_tokens=3000,
# Spreading activation: wider context expansion
spreading_activation_hops=3,
spreading_max_related_entities=8,
# Timezone for recency calculations
timezone="America/Sao_Paulo",
)
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=llm,
embeddings=llm,
config=config,
)
await memory.initialize()
try:
# Write a series of messages
await memory.write(agent_id="demo", message="I started a new job at TechCorp today!", speaker_name="Rafael")
await memory.write(agent_id="demo", message="My manager's name is Sarah. She seems great.", speaker_name="Rafael")
await memory.write(agent_id="demo", message="The office is in downtown with a nice view.", speaker_name="Rafael")
# Retrieve with tuned settings
result = await memory.retrieve(agent_id="demo", query="what's new with the user?")
print(f"Retrieved {len(result.facts)} facts")
print(f"Context ({len(result.context)} chars):")
print(result.context)
# Check individual scores to verify tuning
for fact in result.facts:
print(f"\n [{fact.score:.3f}] {fact.fact_text}")
print(f" Scores: {fact.scores}")
finally:
await memory.close()
asyncio.run(main())
```
---
## Background Jobs Integration
Set up periodic maintenance to keep memory organized:
```python
import asyncio
from arandu import (
MemoryClient,
MemoryConfig,
cluster_user_facts,
compute_entity_importance,
detect_communities,
refresh_entity_summaries,
run_consolidation,
run_memify,
)
from arandu.providers.openai import OpenAIProvider
from arandu.db import create_engine, create_session_factory
async def run_maintenance(
database_url: str,
agent_ids: list[str],
provider: OpenAIProvider,
config: MemoryConfig,
) -> None:
"""Run all background maintenance jobs for a list of users."""
engine = create_engine(database_url)
session_factory = create_session_factory(engine)
try:
async with session_factory() as session:
for agent_id in agent_ids:
print(f"\n--- Maintenance for {agent_id} ---")
# 1. Importance scoring (cheap, SQL-only)
importance = await compute_entity_importance(session, agent_id, config)
print(f" Importance: scored {importance.entities_scored} entities")
# 2. Summary refresh (moderate, LLM)
summaries = await refresh_entity_summaries(
session, agent_id, provider, config
)
print(f" Summaries: refreshed {summaries.summaries_refreshed}")
# 3. Clustering (moderate, LLM)
clusters = await cluster_user_facts(
session, agent_id, provider, provider, config
)
print(f" Clustering: {clusters.clusters_created} clusters")
# 4. Community detection
communities = await detect_communities(
session, agent_id, provider, provider, config
)
print(f" Communities: {communities.communities_created} created")
# 5. Consolidation (moderate, LLM)
consolidation = await run_consolidation(session, agent_id, provider, config)
print(f" Consolidation: {consolidation.observations_created} observations")
# 6. Memify (moderate, LLM)
memify = await run_memify(session, agent_id, provider, provider, config)
print(f" Memify: {memify.facts_scored} facts scored")
await session.commit()
finally:
await engine.dispose()
async def main():
provider = OpenAIProvider(api_key="sk-...")
config = MemoryConfig()
database_url = "postgresql+psycopg://memory:memory@localhost/memory"
# Run once
await run_maintenance(database_url, ["user_123", "user_456"], provider, config)
# Or schedule with asyncio
# while True:
# await run_maintenance(database_url, agent_ids, provider, config)
# await asyncio.sleep(4 * 3600) # every 4 hours
asyncio.run(main())
```
---
## Multi-Agent Setup
Handle multiple agents with isolated memory spaces:
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Each agent has completely isolated memory
await memory.write(
agent_id="alice",
message="I work at Google as a PM. I live in Mountain View.",
speaker_name="Alice",
)
await memory.write(
agent_id="bob",
message="I'm a freelance designer based in Berlin.",
speaker_name="Bob",
)
# Agent alice's context only shows alice's facts
alice_ctx = await memory.retrieve(agent_id="alice", query="where do they work?")
print("Alice:", alice_ctx.context)
# Agent bob's context only shows bob's facts
bob_ctx = await memory.retrieve(agent_id="bob", query="where do they work?")
print("Bob:", bob_ctx.context)
finally:
await memory.close()
asyncio.run(main())
```
---
## Multi-Speaker (Therapy Session)
Two speakers writing to the same agent with the same session - pronouns like "Eu" resolve to the correct speaker each time.
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Two speakers, same agent, same session
await memory.write(
agent_id="therapy_bot",
message="Eu me sinto ignorada pelo Carlos. Ele nunca me ouve.",
speaker_name="Ana Silva",
session_id="sessao_001",
)
await memory.write(
agent_id="therapy_bot",
message="Eu trabalho 12 horas por dia pra sustentar a família.",
speaker_name="Carlos Silva",
session_id="sessao_001",
)
# Retrieve — facts attributed to correct speakers
result = await memory.retrieve(agent_id="therapy_bot", query="Como a Ana se sente?")
# Returns: "Ana Silva feels ignored by Carlos" (not Carlos's facts)
for fact in result.facts:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
finally:
await memory.close()
asyncio.run(main())
```
---
## Multi-Session (Work + Personal)
Same speaker, same agent, different session_ids. Retrieve without session_id searches all sessions.
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Work context
await memory.write(
agent_id="assistant",
message="Preciso entregar o relatório até sexta.",
speaker_name="Marcos",
session_id="work",
)
# Personal context
await memory.write(
agent_id="assistant",
message="Minha mãe tá doente, vou visitar ela no fim de semana.",
speaker_name="Marcos",
session_id="personal",
)
# Retrieve searches ALL sessions by default
result = await memory.retrieve(agent_id="assistant", query="O que o Marcos precisa fazer?")
# Returns facts from BOTH work and personal sessions
for fact in result.facts:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
finally:
await memory.close()
asyncio.run(main())
```
---
## Customer Service Bot (Multiple Customers)
Same agent serving different customers - each customer's facts are isolated by speaker_name.
```python
import asyncio
from arandu import MemoryClient
from arandu.providers.openai import OpenAIProvider
async def main():
provider = OpenAIProvider(api_key="sk-...")
memory = MemoryClient(
database_url="postgresql+psycopg://memory:memory@localhost/memory",
llm=provider,
embeddings=provider,
)
await memory.initialize()
try:
# Customer 1
await memory.write(
agent_id="support_bot",
message="Meu pedido #1234 não chegou.",
speaker_name="Maria Oliveira",
)
# Customer 2
await memory.write(
agent_id="support_bot",
message="Quero trocar o produto que comprei ontem.",
speaker_name="João Santos",
)
# Retrieve for specific customer — agent remembers each one separately
result = await memory.retrieve(agent_id="support_bot", query="Qual o problema da Maria?")
for fact in result.facts:
print(f" [{fact.score:.2f}] {fact.entity_name}: {fact.fact_text}")
finally:
await memory.close()
asyncio.run(main())
```
---
# API Reference
Auto-generated from source code docstrings. For conceptual guides on how these components work together, see the [Concepts](../concepts/write-pipeline.md) section.
---
## Client
### MemoryClient
options:
show_source: false
heading_level: 4
members_order: source
### WriteResult
options:
show_source: false
heading_level: 4
### RetrieveResult
options:
show_source: false
heading_level: 4
### FactDetail
options:
show_source: false
heading_level: 4
### EntityDetail
options:
show_source: false
heading_level: 4
### ScoredFact
options:
show_source: false
heading_level: 4
### PipelineTrace
options:
show_source: false
heading_level: 4
---
## Configuration
### MemoryConfig
options:
show_source: false
heading_level: 4
---
## Protocols
### LLMProvider
options:
show_source: false
heading_level: 4
### EmbeddingProvider
options:
show_source: false
heading_level: 4
---
## Providers
### OpenAIProvider
options:
show_source: false
heading_level: 4
### AnthropicProvider
Built-in LLM provider for Anthropic Claude models. Implements `LLMProvider` only - does not provide embeddings (use `OpenAIProvider` or another embedding provider alongside).
```python
from arandu.providers.anthropic import AnthropicProvider
llm = AnthropicProvider(
api_key="sk-ant-...", # Anthropic API key
model="claude-sonnet-4-20250514", # Default model
timeout=30.0, # Request timeout in seconds
max_tokens=4096, # Default max tokens per response
)
```
Install: `pip install arandu[anthropic]`
**Key behaviors:**
- System messages are extracted from the messages list and passed via Anthropic's `system` parameter
- `response_format={"type": "json_object"}` appends a JSON instruction to the system prompt (Anthropic doesn't support this natively)
- Token usage is tracked via `LLMResult.usage`
- Markdown code fences are stripped automatically
---
## Exceptions
All SDK exceptions inherit from `MemoryError`. The SDK is **fail-safe by default** - most errors are caught internally and result in graceful degradation (empty results, logged warnings). These exceptions are raised only when explicitly documented.
```
MemoryError (base)
├── ExtractionError — LLM extraction failed
├── ResolutionError — entity resolution failed
├── ReconciliationError — fact reconciliation failed
├── RetrievalError — memory retrieval failed
└── UpsertError — database persistence failed
```
### MemoryError
Base exception for all arandu SDK errors. Catch this for catch-all handling.
```python
from arandu import MemoryError
try:
result = await memory.write(agent_id, message, speaker_name="Rafael")
except MemoryError as e:
print(f"SDK error: {e}")
print(f"Caused by: {e.__cause__}") # original exception, if any
```
### ExtractionError
Raised when fact/entity extraction from a message fails (LLM timeout, invalid JSON response, rate limit). In practice, the write pipeline catches this internally and returns an empty `WriteResult` with `success=True` - the event is still logged.
### ResolutionError
Raised when entity resolution fails (embedding provider down, LLM disambiguation timeout). The pipeline catches this internally - unresolved entities are created as new entities.
### ReconciliationError
Raised when fact reconciliation against existing knowledge fails (LLM call fails for similarity evaluation). The pipeline catches this internally - defaults to ADD (better to have a near-duplicate than lose information).
### RetrievalError
Raised when memory retrieval fails (semantic search error, reranker timeout). The read pipeline catches this internally - returns empty results rather than crashing.
### UpsertError
Raised when upserting facts or entities into the database fails (constraint violation, connection error). Individual fact upserts use savepoints - if one fact fails, the others proceed normally.
### Error handling example
```python
from arandu import MemoryError, ExtractionError, RetrievalError
# Write — fail-safe by default
result = await memory.write(agent_id, message, speaker_name="Rafael")
if not result.success:
print(f"Write failed: {result.error}")
# No try/except needed — the pipeline handles errors internally
# Retrieve — also fail-safe
result = await memory.retrieve(agent_id, query)
# Empty facts = nothing found OR error occurred
# Check result.duration_ms to detect timeouts
```
---
## Background Functions
### Clustering
#### cluster_user_facts
options:
show_source: false
heading_level: 5
#### detect_communities
options:
show_source: false
heading_level: 5
### Consolidation
#### run_consolidation
options:
show_source: false
heading_level: 5
#### run_profile_consolidation
options:
show_source: false
heading_level: 5
### Memify
#### run_memify
options:
show_source: false
heading_level: 5
#### compute_vitality
options:
show_source: false
heading_level: 5
### Sleep-Time Compute
#### compute_entity_importance
options:
show_source: false
heading_level: 5
#### refresh_entity_summaries
options:
show_source: false
heading_level: 5
#### detect_entity_communities
options:
show_source: false
heading_level: 5
---
## Result Dataclasses
### ClusteringResult
options:
show_source: false
heading_level: 4
### CommunityDetectionResult
options:
show_source: false
heading_level: 4
### ConsolidationResult
options:
show_source: false
heading_level: 4
### MemifyResult
options:
show_source: false
heading_level: 4
### EntityImportanceResult
options:
show_source: false
heading_level: 4
### SummaryRefreshResult
options:
show_source: false
heading_level: 4
---
## See Also: Advanced API
For documentation of internal pipeline functions, sub-module exports, and additional data types not covered here, see the **Advanced** section:
- [Write Pipeline API](../advanced/write-api.md) -- extraction strategy, canonicalization, entity helpers, correction detection, pending operations, and `run_write_pipeline()`.
- [Read Pipeline API](../advanced/read-api.md) -- retrieval agent, query expansion, graph retrieval, spreading activation, context compression, emotional trends, dynamic importance, procedural memory, and `run_read_pipeline()`.
- [Database Utilities](../advanced/database.md) -- `create_engine()`, `create_session_factory()`, `init_db()`, and schema overview.
- [Data Types Reference](../advanced/data-types.md) -- all enums, dataclasses, and result types across write, read, and background modules.
---