diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 3c4d7b696..1c76f7a52 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -31,6 +31,7 @@ async def add( incremental_loading: bool = True, data_per_batch: Optional[int] = 20, loaders_config: dict[LoaderInterface, dict] = {}, + fetchers_config: dict[str, Any] = {}, ): """ Add data to Cognee for knowledge graph processing. @@ -179,6 +180,7 @@ async def add( dataset_id, preferred_loaders, loaders_config, + fetchers_config, ), ] @@ -204,6 +206,7 @@ async def add( graph_db_config=graph_db_config, incremental_loading=incremental_loading, data_per_batch=data_per_batch, + fetchers_config=fetchers_config, ): pipeline_run_info = run_info diff --git a/cognee/infrastructure/loaders/external/web_url_loader.py b/cognee/infrastructure/loaders/external/web_url_loader.py index 1ecf82171..996f7dae6 100644 --- a/cognee/infrastructure/loaders/external/web_url_loader.py +++ b/cognee/infrastructure/loaders/external/web_url_loader.py @@ -2,8 +2,6 @@ from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface from typing import List from cognee.modules.ingestion.exceptions.exceptions import IngestionError -from cognee.modules.ingestion import save_data_to_file -from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig from cognee.shared.logging_utils import get_logger logger = get_logger() @@ -62,7 +60,7 @@ class WebUrlLoader(LoaderInterface): Load and process the file, returning standardized result. Args: - file_path: Path to the file to be processed + file_path: Path to the file to be processed (already saved by fetcher) file_stream: If file stream is provided it will be used to process file instead **kwargs: Additional loader-specific configuration @@ -71,63 +69,5 @@ class WebUrlLoader(LoaderInterface): Raises: Exception: If file cannot be processed """ - loaders_config = kwargs.get("loaders_config") - if not isinstance(loaders_config, dict): - raise IngestionError("loaders_config must be a valid dictionary") - web_url_loader_config = loaders_config.get(self.loader_name) - if not isinstance(web_url_loader_config, dict): - raise IngestionError(f"{self.loader_name} configuration must be a valid dictionary") - - try: - from cognee.context_global_variables import tavily_config, soup_crawler_config - from cognee.tasks.web_scraper import fetch_page_content - - tavily_dict = web_url_loader_config.get("tavily_config") - _tavily_config = TavilyConfig(**tavily_dict) if tavily_dict else None - - soup_dict = web_url_loader_config.get("soup_config") - _soup_config = SoupCrawlerConfig(**soup_dict) if soup_dict else None - - # Set global configs for downstream access - tavily_config.set(_tavily_config) - soup_crawler_config.set(_soup_config) - - preferred_tool = "beautifulsoup" if _soup_config else "tavily" - if preferred_tool == "tavily" and _tavily_config is None: - raise IngestionError( - message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig." - ) - if preferred_tool == "beautifulsoup" and _soup_config is None: - raise IngestionError( - message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper." - ) - - logger.info(f"Starting web URL crawling for: {file_path}") - logger.info(f"Using scraping tool: {preferred_tool}") - - data = await fetch_page_content( - file_path, - preferred_tool=preferred_tool, - tavily_config=_tavily_config, - soup_crawler_config=_soup_config, - ) - - logger.info(f"Successfully fetched content from {len(data)} URL(s)") - logger.info("Processing and concatenating fetched content") - - content = "" - for key, value in data.items(): - content += f"{key}:\n{value}\n\n" - - logger.info(f"Saving content to file (total size: {len(content)} characters)") - stored_path = await save_data_to_file(content) - logger.info(f"Successfully saved content to: {stored_path}") - - return stored_path - except IngestionError: - raise - except Exception as e: - raise IngestionError( - message=f"Error ingesting webpage from URL {file_path}: {str(e)}" - ) from e + return file_path diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e15e9e505..1e2b3aca5 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -20,6 +20,7 @@ from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( from cognee.modules.pipelines.layers.check_pipeline_run_qualification import ( check_pipeline_run_qualification, ) +from typing import Any logger = get_logger("cognee.pipeline") @@ -36,6 +37,7 @@ async def run_pipeline( graph_db_config: dict = None, incremental_loading: bool = False, data_per_batch: int = 20, + fetchers_config: dict[str, Any] = {}, ): validate_pipeline_tasks(tasks) await setup_and_check_environment(vector_db_config, graph_db_config) @@ -52,6 +54,7 @@ async def run_pipeline( context={"dataset": dataset}, incremental_loading=incremental_loading, data_per_batch=data_per_batch, + fetchers_config=fetchers_config, ): yield run_info @@ -65,6 +68,7 @@ async def run_pipeline_per_dataset( context: dict = None, incremental_loading=False, data_per_batch: int = 20, + fetchers_config: dict[str, Any] = {}, ): # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -80,7 +84,15 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch + tasks, + dataset.id, + data, + user, + pipeline_name, + context, + incremental_loading, + data_per_batch, + fetchers_config, ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index ecc2f647b..d11d87ddf 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -60,6 +60,7 @@ async def run_tasks( context: dict = None, incremental_loading: bool = False, data_per_batch: int = 20, + fetchers_config: dict[str, Any] = {}, ): if not user: user = await get_default_user() @@ -106,6 +107,7 @@ async def run_tasks( context, user, incremental_loading, + fetchers_config, ) ) for data_item in data_batch diff --git a/cognee/modules/pipelines/operations/run_tasks_data_item.py b/cognee/modules/pipelines/operations/run_tasks_data_item.py index 0118e7976..9ddadd855 100644 --- a/cognee/modules/pipelines/operations/run_tasks_data_item.py +++ b/cognee/modules/pipelines/operations/run_tasks_data_item.py @@ -39,6 +39,7 @@ async def run_tasks_data_item_incremental( pipeline_run_id: str, context: Optional[Dict[str, Any]], user: User, + fetchers_config: dict[str, Any], ) -> AsyncGenerator[Dict[str, Any], None]: """ Process a single data item with incremental loading support. @@ -64,51 +65,36 @@ async def run_tasks_data_item_incremental( # If incremental_loading of data is set to True don't process documents already processed by pipeline # If data is being added to Cognee for the first time calculate the id of the data - try: - if not isinstance(data_item, Data): - file_path = await save_data_item_to_storage(data_item) - # Ingest data and add metadata - async with open_data_file(file_path) as file: - classified_data = ingestion.classify(file) - # data_id is the hash of file contents + owner id to avoid duplicate data - data_id = ingestion.identify(classified_data, user) - else: - # If data was already processed by Cognee get data id - data_id = data_item.id + if not isinstance(data_item, Data): + file_path = await save_data_item_to_storage(data_item, fetchers_config) + # Ingest data and add metadata + async with open_data_file(file_path) as file: + classified_data = ingestion.classify(file) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + else: + # If data was already processed by Cognee get data id + data_id = data_item.id - # Check pipeline status, if Data already processed for pipeline before skip current processing - async with db_engine.get_async_session() as session: - data_point = ( - await session.execute(select(Data).filter(Data.id == data_id)) - ).scalar_one_or_none() - if data_point: - if ( - data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) - == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED - ): - yield { - "run_info": PipelineRunAlreadyCompleted( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - ), - "data_id": data_id, - } - return - except UnsupportedPathSchemeError as e: - logger.warning(f"data_item does not support incremental loading: {str(e)}") - # Fall back to regular processing since incremental loading is not supported - async for result in run_tasks_data_item_regular( - data_item=data_item, - dataset=dataset, - tasks=tasks, - pipeline_id=pipeline_id, - pipeline_run_id=pipeline_run_id, - context=context, - user=user, - ): - yield result - return + # Check pipeline status, if Data already processed for pipeline before skip current processing + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + if data_point: + if ( + data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) + == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED + ): + yield { + "run_info": PipelineRunAlreadyCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } + return try: # Process data based on data_item and list of tasks @@ -225,6 +211,7 @@ async def run_tasks_data_item( context: Optional[Dict[str, Any]], user: User, incremental_loading: bool, + fetchers_config: dict[str, Any] = {}, ) -> Optional[Dict[str, Any]]: """ Process a single data item, choosing between incremental and regular processing. @@ -259,6 +246,7 @@ async def run_tasks_data_item( pipeline_run_id=pipeline_run_id, context=context, user=user, + fetchers_config=fetchers_config, ): pass else: diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index 233bb5f1c..84cd1f38b 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -31,6 +31,7 @@ async def ingest_data( dataset_id: UUID = None, preferred_loaders: List[str] = None, loaders_config: dict[LoaderInterface, dict] = {}, + fetchers_config: dict[str, Any] = {}, ): if not user: user = await get_default_user() @@ -80,16 +81,10 @@ async def ingest_data( dataset_data_map = {str(data.id): True for data in dataset_data} for data_item in data: - try: - # Get file path of data item or create a file if it doesn't exist - original_file_path = await save_data_item_to_storage(data_item) - # Transform file path to be OS usable - actual_file_path = get_data_file_path(original_file_path) - except UnsupportedPathSchemeError: - # This data_item (e.g., HTTP/HTTPS URL) should be passed directly to the loader - # skip save_data_item_to_storage and get_data_file_path - actual_file_path = data_item - original_file_path = None # we don't have an original file path + # Get file path of data item or create a file if it doesn't exist + original_file_path = await save_data_item_to_storage(data_item, fetchers_config) + # Transform file path to be OS usable + actual_file_path = get_data_file_path(original_file_path) # Store all input data as text files in Cognee data storage cognee_storage_file_path, loader_engine = await data_item_to_text_file( @@ -99,26 +94,15 @@ async def ingest_data( ) # Find metadata from original file - if original_file_path is not None: - # Standard flow: extract metadata from both original and stored files - async with open_data_file(original_file_path) as file: - classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data, user) - original_file_metadata = classified_data.get_metadata() + # Standard flow: extract metadata from both original and stored files + async with open_data_file(original_file_path) as file: + classified_data = ingestion.classify(file) + data_id = ingestion.identify(classified_data, user) + original_file_metadata = classified_data.get_metadata() - async with open_data_file(cognee_storage_file_path) as file: - classified_data = ingestion.classify(file) - storage_file_metadata = classified_data.get_metadata() - else: - # Alternative flow (e.g., URLs): extract metadata once from stored file - async with open_data_file(cognee_storage_file_path) as file: - classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data, user) - original_file_metadata = classified_data.get_metadata() - # Override file_path to be the actual data_item (e.g., URL) ? - original_file_metadata["file_path"] = actual_file_path - # Storage metadata is the same as original - storage_file_metadata = original_file_metadata.copy() + async with open_data_file(cognee_storage_file_path) as file: + classified_data = ingestion.classify(file) + storage_file_metadata = classified_data.get_metadata() from sqlalchemy import select diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index cf32477cb..d9b98268d 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -9,6 +9,8 @@ from cognee.modules.ingestion import save_data_to_file from cognee.shared.logging_utils import get_logger from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.tasks.ingestion.data_fetchers.web_url_fetcher import WebUrlFetcher + logger = get_logger() @@ -22,7 +24,9 @@ class SaveDataSettings(BaseSettings): settings = SaveDataSettings() -async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str: +async def save_data_item_to_storage( + data_item: Union[BinaryIO, str, Any], fetchers_config: dict[str, Any] = {} +) -> str: if "llama_index" in str(type(data_item)): # Dynamic import is used because the llama_index module is optional. from .transform_data import get_data_from_llama_index @@ -57,9 +61,8 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str if parsed_url.scheme == "s3": return data_item elif parsed_url.scheme == "http" or parsed_url.scheme == "https": - raise UnsupportedPathSchemeError( - message=f"HTTP/HTTPS URLs should be handled by loader, not by save_data_item_to_storage. Received: {data_item}" - ) + fetcher = WebUrlFetcher() + return await fetcher.fetch(data_item, fetchers_config) # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: diff --git a/cognee/tests/integration/web_url_crawler/test_add.py b/cognee/tests/integration/web_url_crawler/test_add.py index abd0d77ba..b45ed9139 100644 --- a/cognee/tests/integration/web_url_crawler/test_add.py +++ b/cognee/tests/integration/web_url_crawler/test_add.py @@ -1,22 +1,28 @@ +from sys import exc_info import pytest import cognee +from cognee.modules.ingestion.exceptions.exceptions import IngestionError @pytest.mark.asyncio -async def test_add_fails_when_preferred_loader_not_specified(): +async def test_add_fails_when_web_url_fetcher_config_not_specified(): from cognee.shared.logging_utils import setup_logging, ERROR setup_logging(log_level=ERROR) await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) - with pytest.raises(ValueError): + with pytest.raises(IngestionError) as excinfo: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", + incremental_loading=False, ) + assert excinfo.value.message.startswith( + "web_url_fetcher configuration must be a valid dictionary" + ) @pytest.mark.asyncio -async def test_add_succesfully_adds_url_when_preferred_loader_specified(): +async def test_add_succesfully_adds_url_when_fetcher_config_specified(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) @@ -27,8 +33,8 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified(): "paragraphs": {"selector": "p", "all": True}, } - loaders_config = { - "web_url_loader": { + fetchers_config = { + "web_url_fetcher": { "soup_config": { "max_depth": 1, "follow_links": False, @@ -40,8 +46,8 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified(): try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", - preferred_loaders=["web_url_loader"], - loaders_config=loaders_config, + incremental_loading=False, + fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}") @@ -59,8 +65,8 @@ async def test_add_with_incremental_loading_works(): "paragraphs": {"selector": "p", "all": True}, } - loaders_config = { - "web_url_loader": { + fetchers_config = { + "web_url_fetcher": { "soup_config": { "max_depth": 1, "follow_links": False, @@ -71,9 +77,8 @@ async def test_add_with_incremental_loading_works(): try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", - preferred_loaders=["web_url_loader"], incremental_loading=True, - loaders_config=loaders_config, + fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}") @@ -91,8 +96,8 @@ async def test_add_without_incremental_loading_works(): "paragraphs": {"selector": "p", "all": True}, } - loaders_config = { - "web_url_loader": { + fetchers_config = { + "web_url_fetcher": { "soup_config": { "max_depth": 1, "follow_links": False, @@ -103,9 +108,8 @@ async def test_add_without_incremental_loading_works(): try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", - preferred_loaders=["web_url_loader"], incremental_loading=False, - loaders_config=loaders_config, + fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}")