Skip to content

Write Pipeline API

Advanced API

These are advanced APIs for power users who want to interact with individual pipeline stages directly. Most users should use MemoryClient.write() instead, which orchestrates the full pipeline automatically.

All write pipeline functions are exported from arandu.write.

from arandu.write import (
    classify_input, select_strategy, run_write_pipeline,
    canonicalize_attribute_key, normalize_key, validate_proposed_key,
    create_or_update_entity, get_entities_for_user, get_entity_by_key,
    detect_and_record_corrections, is_user_correction,
    get_pending, clear_pending, save_pending_execution, save_pending_selection,
)

Pipeline Orchestrator

run_write_pipeline

Executes the full write pipeline. By default (when enable_informed_extraction=True), the pipeline runs: alias lookup -> pre-retrieval / profile load -> informed extraction -> resolve -> upsert. If informed extraction fails, it falls back to: extract -> resolve -> reconcile -> upsert. When enable_informed_extraction=False, it always uses the legacy path.

async def run_write_pipeline(
    session: AsyncSession,
    agent_id: str,
    message: str,
    llm: LLMProvider,
    embeddings: EmbeddingProvider,
    config: MemoryConfig,
    speaker_name: str,
    source: str = "api",
    recent_messages: list[str] | None = None,
    trace: PipelineTrace | None = None,
) -> dict
Parameter Type Description
session AsyncSession Database session (caller manages transaction/commit).
agent_id str Unique identifier for the agent.
message str The user's message text.
llm LLMProvider Injected LLM provider.
embeddings EmbeddingProvider Injected embedding provider.
config MemoryConfig Memory configuration.
speaker_name str Name of the person speaking the message. Pronouns like "I", "me", "eu" resolve to this speaker entity (person:{speaker_slug}).
source str Source channel identifier (default "api").
recent_messages list[str] | None Optional conversation context (last N messages) for resolving pronouns and anaphora.
trace PipelineTrace | None Optional pipeline trace for verbose mode. When provided, each stage records intermediate data.

Returns: dict with keys event_id, facts_added, facts_updated, facts_unchanged, facts_deleted, entities_resolved, duration_ms.

The pipeline creates an immutable MemoryEvent first (survives even if later stages fail), then runs informed extraction (or fallback blind extraction), entity resolution, reconciliation, and upsert inside a savepoint for atomicity. Reconciliation always runs regardless of extraction mode — it compares new facts against existing ones and decides ADD/UPDATE/NOOP/DELETE. Entity profiles are persisted in the same transaction when returned by informed extraction.


Extraction Strategy

Pure functions (no LLM, no DB) that classify input text and choose an extraction mode based on heuristics.

InputType

class InputType(str, Enum):
    SHORT = "short"        # < 500 chars
    MEDIUM = "medium"      # 500-2000 chars, unstructured
    LONG = "long"          # > 2000 chars, unstructured
    STRUCTURED = "structured"  # > 500 chars with headers/bullets/tables

ExtractionMode

class ExtractionMode(str, Enum):
    SINGLE_SHOT = "single_shot"
    CHUNKED = "chunked"

InputClassification

Result of input text analysis.

Field Type Description
input_type InputType Classified input type.
char_count int Number of characters.
estimated_tokens int Estimated token count (chars // 4).
has_headers bool Whether headers were detected.
has_bullets bool Whether bullet points were detected.
has_tables bool Whether tables were detected.
section_count int Number of text sections.
line_count int Number of lines.

ExtractionStrategy

Selected extraction strategy.

Field Type Description
mode ExtractionMode Extraction mode (single_shot or chunked).
reason str Human-readable reason for the selection.
max_tokens_per_call int Max tokens per LLM call.
estimated_chunks int Number of expected chunks (1 for single-shot).
chunk_context_hint str | None Hint about document type for chunked mode.

classify_input

Classify input text using heuristics (no LLM call).

def classify_input(text: str) -> InputClassification
Parameter Type Description
text str Input text to classify.

Returns: InputClassification with detected features.

from arandu.write import classify_input, select_strategy

classification = classify_input("My wife's name is Ana and we live in Sao Paulo.")
print(classification.input_type)  # InputType.SHORT
print(classification.char_count)  # 49

select_strategy

Select extraction strategy from a classification result.

def select_strategy(classification: InputClassification) -> ExtractionStrategy
Parameter Type Description
classification InputClassification Result of classify_input().

Returns: ExtractionStrategy with mode and parameters.

strategy = select_strategy(classification)
print(strategy.mode)             # ExtractionMode.SINGLE_SHOT
print(strategy.estimated_chunks) # 1

Attribute Key Canonicalization

Pipeline: exact match -> alias -> dotted variant -> suffix -> open catalog -> drop.

normalize_key

Normalize a raw attribute key: lowercase, strip, spaces/hyphens to dots. Underscores are preserved.

def normalize_key(raw: str) -> str
Parameter Type Description
raw str Raw attribute key string.

Returns: Normalized key string.

from arandu.write import normalize_key

normalize_key("Personal Info")    # "personal.info"
normalize_key("food_preference")  # "food_preference"

validate_proposed_key

Validate that a proposed key meets naming rules.

def validate_proposed_key(
    key: str,
    extra_namespaces: set[str] | None = None,
) -> bool
Parameter Type Description
key str Normalized key to validate.
extra_namespaces set[str] | None Optional deployer-provided namespaces to accept.

Returns: True if key is well-formed and in an allowed namespace.

canonicalize_attribute_key

Canonicalize an attribute key via catalog, alias, and recovery strategies. This is an async function that queries the database for registry lookups.

async def canonicalize_attribute_key(
    session: AsyncSession,
    agent_id: str,
    raw_key: str,
    config: MemoryConfig,
) -> tuple[str | None, Literal["allow", "map", "propose", "drop"], dict[str, Any]]
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
raw_key str Raw attribute key from extraction.
config MemoryConfig Memory configuration.

Returns: Tuple of (canonical_key, action, metadata) where action is one of "allow", "map", "propose", or "drop".


Entity Helpers

Async CRUD operations for MemoryEntity records using PostgreSQL ON CONFLICT upsert.

create_or_update_entity

Create a MemoryEntity or update if it exists.

async def create_or_update_entity(
    session: AsyncSession,
    agent_id: str,
    canonical_key: str,
    display_name: str | None = None,
    entity_type: str = "other",
) -> MemoryEntity
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
canonical_key str Canonical entity key.
display_name str | None Optional display name.
entity_type str Entity type (person, pet, place, etc.). Default "other".

Returns: The created or updated MemoryEntity.

get_entity_by_key

Get a single MemoryEntity by agent_id and canonical_key.

async def get_entity_by_key(
    session: AsyncSession,
    agent_id: str,
    canonical_key: str,
) -> MemoryEntity | None

Returns: MemoryEntity or None if not found.

get_entities_for_user

List all MemoryEntity records for a user.

async def get_entities_for_user(
    session: AsyncSession,
    agent_id: str,
    active_only: bool = True,
) -> list[MemoryEntity]
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
active_only bool If True, only return active entities. Default True.

Returns: List of MemoryEntity records, ordered by last_seen_at descending.


Correction Detection

Detects when users correct memory facts by comparing old vs new values for the same attribute_key.

CorrectionResult

Field Type Description
corrections_detected int Number of corrections found. Default 0.
corrected_keys list[str] Attribute keys that were corrected.
facts_corrected_ids list[str] IDs of old facts that were corrected.

is_user_correction

Check if a new fact corrects an old fact (same key, different value).

def is_user_correction(old_fact: object, new_fact: object) -> bool
Parameter Type Description
old_fact object The existing fact being superseded.
new_fact object The new fact replacing it.

Returns: True if this is a user correction.

detect_and_record_corrections

Detect supersedes with value changes and increment correction count on old facts.

async def detect_and_record_corrections(
    session: AsyncSession,
    agent_id: str,
    saved_facts: list[Any],
) -> CorrectionResult
Parameter Type Description
session AsyncSession Database session.
agent_id str Agent identifier.
saved_facts list[Any] List of newly saved MemoryFact objects.

Returns: CorrectionResult with detection stats.


Pending Operations

In-memory store for pending destructive operations with a 5-minute TTL. State is per-process and lost on restart.

save_pending_selection

Save a pending selection when a search returned results awaiting user choice.

def save_pending_selection(
    agent_id: str,
    intent: str,
    transactions: list[Any],
    confirmation_text: str,
    edit_params: dict[str, Any] | None = None,
) -> None
Parameter Type Description
agent_id str Agent identifier.
intent str The user's intent (delete, edit, etc.).
transactions list[Any] List of candidate transactions.
confirmation_text str Text to show user for confirmation.
edit_params dict | None Optional parameters for edit operations.

save_pending_execution

Save a pending execution when a destructive operation was blocked.

def save_pending_execution(
    agent_id: str,
    tool_calls: list[Any],
    search_result: str,
    confirmation_text: str,
) -> None
Parameter Type Description
agent_id str Agent identifier.
tool_calls list[Any] Blocked tool calls.
search_result str Context from the search.
confirmation_text str Text to show user for confirmation.

get_pending

Get pending operation if it exists and hasn't expired (5-minute TTL).

def get_pending(agent_id: str) -> dict[str, Any] | None

Returns: Pending operation dict, or None if expired/absent.

clear_pending

Remove pending operation after execution or cancellation.

def clear_pending(agent_id: str) -> None