Pular para conteúdo

API do Write Pipeline

API Avançada

Estas são APIs avançadas para usuários que desejam interagir diretamente com estágios individuais do pipeline. A maioria dos usuários deve usar MemoryClient.write(), que orquestra o pipeline completo automaticamente.

Todas as funções do write pipeline são exportadas de arandu.write.

from arandu.write import (
    classify_input, select_strategy, run_write_pipeline,
    canonicalize_attribute_key, normalize_key, validate_proposed_key,
    create_or_update_entity, get_entities_for_user, get_entity_by_key,
    detect_and_record_corrections, is_user_correction,
    get_pending, clear_pending, save_pending_execution, save_pending_selection,
)

Orquestrador do Pipeline

run_write_pipeline

Executa o pipeline completo de escrita. No fluxo padrão (extração informada): alias lookup -> pré-retrieval -> extração informada -> resolve -> upsert. No fallback (extração cega): extract -> resolve -> reconcile -> upsert.

async def run_write_pipeline(
    session: AsyncSession,
    agent_id: str,
    message: str,
    llm: LLMProvider,
    embeddings: EmbeddingProvider,
    config: MemoryConfig,
    speaker_name: str,
    source: str = "api",
    recent_messages: list[str] | None = None,
    trace: PipelineTrace | None = None,
) -> dict
Parametro Tipo Descricao
session AsyncSession Sessao do banco (o caller gerencia transacao/commit).
agent_id str Identificador unico do agente.
message str Texto da mensagem do usuario.
llm LLMProvider Provider de LLM injetado.
embeddings EmbeddingProvider Provider de embeddings injetado.
config MemoryConfig Configuracao da memoria.
speaker_name str Nome de quem esta falando a mensagem. Pronomes como "I", "me", "eu" sao resolvidos para a entidade do speaker (person:{speaker_slug}).
source str Identificador do canal de origem (padrao "api").
recent_messages list[str] | None Contexto de conversacao opcional (ultimas N mensagens) para resolver pronomes e anaforas.

Retorna: dict com chaves event_id, facts_added, facts_updated, facts_unchanged, facts_deleted, entities_resolved, duration_ms.

O pipeline cria um MemoryEvent imutavel primeiro (sobrevive mesmo se estagios posteriores falharem), depois executa o fluxo de extracao (informada por padrao, com fallback para extracao cega + reconciliacao), resolucao de entidades e upsert dentro de um savepoint para atomicidade. Quando a extracao informada esta habilitada (enable_informed_extraction=True, padrao), o pipeline faz alias lookup e pre-retrieval antes da chamada LLM, e o estagio de reconciliacao e eliminado pois o LLM decide NEW/UPDATE na fonte.


Estrategia de Extracao

Funcoes puras (sem LLM, sem DB) que classificam o texto de entrada e escolhem um modo de extracao baseado em heuristicas.

InputType

class InputType(str, Enum):
    SHORT = "short"        # < 500 caracteres
    MEDIUM = "medium"      # 500-2000 caracteres, nao estruturado
    LONG = "long"          # > 2000 caracteres, nao estruturado
    STRUCTURED = "structured"  # > 500 caracteres com headers/bullets/tabelas

ExtractionMode

class ExtractionMode(str, Enum):
    SINGLE_SHOT = "single_shot"
    CHUNKED = "chunked"

InputClassification

Resultado da analise do texto de entrada.

Campo Tipo Descricao
input_type InputType Tipo de entrada classificado.
char_count int Numero de caracteres.
estimated_tokens int Contagem estimada de tokens (chars // 4).
has_headers bool Se headers foram detectados.
has_bullets bool Se bullet points foram detectados.
has_tables bool Se tabelas foram detectadas.
section_count int Numero de secoes de texto.
line_count int Numero de linhas.

ExtractionStrategy

Estrategia de extracao selecionada.

Campo Tipo Descricao
mode ExtractionMode Modo de extracao (single_shot ou chunked).
reason str Motivo legivel da selecao.
max_tokens_per_call int Maximo de tokens por chamada LLM.
estimated_chunks int Numero esperado de chunks (1 para single-shot).
chunk_context_hint str | None Dica sobre tipo de documento para modo chunked.

classify_input

Classifica o texto de entrada usando heuristicas (sem chamada LLM).

def classify_input(text: str) -> InputClassification
Parametro Tipo Descricao
text str Texto de entrada para classificar.

Retorna: InputClassification com features detectadas.

from arandu.write import classify_input, select_strategy

classification = classify_input("Minha esposa se chama Ana e moramos em Sao Paulo.")
print(classification.input_type)  # InputType.SHORT
print(classification.char_count)  # 50

select_strategy

Seleciona a estrategia de extracao a partir de um resultado de classificacao.

def select_strategy(classification: InputClassification) -> ExtractionStrategy
Parametro Tipo Descricao
classification InputClassification Resultado de classify_input().

Retorna: ExtractionStrategy com modo e parametros.

strategy = select_strategy(classification)
print(strategy.mode)             # ExtractionMode.SINGLE_SHOT
print(strategy.estimated_chunks) # 1

Canonicalizacao de Chaves de Atributo

Pipeline: match exato -> alias -> variante pontuada -> sufixo -> catalogo aberto -> drop.

normalize_key

Normaliza uma chave de atributo bruta: lowercase, strip, espacos/hifens para pontos. Underscores sao preservados.

def normalize_key(raw: str) -> str
Parametro Tipo Descricao
raw str String bruta da chave de atributo.

Retorna: String da chave normalizada.

from arandu.write import normalize_key

normalize_key("Personal Info")    # "personal.info"
normalize_key("food_preference")  # "food_preference"

validate_proposed_key

Valida se uma chave proposta atende as regras de nomenclatura.

def validate_proposed_key(
    key: str,
    extra_namespaces: set[str] | None = None,
) -> bool
Parametro Tipo Descricao
key str Chave normalizada para validar.
extra_namespaces set[str] | None Namespaces adicionais fornecidos pelo deployer.

Retorna: True se a chave e bem formada e esta em um namespace permitido.

canonicalize_attribute_key

Canonicaliza uma chave de atributo via catalogo, alias e estrategias de recuperacao. Funcao async que consulta o banco de dados.

async def canonicalize_attribute_key(
    session: AsyncSession,
    agent_id: str,
    raw_key: str,
    config: MemoryConfig,
) -> tuple[str | None, Literal["allow", "map", "propose", "drop"], dict[str, Any]]
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
raw_key str Chave bruta do atributo vinda da extracao.
config MemoryConfig Configuracao da memoria.

Retorna: Tupla de (canonical_key, action, metadata) onde action e um de "allow", "map", "propose" ou "drop".


Helpers de Entidades

Operacoes CRUD async para registros MemoryEntity usando upsert ON CONFLICT do PostgreSQL.

create_or_update_entity

Cria um MemoryEntity ou atualiza se ja existir.

async def create_or_update_entity(
    session: AsyncSession,
    agent_id: str,
    canonical_key: str,
    display_name: str | None = None,
    entity_type: str = "other",
) -> MemoryEntity
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
canonical_key str Chave canonica da entidade.
display_name str | None Nome de exibicao opcional.
entity_type str Tipo da entidade (person, pet, place, etc.). Padrao "other".

Retorna: O MemoryEntity criado ou atualizado.

get_entity_by_key

Obtem um unico MemoryEntity por agent_id e canonical_key.

async def get_entity_by_key(
    session: AsyncSession,
    agent_id: str,
    canonical_key: str,
) -> MemoryEntity | None

Retorna: MemoryEntity ou None se nao encontrado.

get_entities_for_user

Lista todos os registros MemoryEntity de um usuario.

async def get_entities_for_user(
    session: AsyncSession,
    agent_id: str,
    active_only: bool = True,
) -> list[MemoryEntity]
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
active_only bool Se True, retorna apenas entidades ativas. Padrao True.

Retorna: Lista de registros MemoryEntity, ordenados por last_seen_at decrescente.


Deteccao de Correcoes

Detecta quando usuarios corrigem fatos da memoria comparando valores antigos vs novos para o mesmo attribute_key.

CorrectionResult

Campo Tipo Descricao
corrections_detected int Numero de correcoes encontradas. Padrao 0.
corrected_keys list[str] Chaves de atributo que foram corrigidas.
facts_corrected_ids list[str] IDs dos fatos antigos que foram corrigidos.

is_user_correction

Verifica se um novo fato corrige um fato antigo (mesma chave, valor diferente).

def is_user_correction(old_fact: object, new_fact: object) -> bool
Parametro Tipo Descricao
old_fact object O fato existente sendo substituido.
new_fact object O novo fato que o substitui.

Retorna: True se for uma correcao do usuario.

detect_and_record_corrections

Detecta supersedes com mudancas de valor e incrementa o contador de correcoes nos fatos antigos.

async def detect_and_record_corrections(
    session: AsyncSession,
    agent_id: str,
    saved_facts: list[Any],
) -> CorrectionResult
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
saved_facts list[Any] Lista de objetos MemoryFact recem-salvos.

Retorna: CorrectionResult com estatisticas de deteccao.


Operacoes Pendentes

Armazenamento em memoria para operacoes destrutivas pendentes com TTL de 5 minutos. O estado e por processo e perdido ao reiniciar.

save_pending_selection

Salva uma selecao pendente quando uma busca retornou resultados aguardando escolha do usuario.

def save_pending_selection(
    agent_id: str,
    intent: str,
    transactions: list[Any],
    confirmation_text: str,
    edit_params: dict[str, Any] | None = None,
) -> None
Parametro Tipo Descricao
agent_id str Identificador do agente.
intent str A intencao do usuario (delete, edit, etc.).
transactions list[Any] Lista de transacoes candidatas.
confirmation_text str Texto para mostrar ao usuario para confirmacao.
edit_params dict | None Parametros opcionais para operacoes de edicao.

save_pending_execution

Salva uma execucao pendente quando uma operacao destrutiva foi bloqueada.

def save_pending_execution(
    agent_id: str,
    tool_calls: list[Any],
    search_result: str,
    confirmation_text: str,
) -> None
Parametro Tipo Descricao
agent_id str Identificador do agente.
tool_calls list[Any] Tool calls bloqueadas.
search_result str Contexto da busca.
confirmation_text str Texto para mostrar ao usuario para confirmacao.

get_pending

Obtem operacao pendente se existir e nao tiver expirado (TTL de 5 minutos).

def get_pending(agent_id: str) -> dict[str, Any] | None

Retorna: Dict da operacao pendente, ou None se expirado/ausente.

clear_pending

Remove operacao pendente apos execucao ou cancelamento.

def clear_pending(agent_id: str) -> None