refactor ingest_data to accomodate non-FS data items

This commit is contained in:
Daulet Amirkhanov 2025-10-17 11:12:09 +01:00
parent 2e7ff0b01b
commit d0f3e224cb
4 changed files with 51 additions and 17 deletions

View file

@ -11,3 +11,15 @@ class FileContentHashingError(Exception):
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
): ):
super().__init__(message, name, status_code) super().__init__(message, name, status_code)
class UnsupportedPathSchemeError(Exception):
"""Raised when a non-filesystem path scheme (like http://, https://) is passed to a function expecting filesystem paths."""
def __init__(
self,
message: str = "This function only supports filesystem paths (file:// or local paths), not HTTP/HTTPS URLs.",
name: str = "UnsupportedPathSchemeError",
status_code=status.HTTP_400_BAD_REQUEST,
):
super().__init__(message, name, status_code)

View file

@ -1,6 +1,8 @@
import os import os
from urllib.parse import urlparse from urllib.parse import urlparse
from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError
def get_data_file_path(file_path: str): def get_data_file_path(file_path: str):
# Check if this is a file URI BEFORE normalizing (which corrupts URIs) # Check if this is a file URI BEFORE normalizing (which corrupts URIs)
@ -39,7 +41,9 @@ def get_data_file_path(file_path: str):
return normalized_url return normalized_url
elif file_path.startswith(("http://", "https://")): elif file_path.startswith(("http://", "https://")):
return file_path raise UnsupportedPathSchemeError(
message=f"HTTP/HTTPS URLs are not supported by get_data_file_path(). Received: {file_path}"
)
else: else:
# Regular file path - normalize separators # Regular file path - normalize separators

View file

@ -3,6 +3,7 @@ import inspect
from uuid import UUID from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional from typing import Union, BinaryIO, Any, List, Optional
from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError
from cognee.infrastructure.loaders import LoaderInterface from cognee.infrastructure.loaders import LoaderInterface
import cognee.modules.ingestion as ingestion import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
@ -79,11 +80,16 @@ async def ingest_data(
dataset_data_map = {str(data.id): True for data in dataset_data} dataset_data_map = {str(data.id): True for data in dataset_data}
for data_item in data: for data_item in data:
# Get file path of data item or create a file it doesn't exist try:
original_file_path = await save_data_item_to_storage(data_item) # Get file path of data item or create a file if it doesn't exist
original_file_path = await save_data_item_to_storage(data_item)
# Transform file path to be OS usable # Transform file path to be OS usable
actual_file_path = get_data_file_path(original_file_path) actual_file_path = get_data_file_path(original_file_path)
except UnsupportedPathSchemeError:
# This data_item (e.g., HTTP/HTTPS URL) should be passed directly to the loader
# skip save_data_item_to_storage and get_data_file_path
actual_file_path = data_item
original_file_path = None # we don't have an original file path
# Store all input data as text files in Cognee data storage # Store all input data as text files in Cognee data storage
cognee_storage_file_path, loader_engine = await data_item_to_text_file( cognee_storage_file_path, loader_engine = await data_item_to_text_file(
@ -93,17 +99,26 @@ async def ingest_data(
) )
# Find metadata from original file # Find metadata from original file
async with open_data_file(original_file_path) as file: if original_file_path is not None:
classified_data = ingestion.classify(file) # Standard flow: extract metadata from both original and stored files
async with open_data_file(original_file_path) as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data, user)
original_file_metadata = classified_data.get_metadata()
# data_id is the hash of original file contents + owner id to avoid duplicate data async with open_data_file(cognee_storage_file_path) as file:
data_id = ingestion.identify(classified_data, user) classified_data = ingestion.classify(file)
original_file_metadata = classified_data.get_metadata() storage_file_metadata = classified_data.get_metadata()
else:
# Find metadata from Cognee data storage text file # Alternative flow (e.g., URLs): extract metadata once from stored file
async with open_data_file(cognee_storage_file_path) as file: async with open_data_file(cognee_storage_file_path) as file:
classified_data = ingestion.classify(file) classified_data = ingestion.classify(file)
storage_file_metadata = classified_data.get_metadata() data_id = ingestion.identify(classified_data, user)
original_file_metadata = classified_data.get_metadata()
# Override file_path to be the actual data_item (e.g., URL) ?
# original_file_metadata["file_path"] = actual_file_path
# Storage metadata is the same as original
# storage_file_metadata = original_file_metadata.copy()
from sqlalchemy import select from sqlalchemy import select

View file

@ -3,6 +3,7 @@ from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from typing import Union, BinaryIO, Any from typing import Union, BinaryIO, Any
from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError
from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion.exceptions import IngestionError
from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion import save_data_to_file
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
@ -56,7 +57,9 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
if parsed_url.scheme == "s3": if parsed_url.scheme == "s3":
return data_item return data_item
elif parsed_url.scheme == "http" or parsed_url.scheme == "https": elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
return data_item raise UnsupportedPathSchemeError(
message=f"HTTP/HTTPS URLs should be handled by loader, not by save_data_item_to_storage. Received: {data_item}"
)
# data is local file path # data is local file path
elif parsed_url.scheme == "file": elif parsed_url.scheme == "file":
if settings.accept_local_file_path: if settings.accept_local_file_path: