Pular para conteúdo

API do Read Pipeline

API Avancada

Estas sao APIs avancadas para usuarios que desejam interagir diretamente com estagios individuais de retrieval. A maioria dos usuarios deve usar MemoryClient.retrieve(), que orquestra o pipeline multi-signal completo automaticamente.

Todas as funcoes do read pipeline sao exportadas de arandu.read.

from arandu.read import (
    run_read_pipeline,
    plan_retrieval, expand_query,
    retrieve_relevant_events, compute_pattern_signal,
    retrieve_graph_facts, spread_activation,
    compress_context, compress_broad_context,
    materialize_emotional_trends, get_emotional_summary_for_context,
    compute_dynamic_importance,
    generate_optimized_directives, check_directive_contradiction,
    effective_confidence, invalidate_directive_cache,
)

Orquestrador do Pipeline

run_read_pipeline

Executa o pipeline completo de leitura: agent -> retrieve (multi-signal) -> rerank -> format.

O retrieval multi-signal executa semantic + keyword + graph em paralelo via asyncio.gather(). O retrieval agent planeja a estrategia de retrieval de forma deterministica (sem LLM).

async def run_read_pipeline(
    session: AsyncSession,
    agent_id: str,
    query: str,
    llm: LLMProvider,
    embeddings: EmbeddingProvider,
    config: MemoryConfig,
    trace: PipelineTrace | None = None,
) -> ReadResult
Parametro Tipo Descricao
session AsyncSession Sessao do banco (o caller gerencia transacao).
agent_id str Identificador do agente.
query str A query para buscar na memoria.
llm LLMProvider Provider de LLM injetado.
embeddings EmbeddingProvider Provider de embeddings injetado.
config MemoryConfig Configuracao da memoria.

Retorna: ReadResult com facts (lista de ScoredFact), context (string pronta para prompt), total_candidates e duration_ms.


Retrieval Agent

O retrieval agent e um planejador deterministico (sem LLM) que analisa a query do usuario e decide a estrategia de retrieval antes de qualquer busca acontecer.

PatternQuery

Uma query baseada em padrao para matching de sinal de keyword.

Campo Tipo Descricao
entity_pattern str Padrao SQL LIKE para matching de entity_key.
attribute_filter str | None Filtro opcional de chave de atributo (sempre None no V5).

RetrievalPlan

Saida do retrieval agent. V5 executa todos os sinais (semantic, graph, keyword) em paralelo.

Campo Tipo Padrao Descricao
strategy str "multi_signal" "multi_signal" (padrao) ou "skip".
entities list[str] [] entity_keys detectadas para sinal de grafo.
pattern_queries list[PatternQuery] [] Pattern queries para sinal de keyword.
similarity_query str | None None Sempre a query original (sem reformulacao).
max_facts int 50 Budget por sinal.
reason str "" Motivo da escolha do plano.
latency_ms float 0.0 Tempo gasto no planejamento.
as_of_range tuple[datetime, datetime] | None None Janela temporal opcional (time-travel).
broad_query bool False True para queries abrangentes.

plan_retrieval

Planejador deterministico que decide a estrategia de retrieval com base em heuristicas (sem chamada LLM). A resolucao de entidades na query e feita por resolve_query_entities em etapa separada. Resolucao de anafora e responsabilidade do caller.

def plan_retrieval(
    query_text: str,
) -> RetrievalPlan
Parametro Tipo Descricao
query_text str A query do usuario.

Retorna: RetrievalPlan com estrategia e parametros de query.


Expansao de Query

Pos-processa um RetrievalPlan com entity priming -- resolve entidades mencionadas na query via knowledge graph (aliases + relacionamentos) e injeta termos de contexto.

ExpandedQuery

Campo Tipo Descricao
primed_entities list[str] Entity keys descobertas via alias + KG priming.
temporal_range tuple[datetime, datetime] | None Faixa de datas resolvida.
expanded_terms list[str] Termos de contexto adicionais dos fatos das entidades.

expand_query

Expande um plano de retrieval com entity priming. Fail-safe: qualquer excecao retorna um ExpandedQuery vazio.

async def expand_query(
    session: AsyncSession,
    agent_id: str,
    query: str,
    plan: RetrievalPlan,
    llm: object,
) -> ExpandedQuery
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
query str Texto original da query do usuario.
plan RetrievalPlan RetrievalPlan do retrieval agent.
llm object Provider de LLM (reservado para uso futuro).

Retorna: ExpandedQuery com entidades primadas, faixa temporal e termos expandidos.


Retrieval de Fatos

retrieve_relevant_events

Recupera eventos relevantes por similaridade de embedding + scoring de recencia.

async def retrieve_relevant_events(
    session: AsyncSession,
    agent_id: str,
    query_embedding: list[float],
    config: MemoryConfig,
    limit: int | None = None,
) -> list[dict[str, Any]]
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
query_embedding list[float] Vetor de embedding da query.
config MemoryConfig Configuracao da memoria.
limit int | None Maximo de eventos a retornar.

Retorna: Lista de dicts de evento com date, text, score, event_id.

compute_pattern_signal

Impulsiona fatos confirmados recentemente (sinal de padrão). Fatos com last_confirmed_at recente (confirmados via decisões NOOP no write) recebem um boost aditivo pequeno (até 0.1).

def compute_pattern_signal(
    candidates: list[RetrievalCandidate],
) -> list[RetrievalCandidate]
Parametro Tipo Descricao
candidates list[RetrievalCandidate] Candidatos ranqueados atuais.

Retorna: Candidatos com scores atualizados, ordenados por final_score.


Retrieval de Grafo

Travessia BFS de 2 saltos no knowledge graph MemoryEntityRelationship com poda de relevancia.

GraphRetrievalResult

Campo Tipo Descricao
facts list[dict[str, Any]] Dicts de fatos pontuados com source="graph".
neighbor_keys list[str] Entity keys descobertas via BFS.
edges_traversed int Total de arestas examinadas durante BFS.
edges list[dict[str, Any]] Dicts de arestas deduplicadas com nomes de exibicao.

retrieve_graph_facts

Retrieval BFS de 2 saltos com scoring composto: edge_strength * recency * edge_recency * query_bonus.

async def retrieve_graph_facts(
    session: AsyncSession,
    agent_id: str,
    entity_keys: list[str],
    *,
    min_confidence: float = 0.3,
    as_of_start: datetime | None = None,
    as_of_end: datetime | None = None,
    broad_query: bool = False,
    max_facts: int | None = None,
    query_text: str = "",
    min_edge_strength: float = 0.5,
) -> GraphRetrievalResult
Parametro Tipo Padrao Descricao
session AsyncSession -- Sessao do banco de dados.
agent_id str -- Identificador do agente.
entity_keys list[str] -- entity_keys semente para iniciar BFS.
min_confidence float 0.3 Threshold minimo de confianca do fato.
as_of_start datetime | None None Inicio da janela temporal.
as_of_end datetime | None None Fim da janela temporal.
broad_query bool False Quando True, permite budget expandido.
max_facts int | None None Override do limite padrao (30).
query_text str "" Texto original da query para scoring de query_bonus.
min_edge_strength float 0.5 Forca minima de aresta para poda no salto 2+.

Retorna: GraphRetrievalResult com fatos pontuados e metadados do grafo.


Spreading Activation

Expande contexto a partir de fatos semente seguindo links de entity_key, cluster_id e relacionamentos do knowledge graph. Usa scoring de importancia dinamica com decaimento por salto.

SpreadingActivationResult

Campo Tipo Descricao
candidates list[RetrievalCandidate] Candidatos expandidos dos saltos 1-2.
meta_observations list[Any] Meta-observacoes relevantes referenciando fatos semente.
entities_explored list[str] Entity keys exploradas durante spreading.
clusters_explored list[str] Cluster IDs explorados durante spreading.
hop1_count int Numero de fatos encontrados no salto 1.
hop2_count int Numero de fatos encontrados no salto 2.
kg_relationships_explored int Numero de relacionamentos KG percorridos.

spread_activation

Expande contexto a partir de fatos semente via entity_key, cluster_id e relacionamentos KG (saltos 1-2).

async def spread_activation(
    session: AsyncSession,
    agent_id: str,
    seed_fact_ids: list[str],
    config: MemoryConfig,
    *,
    seed_scores: dict[str, float] | None = None,
    allowed_keys: set[str] | None = None,
) -> list[RetrievalCandidate]
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
seed_fact_ids list[str] IDs dos fatos semente para expandir.
config MemoryConfig Configuracao da memoria com parametros de spreading activation.
seed_scores dict[str, float] | None Dict opcional mapeando ID do fato semente para score.
allowed_keys set[str] | None Conjunto opcional de chaves de atributo permitidas.

Retorna: Lista de objetos RetrievalCandidate do spreading activation. Fail-safe: retorna lista vazia em caso de erro.


Compressao de Contexto

Constroi uma string de contexto pronta para prompt a partir de fatos pontuados, eventos, clusters e meta-observacoes usando um sistema em camadas: Hot (Tier 1), Warm (Tier 2), Cold (Tier 3).

CompressedContext

Campo Tipo Descricao
context_text str String de contexto final pronta para prompt.
hot_count int Numero de fatos na camada hot (Tier 1).
warm_count int Numero de fatos na camada warm (Tier 2).
cold_count int Numero de itens na camada cold (Tier 3).
total_tokens int Contagem estimada de tokens do context_text.

compress_context

Constroi texto de contexto em camadas dentro do budget de tokens.

async def compress_context(
    facts: list[dict[str, Any]],
    events: list[dict[str, Any]],
    config: MemoryConfig,
    *,
    clusters: list[Any] | None = None,
    meta_observations: list[Any] | None = None,
    stale_keys: set[str] | None = None,
    stale_threshold_days: int = 90,
    now: datetime | None = None,
) -> CompressedContext
Parametro Tipo Descricao
facts list[dict] Dicts de fatos pontuados (devem ter chaves score, fact, entity, attribute, value, date).
events list[dict] Dicts de evento com chaves date e text.
config MemoryConfig Configuracao da memoria com budget de tokens e ratios de camada.
clusters list | None Objetos de cluster opcionais.
meta_observations list | None Objetos de meta-observacao opcionais.
stale_keys set[str] | None Chaves de atributo consideradas sempre obsoletas.
stale_threshold_days int Dias apos os quais um fato e considerado obsoleto (padrao 90).
now datetime | None Timestamp atual (padrao UTC now).

Retorna: CompressedContext com texto de contexto em camadas.

compress_broad_context

Constroi contexto para queries abrangentes usando clusters como unidade primaria.

async def compress_broad_context(
    cluster_facts: dict[str, list[dict[str, Any]]],
    clusters: list[Any],
    config: MemoryConfig,
    *,
    meta_observations: list[Any] | None = None,
    events: list[dict[str, Any]] | None = None,
) -> CompressedContext
Parametro Tipo Descricao
cluster_facts dict[str, list[dict]] Mapeamento de cluster_label para dicts de fatos.
clusters list[Any] Objetos de cluster com label, summary_text, fact_count.
config MemoryConfig Configuracao da memoria.
meta_observations list | None Objetos de meta-observacao opcionais.
events list[dict] | None Dicts de evento opcionais.

Retorna: CompressedContext com texto de contexto cluster-first.


Tendencias Emocionais

Materializa tendencias emocionais a partir de eventos de memoria e fornece sumarios formatados para injecao no contexto de retrieval.

EmotionalTrendsResult

Campo Tipo Descricao
emotion_counts dict[str, int] Mapeamento de emocao para contagem de ocorrencias.
trend_direction str "increasing", "decreasing" ou "stable".
dominant_emotion str | None Emocao mais frequente, ou None.
trigger_keywords list[str] Top keywords de eventos de alta intensidade.
avg_intensity float Intensidade emocional media entre eventos.
dominant_intensity float Intensidade media da emocao dominante.
dominant_energy str Nivel de energia predominante (high/medium/low).
events_analyzed int Numero de eventos analisados.
observation_created bool Se uma meta-observacao foi criada/atualizada.
observation_id str | None ID da observacao criada/atualizada.

Agrega dados de emocao de eventos, detecta tendencias e materializa como meta-observacao.

async def materialize_emotional_trends(
    session: AsyncSession,
    agent_id: str,
    config: MemoryConfig,
) -> EmotionalTrendsResult
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
config MemoryConfig Configuracao da memoria com janela de tendencia e minimo de eventos.

Retorna: EmotionalTrendsResult com dados de tendencia agregados.

get_emotional_summary_for_context

Retorna sumario emocional formatado para injecao no contexto de retrieval. Retorna None se nao existir tendencia emocional ativa recente (7 dias).

async def get_emotional_summary_for_context(
    session: AsyncSession,
    agent_id: str,
) -> str | None

Retorna: String de sumario formatada, ou None.


Importancia Dinamica

compute_dynamic_importance

Calcula score de importancia dinamica para um fato de memoria. Inspirado em modelos cognitivos de forca de memoria.

Componentes:

  • retrieval_boost: log(1 + times_retrieved) -- satura gradualmente
  • recency_of_use_boost: decai a partir de last_retrieved_at (meia-vida de 7 dias)
  • correction_penalty: 0.8^n para cada correcao do usuario
  • pattern_boost: 1.3x se o fato faz parte de uma meta-observacao ativa
def compute_dynamic_importance(
    base_importance: float,
    times_retrieved: int,
    last_retrieved_at: datetime | None,
    user_correction_count: int,
    is_in_active_pattern: bool,
    now: datetime | None = None,
) -> float
Parametro Tipo Descricao
base_importance float Score de importancia base (tipicamente 0.5).
times_retrieved int Numero de vezes que este fato foi recuperado.
last_retrieved_at datetime | None Quando o fato foi recuperado pela ultima vez.
user_correction_count int Numero de correcoes do usuario neste fato.
is_in_active_pattern bool Se o fato faz parte de uma meta-observacao ativa.
now datetime | None Timestamp atual (padrao UTC now).

Retorna: Score de importancia dinamica, limitado a [0.05, 3.0].


Memoria Procedimental

Sistema de diretivas comportamentais otimizado para LLM que comprime persona + preferencias comportamentais aprendidas em blocos de instrucao coesos.

DirectiveBlock

Campo Tipo Descricao
text str Bloco de instrucoes comportamentais coeso.
directive_count int Numero de diretivas ativas usadas.
cache_hit bool Se foi servido do cache.

ContradictionResult

Campo Tipo Descricao
has_contradiction bool Se uma contradicao foi encontrada.
conflicting_directive str | None Titulo da diretiva conflitante.
resolution str | None Explicacao de como a contradicao foi resolvida.

generate_optimized_directives

Gera um bloco de instrucoes comportamentais otimizado por LLM integrando persona + diretivas aprendidas.

async def generate_optimized_directives(
    session: AsyncSession,
    agent_id: str,
    llm_provider: LLMProvider,
    config: MemoryConfig,
    *,
    persona_text: str = "",
) -> DirectiveBlock
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
llm_provider LLMProvider Provider de LLM injetado.
config MemoryConfig Configuracao da memoria.
persona_text str Descricao de persona opcional.

Retorna: DirectiveBlock com texto gerado. Resultado e cacheado por hash dos IDs de diretivas + contagens de reforco. Fail-safe: retorna DirectiveBlock vazio em caso de erro.

check_directive_contradiction

Verifica uma nova diretiva contra existentes para contradicoes. Usa similaridade de embedding como pre-filtro, depois LLM como juiz.

async def check_directive_contradiction(
    session: AsyncSession,
    agent_id: str,
    new_directive: str,
    embedding_provider: EmbeddingProvider,
    llm_provider: LLMProvider,
    *,
    similarity_threshold: float = 0.80,
) -> ContradictionResult
Parametro Tipo Descricao
session AsyncSession Sessao do banco de dados.
agent_id str Identificador do agente.
new_directive str Texto da nova diretiva para verificar.
embedding_provider EmbeddingProvider Provider de embedding injetado.
llm_provider LLMProvider Provider de LLM injetado.
similarity_threshold float Similaridade minima para acionar verificacao LLM (padrao 0.80).

Retorna: ContradictionResult com resultado da verificacao. Fail-safe: retorna sem contradicao em caso de erro.

effective_confidence

Aplica decaimento temporal na confianca de diretivas. Formula: base_confidence * 0.95^semanas.

def effective_confidence(
    base_confidence: float,
    created_at: datetime,
    now: datetime | None = None,
) -> float
Parametro Tipo Descricao
base_confidence float Valor de confianca original (0.0-1.0).
created_at datetime Quando a diretiva foi criada.
now datetime | None Timestamp atual (padrao UTC now).

Retorna: Confianca com decaimento, piso em 0.10.

invalidate_directive_cache

Invalida manualmente o cache de diretivas de um usuario.

def invalidate_directive_cache(agent_id: str) -> None