diff --git a/cognee/infrastructure/files/exceptions.py b/cognee/infrastructure/files/exceptions.py index 351eaee9c..eb6efdbce 100644 --- a/cognee/infrastructure/files/exceptions.py +++ b/cognee/infrastructure/files/exceptions.py @@ -11,3 +11,15 @@ class FileContentHashingError(Exception): status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, ): super().__init__(message, name, status_code) + + +class UnsupportedPathSchemeError(Exception): + """Raised when a non-filesystem path scheme (like http://, https://) is passed to a function expecting filesystem paths.""" + + def __init__( + self, + message: str = "This function only supports filesystem paths (file:// or local paths), not HTTP/HTTPS URLs.", + name: str = "UnsupportedPathSchemeError", + status_code=status.HTTP_400_BAD_REQUEST, + ): + super().__init__(message, name, status_code) diff --git a/cognee/infrastructure/files/utils/get_data_file_path.py b/cognee/infrastructure/files/utils/get_data_file_path.py index 242d130a9..d67fc95a0 100644 --- a/cognee/infrastructure/files/utils/get_data_file_path.py +++ b/cognee/infrastructure/files/utils/get_data_file_path.py @@ -1,6 +1,8 @@ import os from urllib.parse import urlparse +from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError + def get_data_file_path(file_path: str): # Check if this is a file URI BEFORE normalizing (which corrupts URIs) @@ -39,7 +41,9 @@ def get_data_file_path(file_path: str): return normalized_url elif file_path.startswith(("http://", "https://")): - return file_path + raise UnsupportedPathSchemeError( + message=f"HTTP/HTTPS URLs are not supported by get_data_file_path(). Received: {file_path}" + ) else: # Regular file path - normalize separators diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index 3fb161181..b742e474e 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -3,6 +3,7 @@ import inspect from uuid import UUID from typing import Union, BinaryIO, Any, List, Optional +from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError from cognee.infrastructure.loaders import LoaderInterface import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine @@ -79,11 +80,16 @@ async def ingest_data( dataset_data_map = {str(data.id): True for data in dataset_data} for data_item in data: - # Get file path of data item or create a file it doesn't exist - original_file_path = await save_data_item_to_storage(data_item) - - # Transform file path to be OS usable - actual_file_path = get_data_file_path(original_file_path) + try: + # Get file path of data item or create a file if it doesn't exist + original_file_path = await save_data_item_to_storage(data_item) + # Transform file path to be OS usable + actual_file_path = get_data_file_path(original_file_path) + except UnsupportedPathSchemeError: + # This data_item (e.g., HTTP/HTTPS URL) should be passed directly to the loader + # skip save_data_item_to_storage and get_data_file_path + actual_file_path = data_item + original_file_path = None # we don't have an original file path # Store all input data as text files in Cognee data storage cognee_storage_file_path, loader_engine = await data_item_to_text_file( @@ -93,17 +99,26 @@ async def ingest_data( ) # Find metadata from original file - async with open_data_file(original_file_path) as file: - classified_data = ingestion.classify(file) + if original_file_path is not None: + # Standard flow: extract metadata from both original and stored files + async with open_data_file(original_file_path) as file: + classified_data = ingestion.classify(file) + data_id = ingestion.identify(classified_data, user) + original_file_metadata = classified_data.get_metadata() - # data_id is the hash of original file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - original_file_metadata = classified_data.get_metadata() - - # Find metadata from Cognee data storage text file - async with open_data_file(cognee_storage_file_path) as file: - classified_data = ingestion.classify(file) - storage_file_metadata = classified_data.get_metadata() + async with open_data_file(cognee_storage_file_path) as file: + classified_data = ingestion.classify(file) + storage_file_metadata = classified_data.get_metadata() + else: + # Alternative flow (e.g., URLs): extract metadata once from stored file + async with open_data_file(cognee_storage_file_path) as file: + classified_data = ingestion.classify(file) + data_id = ingestion.identify(classified_data, user) + original_file_metadata = classified_data.get_metadata() + # Override file_path to be the actual data_item (e.g., URL) ? + # original_file_metadata["file_path"] = actual_file_path + # Storage metadata is the same as original + # storage_file_metadata = original_file_metadata.copy() from sqlalchemy import select diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 5761b19ba..cf32477cb 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -3,6 +3,7 @@ from pathlib import Path from urllib.parse import urlparse from typing import Union, BinaryIO, Any +from cognee.infrastructure.files.exceptions import UnsupportedPathSchemeError from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion import save_data_to_file from cognee.shared.logging_utils import get_logger @@ -56,7 +57,9 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str if parsed_url.scheme == "s3": return data_item elif parsed_url.scheme == "http" or parsed_url.scheme == "https": - return data_item + raise UnsupportedPathSchemeError( + message=f"HTTP/HTTPS URLs should be handled by loader, not by save_data_item_to_storage. Received: {data_item}" + ) # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: