Merge pull request #2417 from danielaskdd/neo4j-retry

Fix: Add Comprehensive Retry Mechanism for Neo4j Storage Operations
This commit is contained in:
Daniel.y 2025-11-25 02:03:48 +08:00 committed by GitHub
commit 2832a2ca7e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 61 additions and 1 deletions

View file

@ -44,6 +44,23 @@ config.read("config.ini", "utf-8")
logging.getLogger("neo4j").setLevel(logging.ERROR) logging.getLogger("neo4j").setLevel(logging.ERROR)
READ_RETRY_EXCEPTIONS = (
neo4jExceptions.ServiceUnavailable,
neo4jExceptions.TransientError,
neo4jExceptions.SessionExpired,
ConnectionResetError,
OSError,
AttributeError,
)
READ_RETRY = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(READ_RETRY_EXCEPTIONS),
reraise=True,
)
@final @final
@dataclass @dataclass
class Neo4JStorage(BaseGraphStorage): class Neo4JStorage(BaseGraphStorage):
@ -352,6 +369,7 @@ class Neo4JStorage(BaseGraphStorage):
# Neo4J handles persistence automatically # Neo4J handles persistence automatically
pass pass
@READ_RETRY
async def has_node(self, node_id: str) -> bool: async def has_node(self, node_id: str) -> bool:
""" """
Check if a node with the given label exists in the database Check if a node with the given label exists in the database
@ -385,6 +403,7 @@ class Neo4JStorage(BaseGraphStorage):
await result.consume() # Ensure results are consumed even on error await result.consume() # Ensure results are consumed even on error
raise raise
@READ_RETRY
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
""" """
Check if an edge exists between two nodes Check if an edge exists between two nodes
@ -426,6 +445,7 @@ class Neo4JStorage(BaseGraphStorage):
await result.consume() # Ensure results are consumed even on error await result.consume() # Ensure results are consumed even on error
raise raise
@READ_RETRY
async def get_node(self, node_id: str) -> dict[str, str] | None: async def get_node(self, node_id: str) -> dict[str, str] | None:
"""Get node by its label identifier, return only node properties """Get node by its label identifier, return only node properties
@ -479,6 +499,7 @@ class Neo4JStorage(BaseGraphStorage):
) )
raise raise
@READ_RETRY
async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]: async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]:
""" """
Retrieve multiple nodes in one query using UNWIND. Retrieve multiple nodes in one query using UNWIND.
@ -515,6 +536,7 @@ class Neo4JStorage(BaseGraphStorage):
await result.consume() # Make sure to consume the result fully await result.consume() # Make sure to consume the result fully
return nodes return nodes
@READ_RETRY
async def node_degree(self, node_id: str) -> int: async def node_degree(self, node_id: str) -> int:
"""Get the degree (number of relationships) of a node with the given label. """Get the degree (number of relationships) of a node with the given label.
If multiple nodes have the same label, returns the degree of the first node. If multiple nodes have the same label, returns the degree of the first node.
@ -563,6 +585,7 @@ class Neo4JStorage(BaseGraphStorage):
) )
raise raise
@READ_RETRY
async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]: async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]:
""" """
Retrieve the degree for multiple nodes in a single query using UNWIND. Retrieve the degree for multiple nodes in a single query using UNWIND.
@ -621,6 +644,7 @@ class Neo4JStorage(BaseGraphStorage):
degrees = int(src_degree) + int(trg_degree) degrees = int(src_degree) + int(trg_degree)
return degrees return degrees
@READ_RETRY
async def edge_degrees_batch( async def edge_degrees_batch(
self, edge_pairs: list[tuple[str, str]] self, edge_pairs: list[tuple[str, str]]
) -> dict[tuple[str, str], int]: ) -> dict[tuple[str, str], int]:
@ -647,6 +671,7 @@ class Neo4JStorage(BaseGraphStorage):
edge_degrees[(src, tgt)] = degrees.get(src, 0) + degrees.get(tgt, 0) edge_degrees[(src, tgt)] = degrees.get(src, 0) + degrees.get(tgt, 0)
return edge_degrees return edge_degrees
@READ_RETRY
async def get_edge( async def get_edge(
self, source_node_id: str, target_node_id: str self, source_node_id: str, target_node_id: str
) -> dict[str, str] | None: ) -> dict[str, str] | None:
@ -734,6 +759,7 @@ class Neo4JStorage(BaseGraphStorage):
) )
raise raise
@READ_RETRY
async def get_edges_batch( async def get_edges_batch(
self, pairs: list[dict[str, str]] self, pairs: list[dict[str, str]]
) -> dict[tuple[str, str], dict]: ) -> dict[tuple[str, str], dict]:
@ -784,6 +810,7 @@ class Neo4JStorage(BaseGraphStorage):
await result.consume() await result.consume()
return edges_dict return edges_dict
@READ_RETRY
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
"""Retrieves all edges (relationships) for a particular node identified by its label. """Retrieves all edges (relationships) for a particular node identified by its label.
@ -851,6 +878,7 @@ class Neo4JStorage(BaseGraphStorage):
) )
raise raise
@READ_RETRY
async def get_nodes_edges_batch( async def get_nodes_edges_batch(
self, node_ids: list[str] self, node_ids: list[str]
) -> dict[str, list[tuple[str, str]]]: ) -> dict[str, list[tuple[str, str]]]:

View file

@ -47,6 +47,7 @@ pytest = [
"pytest>=8.4.2", "pytest>=8.4.2",
"pytest-asyncio>=1.2.0", "pytest-asyncio>=1.2.0",
"pre-commit", "pre-commit",
"ruff",
] ]
api = [ api = [
@ -132,10 +133,11 @@ offline = [
] ]
evaluation = [ evaluation = [
# Test framework dependencies (for evaluation) # Test framework dependencies
"pytest>=8.4.2", "pytest>=8.4.2",
"pytest-asyncio>=1.2.0", "pytest-asyncio>=1.2.0",
"pre-commit", "pre-commit",
"ruff",
# RAG evaluation dependencies (RAGAS framework) # RAG evaluation dependencies (RAGAS framework)
"ragas>=0.3.7", "ragas>=0.3.7",
"datasets>=4.3.0", "datasets>=4.3.0",

30
uv.lock generated
View file

@ -2615,6 +2615,7 @@ evaluation = [
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
{ name = "ragas" }, { name = "ragas" },
{ name = "ruff" },
] ]
observability = [ observability = [
{ name = "langfuse" }, { name = "langfuse" },
@ -2700,6 +2701,7 @@ pytest = [
{ name = "pre-commit" }, { name = "pre-commit" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
{ name = "ruff" },
] ]
[package.metadata] [package.metadata]
@ -2778,6 +2780,8 @@ requires-dist = [
{ name = "qdrant-client", marker = "extra == 'offline-storage'", specifier = ">=1.11.0,<2.0.0" }, { name = "qdrant-client", marker = "extra == 'offline-storage'", specifier = ">=1.11.0,<2.0.0" },
{ name = "ragas", marker = "extra == 'evaluation'", specifier = ">=0.3.7" }, { name = "ragas", marker = "extra == 'evaluation'", specifier = ">=0.3.7" },
{ name = "redis", marker = "extra == 'offline-storage'", specifier = ">=5.0.0,<8.0.0" }, { name = "redis", marker = "extra == 'offline-storage'", specifier = ">=5.0.0,<8.0.0" },
{ name = "ruff", marker = "extra == 'evaluation'" },
{ name = "ruff", marker = "extra == 'pytest'" },
{ name = "setuptools" }, { name = "setuptools" },
{ name = "setuptools", marker = "extra == 'api'" }, { name = "setuptools", marker = "extra == 'api'" },
{ name = "tenacity" }, { name = "tenacity" },
@ -5637,6 +5641,32 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3f/50/0a9e7e7afe7339bd5e36911f0ceb15fed51945836ed803ae5afd661057fd/rtree-1.4.1-py3-none-win_arm64.whl", hash = "sha256:3d46f55729b28138e897ffef32f7ce93ac335cb67f9120125ad3742a220800f0", size = 355253, upload-time = "2025-08-13T19:32:00.296Z" }, { url = "https://files.pythonhosted.org/packages/3f/50/0a9e7e7afe7339bd5e36911f0ceb15fed51945836ed803ae5afd661057fd/rtree-1.4.1-py3-none-win_arm64.whl", hash = "sha256:3d46f55729b28138e897ffef32f7ce93ac335cb67f9120125ad3742a220800f0", size = 355253, upload-time = "2025-08-13T19:32:00.296Z" },
] ]
[[package]]
name = "ruff"
version = "0.14.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/52/f0/62b5a1a723fe183650109407fa56abb433b00aa1c0b9ba555f9c4efec2c6/ruff-0.14.6.tar.gz", hash = "sha256:6f0c742ca6a7783a736b867a263b9a7a80a45ce9bee391eeda296895f1b4e1cc", size = 5669501, upload-time = "2025-11-21T14:26:17.903Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/67/d2/7dd544116d107fffb24a0064d41a5d2ed1c9d6372d142f9ba108c8e39207/ruff-0.14.6-py3-none-linux_armv6l.whl", hash = "sha256:d724ac2f1c240dbd01a2ae98db5d1d9a5e1d9e96eba999d1c48e30062df578a3", size = 13326119, upload-time = "2025-11-21T14:25:24.2Z" },
{ url = "https://files.pythonhosted.org/packages/36/6a/ad66d0a3315d6327ed6b01f759d83df3c4d5f86c30462121024361137b6a/ruff-0.14.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:9f7539ea257aa4d07b7ce87aed580e485c40143f2473ff2f2b75aee003186004", size = 13526007, upload-time = "2025-11-21T14:25:26.906Z" },
{ url = "https://files.pythonhosted.org/packages/a3/9d/dae6db96df28e0a15dea8e986ee393af70fc97fd57669808728080529c37/ruff-0.14.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:7f6007e55b90a2a7e93083ba48a9f23c3158c433591c33ee2e99a49b889c6332", size = 12676572, upload-time = "2025-11-21T14:25:29.826Z" },
{ url = "https://files.pythonhosted.org/packages/76/a4/f319e87759949062cfee1b26245048e92e2acce900ad3a909285f9db1859/ruff-0.14.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a8e7b9d73d8728b68f632aa8e824ef041d068d231d8dbc7808532d3629a6bef", size = 13140745, upload-time = "2025-11-21T14:25:32.788Z" },
{ url = "https://files.pythonhosted.org/packages/95/d3/248c1efc71a0a8ed4e8e10b4b2266845d7dfc7a0ab64354afe049eaa1310/ruff-0.14.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d50d45d4553a3ebcbd33e7c5e0fe6ca4aafd9a9122492de357205c2c48f00775", size = 13076486, upload-time = "2025-11-21T14:25:35.601Z" },
{ url = "https://files.pythonhosted.org/packages/a5/19/b68d4563fe50eba4b8c92aa842149bb56dd24d198389c0ed12e7faff4f7d/ruff-0.14.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:118548dd121f8a21bfa8ab2c5b80e5b4aed67ead4b7567790962554f38e598ce", size = 13727563, upload-time = "2025-11-21T14:25:38.514Z" },
{ url = "https://files.pythonhosted.org/packages/47/ac/943169436832d4b0e867235abbdb57ce3a82367b47e0280fa7b4eabb7593/ruff-0.14.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:57256efafbfefcb8748df9d1d766062f62b20150691021f8ab79e2d919f7c11f", size = 15199755, upload-time = "2025-11-21T14:25:41.516Z" },
{ url = "https://files.pythonhosted.org/packages/c9/b9/288bb2399860a36d4bb0541cb66cce3c0f4156aaff009dc8499be0c24bf2/ruff-0.14.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ff18134841e5c68f8e5df1999a64429a02d5549036b394fafbe410f886e1989d", size = 14850608, upload-time = "2025-11-21T14:25:44.428Z" },
{ url = "https://files.pythonhosted.org/packages/ee/b1/a0d549dd4364e240f37e7d2907e97ee80587480d98c7799d2d8dc7a2f605/ruff-0.14.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:29c4b7ec1e66a105d5c27bd57fa93203637d66a26d10ca9809dc7fc18ec58440", size = 14118754, upload-time = "2025-11-21T14:25:47.214Z" },
{ url = "https://files.pythonhosted.org/packages/13/ac/9b9fe63716af8bdfddfacd0882bc1586f29985d3b988b3c62ddce2e202c3/ruff-0.14.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:167843a6f78680746d7e226f255d920aeed5e4ad9c03258094a2d49d3028b105", size = 13949214, upload-time = "2025-11-21T14:25:50.002Z" },
{ url = "https://files.pythonhosted.org/packages/12/27/4dad6c6a77fede9560b7df6802b1b697e97e49ceabe1f12baf3ea20862e9/ruff-0.14.6-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:16a33af621c9c523b1ae006b1b99b159bf5ac7e4b1f20b85b2572455018e0821", size = 14106112, upload-time = "2025-11-21T14:25:52.841Z" },
{ url = "https://files.pythonhosted.org/packages/6a/db/23e322d7177873eaedea59a7932ca5084ec5b7e20cb30f341ab594130a71/ruff-0.14.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1432ab6e1ae2dc565a7eea707d3b03a0c234ef401482a6f1621bc1f427c2ff55", size = 13035010, upload-time = "2025-11-21T14:25:55.536Z" },
{ url = "https://files.pythonhosted.org/packages/a8/9c/20e21d4d69dbb35e6a1df7691e02f363423658a20a2afacf2a2c011800dc/ruff-0.14.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:4c55cfbbe7abb61eb914bfd20683d14cdfb38a6d56c6c66efa55ec6570ee4e71", size = 13054082, upload-time = "2025-11-21T14:25:58.625Z" },
{ url = "https://files.pythonhosted.org/packages/66/25/906ee6a0464c3125c8d673c589771a974965c2be1a1e28b5c3b96cb6ef88/ruff-0.14.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:efea3c0f21901a685fff4befda6d61a1bf4cb43de16da87e8226a281d614350b", size = 13303354, upload-time = "2025-11-21T14:26:01.816Z" },
{ url = "https://files.pythonhosted.org/packages/4c/58/60577569e198d56922b7ead07b465f559002b7b11d53f40937e95067ca1c/ruff-0.14.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:344d97172576d75dc6afc0e9243376dbe1668559c72de1864439c4fc95f78185", size = 14054487, upload-time = "2025-11-21T14:26:05.058Z" },
{ url = "https://files.pythonhosted.org/packages/67/0b/8e4e0639e4cc12547f41cb771b0b44ec8225b6b6a93393176d75fe6f7d40/ruff-0.14.6-py3-none-win32.whl", hash = "sha256:00169c0c8b85396516fdd9ce3446c7ca20c2a8f90a77aa945ba6b8f2bfe99e85", size = 13013361, upload-time = "2025-11-21T14:26:08.152Z" },
{ url = "https://files.pythonhosted.org/packages/fb/02/82240553b77fd1341f80ebb3eaae43ba011c7a91b4224a9f317d8e6591af/ruff-0.14.6-py3-none-win_amd64.whl", hash = "sha256:390e6480c5e3659f8a4c8d6a0373027820419ac14fa0d2713bd8e6c3e125b8b9", size = 14432087, upload-time = "2025-11-21T14:26:10.891Z" },
{ url = "https://files.pythonhosted.org/packages/a5/1f/93f9b0fad9470e4c829a5bb678da4012f0c710d09331b860ee555216f4ea/ruff-0.14.6-py3-none-win_arm64.whl", hash = "sha256:d43c81fbeae52cfa8728d8766bbf46ee4298c888072105815b392da70ca836b2", size = 13520930, upload-time = "2025-11-21T14:26:13.951Z" },
]
[[package]] [[package]]
name = "s3transfer" name = "s3transfer"
version = "0.14.0" version = "0.14.0"