remove fetchers_config, use default configs for Tavily and BeautifulSoup

This commit is contained in:
Daulet Amirkhanov 2025-10-21 12:04:35 +01:00
parent abbbf88ad3
commit 95e735d397
10 changed files with 15 additions and 85 deletions

View file

@ -26,7 +26,6 @@ async def add(
preferred_loaders: List[str] = None, preferred_loaders: List[str] = None,
incremental_loading: bool = True, incremental_loading: bool = True,
data_per_batch: Optional[int] = 20, data_per_batch: Optional[int] = 20,
fetchers_config: dict[str, Any] = {},
): ):
""" """
Add data to Cognee for knowledge graph processing. Add data to Cognee for knowledge graph processing.
@ -174,7 +173,6 @@ async def add(
node_set, node_set,
dataset_id, dataset_id,
preferred_loaders, preferred_loaders,
fetchers_config,
), ),
] ]
@ -200,7 +198,6 @@ async def add(
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading, incremental_loading=incremental_loading,
data_per_batch=data_per_batch, data_per_batch=data_per_batch,
fetchers_config=fetchers_config,
): ):
pipeline_run_info = run_info pipeline_run_info = run_info

View file

@ -37,7 +37,6 @@ async def run_pipeline(
graph_db_config: dict = None, graph_db_config: dict = None,
incremental_loading: bool = False, incremental_loading: bool = False,
data_per_batch: int = 20, data_per_batch: int = 20,
fetchers_config: dict[str, Any] = {},
): ):
validate_pipeline_tasks(tasks) validate_pipeline_tasks(tasks)
await setup_and_check_environment(vector_db_config, graph_db_config) await setup_and_check_environment(vector_db_config, graph_db_config)
@ -54,7 +53,6 @@ async def run_pipeline(
context={"dataset": dataset}, context={"dataset": dataset},
incremental_loading=incremental_loading, incremental_loading=incremental_loading,
data_per_batch=data_per_batch, data_per_batch=data_per_batch,
fetchers_config=fetchers_config,
): ):
yield run_info yield run_info
@ -68,7 +66,6 @@ async def run_pipeline_per_dataset(
context: dict = None, context: dict = None,
incremental_loading=False, incremental_loading=False,
data_per_batch: int = 20, data_per_batch: int = 20,
fetchers_config: dict[str, Any] = {},
): ):
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
await set_database_global_context_variables(dataset.id, dataset.owner_id) await set_database_global_context_variables(dataset.id, dataset.owner_id)
@ -92,7 +89,6 @@ async def run_pipeline_per_dataset(
context, context,
incremental_loading, incremental_loading,
data_per_batch, data_per_batch,
fetchers_config,
) )
async for pipeline_run_info in pipeline_run: async for pipeline_run_info in pipeline_run:

View file

@ -60,7 +60,6 @@ async def run_tasks(
context: dict = None, context: dict = None,
incremental_loading: bool = False, incremental_loading: bool = False,
data_per_batch: int = 20, data_per_batch: int = 20,
fetchers_config: dict[str, Any] = {},
): ):
if not user: if not user:
user = await get_default_user() user = await get_default_user()
@ -107,7 +106,6 @@ async def run_tasks(
context, context,
user, user,
incremental_loading, incremental_loading,
fetchers_config,
) )
) )
for data_item in data_batch for data_item in data_batch

View file

@ -38,7 +38,6 @@ async def run_tasks_data_item_incremental(
pipeline_run_id: str, pipeline_run_id: str,
context: Optional[Dict[str, Any]], context: Optional[Dict[str, Any]],
user: User, user: User,
fetchers_config: dict[str, Any],
) -> AsyncGenerator[Dict[str, Any], None]: ) -> AsyncGenerator[Dict[str, Any], None]:
""" """
Process a single data item with incremental loading support. Process a single data item with incremental loading support.
@ -65,7 +64,7 @@ async def run_tasks_data_item_incremental(
# If incremental_loading of data is set to True don't process documents already processed by pipeline # If incremental_loading of data is set to True don't process documents already processed by pipeline
# If data is being added to Cognee for the first time calculate the id of the data # If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data): if not isinstance(data_item, Data):
file_path = await save_data_item_to_storage(data_item, fetchers_config) file_path = await save_data_item_to_storage(data_item)
# Ingest data and add metadata # Ingest data and add metadata
async with open_data_file(file_path) as file: async with open_data_file(file_path) as file:
classified_data = ingestion.classify(file) classified_data = ingestion.classify(file)
@ -210,7 +209,6 @@ async def run_tasks_data_item(
context: Optional[Dict[str, Any]], context: Optional[Dict[str, Any]],
user: User, user: User,
incremental_loading: bool, incremental_loading: bool,
fetchers_config: dict[str, Any] = {},
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
""" """
Process a single data item, choosing between incremental and regular processing. Process a single data item, choosing between incremental and regular processing.
@ -245,7 +243,6 @@ async def run_tasks_data_item(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
context=context, context=context,
user=user, user=user,
fetchers_config=fetchers_config,
): ):
pass pass
else: else:

View file

@ -8,7 +8,7 @@ class DataFetcherInterface(ABC):
pass pass
@abstractmethod @abstractmethod
async def fetch(self, data_item_path: str, fetchers_config: dict[str, Any]) -> str: async def fetch(self, data_item_path: str) -> str:
""" """
args: data_item_path - path to the data item args: data_item_path - path to the data item
""" """

View file

@ -1,8 +1,7 @@
import os
from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion import save_data_to_file
from cognee.tasks.ingestion.data_fetchers.data_fetcher_interface import DataFetcherInterface from cognee.tasks.ingestion.data_fetchers.data_fetcher_interface import DataFetcherInterface
from typing import Any
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
from cognee.modules.ingestion.exceptions.exceptions import IngestionError
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
logger = get_logger() logger = get_logger()
@ -14,34 +13,22 @@ class WebUrlFetcher(DataFetcherInterface):
def fetcher_name(self): def fetcher_name(self):
return "web_url_fetcher" return "web_url_fetcher"
async def fetch(self, data_item_path: str, fetchers_config: dict[str, Any]): async def fetch(self, data_item_path: str):
from cognee.context_global_variables import tavily_config, soup_crawler_config from cognee.context_global_variables import tavily_config, soup_crawler_config
from cognee.tasks.web_scraper import fetch_page_content from cognee.tasks.web_scraper import fetch_page_content
web_url_fetcher_config = fetchers_config.get(self.fetcher_name()) if os.getenv("TAVILY_API_KEY"):
if not isinstance(web_url_fetcher_config, dict): _tavily_config = TavilyConfig()
raise IngestionError(f"{self.fetcher_name()} configuration must be a valid dictionary") _soup_config = None
preferred_tool = "tavily"
else:
_tavily_config = None
_soup_config = SoupCrawlerConfig()
preferred_tool = "beautifulsoup"
tavily_dict = web_url_fetcher_config.get("tavily_config")
_tavily_config = TavilyConfig(**tavily_dict) if tavily_dict else None
soup_dict = web_url_fetcher_config.get("soup_config")
_soup_config = SoupCrawlerConfig(**soup_dict) if soup_dict else None
# Set global configs for downstream access
tavily_config.set(_tavily_config) tavily_config.set(_tavily_config)
soup_crawler_config.set(_soup_config) soup_crawler_config.set(_soup_config)
preferred_tool = "beautifulsoup" if _soup_config else "tavily"
if preferred_tool == "tavily" and _tavily_config is None:
raise IngestionError(
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
)
if preferred_tool == "beautifulsoup" and _soup_config is None:
raise IngestionError(
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
)
logger.info(f"Starting web URL crawling for: {data_item_path}") logger.info(f"Starting web URL crawling for: {data_item_path}")
logger.info(f"Using scraping tool: {preferred_tool}") logger.info(f"Using scraping tool: {preferred_tool}")

View file

@ -28,7 +28,6 @@ async def ingest_data(
node_set: Optional[List[str]] = None, node_set: Optional[List[str]] = None,
dataset_id: UUID = None, dataset_id: UUID = None,
preferred_loaders: List[str] = None, preferred_loaders: List[str] = None,
fetchers_config: dict[str, Any] = {},
): ):
if not user: if not user:
user = await get_default_user() user = await get_default_user()
@ -79,7 +78,7 @@ async def ingest_data(
for data_item in data: for data_item in data:
# Get file path of data item or create a file if it doesn't exist # Get file path of data item or create a file if it doesn't exist
original_file_path = await save_data_item_to_storage(data_item, fetchers_config) original_file_path = await save_data_item_to_storage(data_item)
# Transform file path to be OS usable # Transform file path to be OS usable
actual_file_path = get_data_file_path(original_file_path) actual_file_path = get_data_file_path(original_file_path)

View file

@ -23,9 +23,7 @@ class SaveDataSettings(BaseSettings):
settings = SaveDataSettings() settings = SaveDataSettings()
async def save_data_item_to_storage( async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str:
data_item: Union[BinaryIO, str, Any], fetchers_config: dict[str, Any] = {}
) -> str:
if "llama_index" in str(type(data_item)): if "llama_index" in str(type(data_item)):
# Dynamic import is used because the llama_index module is optional. # Dynamic import is used because the llama_index module is optional.
from .transform_data import get_data_from_llama_index from .transform_data import get_data_from_llama_index
@ -61,7 +59,7 @@ async def save_data_item_to_storage(
return data_item return data_item
elif parsed_url.scheme == "http" or parsed_url.scheme == "https": elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
fetcher = WebUrlFetcher() fetcher = WebUrlFetcher()
return await fetcher.fetch(data_item, fetchers_config) return await fetcher.fetch(data_item)
# data is local file path # data is local file path
elif parsed_url.scheme == "file": elif parsed_url.scheme == "file":
if settings.accept_local_file_path: if settings.accept_local_file_path:

View file

@ -33,21 +33,10 @@ async def test_add_succesfully_adds_url_when_fetcher_config_specified():
"paragraphs": {"selector": "p", "all": True}, "paragraphs": {"selector": "p", "all": True},
} }
fetchers_config = {
"web_url_fetcher": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
try: try:
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
incremental_loading=False, incremental_loading=False,
fetchers_config=fetchers_config,
) )
except Exception as e: except Exception as e:
pytest.fail(f"Failed to add url: {e}") pytest.fail(f"Failed to add url: {e}")
@ -65,20 +54,10 @@ async def test_add_with_incremental_loading_works():
"paragraphs": {"selector": "p", "all": True}, "paragraphs": {"selector": "p", "all": True},
} }
fetchers_config = {
"web_url_fetcher": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
try: try:
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
incremental_loading=True, incremental_loading=True,
fetchers_config=fetchers_config,
) )
except Exception as e: except Exception as e:
pytest.fail(f"Failed to add url: {e}") pytest.fail(f"Failed to add url: {e}")
@ -96,20 +75,10 @@ async def test_add_without_incremental_loading_works():
"paragraphs": {"selector": "p", "all": True}, "paragraphs": {"selector": "p", "all": True},
} }
fetchers_config = {
"web_url_fetcher": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
try: try:
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
incremental_loading=False, incremental_loading=False,
fetchers_config=fetchers_config,
) )
except Exception as e: except Exception as e:
pytest.fail(f"Failed to add url: {e}") pytest.fail(f"Failed to add url: {e}")

View file

@ -20,20 +20,9 @@ async def main():
"paragraphs": {"selector": "p", "all": True}, "paragraphs": {"selector": "p", "all": True},
} }
fetchers_config = {
"web_url_loader": {
"soup_config": {
"max_depth": 1,
"follow_links": False,
"extraction_rules": extraction_rules,
}
}
}
await cognee.add( await cognee.add(
"https://en.wikipedia.org/wiki/Large_language_model", "https://en.wikipedia.org/wiki/Large_language_model",
incremental_loading=False, incremental_loading=False,
fetchers_config=fetchers_config,
) )
await cognee.cognify() await cognee.cognify()