Write Pipeline API¶
Advanced API
These are advanced APIs for power users who want to interact with individual pipeline stages directly. Most users should use MemoryClient.write() instead, which orchestrates the full pipeline automatically.
All write pipeline functions are exported from arandu.write.
from arandu.write import (
classify_input, select_strategy, run_write_pipeline,
canonicalize_attribute_key, normalize_key, validate_proposed_key,
create_or_update_entity, get_entities_for_user, get_entity_by_key,
detect_and_record_corrections, is_user_correction,
get_pending, clear_pending, save_pending_execution, save_pending_selection,
)
Pipeline Orchestrator¶
run_write_pipeline¶
Executes the full write pipeline. By default (when enable_informed_extraction=True), the pipeline runs: alias lookup -> pre-retrieval / profile load -> informed extraction -> resolve -> upsert. If informed extraction fails, it falls back to: extract -> resolve -> reconcile -> upsert. When enable_informed_extraction=False, it always uses the legacy path.
async def run_write_pipeline(
session: AsyncSession,
agent_id: str,
message: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
speaker_name: str,
source: str = "api",
recent_messages: list[str] | None = None,
trace: PipelineTrace | None = None,
) -> dict
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session (caller manages transaction/commit). |
agent_id |
str |
Unique identifier for the agent. |
message |
str |
The user's message text. |
llm |
LLMProvider |
Injected LLM provider. |
embeddings |
EmbeddingProvider |
Injected embedding provider. |
config |
MemoryConfig |
Memory configuration. |
speaker_name |
str |
Name of the person speaking the message. Pronouns like "I", "me", "eu" resolve to this speaker entity (person:{speaker_slug}). |
source |
str |
Source channel identifier (default "api"). |
recent_messages |
list[str] | None |
Optional conversation context (last N messages) for resolving pronouns and anaphora. |
trace |
PipelineTrace | None |
Optional pipeline trace for verbose mode. When provided, each stage records intermediate data. |
Returns: dict with keys event_id, facts_added, facts_updated, facts_unchanged, facts_deleted, entities_resolved, duration_ms.
The pipeline creates an immutable MemoryEvent first (survives even if later stages fail), then runs informed extraction (or fallback blind extraction), entity resolution, reconciliation, and upsert inside a savepoint for atomicity. Reconciliation always runs regardless of extraction mode — it compares new facts against existing ones and decides ADD/UPDATE/NOOP/DELETE. Entity profiles are persisted in the same transaction when returned by informed extraction.
Extraction Strategy¶
Pure functions (no LLM, no DB) that classify input text and choose an extraction mode based on heuristics.
InputType¶
class InputType(str, Enum):
SHORT = "short" # < 500 chars
MEDIUM = "medium" # 500-2000 chars, unstructured
LONG = "long" # > 2000 chars, unstructured
STRUCTURED = "structured" # > 500 chars with headers/bullets/tables
ExtractionMode¶
InputClassification¶
Result of input text analysis.
| Field | Type | Description |
|---|---|---|
input_type |
InputType |
Classified input type. |
char_count |
int |
Number of characters. |
estimated_tokens |
int |
Estimated token count (chars // 4). |
has_headers |
bool |
Whether headers were detected. |
has_bullets |
bool |
Whether bullet points were detected. |
has_tables |
bool |
Whether tables were detected. |
section_count |
int |
Number of text sections. |
line_count |
int |
Number of lines. |
ExtractionStrategy¶
Selected extraction strategy.
| Field | Type | Description |
|---|---|---|
mode |
ExtractionMode |
Extraction mode (single_shot or chunked). |
reason |
str |
Human-readable reason for the selection. |
max_tokens_per_call |
int |
Max tokens per LLM call. |
estimated_chunks |
int |
Number of expected chunks (1 for single-shot). |
chunk_context_hint |
str | None |
Hint about document type for chunked mode. |
classify_input¶
Classify input text using heuristics (no LLM call).
| Parameter | Type | Description |
|---|---|---|
text |
str |
Input text to classify. |
Returns: InputClassification with detected features.
from arandu.write import classify_input, select_strategy
classification = classify_input("My wife's name is Ana and we live in Sao Paulo.")
print(classification.input_type) # InputType.SHORT
print(classification.char_count) # 49
select_strategy¶
Select extraction strategy from a classification result.
| Parameter | Type | Description |
|---|---|---|
classification |
InputClassification |
Result of classify_input(). |
Returns: ExtractionStrategy with mode and parameters.
strategy = select_strategy(classification)
print(strategy.mode) # ExtractionMode.SINGLE_SHOT
print(strategy.estimated_chunks) # 1
Attribute Key Canonicalization¶
Pipeline: exact match -> alias -> dotted variant -> suffix -> open catalog -> drop.
normalize_key¶
Normalize a raw attribute key: lowercase, strip, spaces/hyphens to dots. Underscores are preserved.
| Parameter | Type | Description |
|---|---|---|
raw |
str |
Raw attribute key string. |
Returns: Normalized key string.
from arandu.write import normalize_key
normalize_key("Personal Info") # "personal.info"
normalize_key("food_preference") # "food_preference"
validate_proposed_key¶
Validate that a proposed key meets naming rules.
| Parameter | Type | Description |
|---|---|---|
key |
str |
Normalized key to validate. |
extra_namespaces |
set[str] | None |
Optional deployer-provided namespaces to accept. |
Returns: True if key is well-formed and in an allowed namespace.
canonicalize_attribute_key¶
Canonicalize an attribute key via catalog, alias, and recovery strategies. This is an async function that queries the database for registry lookups.
async def canonicalize_attribute_key(
session: AsyncSession,
agent_id: str,
raw_key: str,
config: MemoryConfig,
) -> tuple[str | None, Literal["allow", "map", "propose", "drop"], dict[str, Any]]
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
raw_key |
str |
Raw attribute key from extraction. |
config |
MemoryConfig |
Memory configuration. |
Returns: Tuple of (canonical_key, action, metadata) where action is one of "allow", "map", "propose", or "drop".
Entity Helpers¶
Async CRUD operations for MemoryEntity records using PostgreSQL ON CONFLICT upsert.
create_or_update_entity¶
Create a MemoryEntity or update if it exists.
async def create_or_update_entity(
session: AsyncSession,
agent_id: str,
canonical_key: str,
display_name: str | None = None,
entity_type: str = "other",
) -> MemoryEntity
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
canonical_key |
str |
Canonical entity key. |
display_name |
str | None |
Optional display name. |
entity_type |
str |
Entity type (person, pet, place, etc.). Default "other". |
Returns: The created or updated MemoryEntity.
get_entity_by_key¶
Get a single MemoryEntity by agent_id and canonical_key.
async def get_entity_by_key(
session: AsyncSession,
agent_id: str,
canonical_key: str,
) -> MemoryEntity | None
Returns: MemoryEntity or None if not found.
get_entities_for_user¶
List all MemoryEntity records for a user.
async def get_entities_for_user(
session: AsyncSession,
agent_id: str,
active_only: bool = True,
) -> list[MemoryEntity]
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
active_only |
bool |
If True, only return active entities. Default True. |
Returns: List of MemoryEntity records, ordered by last_seen_at descending.
Correction Detection¶
Detects when users correct memory facts by comparing old vs new values for the same attribute_key.
CorrectionResult¶
| Field | Type | Description |
|---|---|---|
corrections_detected |
int |
Number of corrections found. Default 0. |
corrected_keys |
list[str] |
Attribute keys that were corrected. |
facts_corrected_ids |
list[str] |
IDs of old facts that were corrected. |
is_user_correction¶
Check if a new fact corrects an old fact (same key, different value).
| Parameter | Type | Description |
|---|---|---|
old_fact |
object |
The existing fact being superseded. |
new_fact |
object |
The new fact replacing it. |
Returns: True if this is a user correction.
detect_and_record_corrections¶
Detect supersedes with value changes and increment correction count on old facts.
async def detect_and_record_corrections(
session: AsyncSession,
agent_id: str,
saved_facts: list[Any],
) -> CorrectionResult
| Parameter | Type | Description |
|---|---|---|
session |
AsyncSession |
Database session. |
agent_id |
str |
Agent identifier. |
saved_facts |
list[Any] |
List of newly saved MemoryFact objects. |
Returns: CorrectionResult with detection stats.
Pending Operations¶
In-memory store for pending destructive operations with a 5-minute TTL. State is per-process and lost on restart.
save_pending_selection¶
Save a pending selection when a search returned results awaiting user choice.
def save_pending_selection(
agent_id: str,
intent: str,
transactions: list[Any],
confirmation_text: str,
edit_params: dict[str, Any] | None = None,
) -> None
| Parameter | Type | Description |
|---|---|---|
agent_id |
str |
Agent identifier. |
intent |
str |
The user's intent (delete, edit, etc.). |
transactions |
list[Any] |
List of candidate transactions. |
confirmation_text |
str |
Text to show user for confirmation. |
edit_params |
dict | None |
Optional parameters for edit operations. |
save_pending_execution¶
Save a pending execution when a destructive operation was blocked.
def save_pending_execution(
agent_id: str,
tool_calls: list[Any],
search_result: str,
confirmation_text: str,
) -> None
| Parameter | Type | Description |
|---|---|---|
agent_id |
str |
Agent identifier. |
tool_calls |
list[Any] |
Blocked tool calls. |
search_result |
str |
Context from the search. |
confirmation_text |
str |
Text to show user for confirmation. |
get_pending¶
Get pending operation if it exists and hasn't expired (5-minute TTL).
Returns: Pending operation dict, or None if expired/absent.
clear_pending¶
Remove pending operation after execution or cancellation.