This commit is contained in:
Daniel Chalef 2025-12-09 16:52:51 +08:00 committed by GitHub
commit 0a586b588d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 342 additions and 5 deletions

5
.cursor/worktrees.json Normal file
View file

@ -0,0 +1,5 @@
{
"setup-worktree": [
"make install"
]
}

126
SUMMARY_PIPELINE_STATE.md Normal file
View file

@ -0,0 +1,126 @@
# Graphiti Node Summary Pipelines
This document outlines how entity summaries are generated today and the pragmatic changes proposed to gate expensive LLM calls using novelty and recency heuristics.
## Current Execution Flow
```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | resolve_extracted_edges |
| (deterministic + LLM) | | (LLM dedupe, invalidates)|
+--------------------------+ +---------------------------+
| |
+----------- parallel ----+
|
v
+-----------------------------------------+
| extract_attributes_from_nodes |
| - LLM attribute fill (optional) |
| - LLM summary refresh (always runs) |
+-----------------------------------------+
```
### Current Data & Timing Characteristics
- **Previous context**: only the latest `EPISODE_WINDOW_LEN = 3` episodes are retrieved before any LLM call.
- **Fact availability**: raw `EntityEdge.fact` strings exist immediately after edge extraction; embeddings are produced inside `resolve_extracted_edges`, concurrently with summary generation.
- **Summary inputs**: the summary prompt only sees `node.summary`, node attributes, episode content, and the three-episode window. It does *not* observe resolved fact invalidations or final edge embeddings.
- **LLM usage**: every node that survives dedupe invokes the summary prompt, even when the episode is low-information or repeat content.
## Proposed Execution Flow
```
+------------------+ +-------------------------------+
| Episode Ingest | ----> | retrieve_episodes(last=3) |
+------------------+ +-------------------------------+
| |
v v
+----------------------+ +---------------------------+
| extract_nodes | | extract_edges |
| (LLM + reflexion) | | (LLM fact extraction) |
+----------------------+ +---------------------------+
| |
| dedupe candidates |
v v
+--------------------------+ +---------------------------+
| resolve_extracted_nodes | | build_node_deltas |
| (deterministic + LLM) | | - new facts per node |
+--------------------------+ | - embed facts upfront |
| | - track candidate flips |
| +-------------+-------------+
| |
| v
| +---------------------------+
| | resolve_extracted_edges |
| | (uses prebuilt embeddings|
| | updates NodeDelta state)|
| +-------------+-------------+
| |
| v
| +---------------------------+
| | summary_gate.should_refresh|
| | - fact hash drift |
| | - embedding drift |
| | - negation / invalidation |
| | - burst & staleness rules |
| +------+------+------------+
| | |
| | +------------------------------+
| | |
| v v
| +---------------------------+ +-----------------------------+
| | skip summary (log cause) | | extract_summary LLM call |
| | update metadata only | | update summary & metadata |
| +---------------------------+ +-----------------------------+
v
+-------------------------------+
| add_nodes_and_edges_bulk |
+-------------------------------+
```
### Key Proposed Changes
1. **NodeDelta staging**
- Generate fact embeddings immediately after edge extraction (`create_entity_edge_embeddings`) and group fact deltas by target node.
- Record potential invalidations and episode timestamps within the delta so the summary gate has full context.
2. **Deterministic novelty checks**
- Maintain a stable hash of active facts (new facts minus invalidated). Skip summarisation when the hash matches the stored value.
- Compare pooled fact embeddings to the persisted summary embedding; trigger refresh only when cosine drift exceeds a tuned threshold.
- Force refresh whenever the delta indicates polarity changes (contradiction/negation cues).
3. **Recency & burst handling**
- Track recent episode timestamps per node in metadata. If multiple episodes arrive within a short window, accumulate deltas and defer the summary until the burst ends or a hard cap is reached.
- Enforce a staleness SLA (`last_summary_ts`) so long-lived nodes eventually refresh even if novelty remains low.
4. **Metadata persistence**
- Persist gate state on each entity (`_graphiti_meta`: `fact_hash`, `summary_embedding`, `last_summary_ts`, `_recent_episode_times`, `_burst_active`).
- Update metadata whether the summary runs or is skipped to keep the gating logic deterministic across ingests.
5. **Observability**
- Emit counters for `summary_skipped`, `summary_refreshed`, and reasons (unchanged hash, low drift, burst deferral, staleness). Sample a small percentage of skipped cases to validate heuristics.
## Implementation Snapshot
| Area | Current | Proposed |
| ---- | ------- | -------- |
| Summary trigger | Always per deduped node | Controlled by `summary_gate` using fact hash, embedding drift, negation, burst, staleness |
| Fact embeddings | Produced during edge resolution (parallel) | Produced immediately after extraction and reused downstream |
| Fact availability in summaries | Not available | Encapsulated in `NodeDelta` passed into summary gate |
| Metadata on node | Summary text + organic attributes | Summary text + `_graphiti_meta` (hash, embedding, timestamps, burst state) |
| Recency handling | None | Deque of recent episode timestamps + burst deferral |
| Negation detection | LLM-only inside edge resolution | Propagated into gate to force summary refresh |
These adjustments retain the existing inline execution model—no scheduled jobs—while reducing unnecessary LLM calls and improving determinism by grounding summaries in the same fact set that backs the graph.

109
conductor-setup.sh Executable file
View file

@ -0,0 +1,109 @@
#!/bin/bash
set -e
echo "🚀 Setting up Graphiti workspace..."
# Check if uv is installed
if ! command -v uv &> /dev/null; then
echo "❌ Error: 'uv' is not installed. Please install uv first:"
echo " curl -LsSf https://astral.sh/uv/install.sh | sh"
exit 1
fi
# Check Python version
python_version=$(python3 --version 2>&1 | awk '{print $2}')
required_version="3.10"
if ! python3 -c "import sys; exit(0 if sys.version_info >= (3, 10) else 1)"; then
echo "❌ Error: Python 3.10 or higher is required (found: $python_version)"
exit 1
fi
echo "✓ Prerequisites check passed"
# Copy necessary files from root repo
echo "📄 Copying project files from root repo..."
cp "$CONDUCTOR_ROOT_PATH/pyproject.toml" .
cp "$CONDUCTOR_ROOT_PATH/uv.lock" .
cp "$CONDUCTOR_ROOT_PATH/README.md" .
if [ -f "$CONDUCTOR_ROOT_PATH/pytest.ini" ]; then
cp "$CONDUCTOR_ROOT_PATH/pytest.ini" .
fi
if [ -f "$CONDUCTOR_ROOT_PATH/conftest.py" ]; then
cp "$CONDUCTOR_ROOT_PATH/conftest.py" .
fi
if [ -f "$CONDUCTOR_ROOT_PATH/py.typed" ]; then
cp "$CONDUCTOR_ROOT_PATH/py.typed" .
fi
# Create symlink to source code instead of copying
echo "🔗 Creating symlinks to source code..."
ln -sf "$CONDUCTOR_ROOT_PATH/graphiti_core" graphiti_core
ln -sf "$CONDUCTOR_ROOT_PATH/tests" tests
ln -sf "$CONDUCTOR_ROOT_PATH/examples" examples
# Install dependencies
echo "📦 Installing dependencies with uv..."
uv sync --frozen --extra dev
# Create workspace-specific Makefile
echo "📝 Creating workspace Makefile..."
cat > Makefile << 'EOF'
.PHONY: install format lint test all check
# Define variables - using virtualenv directly instead of uv run
PYTHON = .venv/bin/python
PYTEST = .venv/bin/pytest
RUFF = .venv/bin/ruff
PYRIGHT = .venv/bin/pyright
# Default target
all: format lint test
# Install dependencies
install:
@echo "Dependencies already installed via conductor-setup.sh"
@echo "Run './conductor-setup.sh' to reinstall"
# Format code
format:
$(RUFF) check --select I --fix
$(RUFF) format
# Lint code
lint:
$(RUFF) check
$(PYRIGHT) ./graphiti_core
# Run tests
test:
DISABLE_FALKORDB=1 DISABLE_KUZU=1 DISABLE_NEPTUNE=1 $(PYTEST) -m "not integration"
# Run format, lint, and test
check: format lint test
EOF
# Handle environment variables
if [ -f "$CONDUCTOR_ROOT_PATH/.env" ]; then
echo "🔗 Linking .env file from root repo..."
ln -sf "$CONDUCTOR_ROOT_PATH/.env" .env
echo "✓ Environment file linked"
else
echo "⚠️ No .env file found in root repo"
echo " Copy $CONDUCTOR_ROOT_PATH/.env.example to $CONDUCTOR_ROOT_PATH/.env"
echo " and add your API keys, then rerun setup"
exit 1
fi
# Check for required environment variable
if ! grep -q "OPENAI_API_KEY=.*[^[:space:]]" .env 2>/dev/null; then
echo "⚠️ Warning: OPENAI_API_KEY not set in .env file"
echo " This is required for most Graphiti functionality"
fi
echo "✅ Workspace setup complete!"
echo ""
echo "Available commands:"
echo " make test - Run unit tests"
echo " make lint - Lint and type check code"
echo " make format - Format code with ruff"
echo " make check - Run all checks (format, lint, test)"

7
conductor.json Normal file
View file

@ -0,0 +1,7 @@
{
"scripts": {
"setup": "./conductor-setup.sh",
"run": "make test"
},
"runScriptMode": "nonconcurrent"
}

View file

@ -18,6 +18,7 @@ import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Any
import inspect
if TYPE_CHECKING:
from falkordb import Graph as FalkorGraph
@ -102,12 +103,11 @@ class FalkorDriverSession(GraphDriverSession):
if isinstance(query, list):
for cypher, params in query:
params = convert_datetimes_to_strings(params)
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
await _await_graph_query(self.graph, str(cypher), params) # type: ignore[reportUnknownArgumentType]
else:
params = dict(kwargs)
params = convert_datetimes_to_strings(params)
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
await _await_graph_query(self.graph, str(query), params) # type: ignore[reportUnknownArgumentType]
return None
@ -125,6 +125,9 @@ class FalkorDriver(GraphDriver):
password: str | None = None,
falkor_db: FalkorDB | None = None,
database: str = 'default_db',
*,
lite: bool = False,
lite_db_path: str | None = None,
):
"""
Initialize the FalkorDB driver.
@ -147,7 +150,23 @@ class FalkorDriver(GraphDriver):
# If a FalkorDB instance is provided, use it directly
self.client = falkor_db
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
if lite:
# Lazy import to avoid mandatory dependency when not using Lite
try:
from redislite.falkordb_client import FalkorDB as LiteFalkorDB # type: ignore
except Exception as e: # broad to surface helpful message
raise ImportError(
'falkordblite is required for FalkorDB Lite. Install it with: '\
'pip install falkordblite'
) from e
db_path = lite_db_path or '/tmp/falkordb.db'
lite_client = LiteFalkorDB(db_path)
self.client = _AsyncLiteClientAdapter(lite_client)
self._is_lite = True
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
self._is_lite = False
# Schedule the indices and constraints to be built
try:
@ -172,7 +191,7 @@ class FalkorDriver(GraphDriver):
params = convert_datetimes_to_strings(dict(kwargs))
try:
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
result = await _await_graph_query(graph, cypher_query_, params) # type: ignore[reportUnknownArgumentType]
except Exception as e:
if 'already indexed' in str(e):
# check if index already exists
@ -209,6 +228,9 @@ class FalkorDriver(GraphDriver):
await self.client.connection.aclose()
elif hasattr(self.client.connection, 'close'):
await self.client.connection.close()
else:
# Lite adapter exposes no-op aclose; nothing to do otherwise
pass
async def delete_all_indexes(self) -> None:
result = await self.execute_query('CALL db.indexes()')
@ -360,3 +382,45 @@ class FalkorDriver(GraphDriver):
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
# -----------------
# Internal helpers
# -----------------
async def _await_graph_query(graph: Any, cypher: str, params: dict[str, Any] | None):
"""
Await a graph.query call whether it's native-async or sync (Lite).
"""
query_callable = getattr(graph, 'query')
result = query_callable(cypher, params)
if inspect.isawaitable(result):
return await result
# Sync path: run in a thread to avoid blocking the event loop
return await asyncio.to_thread(query_callable, cypher, params)
class _AsyncLiteGraphAdapter:
def __init__(self, sync_graph: Any):
self._sync_graph = sync_graph
async def query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.query, cypher, params)
async def ro_query(self, cypher: str, params: dict[str, Any] | None = None):
return await asyncio.to_thread(self._sync_graph.ro_query, cypher, params)
async def delete(self):
return await asyncio.to_thread(self._sync_graph.delete)
class _AsyncLiteClientAdapter:
def __init__(self, sync_db: Any):
self._sync_db = sync_db
def select_graph(self, name: str):
return _AsyncLiteGraphAdapter(self._sync_db.select_graph(name))
async def aclose(self):
# redislite does not expose explicit close; rely on GC. No-op.
return None

View file

@ -31,6 +31,7 @@ groq = ["groq>=0.2.0"]
google-genai = ["google-genai>=1.8.0"]
kuzu = ["kuzu>=0.11.3"]
falkordb = ["falkordb>=1.1.2,<2.0.0"]
falkordb-lite = ["falkordblite>=0.1.0"]
voyageai = ["voyageai>=0.2.3"]
neo4j-opensearch = ["boto3>=1.39.16", "opensearch-py>=3.0.0"]
sentence-transformers = ["sentence-transformers>=3.2.1"]
@ -42,6 +43,7 @@ dev = [
"anthropic>=0.49.0",
"google-genai>=1.8.0",
"falkordb>=1.1.2,<2.0.0",
"falkordblite>=0.1.0",
"kuzu>=0.11.3",
"boto3>=1.39.16",
"opensearch-py>=3.0.0",

View file

@ -70,6 +70,30 @@ class TestFalkorDriver:
"""Test driver provider identification."""
assert self.driver.provider == GraphProvider.FALKORDB
def test_init_with_lite_uses_redislite_adapter(self):
"""Test initialization with lite=True uses redislite client adapter.
We don't require falkordb for this test as we patch the redislite import path.
"""
import sys
import types
# Create a fake redislite.falkordb_client module with FalkorDB class
fake_redislite = types.ModuleType('redislite')
fake_client_mod = types.ModuleType('redislite.falkordb_client')
fake_client_mod.FalkorDB = MagicMock()
with patch.dict(sys.modules, {
'redislite': fake_redislite,
'redislite.falkordb_client': fake_client_mod,
}):
from graphiti_core.driver.falkordb_driver import FalkorDriver as _FD
try:
_FD(lite=True, database='default_db')
except Exception as e:
pytest.fail(f'Lite initialization raised unexpectedly: {e}')
@unittest.skipIf(not HAS_FALKORDB, 'FalkorDB is not installed')
def test_get_graph_with_name(self):
"""Test _get_graph with specific graph name."""