conductor-checkpoint-start

This commit is contained in:
Daniel Chalef 2025-11-01 18:26:56 -07:00
parent 20816c3c19
commit 6984f7a55f
5 changed files with 226 additions and 5 deletions

5
.cursor/worktrees.json Normal file
View file

@ -0,0 +1,5 @@
{
"setup-worktree": [
"make install"
]
}

126
SUMMARY_PIPELINE_STATE.md Normal file
View file

@ -0,0 +1,126 @@
# Graphiti Node Summary Pipelines
This document outlines how entity summaries are generated today and the pragmatic changes proposed to gate expensive LLM calls using novelty and recency heuristics.
## Current Execution Flow
```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | resolve_extracted_edges |
| (deterministic + LLM) | | (LLM dedupe, invalidates)|
+--------------------------+ +---------------------------+
| |
+----------- parallel ----+
|
v
+-----------------------------------------+
| extract_attributes_from_nodes |
| - LLM attribute fill (optional) |
| - LLM summary refresh (always runs) |
+-----------------------------------------+
```
### Current Data & Timing Characteristics
- **Previous context**: only the latest `EPISODE_WINDOW_LEN = 3` episodes are retrieved before any LLM call.
- **Fact availability**: raw `EntityEdge.fact` strings exist immediately after edge extraction; embeddings are produced inside `resolve_extracted_edges`, concurrently with summary generation.
- **Summary inputs**: the summary prompt only sees `node.summary`, node attributes, episode content, and the three-episode window. It does *not* observe resolved fact invalidations or final edge embeddings.
- **LLM usage**: every node that survives dedupe invokes the summary prompt, even when the episode is low-information or repeat content.
## Proposed Execution Flow
```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | build_node_deltas |
| (deterministic + LLM) | | - new facts per node |
+--------------------------+ | - embed facts upfront |
| | - track candidate flips |
| +-------------+-------------+
| |
| v
| +---------------------------+
| | resolve_extracted_edges |
| | (uses prebuilt embeddings|
| | updates NodeDelta state)|
| +-------------+-------------+
| |
| v
| +---------------------------+
| | summary_gate.should_refresh|
| | - fact hash drift |
| | - embedding drift |
| | - negation / invalidation |
| | - burst & staleness rules |
| +------+------+------------+
| | |
| | +------------------------------+
| | |
| v v
| +---------------------------+ +-----------------------------+
| | skip summary (log cause) | | extract_summary LLM call |
| | update metadata only | | update summary & metadata |
| +---------------------------+ +-----------------------------+
v
+-------------------------------+
| add_nodes_and_edges_bulk |
+-------------------------------+
```
### Key Proposed Changes
1. **NodeDelta staging**
- Generate fact embeddings immediately after edge extraction (`create_entity_edge_embeddings`) and group fact deltas by target node.
- Record potential invalidations and episode timestamps within the delta so the summary gate has full context.
2. **Deterministic novelty checks**
- Maintain a stable hash of active facts (new facts minus invalidated). Skip summarisation when the hash matches the stored value.
- Compare pooled fact embeddings to the persisted summary embedding; trigger refresh only when cosine drift exceeds a tuned threshold.
- Force refresh whenever the delta indicates polarity changes (contradiction/negation cues).
3. **Recency & burst handling**
- Track recent episode timestamps per node in metadata. If multiple episodes arrive within a short window, accumulate deltas and defer the summary until the burst ends or a hard cap is reached.
- Enforce a staleness SLA (`last_summary_ts`) so long-lived nodes eventually refresh even if novelty remains low.
4. **Metadata persistence**
- Persist gate state on each entity (`_graphiti_meta`: `fact_hash`, `summary_embedding`, `last_summary_ts`, `_recent_episode_times`, `_burst_active`).
- Update metadata whether the summary runs or is skipped to keep the gating logic deterministic across ingests.
5. **Observability**
- Emit counters for `summary_skipped`, `summary_refreshed`, and reasons (unchanged hash, low drift, burst deferral, staleness). Sample a small percentage of skipped cases to validate heuristics.
## Implementation Snapshot
| Area | Current | Proposed |
| ---- | ------- | -------- |
| Summary trigger | Always per deduped node | Controlled by `summary_gate` using fact hash, embedding drift, negation, burst, staleness |
| Fact embeddings | Produced during edge resolution (parallel) | Produced immediately after extraction and reused downstream |
| Fact availability in summaries | Not available | Encapsulated in `NodeDelta` passed into summary gate |
| Metadata on node | Summary text + organic attributes | Summary text + `_graphiti_meta` (hash, embedding, timestamps, burst state) |
| Recency handling | None | Deque of recent episode timestamps + burst deferral |
| Negation detection | LLM-only inside edge resolution | Propagated into gate to force summary refresh |
These adjustments retain the existing inline execution model—no scheduled jobs—while reducing unnecessary LLM calls and improving determinism by grounding summaries in the same fact set that backs the graph.

View file

@ -18,6 +18,7 @@ import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Any
import inspect
if TYPE_CHECKING:
from falkordb import Graph as FalkorGraph
@ -101,12 +102,11 @@ class FalkorDriverSession(GraphDriverSession):
if isinstance(query, list):
for cypher, params in query:
params = convert_datetimes_to_strings(params)
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
await _await_graph_query(self.graph, str(cypher), params) # type: ignore[reportUnknownArgumentType]
else:
params = dict(kwargs)
params = convert_datetimes_to_strings(params)
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
await _await_graph_query(self.graph, str(query), params) # type: ignore[reportUnknownArgumentType]
return None
@ -122,6 +122,9 @@ class FalkorDriver(GraphDriver):
password: str | None = None,
falkor_db: FalkorDB | None = None,
database: str = 'default_db',
*,
lite: bool = False,
lite_db_path: str | None = None,
):
"""
Initialize the FalkorDB driver.
@ -137,7 +140,23 @@ class FalkorDriver(GraphDriver):
# If a FalkorDB instance is provided, use it directly
self.client = falkor_db
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
if lite:
# Lazy import to avoid mandatory dependency when not using Lite
try:
from redislite.falkordb_client import FalkorDB as LiteFalkorDB # type: ignore
except Exception as e: # broad to surface helpful message
raise ImportError(
'falkordblite is required for FalkorDB Lite. Install it with: '\
'pip install falkordblite'
) from e
db_path = lite_db_path or '/tmp/falkordb.db'
lite_client = LiteFalkorDB(db_path)
self.client = _AsyncLiteClientAdapter(lite_client)
self._is_lite = True
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
self._is_lite = False
self.fulltext_syntax = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries see https://redis.io/docs/latest/develop/ai/search-and-query/query/full-text/
@ -154,7 +173,7 @@ class FalkorDriver(GraphDriver):
params = convert_datetimes_to_strings(dict(kwargs))
try:
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
result = await _await_graph_query(graph, cypher_query_, params) # type: ignore[reportUnknownArgumentType]
except Exception as e:
if 'already indexed' in str(e):
# check if index already exists
@ -191,6 +210,9 @@ class FalkorDriver(GraphDriver):
await self.client.connection.aclose()
elif hasattr(self.client.connection, 'close'):
await self.client.connection.close()
else:
# Lite adapter exposes no-op aclose; nothing to do otherwise
pass
async def delete_all_indexes(self) -> None:
result = await self.execute_query('CALL db.indexes()')
@ -329,3 +351,45 @@ class FalkorDriver(GraphDriver):
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
# -----------------
# Internal helpers
# -----------------
async def _await_graph_query(graph: Any, cypher: str, params: dict[str, Any] | None):
"""
Await a graph.query call whether it's native-async or sync (Lite).
"""
query_callable = getattr(graph, 'query')
result = query_callable(cypher, params)
if inspect.isawaitable(result):
return await result
# Sync path: run in a thread to avoid blocking the event loop
return await asyncio.to_thread(query_callable, cypher, params)
class _AsyncLiteGraphAdapter:
def __init__(self, sync_graph: Any):
self._sync_graph = sync_graph
async def query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.query, cypher, params)
async def ro_query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.ro_query, cypher, params)
async def delete(self):
return await asyncio.to_thread(self._sync_graph.delete)
class _AsyncLiteClientAdapter:
def __init__(self, sync_db: Any):
self._sync_db = sync_db
def select_graph(self, name: str):
return _AsyncLiteGraphAdapter(self._sync_db.select_graph(name))
async def aclose(self):
# redislite does not expose explicit close; rely on GC. No-op.
return None

View file

@ -31,6 +31,7 @@ groq = ["groq>=0.2.0"]
google-genai = ["google-genai>=1.8.0"]
kuzu = ["kuzu>=0.11.3"]
falkordb = ["falkordb>=1.1.2,<2.0.0"]
falkordb-lite = ["falkordblite>=0.1.0"]
voyageai = ["voyageai>=0.2.3"]
neo4j-opensearch = ["boto3>=1.39.16", "opensearch-py>=3.0.0"]
sentence-transformers = ["sentence-transformers>=3.2.1"]
@ -42,6 +43,7 @@ dev = [
"anthropic>=0.49.0",
"google-genai>=1.8.0",
"falkordb>=1.1.2,<2.0.0",
"falkordblite>=0.1.0",
"kuzu>=0.11.3",
"boto3>=1.39.16",
"opensearch-py>=3.0.0",

View file

@ -70,6 +70,30 @@ class TestFalkorDriver:
"""Test driver provider identification."""
assert self.driver.provider == GraphProvider.FALKORDB
def test_init_with_lite_uses_redislite_adapter(self):
"""Test initialization with lite=True uses redislite client adapter.
We don't require falkordb for this test as we patch the redislite import path.
"""
import sys
import types
# Create a fake redislite.falkordb_client module with FalkorDB class
fake_redislite = types.ModuleType('redislite')
fake_client_mod = types.ModuleType('redislite.falkordb_client')
fake_client_mod.FalkorDB = MagicMock()
with patch.dict(sys.modules, {
'redislite': fake_redislite,
'redislite.falkordb_client': fake_client_mod,
}):
from graphiti_core.driver.falkordb_driver import FalkorDriver as _FD
try:
_FD(lite=True, database='default_db')
except Exception as e:
pytest.fail(f'Lite initialization raised unexpectedly: {e}')
@unittest.skipIf(not HAS_FALKORDB, 'FalkorDB is not installed')
def test_get_graph_with_name(self):
"""Test _get_graph with specific graph name."""