Read Pipeline API¶
Advanced API
These are advanced APIs for power users who want to interact with individual retrieval stages directly. Most users should use MemoryClient.retrieve() instead, which orchestrates the full multi-signal pipeline automatically.
All read pipeline functions are exported from arandu.read.
from arandu.read import (
run_read_pipeline,
plan_retrieval, expand_query,
retrieve_relevant_events, compute_pattern_signal,
retrieve_graph_facts, spread_activation,
compress_context, compress_broad_context,
materialize_emotional_trends, get_emotional_summary_for_context,
compute_dynamic_importance,
generate_optimized_directives, check_directive_contradiction,
effective_confidence, invalidate_directive_cache,
)
Pipeline Orchestrator¶
run_read_pipeline¶
Executes the full read pipeline: plan -> retrieve (multi-signal) -> rerank -> format.
Multi-signal retrieval runs semantic + keyword + graph in parallel via asyncio.gather(). The deterministic planner identifies entities and generates pattern queries for the keyword signal.
async def run_read_pipeline(
session: AsyncSession,
agent_id: str,
query: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
trace: PipelineTrace | None = None,
) -> ReadResult
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session (caller manages transaction). |
agent_id |
str |
Agent identifier. |
query |
str |
The query to search memory for. |
llm |
LLMProvider |
Injected LLM provider. |
embeddings |
EmbeddingProvider |
Injected embedding provider. |
config |
MemoryConfig |
Memory configuration. |
trace |
PipelineTrace | None |
Optional pipeline trace for verbose mode. When provided, each stage records intermediate data. |
Returns: ReadResult with facts (list of ScoredFact), context (prompt-ready string), total_candidates, and duration_ms.
Retrieval Planner¶
The retrieval planner is a deterministic function that analyzes the user query and decides the retrieval strategy before any search happens. It uses regex matching and schema lookups -- no LLM call is involved.
PatternQuery¶
A pattern-based query for keyword signal matching.
| Field | Type | Description |
|---|---|---|
entity_pattern |
str |
SQL LIKE pattern for entity_key matching. |
attribute_filter |
str | None |
Optional attribute key filter (always None in V5). |
RetrievalPlan¶
Output of the retrieval agent. V5 runs all signals (semantic, graph, keyword) in parallel.
| Field | Type | Default | Description |
|---|---|---|---|
strategy |
str |
"multi_signal" |
"multi_signal" (default) or "skip". |
entities |
list[str] |
[] |
Detected entity_keys for graph signal. |
pattern_queries |
list[PatternQuery] |
[] |
Pattern queries for keyword signal. |
similarity_query |
str | None |
None |
Query for semantic signal (always the original query -- no reformulation). |
max_facts |
int |
50 |
Budget per signal. |
reason |
str |
"" |
Why this plan was chosen. |
latency_ms |
float |
0.0 |
Time spent planning. |
as_of_range |
tuple[datetime, datetime] | None |
None |
Optional time-travel window. |
broad_query |
bool |
False |
True for comprehensive queries. |
plan_retrieval¶
Deterministic function that decides retrieval strategy using regex matching and schema lookups. No LLM call is made. Entity extraction is handled downstream by resolve_query_entities. Anaphora resolution (e.g., resolving "she" or "it" from conversation context) is the caller's responsibility -- the query should already contain resolved references.
| Parameter | Type | Description |
|---|---|---|
query_text |
str |
The user's query (anaphora should already be resolved by the caller). |
schema_prefixes |
list[str] |
Known entity key prefixes from the agent's schema (e.g., ["person", "org"]). |
Returns: RetrievalPlan with strategy, pattern queries, and query parameters.
Query Expansion¶
Post-processes a RetrievalPlan with entity priming -- resolves entities mentioned in the query via the knowledge graph (aliases + relationships) and injects context terms.
ExpandedQuery¶
| Field | Type | Description |
|---|---|---|
primed_entities |
list[str] |
Entity keys discovered via alias + KG priming. |
temporal_range |
tuple[datetime, datetime] | None |
Resolved date range (from retrieval agent). |
expanded_terms |
list[str] |
Additional context terms from entity facts. |
expand_query¶
Expand a retrieval plan with entity priming. Fail-safe: any exception returns an empty ExpandedQuery.
async def expand_query(
session: AsyncSession,
agent_id: str,
query: str,
plan: RetrievalPlan,
llm: object,
) -> ExpandedQuery
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
query |
str |
Original user query text. |
plan |
RetrievalPlan |
RetrievalPlan from the retrieval agent. |
llm |
object |
LLM provider (reserved for future use). |
Returns: ExpandedQuery with primed entities, temporal range, and expanded terms.
Fact Retrieval¶
retrieve_relevant_events¶
Retrieve relevant events by embedding similarity + recency scoring.
async def retrieve_relevant_events(
session: AsyncSession,
agent_id: str,
query_embedding: list[float],
config: MemoryConfig,
limit: int | None = None,
) -> list[dict[str, Any]]
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
query_embedding |
list[float] |
Query embedding vector. |
config |
MemoryConfig |
Memory configuration. |
limit |
int | None |
Max events to return. |
Returns: List of event dicts with date, text, score, event_id.
compute_pattern_signal¶
Boost facts that have been recently confirmed (pattern signal). Facts with recent last_confirmed_at timestamps (confirmed via NOOP decisions in write) get a small additive score boost (up to 0.1).
| Parameter | Type | Description |
|---|---|---|
candidates |
list[RetrievalCandidate] |
Current ranked candidates. |
Returns: Candidates with updated scores, sorted by final_score.
Graph Retrieval¶
BFS 2-hop traversal on the MemoryEntityRelationship knowledge graph with relevance pruning.
GraphRetrievalResult¶
| Field | Type | Description |
|---|---|---|
facts |
list[dict[str, Any]] |
Scored fact dicts with source="graph". |
neighbor_keys |
list[str] |
Entity keys discovered via BFS. |
edges_traversed |
int |
Total edges examined during BFS. |
edges |
list[dict[str, Any]] |
Deduplicated edge dicts with display names. |
retrieve_graph_facts¶
BFS 2-hop retrieval with composite scoring: edge_strength * recency * edge_recency * query_bonus.
async def retrieve_graph_facts(
session: AsyncSession,
agent_id: str,
entity_keys: list[str],
*,
min_confidence: float = 0.3,
as_of_start: datetime | None = None,
as_of_end: datetime | None = None,
broad_query: bool = False,
max_facts: int | None = None,
query_text: str = "",
min_edge_strength: float = 0.5,
) -> GraphRetrievalResult
| Parameter | Type | Default | Description |
|---|---|---|---|
session |
AsyncSession |
-- | Database session. |
agent_id |
str |
-- | Agent identifier. |
entity_keys |
list[str] |
-- | Seed entity_keys to start BFS from. |
min_confidence |
float |
0.3 |
Minimum fact confidence threshold. |
as_of_start |
datetime | None |
None |
Start of temporal window. |
as_of_end |
datetime | None |
None |
End of temporal window. |
broad_query |
bool |
False |
When True, allows expanded budget. |
max_facts |
int | None |
None |
Override default limit (30). |
query_text |
str |
"" |
Original query text for query_bonus scoring. |
min_edge_strength |
float |
0.5 |
Minimum edge strength for hop 2+ pruning. |
Returns: GraphRetrievalResult with scored facts and graph metadata.
Spreading Activation¶
Expands context from seed facts by following entity_key, cluster_id, and knowledge graph relationship links. Uses dynamic importance scoring with decay per hop.
SpreadingActivationResult¶
| Field | Type | Description |
|---|---|---|
candidates |
list[RetrievalCandidate] |
Expanded candidates from hop 1-2. |
meta_observations |
list[Any] |
Relevant meta-observations referencing seed facts. |
entities_explored |
list[str] |
Entity keys explored during spreading. |
clusters_explored |
list[str] |
Cluster IDs explored during spreading. |
hop1_count |
int |
Number of facts found in hop 1. |
hop2_count |
int |
Number of facts found in hop 2. |
kg_relationships_explored |
int |
Number of KG relationships traversed. |
spread_activation¶
Expand context from seed facts via entity_key, cluster_id, and KG relationships (hop 1-2).
async def spread_activation(
session: AsyncSession,
agent_id: str,
seed_fact_ids: list[str],
config: MemoryConfig,
*,
seed_scores: dict[str, float] | None = None,
allowed_keys: set[str] | None = None,
) -> list[RetrievalCandidate]
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
seed_fact_ids |
list[str] |
IDs of seed facts to expand from. |
config |
MemoryConfig |
Memory configuration with spreading activation params. |
seed_scores |
dict[str, float] | None |
Optional dict mapping seed fact ID to score. |
allowed_keys |
set[str] | None |
Optional set of allowed attribute keys. |
Returns: List of RetrievalCandidate objects from spreading activation. Fail-safe: returns empty list on error.
Context Compression¶
Builds a prompt-ready context string from scored facts, events, clusters, and meta-observations using a tiered system: Hot (Tier 1), Warm (Tier 2), Cold (Tier 3).
CompressedContext¶
| Field | Type | Description |
|---|---|---|
context_text |
str |
Final prompt-ready context string. |
hot_count |
int |
Number of facts in hot tier (Tier 1). |
warm_count |
int |
Number of facts in warm tier (Tier 2). |
cold_count |
int |
Number of items in cold tier (Tier 3). |
total_tokens |
int |
Estimated token count of context_text. |
compress_context¶
Build tiered context text within token budget.
async def compress_context(
facts: list[dict[str, Any]],
events: list[dict[str, Any]],
config: MemoryConfig,
*,
clusters: list[Any] | None = None,
meta_observations: list[Any] | None = None,
stale_keys: set[str] | None = None,
stale_threshold_days: int = 90,
now: datetime | None = None,
) -> CompressedContext
| Parameter | Type | Description |
|---|---|---|
facts |
list[dict] |
Scored fact dicts (must have score, fact, entity, attribute, value, date keys). |
events |
list[dict] |
Event dicts with date and text keys. |
config |
MemoryConfig |
Memory configuration with token budget and tier ratios. |
clusters |
list | None |
Optional cluster objects. |
meta_observations |
list | None |
Optional meta-observation objects. |
stale_keys |
set[str] | None |
Attribute keys considered always-stale. |
stale_threshold_days |
int |
Days after which a fact is stale (default 90). |
now |
datetime | None |
Current timestamp (defaults to UTC now). |
Returns: CompressedContext with tiered context text.
compress_broad_context¶
Build context for broad queries using clusters as primary unit.
async def compress_broad_context(
cluster_facts: dict[str, list[dict[str, Any]]],
clusters: list[Any],
config: MemoryConfig,
*,
meta_observations: list[Any] | None = None,
events: list[dict[str, Any]] | None = None,
) -> CompressedContext
| Parameter | Type | Description |
|---|---|---|
cluster_facts |
dict[str, list[dict]] |
Mapping of cluster_label to fact dicts. |
clusters |
list[Any] |
Cluster objects with label, summary_text, fact_count. |
config |
MemoryConfig |
Memory configuration. |
meta_observations |
list | None |
Optional meta-observation objects. |
events |
list[dict] | None |
Optional event dicts. |
Returns: CompressedContext with cluster-first context text.
Emotional Trends¶
Materializes emotional trends from memory events and provides formatted summaries for injection into retrieval context.
EmotionalTrendsResult¶
| Field | Type | Description |
|---|---|---|
emotion_counts |
dict[str, int] |
Mapping of emotion to occurrence count. |
trend_direction |
str |
"increasing", "decreasing", or "stable". |
dominant_emotion |
str | None |
Most frequent emotion, or None. |
trigger_keywords |
list[str] |
Top keywords from high-intensity events. |
avg_intensity |
float |
Average emotion intensity across events. |
dominant_intensity |
float |
Average intensity of the dominant emotion. |
dominant_energy |
str |
Predominant energy level (high/medium/low). |
events_analyzed |
int |
Number of events analyzed. |
observation_created |
bool |
Whether a meta-observation was created/updated. |
observation_id |
str | None |
ID of the created/updated observation. |
materialize_emotional_trends¶
Aggregate emotion data from events, detect trends, and materialize as a meta-observation.
async def materialize_emotional_trends(
session: AsyncSession,
agent_id: str,
config: MemoryConfig,
) -> EmotionalTrendsResult
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
config |
MemoryConfig |
Memory configuration with trend window and min events. |
Returns: EmotionalTrendsResult with aggregated trend data.
get_emotional_summary_for_context¶
Return formatted emotional summary for injection into retrieval context. Returns None if no recent (7-day) active emotional trend exists.
Returns: Formatted summary string, or None.
Dynamic Importance¶
compute_dynamic_importance¶
Compute dynamic importance score for a memory fact. Inspired by cognitive memory strength models.
Components:
- retrieval_boost:
log(1 + times_retrieved)-- saturates gradually - recency_of_use_boost: decays from
last_retrieved_at(half-life 7 days) - correction_penalty:
0.8^nfor each user correction - pattern_boost: 1.3x if fact is part of an active meta-observation
def compute_dynamic_importance(
base_importance: float,
times_retrieved: int,
last_retrieved_at: datetime | None,
user_correction_count: int,
is_in_active_pattern: bool,
now: datetime | None = None,
) -> float
| Parameter | Type | Description |
|---|---|---|
base_importance |
float |
Base importance score (typically 0.5). |
times_retrieved |
int |
Number of times this fact has been retrieved. |
last_retrieved_at |
datetime | None |
When the fact was last retrieved. |
user_correction_count |
int |
Number of user corrections on this fact. |
is_in_active_pattern |
bool |
Whether fact is part of an active meta-observation. |
now |
datetime | None |
Current timestamp (defaults to UTC now). |
Returns: Dynamic importance score, clamped to [0.05, 3.0].
Procedural Memory¶
LLM-optimized behavioral directives system that compresses persona + learned behavioral preferences into cohesive instruction blocks.
DirectiveBlock¶
| Field | Type | Description |
|---|---|---|
text |
str |
Cohesive behavioral instructions block. |
directive_count |
int |
Number of active directives used. |
cache_hit |
bool |
Whether this was served from cache. |
ContradictionResult¶
| Field | Type | Description |
|---|---|---|
has_contradiction |
bool |
Whether a contradiction was found. |
conflicting_directive |
str | None |
Title of the conflicting directive. |
resolution |
str | None |
Explanation of how the contradiction was resolved. |
generate_optimized_directives¶
Generate an LLM-optimized behavioral instructions block by integrating persona + learned directives.
async def generate_optimized_directives(
session: AsyncSession,
agent_id: str,
llm_provider: LLMProvider,
config: MemoryConfig,
*,
persona_text: str = "",
) -> DirectiveBlock
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
llm_provider |
LLMProvider |
Injected LLM provider. |
config |
MemoryConfig |
Memory configuration. |
persona_text |
str |
Optional persona description. |
Returns: DirectiveBlock with generated text. Result is cached by hash of directive IDs + reinforcement counts. Fail-safe: returns empty DirectiveBlock on error.
check_directive_contradiction¶
Check a new directive against existing ones for contradictions. Uses embedding similarity as pre-filter, then LLM as judge.
async def check_directive_contradiction(
session: AsyncSession,
agent_id: str,
new_directive: str,
embedding_provider: EmbeddingProvider,
llm_provider: LLMProvider,
*,
similarity_threshold: float = 0.80,
) -> ContradictionResult
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
new_directive |
str |
Text of the new directive to check. |
embedding_provider |
EmbeddingProvider |
Injected embedding provider. |
llm_provider |
LLMProvider |
Injected LLM provider. |
similarity_threshold |
float |
Minimum similarity to trigger LLM check (default 0.80). |
Returns: ContradictionResult with check outcome. Fail-safe: returns no contradiction on error.
effective_confidence¶
Apply temporal decay to directive confidence. Formula: base_confidence * 0.95^weeks.
def effective_confidence(
base_confidence: float,
created_at: datetime,
now: datetime | None = None,
) -> float
| Parameter | Type | Description |
|---|---|---|
base_confidence |
float |
Original confidence value (0.0-1.0). |
created_at |
datetime |
When the directive was created. |
now |
datetime | None |
Current timestamp (defaults to UTC now). |
Returns: Decayed confidence, floored at 0.10.
invalidate_directive_cache¶
Manually invalidate the directive cache for a user.