Skip to content

Read Pipeline API

Advanced API

These are advanced APIs for power users who want to interact with individual retrieval stages directly. Most users should use MemoryClient.retrieve() instead, which orchestrates the full multi-signal pipeline automatically.

All read pipeline functions are exported from arandu.read.

from arandu.read import (
    run_read_pipeline,
    plan_retrieval, expand_query,
    retrieve_relevant_events, compute_pattern_signal,
    retrieve_graph_facts, spread_activation,
    compress_context, compress_broad_context,
    materialize_emotional_trends, get_emotional_summary_for_context,
    compute_dynamic_importance,
    generate_optimized_directives, check_directive_contradiction,
    effective_confidence, invalidate_directive_cache,
)

Pipeline Orchestrator

run_read_pipeline

Executes the full read pipeline: plan -> retrieve (multi-signal) -> rerank -> format.

Multi-signal retrieval runs semantic + keyword + graph in parallel via asyncio.gather(). The deterministic planner identifies entities and generates pattern queries for the keyword signal.

async def run_read_pipeline(
    session: AsyncSession,
    agent_id: str,
    query: str,
    llm: LLMProvider,
    embeddings: EmbeddingProvider,
    config: MemoryConfig,
    trace: PipelineTrace | None = None,
) -> ReadResult
Parameter Type Description
session AsyncSession Database session (caller manages transaction).
agent_id str Agent identifier.
query str The query to search memory for.
llm LLMProvider Injected LLM provider.
embeddings EmbeddingProvider Injected embedding provider.
config MemoryConfig Memory configuration.
trace PipelineTrace | None Optional pipeline trace for verbose mode. When provided, each stage records intermediate data.

Returns: ReadResult with facts (list of ScoredFact), context (prompt-ready string), total_candidates, and duration_ms.


Retrieval Planner

The retrieval planner is a deterministic function that analyzes the user query and decides the retrieval strategy before any search happens. It uses regex matching and schema lookups -- no LLM call is involved.

PatternQuery

A pattern-based query for keyword signal matching.

Field Type Description
entity_pattern str SQL LIKE pattern for entity_key matching.
attribute_filter str | None Optional attribute key filter (always None in V5).

RetrievalPlan

Output of the retrieval agent. V5 runs all signals (semantic, graph, keyword) in parallel.

Field Type Default Description
strategy str "multi_signal" "multi_signal" (default) or "skip".
entities list[str] [] Detected entity_keys for graph signal.
pattern_queries list[PatternQuery] [] Pattern queries for keyword signal.
similarity_query str | None None Query for semantic signal (always the original query -- no reformulation).
max_facts int 50 Budget per signal.
reason str "" Why this plan was chosen.
latency_ms float 0.0 Time spent planning.
as_of_range tuple[datetime, datetime] | None None Optional time-travel window.
broad_query bool False True for comprehensive queries.

plan_retrieval

Deterministic function that decides retrieval strategy using regex matching and schema lookups. No LLM call is made. Entity extraction is handled downstream by resolve_query_entities. Anaphora resolution (e.g., resolving "she" or "it" from conversation context) is the caller's responsibility -- the query should already contain resolved references.

def plan_retrieval(
    query_text: str,
    schema_prefixes: list[str],
) -> RetrievalPlan
Parameter Type Description
query_text str The user's query (anaphora should already be resolved by the caller).
schema_prefixes list[str] Known entity key prefixes from the agent's schema (e.g., ["person", "org"]).

Returns: RetrievalPlan with strategy, pattern queries, and query parameters.


Query Expansion

Post-processes a RetrievalPlan with entity priming -- resolves entities mentioned in the query via the knowledge graph (aliases + relationships) and injects context terms.

ExpandedQuery

Field Type Description
primed_entities list[str] Entity keys discovered via alias + KG priming.
temporal_range tuple[datetime, datetime] | None Resolved date range (from retrieval agent).
expanded_terms list[str] Additional context terms from entity facts.

expand_query

Expand a retrieval plan with entity priming. Fail-safe: any exception returns an empty ExpandedQuery.

async def expand_query(
    session: AsyncSession,
    agent_id: str,
    query: str,
    plan: RetrievalPlan,
    llm: object,
) -> ExpandedQuery
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
query str Original user query text.
plan RetrievalPlan RetrievalPlan from the retrieval agent.
llm object LLM provider (reserved for future use).

Returns: ExpandedQuery with primed entities, temporal range, and expanded terms.


Fact Retrieval

retrieve_relevant_events

Retrieve relevant events by embedding similarity + recency scoring.

async def retrieve_relevant_events(
    session: AsyncSession,
    agent_id: str,
    query_embedding: list[float],
    config: MemoryConfig,
    limit: int | None = None,
) -> list[dict[str, Any]]
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
query_embedding list[float] Query embedding vector.
config MemoryConfig Memory configuration.
limit int | None Max events to return.

Returns: List of event dicts with date, text, score, event_id.

compute_pattern_signal

Boost facts that have been recently confirmed (pattern signal). Facts with recent last_confirmed_at timestamps (confirmed via NOOP decisions in write) get a small additive score boost (up to 0.1).

def compute_pattern_signal(
    candidates: list[RetrievalCandidate],
) -> list[RetrievalCandidate]
Parameter Type Description
candidates list[RetrievalCandidate] Current ranked candidates.

Returns: Candidates with updated scores, sorted by final_score.


Graph Retrieval

BFS 2-hop traversal on the MemoryEntityRelationship knowledge graph with relevance pruning.

GraphRetrievalResult

Field Type Description
facts list[dict[str, Any]] Scored fact dicts with source="graph".
neighbor_keys list[str] Entity keys discovered via BFS.
edges_traversed int Total edges examined during BFS.
edges list[dict[str, Any]] Deduplicated edge dicts with display names.

retrieve_graph_facts

BFS 2-hop retrieval with composite scoring: edge_strength * recency * edge_recency * query_bonus.

async def retrieve_graph_facts(
    session: AsyncSession,
    agent_id: str,
    entity_keys: list[str],
    *,
    min_confidence: float = 0.3,
    as_of_start: datetime | None = None,
    as_of_end: datetime | None = None,
    broad_query: bool = False,
    max_facts: int | None = None,
    query_text: str = "",
    min_edge_strength: float = 0.5,
) -> GraphRetrievalResult
Parameter Type Default Description
session AsyncSession -- Database session.
agent_id str -- Agent identifier.
entity_keys list[str] -- Seed entity_keys to start BFS from.
min_confidence float 0.3 Minimum fact confidence threshold.
as_of_start datetime | None None Start of temporal window.
as_of_end datetime | None None End of temporal window.
broad_query bool False When True, allows expanded budget.
max_facts int | None None Override default limit (30).
query_text str "" Original query text for query_bonus scoring.
min_edge_strength float 0.5 Minimum edge strength for hop 2+ pruning.

Returns: GraphRetrievalResult with scored facts and graph metadata.


Spreading Activation

Expands context from seed facts by following entity_key, cluster_id, and knowledge graph relationship links. Uses dynamic importance scoring with decay per hop.

SpreadingActivationResult

Field Type Description
candidates list[RetrievalCandidate] Expanded candidates from hop 1-2.
meta_observations list[Any] Relevant meta-observations referencing seed facts.
entities_explored list[str] Entity keys explored during spreading.
clusters_explored list[str] Cluster IDs explored during spreading.
hop1_count int Number of facts found in hop 1.
hop2_count int Number of facts found in hop 2.
kg_relationships_explored int Number of KG relationships traversed.

spread_activation

Expand context from seed facts via entity_key, cluster_id, and KG relationships (hop 1-2).

async def spread_activation(
    session: AsyncSession,
    agent_id: str,
    seed_fact_ids: list[str],
    config: MemoryConfig,
    *,
    seed_scores: dict[str, float] | None = None,
    allowed_keys: set[str] | None = None,
) -> list[RetrievalCandidate]
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
seed_fact_ids list[str] IDs of seed facts to expand from.
config MemoryConfig Memory configuration with spreading activation params.
seed_scores dict[str, float] | None Optional dict mapping seed fact ID to score.
allowed_keys set[str] | None Optional set of allowed attribute keys.

Returns: List of RetrievalCandidate objects from spreading activation. Fail-safe: returns empty list on error.


Context Compression

Builds a prompt-ready context string from scored facts, events, clusters, and meta-observations using a tiered system: Hot (Tier 1), Warm (Tier 2), Cold (Tier 3).

CompressedContext

Field Type Description
context_text str Final prompt-ready context string.
hot_count int Number of facts in hot tier (Tier 1).
warm_count int Number of facts in warm tier (Tier 2).
cold_count int Number of items in cold tier (Tier 3).
total_tokens int Estimated token count of context_text.

compress_context

Build tiered context text within token budget.

async def compress_context(
    facts: list[dict[str, Any]],
    events: list[dict[str, Any]],
    config: MemoryConfig,
    *,
    clusters: list[Any] | None = None,
    meta_observations: list[Any] | None = None,
    stale_keys: set[str] | None = None,
    stale_threshold_days: int = 90,
    now: datetime | None = None,
) -> CompressedContext
Parameter Type Description
facts list[dict] Scored fact dicts (must have score, fact, entity, attribute, value, date keys).
events list[dict] Event dicts with date and text keys.
config MemoryConfig Memory configuration with token budget and tier ratios.
clusters list | None Optional cluster objects.
meta_observations list | None Optional meta-observation objects.
stale_keys set[str] | None Attribute keys considered always-stale.
stale_threshold_days int Days after which a fact is stale (default 90).
now datetime | None Current timestamp (defaults to UTC now).

Returns: CompressedContext with tiered context text.

compress_broad_context

Build context for broad queries using clusters as primary unit.

async def compress_broad_context(
    cluster_facts: dict[str, list[dict[str, Any]]],
    clusters: list[Any],
    config: MemoryConfig,
    *,
    meta_observations: list[Any] | None = None,
    events: list[dict[str, Any]] | None = None,
) -> CompressedContext
Parameter Type Description
cluster_facts dict[str, list[dict]] Mapping of cluster_label to fact dicts.
clusters list[Any] Cluster objects with label, summary_text, fact_count.
config MemoryConfig Memory configuration.
meta_observations list | None Optional meta-observation objects.
events list[dict] | None Optional event dicts.

Returns: CompressedContext with cluster-first context text.


Materializes emotional trends from memory events and provides formatted summaries for injection into retrieval context.

EmotionalTrendsResult

Field Type Description
emotion_counts dict[str, int] Mapping of emotion to occurrence count.
trend_direction str "increasing", "decreasing", or "stable".
dominant_emotion str | None Most frequent emotion, or None.
trigger_keywords list[str] Top keywords from high-intensity events.
avg_intensity float Average emotion intensity across events.
dominant_intensity float Average intensity of the dominant emotion.
dominant_energy str Predominant energy level (high/medium/low).
events_analyzed int Number of events analyzed.
observation_created bool Whether a meta-observation was created/updated.
observation_id str | None ID of the created/updated observation.

Aggregate emotion data from events, detect trends, and materialize as a meta-observation.

async def materialize_emotional_trends(
    session: AsyncSession,
    agent_id: str,
    config: MemoryConfig,
) -> EmotionalTrendsResult
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
config MemoryConfig Memory configuration with trend window and min events.

Returns: EmotionalTrendsResult with aggregated trend data.

get_emotional_summary_for_context

Return formatted emotional summary for injection into retrieval context. Returns None if no recent (7-day) active emotional trend exists.

async def get_emotional_summary_for_context(
    session: AsyncSession,
    agent_id: str,
) -> str | None

Returns: Formatted summary string, or None.


Dynamic Importance

compute_dynamic_importance

Compute dynamic importance score for a memory fact. Inspired by cognitive memory strength models.

Components:

  • retrieval_boost: log(1 + times_retrieved) -- saturates gradually
  • recency_of_use_boost: decays from last_retrieved_at (half-life 7 days)
  • correction_penalty: 0.8^n for each user correction
  • pattern_boost: 1.3x if fact is part of an active meta-observation
def compute_dynamic_importance(
    base_importance: float,
    times_retrieved: int,
    last_retrieved_at: datetime | None,
    user_correction_count: int,
    is_in_active_pattern: bool,
    now: datetime | None = None,
) -> float
Parameter Type Description
base_importance float Base importance score (typically 0.5).
times_retrieved int Number of times this fact has been retrieved.
last_retrieved_at datetime | None When the fact was last retrieved.
user_correction_count int Number of user corrections on this fact.
is_in_active_pattern bool Whether fact is part of an active meta-observation.
now datetime | None Current timestamp (defaults to UTC now).

Returns: Dynamic importance score, clamped to [0.05, 3.0].


Procedural Memory

LLM-optimized behavioral directives system that compresses persona + learned behavioral preferences into cohesive instruction blocks.

DirectiveBlock

Field Type Description
text str Cohesive behavioral instructions block.
directive_count int Number of active directives used.
cache_hit bool Whether this was served from cache.

ContradictionResult

Field Type Description
has_contradiction bool Whether a contradiction was found.
conflicting_directive str | None Title of the conflicting directive.
resolution str | None Explanation of how the contradiction was resolved.

generate_optimized_directives

Generate an LLM-optimized behavioral instructions block by integrating persona + learned directives.

async def generate_optimized_directives(
    session: AsyncSession,
    agent_id: str,
    llm_provider: LLMProvider,
    config: MemoryConfig,
    *,
    persona_text: str = "",
) -> DirectiveBlock
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
llm_provider LLMProvider Injected LLM provider.
config MemoryConfig Memory configuration.
persona_text str Optional persona description.

Returns: DirectiveBlock with generated text. Result is cached by hash of directive IDs + reinforcement counts. Fail-safe: returns empty DirectiveBlock on error.

check_directive_contradiction

Check a new directive against existing ones for contradictions. Uses embedding similarity as pre-filter, then LLM as judge.

async def check_directive_contradiction(
    session: AsyncSession,
    agent_id: str,
    new_directive: str,
    embedding_provider: EmbeddingProvider,
    llm_provider: LLMProvider,
    *,
    similarity_threshold: float = 0.80,
) -> ContradictionResult
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
new_directive str Text of the new directive to check.
embedding_provider EmbeddingProvider Injected embedding provider.
llm_provider LLMProvider Injected LLM provider.
similarity_threshold float Minimum similarity to trigger LLM check (default 0.80).

Returns: ContradictionResult with check outcome. Fail-safe: returns no contradiction on error.

effective_confidence

Apply temporal decay to directive confidence. Formula: base_confidence * 0.95^weeks.

def effective_confidence(
    base_confidence: float,
    created_at: datetime,
    now: datetime | None = None,
) -> float
Parameter Type Description
base_confidence float Original confidence value (0.0-1.0).
created_at datetime When the directive was created.
now datetime | None Current timestamp (defaults to UTC now).

Returns: Decayed confidence, floored at 0.10.

invalidate_directive_cache

Manually invalidate the directive cache for a user.

def invalidate_directive_cache(agent_id: str) -> None