API do Read Pipeline¶
API Avancada
Estas sao APIs avancadas para usuarios que desejam interagir diretamente com estagios individuais de retrieval. A maioria dos usuarios deve usar MemoryClient.retrieve(), que orquestra o pipeline multi-signal completo automaticamente.
Todas as funcoes do read pipeline sao exportadas de arandu.read.
from arandu.read import (
run_read_pipeline,
plan_retrieval, expand_query,
retrieve_relevant_events, compute_pattern_signal,
retrieve_graph_facts, spread_activation,
compress_context, compress_broad_context,
materialize_emotional_trends, get_emotional_summary_for_context,
compute_dynamic_importance,
generate_optimized_directives, check_directive_contradiction,
effective_confidence, invalidate_directive_cache,
)
Orquestrador do Pipeline¶
run_read_pipeline¶
Executa o pipeline completo de leitura: agent -> retrieve (multi-signal) -> rerank -> format.
O retrieval multi-signal executa semantic + keyword + graph em paralelo via asyncio.gather(). O retrieval agent planeja a estrategia de retrieval de forma deterministica (sem LLM).
async def run_read_pipeline(
session: AsyncSession,
agent_id: str,
query: str,
llm: LLMProvider,
embeddings: EmbeddingProvider,
config: MemoryConfig,
trace: PipelineTrace | None = None,
) -> ReadResult
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco (o caller gerencia transacao). |
agent_id |
str |
Identificador do agente. |
query |
str |
A query para buscar na memoria. |
llm |
LLMProvider |
Provider de LLM injetado. |
embeddings |
EmbeddingProvider |
Provider de embeddings injetado. |
config |
MemoryConfig |
Configuracao da memoria. |
Retorna: ReadResult com facts (lista de ScoredFact), context (string pronta para prompt), total_candidates e duration_ms.
Retrieval Agent¶
O retrieval agent e um planejador deterministico (sem LLM) que analisa a query do usuario e decide a estrategia de retrieval antes de qualquer busca acontecer.
PatternQuery¶
Uma query baseada em padrao para matching de sinal de keyword.
| Campo | Tipo | Descricao |
|---|---|---|
entity_pattern |
str |
Padrao SQL LIKE para matching de entity_key. |
attribute_filter |
str | None |
Filtro opcional de chave de atributo (sempre None no V5). |
RetrievalPlan¶
Saida do retrieval agent. V5 executa todos os sinais (semantic, graph, keyword) em paralelo.
| Campo | Tipo | Padrao | Descricao |
|---|---|---|---|
strategy |
str |
"multi_signal" |
"multi_signal" (padrao) ou "skip". |
entities |
list[str] |
[] |
entity_keys detectadas para sinal de grafo. |
pattern_queries |
list[PatternQuery] |
[] |
Pattern queries para sinal de keyword. |
similarity_query |
str | None |
None |
Sempre a query original (sem reformulacao). |
max_facts |
int |
50 |
Budget por sinal. |
reason |
str |
"" |
Motivo da escolha do plano. |
latency_ms |
float |
0.0 |
Tempo gasto no planejamento. |
as_of_range |
tuple[datetime, datetime] | None |
None |
Janela temporal opcional (time-travel). |
broad_query |
bool |
False |
True para queries abrangentes. |
plan_retrieval¶
Planejador deterministico que decide a estrategia de retrieval com base em heuristicas (sem chamada LLM). A resolucao de entidades na query e feita por resolve_query_entities em etapa separada. Resolucao de anafora e responsabilidade do caller.
| Parametro | Tipo | Descricao |
|---|---|---|
query_text |
str |
A query do usuario. |
Retorna: RetrievalPlan com estrategia e parametros de query.
Expansao de Query¶
Pos-processa um RetrievalPlan com entity priming -- resolve entidades mencionadas na query via knowledge graph (aliases + relacionamentos) e injeta termos de contexto.
ExpandedQuery¶
| Campo | Tipo | Descricao |
|---|---|---|
primed_entities |
list[str] |
Entity keys descobertas via alias + KG priming. |
temporal_range |
tuple[datetime, datetime] | None |
Faixa de datas resolvida. |
expanded_terms |
list[str] |
Termos de contexto adicionais dos fatos das entidades. |
expand_query¶
Expande um plano de retrieval com entity priming. Fail-safe: qualquer excecao retorna um ExpandedQuery vazio.
async def expand_query(
session: AsyncSession,
agent_id: str,
query: str,
plan: RetrievalPlan,
llm: object,
) -> ExpandedQuery
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
query |
str |
Texto original da query do usuario. |
plan |
RetrievalPlan |
RetrievalPlan do retrieval agent. |
llm |
object |
Provider de LLM (reservado para uso futuro). |
Retorna: ExpandedQuery com entidades primadas, faixa temporal e termos expandidos.
Retrieval de Fatos¶
retrieve_relevant_events¶
Recupera eventos relevantes por similaridade de embedding + scoring de recencia.
async def retrieve_relevant_events(
session: AsyncSession,
agent_id: str,
query_embedding: list[float],
config: MemoryConfig,
limit: int | None = None,
) -> list[dict[str, Any]]
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
query_embedding |
list[float] |
Vetor de embedding da query. |
config |
MemoryConfig |
Configuracao da memoria. |
limit |
int | None |
Maximo de eventos a retornar. |
Retorna: Lista de dicts de evento com date, text, score, event_id.
compute_pattern_signal¶
Impulsiona fatos confirmados recentemente (sinal de padrão). Fatos com last_confirmed_at recente (confirmados via decisões NOOP no write) recebem um boost aditivo pequeno (até 0.1).
| Parametro | Tipo | Descricao |
|---|---|---|
candidates |
list[RetrievalCandidate] |
Candidatos ranqueados atuais. |
Retorna: Candidatos com scores atualizados, ordenados por final_score.
Retrieval de Grafo¶
Travessia BFS de 2 saltos no knowledge graph MemoryEntityRelationship com poda de relevancia.
GraphRetrievalResult¶
| Campo | Tipo | Descricao |
|---|---|---|
facts |
list[dict[str, Any]] |
Dicts de fatos pontuados com source="graph". |
neighbor_keys |
list[str] |
Entity keys descobertas via BFS. |
edges_traversed |
int |
Total de arestas examinadas durante BFS. |
edges |
list[dict[str, Any]] |
Dicts de arestas deduplicadas com nomes de exibicao. |
retrieve_graph_facts¶
Retrieval BFS de 2 saltos com scoring composto: edge_strength * recency * edge_recency * query_bonus.
async def retrieve_graph_facts(
session: AsyncSession,
agent_id: str,
entity_keys: list[str],
*,
min_confidence: float = 0.3,
as_of_start: datetime | None = None,
as_of_end: datetime | None = None,
broad_query: bool = False,
max_facts: int | None = None,
query_text: str = "",
min_edge_strength: float = 0.5,
) -> GraphRetrievalResult
| Parametro | Tipo | Padrao | Descricao |
|---|---|---|---|
session |
AsyncSession |
-- | Sessao do banco de dados. |
agent_id |
str |
-- | Identificador do agente. |
entity_keys |
list[str] |
-- | entity_keys semente para iniciar BFS. |
min_confidence |
float |
0.3 |
Threshold minimo de confianca do fato. |
as_of_start |
datetime | None |
None |
Inicio da janela temporal. |
as_of_end |
datetime | None |
None |
Fim da janela temporal. |
broad_query |
bool |
False |
Quando True, permite budget expandido. |
max_facts |
int | None |
None |
Override do limite padrao (30). |
query_text |
str |
"" |
Texto original da query para scoring de query_bonus. |
min_edge_strength |
float |
0.5 |
Forca minima de aresta para poda no salto 2+. |
Retorna: GraphRetrievalResult com fatos pontuados e metadados do grafo.
Spreading Activation¶
Expande contexto a partir de fatos semente seguindo links de entity_key, cluster_id e relacionamentos do knowledge graph. Usa scoring de importancia dinamica com decaimento por salto.
SpreadingActivationResult¶
| Campo | Tipo | Descricao |
|---|---|---|
candidates |
list[RetrievalCandidate] |
Candidatos expandidos dos saltos 1-2. |
meta_observations |
list[Any] |
Meta-observacoes relevantes referenciando fatos semente. |
entities_explored |
list[str] |
Entity keys exploradas durante spreading. |
clusters_explored |
list[str] |
Cluster IDs explorados durante spreading. |
hop1_count |
int |
Numero de fatos encontrados no salto 1. |
hop2_count |
int |
Numero de fatos encontrados no salto 2. |
kg_relationships_explored |
int |
Numero de relacionamentos KG percorridos. |
spread_activation¶
Expande contexto a partir de fatos semente via entity_key, cluster_id e relacionamentos KG (saltos 1-2).
async def spread_activation(
session: AsyncSession,
agent_id: str,
seed_fact_ids: list[str],
config: MemoryConfig,
*,
seed_scores: dict[str, float] | None = None,
allowed_keys: set[str] | None = None,
) -> list[RetrievalCandidate]
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
seed_fact_ids |
list[str] |
IDs dos fatos semente para expandir. |
config |
MemoryConfig |
Configuracao da memoria com parametros de spreading activation. |
seed_scores |
dict[str, float] | None |
Dict opcional mapeando ID do fato semente para score. |
allowed_keys |
set[str] | None |
Conjunto opcional de chaves de atributo permitidas. |
Retorna: Lista de objetos RetrievalCandidate do spreading activation. Fail-safe: retorna lista vazia em caso de erro.
Compressao de Contexto¶
Constroi uma string de contexto pronta para prompt a partir de fatos pontuados, eventos, clusters e meta-observacoes usando um sistema em camadas: Hot (Tier 1), Warm (Tier 2), Cold (Tier 3).
CompressedContext¶
| Campo | Tipo | Descricao |
|---|---|---|
context_text |
str |
String de contexto final pronta para prompt. |
hot_count |
int |
Numero de fatos na camada hot (Tier 1). |
warm_count |
int |
Numero de fatos na camada warm (Tier 2). |
cold_count |
int |
Numero de itens na camada cold (Tier 3). |
total_tokens |
int |
Contagem estimada de tokens do context_text. |
compress_context¶
Constroi texto de contexto em camadas dentro do budget de tokens.
async def compress_context(
facts: list[dict[str, Any]],
events: list[dict[str, Any]],
config: MemoryConfig,
*,
clusters: list[Any] | None = None,
meta_observations: list[Any] | None = None,
stale_keys: set[str] | None = None,
stale_threshold_days: int = 90,
now: datetime | None = None,
) -> CompressedContext
| Parametro | Tipo | Descricao |
|---|---|---|
facts |
list[dict] |
Dicts de fatos pontuados (devem ter chaves score, fact, entity, attribute, value, date). |
events |
list[dict] |
Dicts de evento com chaves date e text. |
config |
MemoryConfig |
Configuracao da memoria com budget de tokens e ratios de camada. |
clusters |
list | None |
Objetos de cluster opcionais. |
meta_observations |
list | None |
Objetos de meta-observacao opcionais. |
stale_keys |
set[str] | None |
Chaves de atributo consideradas sempre obsoletas. |
stale_threshold_days |
int |
Dias apos os quais um fato e considerado obsoleto (padrao 90). |
now |
datetime | None |
Timestamp atual (padrao UTC now). |
Retorna: CompressedContext com texto de contexto em camadas.
compress_broad_context¶
Constroi contexto para queries abrangentes usando clusters como unidade primaria.
async def compress_broad_context(
cluster_facts: dict[str, list[dict[str, Any]]],
clusters: list[Any],
config: MemoryConfig,
*,
meta_observations: list[Any] | None = None,
events: list[dict[str, Any]] | None = None,
) -> CompressedContext
| Parametro | Tipo | Descricao |
|---|---|---|
cluster_facts |
dict[str, list[dict]] |
Mapeamento de cluster_label para dicts de fatos. |
clusters |
list[Any] |
Objetos de cluster com label, summary_text, fact_count. |
config |
MemoryConfig |
Configuracao da memoria. |
meta_observations |
list | None |
Objetos de meta-observacao opcionais. |
events |
list[dict] | None |
Dicts de evento opcionais. |
Retorna: CompressedContext com texto de contexto cluster-first.
Tendencias Emocionais¶
Materializa tendencias emocionais a partir de eventos de memoria e fornece sumarios formatados para injecao no contexto de retrieval.
EmotionalTrendsResult¶
| Campo | Tipo | Descricao |
|---|---|---|
emotion_counts |
dict[str, int] |
Mapeamento de emocao para contagem de ocorrencias. |
trend_direction |
str |
"increasing", "decreasing" ou "stable". |
dominant_emotion |
str | None |
Emocao mais frequente, ou None. |
trigger_keywords |
list[str] |
Top keywords de eventos de alta intensidade. |
avg_intensity |
float |
Intensidade emocional media entre eventos. |
dominant_intensity |
float |
Intensidade media da emocao dominante. |
dominant_energy |
str |
Nivel de energia predominante (high/medium/low). |
events_analyzed |
int |
Numero de eventos analisados. |
observation_created |
bool |
Se uma meta-observacao foi criada/atualizada. |
observation_id |
str | None |
ID da observacao criada/atualizada. |
materialize_emotional_trends¶
Agrega dados de emocao de eventos, detecta tendencias e materializa como meta-observacao.
async def materialize_emotional_trends(
session: AsyncSession,
agent_id: str,
config: MemoryConfig,
) -> EmotionalTrendsResult
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
config |
MemoryConfig |
Configuracao da memoria com janela de tendencia e minimo de eventos. |
Retorna: EmotionalTrendsResult com dados de tendencia agregados.
get_emotional_summary_for_context¶
Retorna sumario emocional formatado para injecao no contexto de retrieval. Retorna None se nao existir tendencia emocional ativa recente (7 dias).
Retorna: String de sumario formatada, ou None.
Importancia Dinamica¶
compute_dynamic_importance¶
Calcula score de importancia dinamica para um fato de memoria. Inspirado em modelos cognitivos de forca de memoria.
Componentes:
- retrieval_boost:
log(1 + times_retrieved)-- satura gradualmente - recency_of_use_boost: decai a partir de
last_retrieved_at(meia-vida de 7 dias) - correction_penalty:
0.8^npara cada correcao do usuario - pattern_boost: 1.3x se o fato faz parte de uma meta-observacao ativa
def compute_dynamic_importance(
base_importance: float,
times_retrieved: int,
last_retrieved_at: datetime | None,
user_correction_count: int,
is_in_active_pattern: bool,
now: datetime | None = None,
) -> float
| Parametro | Tipo | Descricao |
|---|---|---|
base_importance |
float |
Score de importancia base (tipicamente 0.5). |
times_retrieved |
int |
Numero de vezes que este fato foi recuperado. |
last_retrieved_at |
datetime | None |
Quando o fato foi recuperado pela ultima vez. |
user_correction_count |
int |
Numero de correcoes do usuario neste fato. |
is_in_active_pattern |
bool |
Se o fato faz parte de uma meta-observacao ativa. |
now |
datetime | None |
Timestamp atual (padrao UTC now). |
Retorna: Score de importancia dinamica, limitado a [0.05, 3.0].
Memoria Procedimental¶
Sistema de diretivas comportamentais otimizado para LLM que comprime persona + preferencias comportamentais aprendidas em blocos de instrucao coesos.
DirectiveBlock¶
| Campo | Tipo | Descricao |
|---|---|---|
text |
str |
Bloco de instrucoes comportamentais coeso. |
directive_count |
int |
Numero de diretivas ativas usadas. |
cache_hit |
bool |
Se foi servido do cache. |
ContradictionResult¶
| Campo | Tipo | Descricao |
|---|---|---|
has_contradiction |
bool |
Se uma contradicao foi encontrada. |
conflicting_directive |
str | None |
Titulo da diretiva conflitante. |
resolution |
str | None |
Explicacao de como a contradicao foi resolvida. |
generate_optimized_directives¶
Gera um bloco de instrucoes comportamentais otimizado por LLM integrando persona + diretivas aprendidas.
async def generate_optimized_directives(
session: AsyncSession,
agent_id: str,
llm_provider: LLMProvider,
config: MemoryConfig,
*,
persona_text: str = "",
) -> DirectiveBlock
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
llm_provider |
LLMProvider |
Provider de LLM injetado. |
config |
MemoryConfig |
Configuracao da memoria. |
persona_text |
str |
Descricao de persona opcional. |
Retorna: DirectiveBlock com texto gerado. Resultado e cacheado por hash dos IDs de diretivas + contagens de reforco. Fail-safe: retorna DirectiveBlock vazio em caso de erro.
check_directive_contradiction¶
Verifica uma nova diretiva contra existentes para contradicoes. Usa similaridade de embedding como pre-filtro, depois LLM como juiz.
async def check_directive_contradiction(
session: AsyncSession,
agent_id: str,
new_directive: str,
embedding_provider: EmbeddingProvider,
llm_provider: LLMProvider,
*,
similarity_threshold: float = 0.80,
) -> ContradictionResult
| Parametro | Tipo | Descricao |
|---|---|---|
session |
AsyncSession |
Sessao do banco de dados. |
agent_id |
str |
Identificador do agente. |
new_directive |
str |
Texto da nova diretiva para verificar. |
embedding_provider |
EmbeddingProvider |
Provider de embedding injetado. |
llm_provider |
LLMProvider |
Provider de LLM injetado. |
similarity_threshold |
float |
Similaridade minima para acionar verificacao LLM (padrao 0.80). |
Retorna: ContradictionResult com resultado da verificacao. Fail-safe: retorna sem contradicao em caso de erro.
effective_confidence¶
Aplica decaimento temporal na confianca de diretivas. Formula: base_confidence * 0.95^semanas.
def effective_confidence(
base_confidence: float,
created_at: datetime,
now: datetime | None = None,
) -> float
| Parametro | Tipo | Descricao |
|---|---|---|
base_confidence |
float |
Valor de confianca original (0.0-1.0). |
created_at |
datetime |
Quando a diretiva foi criada. |
now |
datetime | None |
Timestamp atual (padrao UTC now). |
Retorna: Confianca com decaimento, piso em 0.10.
invalidate_directive_cache¶
Invalida manualmente o cache de diretivas de um usuario.