From 5035c872a71ce77dc8bd564ebb16400ec5a5f3dd Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Tue, 21 Oct 2025 15:20:09 +0100 Subject: [PATCH] refactor: update web scraper configurations and simplify fetch logic --- cognee/context_global_variables.py | 7 ++- .../data_fetchers/web_url_fetcher.py | 2 - cognee/tasks/web_scraper/bs4_crawler.py | 55 +++---------------- cognee/tasks/web_scraper/utils.py | 24 ++++---- 4 files changed, 25 insertions(+), 63 deletions(-) diff --git a/cognee/context_global_variables.py b/cognee/context_global_variables.py index 2ecf9b8d3..388316359 100644 --- a/cognee/context_global_variables.py +++ b/cognee/context_global_variables.py @@ -7,14 +7,17 @@ from cognee.base_config import get_base_config from cognee.infrastructure.databases.utils import get_or_create_dataset_database from cognee.infrastructure.files.storage.config import file_storage_config from cognee.modules.users.methods import get_user +from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig # Note: ContextVar allows us to use different graph db configurations in Cognee # for different async tasks, threads and processes vector_db_config = ContextVar("vector_db_config", default=None) graph_db_config = ContextVar("graph_db_config", default=None) session_user = ContextVar("session_user", default=None) -soup_crawler_config = ContextVar("soup_crawler_config", default=None) -tavily_config = ContextVar("tavily_config", default=None) +soup_crawler_config: ContextVar[SoupCrawlerConfig | None] = ContextVar( + "soup_crawler_config", default=None +) +tavily_config: ContextVar[TavilyConfig | None] = ContextVar("tavily_config", default=None) async def set_session_user_context_variable(user): diff --git a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py b/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py index 3b90b51b1..949cb9b0a 100644 --- a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py +++ b/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py @@ -35,8 +35,6 @@ class WebUrlFetcher(DataFetcherInterface): data = await fetch_page_content( data_item_path, preferred_tool=preferred_tool, - soup_crawler_config=_soup_config, - tavily_config=_tavily_config, ) logger.info(f"Successfully fetched content from URL {data_item_path}") diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/bs4_crawler.py index 400287e08..969058466 100644 --- a/cognee/tasks/web_scraper/bs4_crawler.py +++ b/cognee/tasks/web_scraper/bs4_crawler.py @@ -66,6 +66,7 @@ class RobotsTxtCache: timestamp: float = field(default_factory=time.time) +# TODO(daulet) refactor: This is no longer BeautifulSoup, rather just a crawler class BeautifulSoupCrawler: """Crawler for fetching and extracting web content using BeautifulSoup. @@ -491,14 +492,12 @@ class BeautifulSoupCrawler: return (val or "").strip() return el.get_text(strip=True) - async def fetch_with_bs4( + async def fetch_urls( self, - urls: Union[str, List[str], Dict[str, Dict[str, Any]]], - extraction_rules: Optional[Dict[str, Any]] = None, + urls: Union[str, List[str]], *, use_playwright: bool = False, playwright_js_wait: float = 0.8, - join_all_matches: bool = False, ) -> Dict[str, str]: """Fetch and extract content from URLs using BeautifulSoup or Playwright. @@ -516,38 +515,11 @@ class BeautifulSoupCrawler: ValueError: If extraction_rules are missing when required or if urls is invalid. Exception: If fetching or extraction fails. """ - url_rules_map: Dict[str, Dict[str, Any]] = {} - if isinstance(urls, str): - if not extraction_rules: - raise ValueError("extraction_rules required when urls is a string") - url_rules_map[urls] = extraction_rules - elif isinstance(urls, list): - if not extraction_rules: - raise ValueError("extraction_rules required when urls is a list") - for url in urls: - url_rules_map[url] = extraction_rules - elif isinstance(urls, dict): - url_rules_map = urls + urls = [urls] else: raise ValueError(f"Invalid urls type: {type(urls)}") - logger.info( - f"Preparing to fetch {len(url_rules_map)} URL(s) with {len(extraction_rules) if extraction_rules else 0} extraction rule(s)" - ) - - normalized_url_rules: Dict[str, List[ExtractionRule]] = {} - for url, rules in url_rules_map.items(): - normalized_rules = [] - for _, rule in rules.items(): - r = self._normalize_rule(rule) - if join_all_matches: - r.all = True - normalized_rules.append(r) - normalized_url_rules[url] = normalized_rules - - logger.info(f"Normalized extraction rules for {len(normalized_url_rules)} URL(s)") - async def _task(url: str): async with self._sem: try: @@ -575,30 +547,21 @@ class BeautifulSoupCrawler: logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)") - # Extract content - pieces = [] - for rule in normalized_url_rules[url]: - text = self._extract_with_bs4(html, rule) - if text: - pieces.append(text) - - concatenated = " ".join(pieces).strip() - logger.info(f"Extracted {len(concatenated)} characters from {url}") - return url, concatenated + return url, html except Exception as e: logger.error(f"Error processing {url}: {e}") return url, "" - logger.info(f"Creating {len(url_rules_map)} async tasks for concurrent fetching") - tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()] + logger.info(f"Creating {len(urls)} async tasks for concurrent fetching") + tasks = [asyncio.create_task(_task(u)) for u in urls] results = {} completed = 0 total = len(tasks) for coro in asyncio.as_completed(tasks): - url, text = await coro - results[url] = text + url, html = await coro + results[url] = html completed += 1 logger.info(f"Progress: {completed}/{total} URLs processed") diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index a32b6848c..8b8bcc11f 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -5,19 +5,17 @@ both BeautifulSoup for custom extraction rules and Tavily for API-based scraping """ from typing import Dict, List, Union, Optional, Literal +from cognee.context_global_variables import soup_crawler_config, tavily_config from cognee.shared.logging_utils import get_logger from .bs4_crawler import BeautifulSoupCrawler -from .config import TavilyConfig, SoupCrawlerConfig +from .config import TavilyConfig logger = get_logger(__name__) async def fetch_page_content( urls: Union[str, List[str]], - *, preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", - tavily_config: Optional[TavilyConfig] = None, - soup_crawler_config: Optional[SoupCrawlerConfig] = None, ) -> Dict[str, str]: """Fetch content from one or more URLs using the specified tool. @@ -48,6 +46,9 @@ async def fetch_page_content( url_list = [urls] if isinstance(urls, str) else urls logger.info(f"Starting to fetch content from {len(url_list)} URL(s) using {preferred_tool}") + _tavily_config = tavily_config.get() + _soup_crawler_config = soup_crawler_config.get() + if preferred_tool == "tavily": if not tavily_config or tavily_config.api_key is None: raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily") @@ -62,11 +63,10 @@ async def fetch_page_content( "Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1" ) raise ImportError - if not soup_crawler_config or soup_crawler_config.extraction_rules is None: - raise ValueError("extraction_rules must be provided when not using Tavily") + if soup_crawler_config is None or soup_crawler_config.extraction_rules is None: + raise ValueError("soup_crawler_config must be provided when not using Tavily") logger.info("Using BeautifulSoup for content extraction") - extraction_rules = soup_crawler_config.extraction_rules logger.info( f"Initializing BeautifulSoup crawler with concurrency={soup_crawler_config.concurrency}, timeout={soup_crawler_config.timeout}s, max_crawl_delay={soup_crawler_config.max_crawl_delay}s" ) @@ -85,12 +85,10 @@ async def fetch_page_content( logger.info( f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={soup_crawler_config.use_playwright})" ) - results = await crawler.fetch_with_bs4( + results = await crawler.fetch_urls( urls, - extraction_rules, use_playwright=soup_crawler_config.use_playwright, playwright_js_wait=soup_crawler_config.playwright_js_wait, - join_all_matches=soup_crawler_config.join_all_matches, ) logger.info(f"Successfully fetched content from {len(results)} URL(s)") return results @@ -103,7 +101,7 @@ async def fetch_page_content( async def fetch_with_tavily( - urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None + urls: Union[str, List[str]], tavily_config: TavilyConfig ) -> Dict[str, str]: """Fetch content from URLs using the Tavily API. @@ -133,8 +131,8 @@ async def fetch_with_tavily( f"Initializing Tavily client with extract_depth={extract_depth}, timeout={timeout}s" ) client = AsyncTavilyClient( - api_key=tavily_config.api_key if tavily_config else None, - proxies=tavily_config.proxies if tavily_config else None, + api_key=tavily_config.api_key, + proxies=tavily_config.proxies, ) logger.info(f"Sending extract request to Tavily API for {len(url_list)} URL(s)")