API do Write Pipeline¶
API Avançada
Estas são APIs avançadas para usuários que desejam interagir diretamente com estágios individuais do pipeline. A maioria dos usuários deve usar MemoryClient.write(), que orquestra o pipeline completo automaticamente.
Todas as funções do write pipeline são exportadas de arandu.write.
from arandu.write import (
classify_input, select_strategy, run_write_pipeline,
canonicalize_attribute_key, normalize_key, validate_proposed_key,
create_or_update_entity, get_entities_for_user, get_entity_by_key,
detect_and_record_corrections, is_user_correction,
get_pending, clear_pending, save_pending_execution, save_pending_selection,
)
Orquestrador do Pipeline¶
run_write_pipeline¶
Executa o pipeline completo de escrita. No fluxo padrão (extração informada): alias lookup -> pré-retrieval -> extração informada -> resolve -> upsert. No fallback (extração cega): extract -> resolve -> reconcile -> upsert.
async def run_write_pipeline(
session: AsyncSession,
agent_id: str,
message: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
speaker_name: str,
source: str = "api",
recent_messages: list[str] | None = None,
trace: PipelineTrace | None = None,
) -> dict
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco (o caller gerencia transacao/commit). |
agent_id |
str |
Identificador unico do agente. |
message |
str |
Texto da mensagem do usuario. |
llm |
LLMProvider |
Provider de LLM injetado. |
embeddings |
EmbeddingProvider |
Provider de embeddings injetado. |
config |
MemoryConfig |
Configuracao da memoria. |
speaker_name |
str |
Nome de quem esta falando a mensagem. Pronomes como "I", "me", "eu" sao resolvidos para a entidade do speaker (person:{speaker_slug}). |
source |
str |
Identificador do canal de origem (padrao "api"). |
recent_messages |
list[str] | None |
Contexto de conversacao opcional (ultimas N mensagens) para resolver pronomes e anaforas. |
Retorna: dict com chaves event_id, facts_added, facts_updated, facts_unchanged, facts_deleted, entities_resolved, duration_ms.
O pipeline cria um MemoryEvent imutavel primeiro (sobrevive mesmo se estagios posteriores falharem), depois executa o fluxo de extracao (informada por padrao, com fallback para extracao cega + reconciliacao), resolucao de entidades e upsert dentro de um savepoint para atomicidade. Quando a extracao informada esta habilitada (enable_informed_extraction=True, padrao), o pipeline faz alias lookup e pre-retrieval antes da chamada LLM, e o estagio de reconciliacao e eliminado pois o LLM decide NEW/UPDATE na fonte.
Estrategia de Extracao¶
Funcoes puras (sem LLM, sem DB) que classificam o texto de entrada e escolhem um modo de extracao baseado em heuristicas.
InputType¶
class InputType(str, Enum):
SHORT = "short" # < 500 caracteres
MEDIUM = "medium" # 500-2000 caracteres, nao estruturado
LONG = "long" # > 2000 caracteres, nao estruturado
STRUCTURED = "structured" # > 500 caracteres com headers/bullets/tabelas
ExtractionMode¶
InputClassification¶
Resultado da analise do texto de entrada.
| Campo | Tipo | Descricao |
|---|---|---|
input_type |
InputType |
Tipo de entrada classificado. |
char_count |
int |
Numero de caracteres. |
estimated_tokens |
int |
Contagem estimada de tokens (chars // 4). |
has_headers |
bool |
Se headers foram detectados. |
has_bullets |
bool |
Se bullet points foram detectados. |
has_tables |
bool |
Se tabelas foram detectadas. |
section_count |
int |
Numero de secoes de texto. |
line_count |
int |
Numero de linhas. |
ExtractionStrategy¶
Estrategia de extracao selecionada.
| Campo | Tipo | Descricao |
|---|---|---|
mode |
ExtractionMode |
Modo de extracao (single_shot ou chunked). |
reason |
str |
Motivo legivel da selecao. |
max_tokens_per_call |
int |
Maximo de tokens por chamada LLM. |
estimated_chunks |
int |
Numero esperado de chunks (1 para single-shot). |
chunk_context_hint |
str | None |
Dica sobre tipo de documento para modo chunked. |
classify_input¶
Classifica o texto de entrada usando heuristicas (sem chamada LLM).
| Parametro | Tipo | Descricao |
|---|---|---|
text |
str |
Texto de entrada para classificar. |
Retorna: InputClassification com features detectadas.
from arandu.write import classify_input, select_strategy
classification = classify_input("Minha esposa se chama Ana e moramos em Sao Paulo.")
print(classification.input_type) # InputType.SHORT
print(classification.char_count) # 50
select_strategy¶
Seleciona a estrategia de extracao a partir de um resultado de classificacao.
| Parametro | Tipo | Descricao |
|---|---|---|
classification |
InputClassification |
Resultado de classify_input(). |
Retorna: ExtractionStrategy com modo e parametros.
strategy = select_strategy(classification)
print(strategy.mode) # ExtractionMode.SINGLE_SHOT
print(strategy.estimated_chunks) # 1
Canonicalizacao de Chaves de Atributo¶
Pipeline: match exato -> alias -> variante pontuada -> sufixo -> catalogo aberto -> drop.
normalize_key¶
Normaliza uma chave de atributo bruta: lowercase, strip, espacos/hifens para pontos. Underscores sao preservados.
| Parametro | Tipo | Descricao |
|---|---|---|
raw |
str |
String bruta da chave de atributo. |
Retorna: String da chave normalizada.
from arandu.write import normalize_key
normalize_key("Personal Info") # "personal.info"
normalize_key("food_preference") # "food_preference"
validate_proposed_key¶
Valida se uma chave proposta atende as regras de nomenclatura.
| Parametro | Tipo | Descricao |
|---|---|---|
key |
str |
Chave normalizada para validar. |
extra_namespaces |
set[str] | None |
Namespaces adicionais fornecidos pelo deployer. |
Retorna: True se a chave e bem formada e esta em um namespace permitido.
canonicalize_attribute_key¶
Canonicaliza uma chave de atributo via catalogo, alias e estrategias de recuperacao. Funcao async que consulta o banco de dados.
async def canonicalize_attribute_key(
session: AsyncSession,
agent_id: str,
raw_key: str,
config: MemoryConfig,
) -> tuple[str | None, Literal["allow", "map", "propose", "drop"], dict[str, Any]]
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
raw_key |
str |
Chave bruta do atributo vinda da extracao. |
config |
MemoryConfig |
Configuracao da memoria. |
Retorna: Tupla de (canonical_key, action, metadata) onde action e um de "allow", "map", "propose" ou "drop".
Helpers de Entidades¶
Operacoes CRUD async para registros MemoryEntity usando upsert ON CONFLICT do PostgreSQL.
create_or_update_entity¶
Cria um MemoryEntity ou atualiza se ja existir.
async def create_or_update_entity(
session: AsyncSession,
agent_id: str,
canonical_key: str,
display_name: str | None = None,
entity_type: str = "other",
) -> MemoryEntity
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
canonical_key |
str |
Chave canonica da entidade. |
display_name |
str | None |
Nome de exibicao opcional. |
entity_type |
str |
Tipo da entidade (person, pet, place, etc.). Padrao "other". |
Retorna: O MemoryEntity criado ou atualizado.
get_entity_by_key¶
Obtem um unico MemoryEntity por agent_id e canonical_key.
async def get_entity_by_key(
session: AsyncSession,
agent_id: str,
canonical_key: str,
) -> MemoryEntity | None
Retorna: MemoryEntity ou None se nao encontrado.
get_entities_for_user¶
Lista todos os registros MemoryEntity de um usuario.
async def get_entities_for_user(
session: AsyncSession,
agent_id: str,
active_only: bool = True,
) -> list[MemoryEntity]
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
active_only |
bool |
Se True, retorna apenas entidades ativas. Padrao True. |
Retorna: Lista de registros MemoryEntity, ordenados por last_seen_at decrescente.
Deteccao de Correcoes¶
Detecta quando usuarios corrigem fatos da memoria comparando valores antigos vs novos para o mesmo attribute_key.
CorrectionResult¶
| Campo | Tipo | Descricao |
|---|---|---|
corrections_detected |
int |
Numero de correcoes encontradas. Padrao 0. |
corrected_keys |
list[str] |
Chaves de atributo que foram corrigidas. |
facts_corrected_ids |
list[str] |
IDs dos fatos antigos que foram corrigidos. |
is_user_correction¶
Verifica se um novo fato corrige um fato antigo (mesma chave, valor diferente).
| Parametro | Tipo | Descricao |
|---|---|---|
old_fact |
object |
O fato existente sendo substituido. |
new_fact |
object |
O novo fato que o substitui. |
Retorna: True se for uma correcao do usuario.
detect_and_record_corrections¶
Detecta supersedes com mudancas de valor e incrementa o contador de correcoes nos fatos antigos.
async def detect_and_record_corrections(
session: AsyncSession,
agent_id: str,
saved_facts: list[Any],
) -> CorrectionResult
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
saved_facts |
list[Any] |
Lista de objetos MemoryFact recem-salvos. |
Retorna: CorrectionResult com estatisticas de deteccao.
Operacoes Pendentes¶
Armazenamento em memoria para operacoes destrutivas pendentes com TTL de 5 minutos. O estado e por processo e perdido ao reiniciar.
save_pending_selection¶
Salva uma selecao pendente quando uma busca retornou resultados aguardando escolha do usuario.
def save_pending_selection(
agent_id: str,
intent: str,
transactions: list[Any],
confirmation_text: str,
edit_params: dict[str, Any] | None = None,
) -> None
| Parametro | Tipo | Descricao |
|---|---|---|
agent_id |
str |
Identificador do agente. |
intent |
str |
A intencao do usuario (delete, edit, etc.). |
transactions |
list[Any] |
Lista de transacoes candidatas. |
confirmation_text |
str |
Texto para mostrar ao usuario para confirmacao. |
edit_params |
dict | None |
Parametros opcionais para operacoes de edicao. |
save_pending_execution¶
Salva uma execucao pendente quando uma operacao destrutiva foi bloqueada.
def save_pending_execution(
agent_id: str,
tool_calls: list[Any],
search_result: str,
confirmation_text: str,
) -> None
| Parametro | Tipo | Descricao |
|---|---|---|
agent_id |
str |
Identificador do agente. |
tool_calls |
list[Any] |
Tool calls bloqueadas. |
search_result |
str |
Contexto da busca. |
confirmation_text |
str |
Texto para mostrar ao usuario para confirmacao. |
get_pending¶
Obtem operacao pendente se existir e nao tiver expirado (TTL de 5 minutos).
Retorna: Dict da operacao pendente, ou None se expirado/ausente.
clear_pending¶
Remove operacao pendente apos execucao ou cancelamento.