From 9d9969676f105d60e46c6bdf7d0b75a4b5f3c8bb Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Tue, 21 Oct 2025 15:49:12 +0100 Subject: [PATCH] Separate BeautifulSoup crawling from fetching --- cognee/context_global_variables.py | 6 - .../tasks/ingestion/data_fetchers/__init__.py | 8 - .../data_fetchers/data_fetcher_interface.py | 15 - .../data_fetchers/web_url_fetcher.py | 55 --- cognee/tasks/web_scraper/__init__.py | 2 + cognee/tasks/web_scraper/bs4_crawler.py | 441 +---------------- cognee/tasks/web_scraper/config.py | 2 +- .../tasks/web_scraper/default_url_crawler.py | 446 ++++++++++++++++++ cognee/tasks/web_scraper/utils.py | 71 ++- cognee/tasks/web_scraper/web_scraper_task.py | 10 +- .../tasks/web_scraping/web_scraping_test.py | 6 +- 11 files changed, 489 insertions(+), 573 deletions(-) delete mode 100644 cognee/tasks/ingestion/data_fetchers/__init__.py delete mode 100644 cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py delete mode 100644 cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py create mode 100644 cognee/tasks/web_scraper/default_url_crawler.py diff --git a/cognee/context_global_variables.py b/cognee/context_global_variables.py index 388316359..aad53341a 100644 --- a/cognee/context_global_variables.py +++ b/cognee/context_global_variables.py @@ -7,18 +7,12 @@ from cognee.base_config import get_base_config from cognee.infrastructure.databases.utils import get_or_create_dataset_database from cognee.infrastructure.files.storage.config import file_storage_config from cognee.modules.users.methods import get_user -from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig # Note: ContextVar allows us to use different graph db configurations in Cognee # for different async tasks, threads and processes vector_db_config = ContextVar("vector_db_config", default=None) graph_db_config = ContextVar("graph_db_config", default=None) session_user = ContextVar("session_user", default=None) -soup_crawler_config: ContextVar[SoupCrawlerConfig | None] = ContextVar( - "soup_crawler_config", default=None -) -tavily_config: ContextVar[TavilyConfig | None] = ContextVar("tavily_config", default=None) - async def set_session_user_context_variable(user): session_user.set(user) diff --git a/cognee/tasks/ingestion/data_fetchers/__init__.py b/cognee/tasks/ingestion/data_fetchers/__init__.py deleted file mode 100644 index 63530b427..000000000 --- a/cognee/tasks/ingestion/data_fetchers/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -__all__ = [] - -try: - from .web_url_fetcher import WebUrlFetcher - - __all__.append("WebUrlFetcher") -except ImportError: - pass diff --git a/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py b/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py deleted file mode 100644 index 9171e429d..000000000 --- a/cognee/tasks/ingestion/data_fetchers/data_fetcher_interface.py +++ /dev/null @@ -1,15 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - - -class DataFetcherInterface(ABC): - @abstractmethod - def fetcher_name(self) -> str: - pass - - @abstractmethod - async def fetch(self, data_item_path: str) -> str: - """ - args: data_item_path - path to the data item - """ - pass diff --git a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py b/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py deleted file mode 100644 index 949cb9b0a..000000000 --- a/cognee/tasks/ingestion/data_fetchers/web_url_fetcher.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -from cognee.modules.ingestion import save_data_to_file -from cognee.tasks.ingestion.data_fetchers.data_fetcher_interface import DataFetcherInterface -from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig -from cognee.shared.logging_utils import get_logger - -logger = get_logger() - - -class WebUrlFetcher(DataFetcherInterface): - def __init__(self): ... - - def fetcher_name(self): - return "web_url_fetcher" - - async def fetch(self, data_item_path: str): - from cognee.context_global_variables import tavily_config, soup_crawler_config - from cognee.tasks.web_scraper import fetch_page_content - - if os.getenv("TAVILY_API_KEY"): - _tavily_config = TavilyConfig() - _soup_config = None - preferred_tool = "tavily" - else: - _tavily_config = None - _soup_config = SoupCrawlerConfig() - preferred_tool = "beautifulsoup" - - tavily_config.set(_tavily_config) - soup_crawler_config.set(_soup_config) - - logger.info(f"Starting web URL crawling for: {data_item_path}") - logger.info(f"Using scraping tool: {preferred_tool}") - - data = await fetch_page_content( - data_item_path, - preferred_tool=preferred_tool, - ) - - logger.info(f"Successfully fetched content from URL {data_item_path}") - - # fetch_page_content returns a dict like {url: content} - # Extract the content string before saving - if isinstance(data, dict): - # Concatenate all URL contents (usually just one URL) - content = "" - for url, text in data.items(): - content += f"{url}:\n{text}\n\n" - logger.info( - f"Extracted content from {len(data)} URL(s), total size: {len(content)} characters" - ) - else: - content = data - - return await save_data_to_file(content) diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py index d8e580fad..f4d6677c7 100644 --- a/cognee/tasks/web_scraper/__init__.py +++ b/cognee/tasks/web_scraper/__init__.py @@ -8,6 +8,7 @@ BeautifulSoup or Tavily, defining data models, and handling scraping configurati from .bs4_crawler import BeautifulSoupCrawler from .utils import fetch_page_content from .web_scraper_task import cron_web_scraper_task, web_scraper_task +from .default_url_crawler import DefaultUrlCrawler __all__ = [ @@ -15,4 +16,5 @@ __all__ = [ "fetch_page_content", "cron_web_scraper_task", "web_scraper_task", + "DefaultUrlCrawler", ] diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/bs4_crawler.py index 969058466..171a76633 100644 --- a/cognee/tasks/web_scraper/bs4_crawler.py +++ b/cognee/tasks/web_scraper/bs4_crawler.py @@ -5,32 +5,13 @@ from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. supports robots.txt handling, rate limiting, and custom extraction rules. """ -import asyncio -import time from typing import Union, List, Dict, Any, Optional -from urllib.parse import urlparse -from dataclasses import dataclass, field -from functools import lru_cache -import httpx +from dataclasses import dataclass from bs4 import BeautifulSoup from cognee.shared.logging_utils import get_logger logger = get_logger(__name__) -try: - from playwright.async_api import async_playwright -except ImportError: - logger.warning( - "Failed to import playwright, make sure to install using pip install playwright>=1.9.0" - ) - async_playwright = None - -try: - from protego import Protego -except ImportError: - logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1") - Protego = None - @dataclass class ExtractionRule: @@ -51,21 +32,6 @@ class ExtractionRule: join_with: str = " " -@dataclass -class RobotsTxtCache: - """Cache for robots.txt data. - - Attributes: - protego: Parsed robots.txt object (Protego instance). - crawl_delay: Delay between requests (in seconds). - timestamp: Time when the cache entry was created. - """ - - protego: Any - crawl_delay: float - timestamp: float = field(default_factory=time.time) - - # TODO(daulet) refactor: This is no longer BeautifulSoup, rather just a crawler class BeautifulSoupCrawler: """Crawler for fetching and extracting web content using BeautifulSoup. @@ -84,333 +50,6 @@ class BeautifulSoupCrawler: robots_cache_ttl: Time-to-live for robots.txt cache in seconds. """ - def __init__( - self, - *, - concurrency: int = 5, - crawl_delay: float = 0.5, - max_crawl_delay: Optional[float] = 10.0, - timeout: float = 15.0, - max_retries: int = 2, - retry_delay_factor: float = 0.5, - headers: Optional[Dict[str, str]] = None, - robots_cache_ttl: float = 3600.0, - ): - """Initialize the BeautifulSoupCrawler. - - Args: - concurrency: Number of concurrent requests allowed. - crawl_delay: Minimum seconds between requests to the same domain. - max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit). - timeout: Per-request timeout in seconds. - max_retries: Number of retries for failed requests. - retry_delay_factor: Multiplier for exponential backoff on retries. - headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0). - robots_cache_ttl: Time-to-live for robots.txt cache in seconds. - """ - self.concurrency = concurrency - self._sem = asyncio.Semaphore(concurrency) - self.crawl_delay = crawl_delay - self.max_crawl_delay = max_crawl_delay - self.timeout = timeout - self.max_retries = max_retries - self.retry_delay_factor = retry_delay_factor - self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"} - self.robots_cache_ttl = robots_cache_ttl - self._last_request_time_per_domain: Dict[str, float] = {} - self._robots_cache: Dict[str, RobotsTxtCache] = {} - self._client: Optional[httpx.AsyncClient] = None - self._robots_lock = asyncio.Lock() - - async def _ensure_client(self): - """Initialize the HTTP client if not already created.""" - if self._client is None: - self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) - - async def close(self): - """Close the HTTP client.""" - if self._client: - await self._client.aclose() - self._client = None - - async def __aenter__(self): - """Enter the context manager, initializing the HTTP client.""" - await self._ensure_client() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Exit the context manager, closing the HTTP client.""" - await self.close() - - @lru_cache(maxsize=1024) - def _domain_from_url(self, url: str) -> str: - """Extract the domain (netloc) from a URL. - - Args: - url: The URL to parse. - - Returns: - str: The domain (netloc) of the URL. - """ - try: - return urlparse(url).netloc - except Exception: - return url - - @lru_cache(maxsize=1024) - def _get_domain_root(self, url: str) -> str: - """Get the root URL (scheme and netloc) from a URL. - - Args: - url: The URL to parse. - - Returns: - str: The root URL (e.g., "https://example.com"). - """ - parsed = urlparse(url) - return f"{parsed.scheme}://{parsed.netloc}" - - async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None): - """Enforce rate limiting for requests to the same domain. - - Args: - url: The URL to check. - crawl_delay: Custom crawl delay in seconds (if any). - """ - domain = self._domain_from_url(url) - last = self._last_request_time_per_domain.get(domain) - delay = crawl_delay if crawl_delay is not None else self.crawl_delay - - if last is None: - self._last_request_time_per_domain[domain] = time.time() - return - - elapsed = time.time() - last - wait_for = delay - elapsed - if wait_for > 0: - logger.info( - f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)" - ) - await asyncio.sleep(wait_for) - logger.info(f"Rate limit wait completed for {url}") - self._last_request_time_per_domain[domain] = time.time() - - async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]: - """Get cached robots.txt data if valid. - - Args: - domain_root: The root URL (e.g., "https://example.com"). - - Returns: - Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found. - """ - if Protego is None: - return None - - cached = self._robots_cache.get(domain_root) - if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl: - return cached - return None - - async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache: - """Fetch and cache robots.txt data. - - Args: - domain_root: The root URL (e.g., "https://example.com"). - - Returns: - RobotsTxtCache: Cached robots.txt data with crawl delay. - - Raises: - Exception: If fetching robots.txt fails. - """ - async with self._robots_lock: - cached = await self._get_robots_cache(domain_root) - if cached: - return cached - - robots_url = f"{domain_root}/robots.txt" - try: - await self._ensure_client() - await self._respect_rate_limit(robots_url, self.crawl_delay) - resp = await self._client.get(robots_url, timeout=5.0) - content = resp.text if resp.status_code == 200 else "" - except Exception as e: - logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}") - content = "" - - protego = Protego.parse(content) if content.strip() else None - agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") - - crawl_delay = self.crawl_delay - if protego: - delay = protego.crawl_delay(agent) or protego.crawl_delay("*") - if delay: - # Apply max_crawl_delay cap if configured - if self.max_crawl_delay is not None and delay > self.max_crawl_delay: - logger.warning( - f"robots.txt specifies crawl_delay={delay}s for {domain_root}, " - f"capping to max_crawl_delay={self.max_crawl_delay}s" - ) - crawl_delay = self.max_crawl_delay - else: - crawl_delay = delay - - cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay) - self._robots_cache[domain_root] = cache_entry - return cache_entry - - async def _is_url_allowed(self, url: str) -> bool: - """Check if a URL is allowed by robots.txt. - - Args: - url: The URL to check. - - Returns: - bool: True if the URL is allowed, False otherwise. - """ - if Protego is None: - return True - - try: - domain_root = self._get_domain_root(url) - cache = await self._get_robots_cache(domain_root) - if cache is None: - cache = await self._fetch_and_cache_robots(domain_root) - - if cache.protego is None: - return True - - agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") - return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url) - except Exception as e: - logger.debug(f"Error checking robots.txt for {url}: {e}") - return True - - async def _get_crawl_delay(self, url: str) -> float: - """Get the crawl delay for a URL from robots.txt. - - Args: - url: The URL to check. - - Returns: - float: Crawl delay in seconds. - """ - if Protego is None: - return self.crawl_delay - - try: - domain_root = self._get_domain_root(url) - cache = await self._get_robots_cache(domain_root) - if cache is None: - cache = await self._fetch_and_cache_robots(domain_root) - return cache.crawl_delay - except Exception: - return self.crawl_delay - - async def _fetch_httpx(self, url: str) -> str: - """Fetch a URL using HTTPX with retries. - - Args: - url: The URL to fetch. - - Returns: - str: The HTML content of the page. - - Raises: - Exception: If all retry attempts fail. - """ - await self._ensure_client() - assert self._client is not None, "HTTP client not initialized" - - attempt = 0 - crawl_delay = await self._get_crawl_delay(url) - logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}") - - while True: - try: - await self._respect_rate_limit(url, crawl_delay) - resp = await self._client.get(url) - resp.raise_for_status() - logger.info( - f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)" - ) - return resp.text - except Exception as exc: - attempt += 1 - if attempt > self.max_retries: - logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}") - raise - - delay = self.retry_delay_factor * (2 ** (attempt - 1)) - logger.warning( - f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}" - ) - await asyncio.sleep(delay) - - async def _render_with_playwright( - self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None - ) -> str: - """Fetch and render a URL using Playwright for JavaScript content. - - Args: - url: The URL to fetch. - js_wait: Seconds to wait for JavaScript to load. - timeout: Timeout for the request (in seconds, defaults to instance timeout). - - Returns: - str: The rendered HTML content. - - Raises: - RuntimeError: If Playwright is not installed. - Exception: If all retry attempts fail. - """ - if async_playwright is None: - raise RuntimeError( - "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." - ) - - timeout_val = timeout or self.timeout - logger.info( - f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}" - ) - - attempt = 0 - while True: - try: - async with async_playwright() as p: - logger.info(f"Launching headless Chromium browser for {url}") - browser = await p.chromium.launch(headless=True) - try: - context = await browser.new_context() - page = await context.new_page() - logger.info(f"Navigating to {url} and waiting for network idle") - await page.goto( - url, - wait_until="networkidle", - timeout=int(timeout_val * 1000), - ) - if js_wait: - logger.info(f"Waiting {js_wait}s for JavaScript to execute") - await asyncio.sleep(js_wait) - content = await page.content() - logger.info( - f"Successfully rendered {url} with Playwright (size={len(content)} bytes)" - ) - return content - finally: - await browser.close() - except Exception as exc: - attempt += 1 - if attempt > self.max_retries: - logger.error(f"Playwright fetch failed for {url}: {exc}") - raise - backoff = self.retry_delay_factor * (2 ** (attempt - 1)) - logger.warning( - f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})" - ) - await asyncio.sleep(backoff) - def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule: """Normalize an extraction rule to an ExtractionRule dataclass. @@ -435,7 +74,7 @@ class BeautifulSoupCrawler: ) raise ValueError(f"Invalid extraction rule: {rule}") - def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str: + def extract(self, html: str, rule: ExtractionRule) -> str: """Extract content from HTML using BeautifulSoup or lxml XPath. Args: @@ -491,79 +130,3 @@ class BeautifulSoupCrawler: val = el.get(rule.attr) return (val or "").strip() return el.get_text(strip=True) - - async def fetch_urls( - self, - urls: Union[str, List[str]], - *, - use_playwright: bool = False, - playwright_js_wait: float = 0.8, - ) -> Dict[str, str]: - """Fetch and extract content from URLs using BeautifulSoup or Playwright. - - Args: - urls: A single URL, list of URLs, or dict mapping URLs to extraction rules. - extraction_rules: Default extraction rules for string or list URLs. - use_playwright: If True, use Playwright for JavaScript rendering. - playwright_js_wait: Seconds to wait for JavaScript to load. - join_all_matches: If True, extract all matching elements for each rule. - - Returns: - Dict[str, str]: A dictionary mapping URLs to their extracted content. - - Raises: - ValueError: If extraction_rules are missing when required or if urls is invalid. - Exception: If fetching or extraction fails. - """ - if isinstance(urls, str): - urls = [urls] - else: - raise ValueError(f"Invalid urls type: {type(urls)}") - - async def _task(url: str): - async with self._sem: - try: - logger.info(f"Processing URL: {url}") - - # Check robots.txt - allowed = await self._is_url_allowed(url) - if not allowed: - logger.warning(f"URL disallowed by robots.txt: {url}") - return url, "" - - logger.info(f"Robots.txt check passed for {url}") - - # Fetch HTML - if use_playwright: - logger.info( - f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)" - ) - html = await self._render_with_playwright( - url, js_wait=playwright_js_wait, timeout=self.timeout - ) - else: - logger.info(f"Fetching {url} with httpx") - html = await self._fetch_httpx(url) - - logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)") - - return url, html - - except Exception as e: - logger.error(f"Error processing {url}: {e}") - return url, "" - - logger.info(f"Creating {len(urls)} async tasks for concurrent fetching") - tasks = [asyncio.create_task(_task(u)) for u in urls] - results = {} - completed = 0 - total = len(tasks) - - for coro in asyncio.as_completed(tasks): - url, html = await coro - results[url] = html - completed += 1 - logger.info(f"Progress: {completed}/{total} URLs processed") - - logger.info(f"Completed fetching all {len(results)} URL(s)") - return results diff --git a/cognee/tasks/web_scraper/config.py b/cognee/tasks/web_scraper/config.py index ac470daa9..fcf22ab33 100644 --- a/cognee/tasks/web_scraper/config.py +++ b/cognee/tasks/web_scraper/config.py @@ -10,7 +10,7 @@ class TavilyConfig(BaseModel): timeout: Optional[int] = Field(default=10, ge=1, le=60) -class SoupCrawlerConfig(BaseModel): +class DefaultCrawlerConfig(BaseModel): concurrency: int = 5 crawl_delay: float = 0.5 max_crawl_delay: Optional[float] = ( diff --git a/cognee/tasks/web_scraper/default_url_crawler.py b/cognee/tasks/web_scraper/default_url_crawler.py new file mode 100644 index 000000000..d9d2ee922 --- /dev/null +++ b/cognee/tasks/web_scraper/default_url_crawler.py @@ -0,0 +1,446 @@ +import asyncio +from dataclasses import dataclass, field +from functools import lru_cache +import time +from typing import Any, Union, List, Dict, Optional +from urllib.parse import urlparse +import httpx + +from cognee.shared.logging_utils import get_logger +from cognee.tasks.web_scraper.utils import UrlsToHtmls + +logger = get_logger() + +try: + from protego import Protego +except ImportError: + logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1") + Protego = None + +try: + from playwright.async_api import async_playwright +except ImportError: + logger.warning( + "Failed to import playwright, make sure to install using pip install playwright>=1.9.0" + ) + async_playwright = None + + +@dataclass +class RobotsTxtCache: + """Cache for robots.txt data. + + Attributes: + protego: Parsed robots.txt object (Protego instance). + crawl_delay: Delay between requests (in seconds). + timestamp: Time when the cache entry was created. + """ + + protego: Any + crawl_delay: float + timestamp: float = field(default_factory=time.time) + + +class DefaultUrlCrawler: + def __init__( + self, + *, + concurrency: int = 5, + crawl_delay: float = 0.5, + max_crawl_delay: Optional[float] = 10.0, + timeout: float = 15.0, + max_retries: int = 2, + retry_delay_factor: float = 0.5, + headers: Optional[Dict[str, str]] = None, + robots_cache_ttl: float = 3600.0, + ): + """Initialize the BeautifulSoupCrawler. + + Args: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit). + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. + """ + self.concurrency = concurrency + self._sem = asyncio.Semaphore(concurrency) + self.crawl_delay = crawl_delay + self.max_crawl_delay = max_crawl_delay + self.timeout = timeout + self.max_retries = max_retries + self.retry_delay_factor = retry_delay_factor + self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"} + self.robots_cache_ttl = robots_cache_ttl + self._last_request_time_per_domain: Dict[str, float] = {} + self._robots_cache: Dict[str, RobotsTxtCache] = {} + self._client: Optional[httpx.AsyncClient] = None + self._robots_lock = asyncio.Lock() + + async def _ensure_client(self): + """Initialize the HTTP client if not already created.""" + if self._client is None: + self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) + + async def close(self): + """Close the HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + async def __aenter__(self): + """Enter the context manager, initializing the HTTP client.""" + await self._ensure_client() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager, closing the HTTP client.""" + await self.close() + + @lru_cache(maxsize=1024) + def _domain_from_url(self, url: str) -> str: + """Extract the domain (netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The domain (netloc) of the URL. + """ + try: + return urlparse(url).netloc + except Exception: + return url + + @lru_cache(maxsize=1024) + def _get_domain_root(self, url: str) -> str: + """Get the root URL (scheme and netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The root URL (e.g., "https://example.com"). + """ + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None): + """Enforce rate limiting for requests to the same domain. + + Args: + url: The URL to check. + crawl_delay: Custom crawl delay in seconds (if any). + """ + domain = self._domain_from_url(url) + last = self._last_request_time_per_domain.get(domain) + delay = crawl_delay if crawl_delay is not None else self.crawl_delay + + if last is None: + self._last_request_time_per_domain[domain] = time.time() + return + + elapsed = time.time() - last + wait_for = delay - elapsed + if wait_for > 0: + logger.info( + f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)" + ) + await asyncio.sleep(wait_for) + logger.info(f"Rate limit wait completed for {url}") + self._last_request_time_per_domain[domain] = time.time() + + async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]: + """Get cached robots.txt data if valid. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found. + """ + if Protego is None: + return None + + cached = self._robots_cache.get(domain_root) + if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl: + return cached + return None + + async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache: + """Fetch and cache robots.txt data. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + RobotsTxtCache: Cached robots.txt data with crawl delay. + + Raises: + Exception: If fetching robots.txt fails. + """ + async with self._robots_lock: + cached = await self._get_robots_cache(domain_root) + if cached: + return cached + + robots_url = f"{domain_root}/robots.txt" + try: + await self._ensure_client() + await self._respect_rate_limit(robots_url, self.crawl_delay) + resp = await self._client.get(robots_url, timeout=5.0) + content = resp.text if resp.status_code == 200 else "" + except Exception as e: + logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}") + content = "" + + protego = Protego.parse(content) if content.strip() else None + agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") + + crawl_delay = self.crawl_delay + if protego: + delay = protego.crawl_delay(agent) or protego.crawl_delay("*") + if delay: + # Apply max_crawl_delay cap if configured + if self.max_crawl_delay is not None and delay > self.max_crawl_delay: + logger.warning( + f"robots.txt specifies crawl_delay={delay}s for {domain_root}, " + f"capping to max_crawl_delay={self.max_crawl_delay}s" + ) + crawl_delay = self.max_crawl_delay + else: + crawl_delay = delay + + cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay) + self._robots_cache[domain_root] = cache_entry + return cache_entry + + async def _is_url_allowed(self, url: str) -> bool: + """Check if a URL is allowed by robots.txt. + + Args: + url: The URL to check. + + Returns: + bool: True if the URL is allowed, False otherwise. + """ + if Protego is None: + return True + + try: + domain_root = self._get_domain_root(url) + cache = await self._get_robots_cache(domain_root) + if cache is None: + cache = await self._fetch_and_cache_robots(domain_root) + + if cache.protego is None: + return True + + agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") + return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url) + except Exception as e: + logger.debug(f"Error checking robots.txt for {url}: {e}") + return True + + async def _get_crawl_delay(self, url: str) -> float: + """Get the crawl delay for a URL from robots.txt. + + Args: + url: The URL to check. + + Returns: + float: Crawl delay in seconds. + """ + if Protego is None: + return self.crawl_delay + + try: + domain_root = self._get_domain_root(url) + cache = await self._get_robots_cache(domain_root) + if cache is None: + cache = await self._fetch_and_cache_robots(domain_root) + return cache.crawl_delay + except Exception: + return self.crawl_delay + + async def _fetch_httpx(self, url: str) -> str: + """Fetch a URL using HTTPX with retries. + + Args: + url: The URL to fetch. + + Returns: + str: The HTML content of the page. + + Raises: + Exception: If all retry attempts fail. + """ + await self._ensure_client() + assert self._client is not None, "HTTP client not initialized" + + attempt = 0 + crawl_delay = await self._get_crawl_delay(url) + logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}") + + while True: + try: + await self._respect_rate_limit(url, crawl_delay) + resp = await self._client.get(url) + resp.raise_for_status() + logger.info( + f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)" + ) + return resp.text + except Exception as exc: + attempt += 1 + if attempt > self.max_retries: + logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}") + raise + + delay = self.retry_delay_factor * (2 ** (attempt - 1)) + logger.warning( + f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}" + ) + await asyncio.sleep(delay) + + async def _render_with_playwright( + self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None + ) -> str: + """Fetch and render a URL using Playwright for JavaScript content. + + Args: + url: The URL to fetch. + js_wait: Seconds to wait for JavaScript to load. + timeout: Timeout for the request (in seconds, defaults to instance timeout). + + Returns: + str: The rendered HTML content. + + Raises: + RuntimeError: If Playwright is not installed. + Exception: If all retry attempts fail. + """ + if async_playwright is None: + raise RuntimeError( + "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." + ) + + timeout_val = timeout or self.timeout + logger.info( + f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}" + ) + + attempt = 0 + while True: + try: + async with async_playwright() as p: + logger.info(f"Launching headless Chromium browser for {url}") + browser = await p.chromium.launch(headless=True) + try: + context = await browser.new_context() + page = await context.new_page() + logger.info(f"Navigating to {url} and waiting for network idle") + await page.goto( + url, + wait_until="networkidle", + timeout=int(timeout_val * 1000), + ) + if js_wait: + logger.info(f"Waiting {js_wait}s for JavaScript to execute") + await asyncio.sleep(js_wait) + content = await page.content() + logger.info( + f"Successfully rendered {url} with Playwright (size={len(content)} bytes)" + ) + return content + finally: + await browser.close() + except Exception as exc: + attempt += 1 + if attempt > self.max_retries: + logger.error(f"Playwright fetch failed for {url}: {exc}") + raise + backoff = self.retry_delay_factor * (2 ** (attempt - 1)) + logger.warning( + f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})" + ) + await asyncio.sleep(backoff) + + async def fetch_urls( + self, + urls: Union[str, List[str]], + *, + use_playwright: bool = False, + playwright_js_wait: float = 0.8, + ) -> UrlsToHtmls: + """Fetch and extract content from URLs using BeautifulSoup or Playwright. + + Args: + urls: A single URL, list of URLs, or dict mapping URLs to extraction rules. + extraction_rules: Default extraction rules for string or list URLs. + use_playwright: If True, use Playwright for JavaScript rendering. + playwright_js_wait: Seconds to wait for JavaScript to load. + join_all_matches: If True, extract all matching elements for each rule. + + Returns: + Dict[str, str]: A dictionary mapping URLs to their extracted content. + + Raises: + ValueError: If extraction_rules are missing when required or if urls is invalid. + Exception: If fetching or extraction fails. + """ + if isinstance(urls, str): + urls = [urls] + else: + raise ValueError(f"Invalid urls type: {type(urls)}") + + async def _task(url: str): + async with self._sem: + try: + logger.info(f"Processing URL: {url}") + + # Check robots.txt + allowed = await self._is_url_allowed(url) + if not allowed: + logger.warning(f"URL disallowed by robots.txt: {url}") + return url, "" + + logger.info(f"Robots.txt check passed for {url}") + + # Fetch HTML + if use_playwright: + logger.info( + f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)" + ) + html = await self._render_with_playwright( + url, js_wait=playwright_js_wait, timeout=self.timeout + ) + else: + logger.info(f"Fetching {url} with httpx") + html = await self._fetch_httpx(url) + + logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)") + + return url, html + + except Exception as e: + logger.error(f"Error processing {url}: {e}") + return url, "" + + logger.info(f"Creating {len(urls)} async tasks for concurrent fetching") + tasks = [asyncio.create_task(_task(u)) for u in urls] + results = {} + completed = 0 + total = len(tasks) + + for coro in asyncio.as_completed(tasks): + url, html = await coro + results[url] = html + completed += 1 + logger.info(f"Progress: {completed}/{total} URLs processed") + + logger.info(f"Completed fetching all {len(results)} URL(s)") + return results diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index 8b8bcc11f..0cbd355a3 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -4,19 +4,20 @@ This module provides functions to fetch and extract content from web pages, supp both BeautifulSoup for custom extraction rules and Tavily for API-based scraping. """ -from typing import Dict, List, Union, Optional, Literal -from cognee.context_global_variables import soup_crawler_config, tavily_config +import os +from re import L +from typing import List, Union, TypeAlias from cognee.shared.logging_utils import get_logger +from .default_url_crawler import DefaultUrlCrawler from .bs4_crawler import BeautifulSoupCrawler -from .config import TavilyConfig +from .config import DefaultCrawlerConfig, TavilyConfig logger = get_logger(__name__) +UrlsToHtmls: TypeAlias = dict[str, str] -async def fetch_page_content( - urls: Union[str, List[str]], - preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", -) -> Dict[str, str]: + +async def fetch_page_content(urls: Union[str, List[str]]) -> UrlsToHtmls: """Fetch content from one or more URLs using the specified tool. This function retrieves web page content using either BeautifulSoup (with custom @@ -29,7 +30,7 @@ async def fetch_page_content( Defaults to "beautifulsoup". tavily_config: Configuration for Tavily API, including API key. Required if preferred_tool is "tavily". - soup_crawler_config: Configuration for BeautifulSoup crawler, including + default_crawler_config: Configuration for BeautifulSoup crawler, including extraction rules. Required if preferred_tool is "beautifulsoup" and extraction_rules are needed. @@ -44,51 +45,39 @@ async def fetch_page_content( installed. """ url_list = [urls] if isinstance(urls, str) else urls - logger.info(f"Starting to fetch content from {len(url_list)} URL(s) using {preferred_tool}") - _tavily_config = tavily_config.get() - _soup_crawler_config = soup_crawler_config.get() - - if preferred_tool == "tavily": - if not tavily_config or tavily_config.api_key is None: - raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily") - logger.info("Using Tavily API for content extraction") + if os.getenv("TAVILY_API_KEY"): + logger.info("Using Tavily API for url fetching") return await fetch_with_tavily(urls, tavily_config) + else: + logger.info("Using default crawler for content extraction") - if preferred_tool == "beautifulsoup": - try: - from bs4 import BeautifulSoup as _ # noqa: F401 - except ImportError: - logger.error( - "Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1" - ) - raise ImportError - if soup_crawler_config is None or soup_crawler_config.extraction_rules is None: - raise ValueError("soup_crawler_config must be provided when not using Tavily") + default_crawler_config = ( + DefaultCrawlerConfig() + ) # We've decided to use defaults, and configure through env vars as needed - logger.info("Using BeautifulSoup for content extraction") logger.info( - f"Initializing BeautifulSoup crawler with concurrency={soup_crawler_config.concurrency}, timeout={soup_crawler_config.timeout}s, max_crawl_delay={soup_crawler_config.max_crawl_delay}s" + f"Initializing BeautifulSoup crawler with concurrency={default_crawler_config.concurrency}, timeout={default_crawler_config.timeout}s, max_crawl_delay={default_crawler_config.max_crawl_delay}s" ) - crawler = BeautifulSoupCrawler( - concurrency=soup_crawler_config.concurrency, - crawl_delay=soup_crawler_config.crawl_delay, - max_crawl_delay=soup_crawler_config.max_crawl_delay, - timeout=soup_crawler_config.timeout, - max_retries=soup_crawler_config.max_retries, - retry_delay_factor=soup_crawler_config.retry_delay_factor, - headers=soup_crawler_config.headers, - robots_cache_ttl=soup_crawler_config.robots_cache_ttl, + crawler = DefaultUrlCrawler( + concurrency=default_crawler_config.concurrency, + crawl_delay=default_crawler_config.crawl_delay, + max_crawl_delay=default_crawler_config.max_crawl_delay, + timeout=default_crawler_config.timeout, + max_retries=default_crawler_config.max_retries, + retry_delay_factor=default_crawler_config.retry_delay_factor, + headers=default_crawler_config.headers, + robots_cache_ttl=default_crawler_config.robots_cache_ttl, ) try: logger.info( - f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={soup_crawler_config.use_playwright})" + f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={default_crawler_config.use_playwright})" ) results = await crawler.fetch_urls( urls, - use_playwright=soup_crawler_config.use_playwright, - playwright_js_wait=soup_crawler_config.playwright_js_wait, + use_playwright=default_crawler_config.use_playwright, + playwright_js_wait=default_crawler_config.playwright_js_wait, ) logger.info(f"Successfully fetched content from {len(results)} URL(s)") return results @@ -102,7 +91,7 @@ async def fetch_page_content( async def fetch_with_tavily( urls: Union[str, List[str]], tavily_config: TavilyConfig -) -> Dict[str, str]: +) -> UrlsToHtmls: """Fetch content from URLs using the Tavily API. Args: diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py index 52154c6ef..2bade3719 100644 --- a/cognee/tasks/web_scraper/web_scraper_task.py +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -19,7 +19,7 @@ from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.modules.engine.operations.setup import setup from .models import WebPage, WebSite, ScrapingJob -from .config import SoupCrawlerConfig, TavilyConfig +from .config import DefaultCrawlerConfig, TavilyConfig from .utils import fetch_page_content try: @@ -47,7 +47,7 @@ async def cron_web_scraper_task( schedule: str = None, extraction_rules: dict = None, tavily_api_key: str = os.getenv("TAVILY_API_KEY"), - soup_crawler_config: SoupCrawlerConfig = None, + soup_crawler_config: DefaultCrawlerConfig = None, tavily_config: TavilyConfig = None, job_name: str = "scraping", ): @@ -121,7 +121,7 @@ async def web_scraper_task( schedule: str = None, extraction_rules: dict = None, tavily_api_key: str = os.getenv("TAVILY_API_KEY"), - soup_crawler_config: SoupCrawlerConfig = None, + soup_crawler_config: DefaultCrawlerConfig = None, tavily_config: TavilyConfig = None, job_name: str = None, ): @@ -341,7 +341,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle soup_crawler_config: Configuration for BeautifulSoup crawler. Returns: - Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, + Tuple[DefaultCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, tavily_config, and preferred_tool ("tavily" or "beautifulsoup"). Raises: @@ -350,7 +350,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle preferred_tool = "beautifulsoup" if extraction_rules and not soup_crawler_config: - soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) + soup_crawler_config = DefaultCrawlerConfig(extraction_rules=extraction_rules) if tavily_api_key: if not tavily_config: diff --git a/cognee/tests/tasks/web_scraping/web_scraping_test.py b/cognee/tests/tasks/web_scraping/web_scraping_test.py index bf66b5155..81c58ac8d 100644 --- a/cognee/tests/tasks/web_scraping/web_scraping_test.py +++ b/cognee/tests/tasks/web_scraping/web_scraping_test.py @@ -1,6 +1,6 @@ import asyncio import cognee -from cognee.tasks.web_scraper.config import SoupCrawlerConfig +from cognee.tasks.web_scraper.config import DefaultCrawlerConfig from cognee.tasks.web_scraper import cron_web_scraper_task @@ -14,7 +14,7 @@ async def test_web_scraping_using_bs4(): "authors": {"selector": ".quote small", "all": True}, } - soup_config = SoupCrawlerConfig( + soup_config = DefaultCrawlerConfig( concurrency=5, crawl_delay=0.5, timeout=15.0, @@ -47,7 +47,7 @@ async def test_web_scraping_using_bs4_and_incremental_loading(): url = "https://books.toscrape.com/" rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"} - soup_config = SoupCrawlerConfig( + soup_config = DefaultCrawlerConfig( concurrency=1, crawl_delay=0.1, timeout=10.0,