diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py index 8a142b515..d8e580fad 100644 --- a/cognee/tasks/web_scraper/__init__.py +++ b/cognee/tasks/web_scraper/__init__.py @@ -1,3 +1,10 @@ +"""Web scraping module for cognee. + +This module provides tools for scraping web content, managing scraping jobs, and storing +data in a graph database. It includes classes and functions for crawling web pages using +BeautifulSoup or Tavily, defining data models, and handling scraping configurations. +""" + from .bs4_crawler import BeautifulSoupCrawler from .utils import fetch_page_content from .web_scraper_task import cron_web_scraper_task, web_scraper_task diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/bs4_crawler.py index 57cfa46f8..7ccf06a3d 100644 --- a/cognee/tasks/web_scraper/bs4_crawler.py +++ b/cognee/tasks/web_scraper/bs4_crawler.py @@ -1,10 +1,16 @@ +"""BeautifulSoup-based web crawler for extracting content from web pages. + +This module provides the BeautifulSoupCrawler class for fetching and extracting content +from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It +supports robots.txt handling, rate limiting, and custom extraction rules. +""" + import asyncio import time from typing import Union, List, Dict, Any, Optional from urllib.parse import urlparse from dataclasses import dataclass, field from functools import lru_cache - import httpx from bs4 import BeautifulSoup from cognee.shared.logging_utils import get_logger @@ -28,7 +34,15 @@ except ImportError: @dataclass class ExtractionRule: - """Normalized extraction rule""" + """Normalized extraction rule for web content. + + Attributes: + selector: CSS selector for extraction (if any). + xpath: XPath expression for extraction (if any). + attr: HTML attribute to extract (if any). + all: If True, extract all matching elements; otherwise, extract first. + join_with: String to join multiple extracted elements. + """ selector: Optional[str] = None xpath: Optional[str] = None @@ -39,7 +53,13 @@ class ExtractionRule: @dataclass class RobotsTxtCache: - """Cache for robots.txt data""" + """Cache for robots.txt data. + + Attributes: + protego: Parsed robots.txt object (Protego instance). + crawl_delay: Delay between requests (in seconds). + timestamp: Time when the cache entry was created. + """ protego: Any crawl_delay: float @@ -47,6 +67,21 @@ class RobotsTxtCache: class BeautifulSoupCrawler: + """Crawler for fetching and extracting web content using BeautifulSoup. + + Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt + compliance, and rate limiting. Extracts content using CSS selectors or XPath rules. + + Attributes: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (e.g., User-Agent). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. + """ + def __init__( self, *, @@ -56,16 +91,18 @@ class BeautifulSoupCrawler: max_retries: int = 2, retry_delay_factor: float = 0.5, headers: Optional[Dict[str, str]] = None, - robots_cache_ttl: float = 3600.0, # Cache robots.txt for 1 hour + robots_cache_ttl: float = 3600.0, ): - """ - concurrency: number of concurrent requests allowed - crawl_delay: minimum seconds to wait between requests to the SAME domain - timeout: per-request timeout - max_retries: number of retries on network errors - retry_delay_factor: multiplier for exponential retry failure delay - headers: default headers for requests - robots_cache_ttl: TTL for robots.txt cache in seconds + """Initialize the BeautifulSoupCrawler. + + Args: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. """ self.concurrency = concurrency self._sem = asyncio.Semaphore(concurrency) @@ -75,33 +112,42 @@ class BeautifulSoupCrawler: self.retry_delay_factor = retry_delay_factor self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"} self.robots_cache_ttl = robots_cache_ttl - self._last_request_time_per_domain: Dict[str, float] = {} self._robots_cache: Dict[str, RobotsTxtCache] = {} self._client: Optional[httpx.AsyncClient] = None self._robots_lock = asyncio.Lock() - # ---------- lifecycle helpers ---------- async def _ensure_client(self): + """Initialize the HTTP client if not already created.""" if self._client is None: self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) async def close(self): + """Close the HTTP client.""" if self._client: await self._client.aclose() self._client = None async def __aenter__(self): + """Enter the context manager, initializing the HTTP client.""" await self._ensure_client() return self async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager, closing the HTTP client.""" await self.close() - # ---------- rate limiting ---------- @staticmethod @lru_cache(maxsize=1024) def _domain_from_url(url: str) -> str: + """Extract the domain (netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The domain (netloc) of the URL. + """ try: return urlparse(url).netloc except Exception: @@ -110,10 +156,24 @@ class BeautifulSoupCrawler: @staticmethod @lru_cache(maxsize=1024) def _get_domain_root(url: str) -> str: + """Get the root URL (scheme and netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The root URL (e.g., "https://example.com"). + """ parsed = urlparse(url) return f"{parsed.scheme}://{parsed.netloc}" async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None): + """Enforce rate limiting for requests to the same domain. + + Args: + url: The URL to check. + crawl_delay: Custom crawl delay in seconds (if any). + """ domain = self._domain_from_url(url) last = self._last_request_time_per_domain.get(domain) delay = crawl_delay if crawl_delay is not None else self.crawl_delay @@ -128,9 +188,15 @@ class BeautifulSoupCrawler: await asyncio.sleep(wait_for) self._last_request_time_per_domain[domain] = time.time() - # ----------- robots.txt handling ----------- async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]: - """Get cached robots.txt data if valid""" + """Get cached robots.txt data if valid. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found. + """ if Protego is None: return None @@ -140,9 +206,18 @@ class BeautifulSoupCrawler: return None async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache: - """Fetch and cache robots.txt data""" + """Fetch and cache robots.txt data. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + RobotsTxtCache: Cached robots.txt data with crawl delay. + + Raises: + Exception: If fetching robots.txt fails. + """ async with self._robots_lock: - # Check again after acquiring lock cached = await self._get_robots_cache(domain_root) if cached: return cached @@ -170,6 +245,14 @@ class BeautifulSoupCrawler: return cache_entry async def _is_url_allowed(self, url: str) -> bool: + """Check if a URL is allowed by robots.txt. + + Args: + url: The URL to check. + + Returns: + bool: True if the URL is allowed, False otherwise. + """ if Protego is None: return True @@ -189,6 +272,14 @@ class BeautifulSoupCrawler: return True async def _get_crawl_delay(self, url: str) -> float: + """Get the crawl delay for a URL from robots.txt. + + Args: + url: The URL to check. + + Returns: + float: Crawl delay in seconds. + """ if Protego is None: return self.crawl_delay @@ -201,8 +292,18 @@ class BeautifulSoupCrawler: except Exception: return self.crawl_delay - # ---------- low-level fetchers ---------- async def _fetch_httpx(self, url: str) -> str: + """Fetch a URL using HTTPX with retries. + + Args: + url: The URL to fetch. + + Returns: + str: The HTML content of the page. + + Raises: + Exception: If all retry attempts fail. + """ await self._ensure_client() assert self._client is not None, "HTTP client not initialized" @@ -230,6 +331,20 @@ class BeautifulSoupCrawler: async def _render_with_playwright( self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None ) -> str: + """Fetch and render a URL using Playwright for JavaScript content. + + Args: + url: The URL to fetch. + js_wait: Seconds to wait for JavaScript to load. + timeout: Timeout for the request (in seconds, defaults to instance timeout). + + Returns: + str: The rendered HTML content. + + Raises: + RuntimeError: If Playwright is not installed. + Exception: If all retry attempts fail. + """ if async_playwright is None: raise RuntimeError( "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." @@ -263,10 +378,19 @@ class BeautifulSoupCrawler: ) await asyncio.sleep(backoff) - # ---------- extraction helpers ---------- @staticmethod def _normalize_rule(rule: Union[str, Dict[str, Any]]) -> ExtractionRule: - """Normalize extraction rule to ExtractionRule dataclass""" + """Normalize an extraction rule to an ExtractionRule dataclass. + + Args: + rule: A string (CSS selector) or dict with extraction parameters. + + Returns: + ExtractionRule: Normalized extraction rule. + + Raises: + ValueError: If the rule is invalid. + """ if isinstance(rule, str): return ExtractionRule(selector=rule) if isinstance(rule, dict): @@ -280,7 +404,18 @@ class BeautifulSoupCrawler: raise ValueError(f"Invalid extraction rule: {rule}") def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str: - """Extract content using BeautifulSoup or lxml xpath""" + """Extract content from HTML using BeautifulSoup or lxml XPath. + + Args: + html: The HTML content to extract from. + rule: The extraction rule to apply. + + Returns: + str: The extracted content. + + Raises: + RuntimeError: If XPath is used but lxml is not installed. + """ soup = BeautifulSoup(html, "html.parser") if rule.xpath: @@ -325,7 +460,6 @@ class BeautifulSoupCrawler: return (val or "").strip() return el.get_text(strip=True) - # ---------- public methods ---------- async def fetch_with_bs4( self, urls: Union[str, List[str], Dict[str, Dict[str, Any]]], @@ -335,23 +469,22 @@ class BeautifulSoupCrawler: playwright_js_wait: float = 0.8, join_all_matches: bool = False, ) -> Dict[str, str]: - """ - Fetch one or more URLs and extract text using BeautifulSoup (or lxml xpath). + """Fetch and extract content from URLs using BeautifulSoup or Playwright. Args: - urls: Can be: - - A single URL string - - A list of URLs (uses extraction_rules for all) - - A dict mapping URL -> extraction_rules (URL-specific rules) - extraction_rules: Default rules when urls is a string or list - use_playwright: Whether to use Playwright for JS rendering - playwright_js_wait: Wait time after page load for JS - join_all_matches: Force all rules to extract all matching elements + urls: A single URL, list of URLs, or dict mapping URLs to extraction rules. + extraction_rules: Default extraction rules for string or list URLs. + use_playwright: If True, use Playwright for JavaScript rendering. + playwright_js_wait: Seconds to wait for JavaScript to load. + join_all_matches: If True, extract all matching elements for each rule. Returns: - dict[url] -> concatenated string of extracted content + Dict[str, str]: A dictionary mapping URLs to their extracted content. + + Raises: + ValueError: If extraction_rules are missing when required or if urls is invalid. + Exception: If fetching or extraction fails. """ - # Handle different input formats url_rules_map: Dict[str, Dict[str, Any]] = {} if isinstance(urls, str): @@ -364,12 +497,10 @@ class BeautifulSoupCrawler: for url in urls: url_rules_map[url] = extraction_rules elif isinstance(urls, dict): - # URL-specific rules url_rules_map = urls else: raise ValueError(f"Invalid urls type: {type(urls)}") - # Normalize all rules normalized_url_rules: Dict[str, List[ExtractionRule]] = {} for url, rules in url_rules_map.items(): normalized_rules = [] @@ -388,7 +519,6 @@ class BeautifulSoupCrawler: logger.warning(f"URL disallowed by robots.txt: {url}") return url, "" - # Fetch (rendered or not) if use_playwright: html = await self._render_with_playwright( url, js_wait=playwright_js_wait, timeout=self.timeout @@ -396,7 +526,6 @@ class BeautifulSoupCrawler: else: html = await self._fetch_httpx(url) - # Extract content using URL-specific rules pieces = [] for rule in normalized_url_rules[url]: text = self._extract_with_bs4(html, rule) diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index c1b6ecb53..a14dbbd20 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -1,10 +1,13 @@ -from tavily import AsyncTavilyClient -from .bs4_crawler import BeautifulSoupCrawler -import os -from .config import TavilyConfig, SoupCrawlerConfig -from typing import Dict, Any, List, Union, Optional, Literal +"""Utilities for fetching web content using BeautifulSoup or Tavily. + +This module provides functions to fetch and extract content from web pages, supporting +both BeautifulSoup for custom extraction rules and Tavily for API-based scraping. +""" + +from typing import Dict, List, Union, Optional, Literal from cognee.shared.logging_utils import get_logger -import asyncio +from .bs4_crawler import BeautifulSoupCrawler +from .config import TavilyConfig, SoupCrawlerConfig logger = get_logger(__name__) @@ -16,20 +19,31 @@ async def fetch_page_content( tavily_config: Optional[TavilyConfig] = None, soup_crawler_config: Optional[SoupCrawlerConfig] = None, ) -> Dict[str, Union[str, Dict[str, str]]]: - """ - Fetch page content using Tavily API if TAVILY_API_KEY is set, - otherwise fetch using BeautifulSoupCrawler directly. + """Fetch content from one or more URLs using the specified tool. - Parameters: - urls: single URL or list of URLs - extraction_rules: dict mapping field names -> CSS selector or rule - use_playwright: whether to render JS (BeautifulSoupCrawler) - playwright_js_wait: seconds to wait for JS to load - join_all_matches: join all matching elements per rule - structured: if True, returns structured dict instead of concatenated string (based on extraction_rules field names) + This function retrieves web page content using either BeautifulSoup (with custom + extraction rules) or Tavily (API-based scraping). It handles single URLs or lists of + URLs and returns a dictionary mapping URLs to their extracted content. + + Args: + urls: A single URL (str) or a list of URLs (List[str]) to scrape. + preferred_tool: The scraping tool to use ("tavily" or "beautifulsoup"). + Defaults to "beautifulsoup". + tavily_config: Configuration for Tavily API, including API key. + Required if preferred_tool is "tavily". + soup_crawler_config: Configuration for BeautifulSoup crawler, including + extraction rules. Required if preferred_tool is "beautifulsoup" and + extraction_rules are needed. Returns: - Dict mapping URL -> extracted string or structured dict + Dict[str, Union[str, Dict[str, str]]]: A dictionary mapping each URL to its + extracted content (as a string for BeautifulSoup or a dict for Tavily). + + Raises: + ValueError: If Tavily API key is missing when using Tavily, or if + extraction_rules are not provided when using BeautifulSoup. + ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not + installed. """ if preferred_tool == "tavily": if tavily_config.api_key is None: @@ -43,6 +57,7 @@ async def fetch_page_content( logger.error( "Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1" ) + raise crawler = BeautifulSoupCrawler() extraction_rules = soup_crawler_config.extraction_rules if extraction_rules is None: @@ -58,20 +73,34 @@ async def fetch_page_content( return results except Exception as e: logger.error(f"Error fetching page content: {str(e)}") + raise async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]: + """Fetch content from URLs using the Tavily API. + + Args: + urls: A single URL (str) or a list of URLs (List[str]) to scrape. + + Returns: + Dict[str, str]: A dictionary mapping each URL to its raw content as a string. + + Raises: + ImportError: If tavily-python is not installed. + Exception: If the Tavily API request fails. + """ try: from tavily import AsyncTavilyClient except ImportError: logger.error( "Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0" ) + raise client = AsyncTavilyClient() results = await client.extract(urls) for failed_result in results.get("failed_results", []): logger.warning(f"Failed to fetch {failed_result}") return_results = {} - for results in results.get("results", []): - return_results[results["url"]] = results["raw_content"] + for result in results.get("results", []): + return_results[result["url"]] = result["raw_content"] return return_results diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py index 9fb43c2cb..dc286500a 100644 --- a/cognee/tasks/web_scraper/web_scraper_task.py +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -1,3 +1,10 @@ +"""Web scraping tasks for storing scraped data in a graph database. + +This module provides functions to scrape web content, create or update WebPage, WebSite, +and ScrapingJob data points, and store them in a Kuzu graph database. It supports +scheduled scraping tasks and ensures that node updates preserve existing graph edges. +""" + import os import hashlib from datetime import datetime @@ -22,7 +29,7 @@ try: scheduler = BackgroundScheduler() except ImportError: - raise ImportError("Please install apscheduler by pip install APScheduler >=3.10") + raise ImportError("Please install apscheduler by pip install APScheduler>=3.10") logger = get_logger(__name__) @@ -37,6 +44,28 @@ async def cron_web_scraper_task( tavily_config: TavilyConfig = None, job_name: str = "scraping", ): + """Schedule or run a web scraping task. + + This function schedules a recurring web scraping task using APScheduler or runs it + immediately if no schedule is provided. It delegates to web_scraper_task for actual + scraping and graph storage. + + Args: + url: A single URL or list of URLs to scrape. + schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs immediately. + extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors). + tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable. + soup_crawler_config: Configuration for BeautifulSoup crawler. + tavily_config: Configuration for Tavily API. + job_name: Name of the scraping job. Defaults to "scraping". + + Returns: + Any: The result of web_scraper_task if run immediately, or None if scheduled. + + Raises: + ValueError: If the schedule is an invalid cron expression. + ImportError: If APScheduler is not installed. + """ now = datetime.now() job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}" if schedule: @@ -89,10 +118,29 @@ async def web_scraper_task( tavily_config: TavilyConfig = None, job_name: str = None, ): - """ - Scrapes one or more URLs and returns WebPage, WebSite, and ScrapingJob data points. - Unique IDs are assigned to each WebPage, WebSite, and ScrapingJob. - Includes a description field summarizing other fields for each data point. + """Scrape URLs and store data points in a Kuzu graph database. + + This function scrapes content from the provided URLs, creates or updates WebPage, + WebSite, and ScrapingJob data points, and stores them in a Kuzu graph database. + Each data point includes a description field summarizing its attributes. It creates + 'is_scraping' (ScrapingJob to WebSite) and 'is_part_of' (WebPage to WebSite) + relationships, preserving existing edges during node updates. + + Args: + url: A single URL or list of URLs to scrape. + schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs once. + extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors). + tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable. + soup_crawler_config: Configuration for BeautifulSoup crawler. + tavily_config: Configuration for Tavily API. + job_name: Name of the scraping job. Defaults to a timestamp-based name. + + Returns: + Any: The graph data returned by the graph database. + + Raises: + TypeError: If neither tavily_config nor soup_crawler_config is provided. + Exception: If fetching content or database operations fail. """ await setup() graph_db = await get_graph_engine() @@ -260,7 +308,7 @@ async def web_scraper_task( base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" edge_mapping.append( ( - webpage.id, # Corrected: WebPage is the source, WebSite is the target + webpage.id, websites_dict[base_url].id, "is_part_of", { @@ -280,8 +328,20 @@ async def web_scraper_task( def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config): - """ - Checking if the right argument are given, if not TypeError will be raised. + """Validate and configure arguments for web_scraper_task. + + Args: + tavily_api_key: API key for Tavily. + extraction_rules: Extraction rules for BeautifulSoup. + tavily_config: Configuration for Tavily API. + soup_crawler_config: Configuration for BeautifulSoup crawler. + + Returns: + Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, + tavily_config, and preferred_tool ("tavily" or "beautifulsoup"). + + Raises: + TypeError: If neither tavily_config nor soup_crawler_config is provided. """ preferred_tool = "beautifulsoup" @@ -302,7 +362,19 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle return soup_crawler_config, tavily_config, preferred_tool -def get_path_after_base(base_url, url): +def get_path_after_base(base_url: str, url: str) -> str: + """Extract the path after the base URL. + + Args: + base_url: The base URL (e.g., "https://example.com"). + url: The full URL to extract the path from. + + Returns: + str: The path after the base URL, with leading slashes removed. + + Raises: + ValueError: If the base URL and target URL are from different domains. + """ parsed_base = urlparse(base_url) parsed_url = urlparse(url)