From 95e735d3979aba36f72c7cc353c8641756fe2359 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Tue, 21 Oct 2025 12:04:35 +0100 Subject: [PATCH] remove `fetchers_config`, use default configs for Tavily and BeautifulSoup --- cognee/api/v1/add/add.py | 3 -- .../modules/pipelines/operations/pipeline.py | 4 --- .../modules/pipelines/operations/run_tasks.py | 2 -- .../operations/run_tasks_data_item.py | 5 +-- .../data_fetchers/data_fetcher_interface.py | 2 +- .../data_fetchers/web_url_fetcher.py | 33 ++++++------------- cognee/tasks/ingestion/ingest_data.py | 3 +- .../ingestion/save_data_item_to_storage.py | 6 ++-- .../integration/web_url_crawler/test_add.py | 31 ----------------- examples/python/web_url_fetcher_example.py | 11 ------- 10 files changed, 15 insertions(+), 85 deletions(-) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 67da3047b..216911ec0 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -26,7 +26,6 @@ async def add( preferred_loaders: List[str] = None, incremental_loading: bool = True, data_per_batch: Optional[int] = 20, - fetchers_config: dict[str, Any] = {}, ): """ Add data to Cognee for knowledge graph processing. @@ -174,7 +173,6 @@ async def add( node_set, dataset_id, preferred_loaders, - fetchers_config, ), ] @@ -200,7 +198,6 @@ async def add( graph_db_config=graph_db_config, incremental_loading=incremental_loading, data_per_batch=data_per_batch, - fetchers_config=fetchers_config, ): pipeline_run_info = run_info diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index 1e2b3aca5..eb0ebe8bd 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -37,7 +37,6 @@ async def run_pipeline( graph_db_config: dict = None, incremental_loading: bool = False, data_per_batch: int = 20, - fetchers_config: dict[str, Any] = {}, ): validate_pipeline_tasks(tasks) await setup_and_check_environment(vector_db_config, graph_db_config) @@ -54,7 +53,6 @@ async def run_pipeline( context={"dataset": dataset}, incremental_loading=incremental_loading, data_per_batch=data_per_batch, - fetchers_config=fetchers_config, ): yield run_info @@ -68,7 +66,6 @@ async def run_pipeline_per_dataset( context: dict = None, incremental_loading=False, data_per_batch: int = 20, - fetchers_config: dict[str, Any] = {}, ): # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True await set_database_global_context_variables(dataset.id, dataset.owner_id) @@ -92,7 +89,6 @@ async def run_pipeline_per_dataset( context, incremental_loading, data_per_batch, - fetchers_config, ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index d11d87ddf..ecc2f647b 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -60,7 +60,6 @@ async def run_tasks( context: dict = None, incremental_loading: bool = False, data_per_batch: int = 20, - fetchers_config: dict[str, Any] = {}, ): if not user: user = await get_default_user() @@ -107,7 +106,6 @@ async def run_tasks( context, user, incremental_loading, - fetchers_config, ) ) for data_item in data_batch diff --git a/cognee/modules/pipelines/operations/run_tasks_data_item.py b/cognee/modules/pipelines/operations/run_tasks_data_item.py index e445d323b..152e72d7f 100644 --- a/cognee/modules/pipelines/operations/run_tasks_data_item.py +++ b/cognee/modules/pipelines/operations/run_tasks_data_item.py @@ -38,7 +38,6 @@ async def run_tasks_data_item_incremental( pipeline_run_id: str, context: Optional[Dict[str, Any]], user: User, - fetchers_config: dict[str, Any], ) -> AsyncGenerator[Dict[str, Any], None]: """ Process a single data item with incremental loading support. @@ -65,7 +64,7 @@ async def run_tasks_data_item_incremental( # If incremental_loading of data is set to True don't process documents already processed by pipeline # If data is being added to Cognee for the first time calculate the id of the data if not isinstance(data_item, Data): - file_path = await save_data_item_to_storage(data_item, fetchers_config) + file_path = await save_data_item_to_storage(data_item) # Ingest data and add metadata async with open_data_file(file_path) as file: classified_data = ingestion.classify(file) @@ -210,7 +209,6 @@ async def run_tasks_data_item( context: Optional[Dict[str, Any]], user: User, incremental_loading: bool, - fetchers_config: dict[str, Any] = {}, ) -> Optional[Dict[str, Any]]: """ Process a single data item, choosing between incremental and regular processing. @@ -245,7 +243,6 @@ async def run_tasks_data_item( pipeline_run_id=pipeline_run_id, context=context, user=user, - fetchers_config=fetchers_config, ): pass else: diff --git a/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py b/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py index db8b8963b..9171e429d 100644 --- a/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py +++ b/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py @@ -8,7 +8,7 @@ class DataFetcherInterface(ABC): pass @abstractmethod - async def fetch(self, data_item_path: str, fetchers_config: dict[str, Any]) -> str: + async def fetch(self, data_item_path: str) -> str: """ args: data_item_path - path to the data item """ diff --git a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py b/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py index f1e5dac91..3b90b51b1 100644 --- a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py +++ b/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py @@ -1,8 +1,7 @@ +import os from cognee.modules.ingestion import save_data_to_file from cognee.tasks.ingestion.data_fetchers.data_fetcher_interface import DataFetcherInterface -from typing import Any from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig -from cognee.modules.ingestion.exceptions.exceptions import IngestionError from cognee.shared.logging_utils import get_logger logger = get_logger() @@ -14,34 +13,22 @@ class WebUrlFetcher(DataFetcherInterface): def fetcher_name(self): return "web_url_fetcher" - async def fetch(self, data_item_path: str, fetchers_config: dict[str, Any]): + async def fetch(self, data_item_path: str): from cognee.context_global_variables import tavily_config, soup_crawler_config from cognee.tasks.web_scraper import fetch_page_content - web_url_fetcher_config = fetchers_config.get(self.fetcher_name()) - if not isinstance(web_url_fetcher_config, dict): - raise IngestionError(f"{self.fetcher_name()} configuration must be a valid dictionary") + if os.getenv("TAVILY_API_KEY"): + _tavily_config = TavilyConfig() + _soup_config = None + preferred_tool = "tavily" + else: + _tavily_config = None + _soup_config = SoupCrawlerConfig() + preferred_tool = "beautifulsoup" - tavily_dict = web_url_fetcher_config.get("tavily_config") - _tavily_config = TavilyConfig(**tavily_dict) if tavily_dict else None - - soup_dict = web_url_fetcher_config.get("soup_config") - _soup_config = SoupCrawlerConfig(**soup_dict) if soup_dict else None - - # Set global configs for downstream access tavily_config.set(_tavily_config) soup_crawler_config.set(_soup_config) - preferred_tool = "beautifulsoup" if _soup_config else "tavily" - if preferred_tool == "tavily" and _tavily_config is None: - raise IngestionError( - message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig." - ) - if preferred_tool == "beautifulsoup" and _soup_config is None: - raise IngestionError( - message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper." - ) - logger.info(f"Starting web URL crawling for: {data_item_path}") logger.info(f"Using scraping tool: {preferred_tool}") diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index e707f4d92..02987b893 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -28,7 +28,6 @@ async def ingest_data( node_set: Optional[List[str]] = None, dataset_id: UUID = None, preferred_loaders: List[str] = None, - fetchers_config: dict[str, Any] = {}, ): if not user: user = await get_default_user() @@ -79,7 +78,7 @@ async def ingest_data( for data_item in data: # Get file path of data item or create a file if it doesn't exist - original_file_path = await save_data_item_to_storage(data_item, fetchers_config) + original_file_path = await save_data_item_to_storage(data_item) # Transform file path to be OS usable actual_file_path = get_data_file_path(original_file_path) diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index c70ddb2de..453219f15 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -23,9 +23,7 @@ class SaveDataSettings(BaseSettings): settings = SaveDataSettings() -async def save_data_item_to_storage( - data_item: Union[BinaryIO, str, Any], fetchers_config: dict[str, Any] = {} -) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str: if "llama_index" in str(type(data_item)): # Dynamic import is used because the llama_index module is optional. from .transform_data import get_data_from_llama_index @@ -61,7 +59,7 @@ async def save_data_item_to_storage( return data_item elif parsed_url.scheme == "http" or parsed_url.scheme == "https": fetcher = WebUrlFetcher() - return await fetcher.fetch(data_item, fetchers_config) + return await fetcher.fetch(data_item) # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: diff --git a/cognee/tests/integration/web_url_crawler/test_add.py b/cognee/tests/integration/web_url_crawler/test_add.py index b45ed9139..a00ca9e0d 100644 --- a/cognee/tests/integration/web_url_crawler/test_add.py +++ b/cognee/tests/integration/web_url_crawler/test_add.py @@ -33,21 +33,10 @@ async def test_add_succesfully_adds_url_when_fetcher_config_specified(): "paragraphs": {"selector": "p", "all": True}, } - fetchers_config = { - "web_url_fetcher": { - "soup_config": { - "max_depth": 1, - "follow_links": False, - "extraction_rules": extraction_rules, - } - } - } - try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", incremental_loading=False, - fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}") @@ -65,20 +54,10 @@ async def test_add_with_incremental_loading_works(): "paragraphs": {"selector": "p", "all": True}, } - fetchers_config = { - "web_url_fetcher": { - "soup_config": { - "max_depth": 1, - "follow_links": False, - "extraction_rules": extraction_rules, - } - } - } try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", incremental_loading=True, - fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}") @@ -96,20 +75,10 @@ async def test_add_without_incremental_loading_works(): "paragraphs": {"selector": "p", "all": True}, } - fetchers_config = { - "web_url_fetcher": { - "soup_config": { - "max_depth": 1, - "follow_links": False, - "extraction_rules": extraction_rules, - } - } - } try: await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", incremental_loading=False, - fetchers_config=fetchers_config, ) except Exception as e: pytest.fail(f"Failed to add url: {e}") diff --git a/examples/python/web_url_fetcher_example.py b/examples/python/web_url_fetcher_example.py index 9ac099e16..2195a62c0 100644 --- a/examples/python/web_url_fetcher_example.py +++ b/examples/python/web_url_fetcher_example.py @@ -20,20 +20,9 @@ async def main(): "paragraphs": {"selector": "p", "all": True}, } - fetchers_config = { - "web_url_loader": { - "soup_config": { - "max_depth": 1, - "follow_links": False, - "extraction_rules": extraction_rules, - } - } - } - await cognee.add( "https://en.wikipedia.org/wiki/Large_language_model", incremental_loading=False, - fetchers_config=fetchers_config, ) await cognee.cognify()