Added Documentation

This commit is contained in:
Geoff-Robin 2025-10-06 04:00:15 +05:30
parent 0f64f6804d
commit 4d5146c802
4 changed files with 305 additions and 68 deletions

View file

@ -1,3 +1,10 @@
"""Web scraping module for cognee.
This module provides tools for scraping web content, managing scraping jobs, and storing
data in a graph database. It includes classes and functions for crawling web pages using
BeautifulSoup or Tavily, defining data models, and handling scraping configurations.
"""
from .bs4_crawler import BeautifulSoupCrawler
from .utils import fetch_page_content
from .web_scraper_task import cron_web_scraper_task, web_scraper_task

View file

@ -1,10 +1,16 @@
"""BeautifulSoup-based web crawler for extracting content from web pages.
This module provides the BeautifulSoupCrawler class for fetching and extracting content
from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It
supports robots.txt handling, rate limiting, and custom extraction rules.
"""
import asyncio
import time
from typing import Union, List, Dict, Any, Optional
from urllib.parse import urlparse
from dataclasses import dataclass, field
from functools import lru_cache
import httpx
from bs4 import BeautifulSoup
from cognee.shared.logging_utils import get_logger
@ -28,7 +34,15 @@ except ImportError:
@dataclass
class ExtractionRule:
"""Normalized extraction rule"""
"""Normalized extraction rule for web content.
Attributes:
selector: CSS selector for extraction (if any).
xpath: XPath expression for extraction (if any).
attr: HTML attribute to extract (if any).
all: If True, extract all matching elements; otherwise, extract first.
join_with: String to join multiple extracted elements.
"""
selector: Optional[str] = None
xpath: Optional[str] = None
@ -39,7 +53,13 @@ class ExtractionRule:
@dataclass
class RobotsTxtCache:
"""Cache for robots.txt data"""
"""Cache for robots.txt data.
Attributes:
protego: Parsed robots.txt object (Protego instance).
crawl_delay: Delay between requests (in seconds).
timestamp: Time when the cache entry was created.
"""
protego: Any
crawl_delay: float
@ -47,6 +67,21 @@ class RobotsTxtCache:
class BeautifulSoupCrawler:
"""Crawler for fetching and extracting web content using BeautifulSoup.
Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt
compliance, and rate limiting. Extracts content using CSS selectors or XPath rules.
Attributes:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (e.g., User-Agent).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
def __init__(
self,
*,
@ -56,16 +91,18 @@ class BeautifulSoupCrawler:
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
robots_cache_ttl: float = 3600.0, # Cache robots.txt for 1 hour
robots_cache_ttl: float = 3600.0,
):
"""
concurrency: number of concurrent requests allowed
crawl_delay: minimum seconds to wait between requests to the SAME domain
timeout: per-request timeout
max_retries: number of retries on network errors
retry_delay_factor: multiplier for exponential retry failure delay
headers: default headers for requests
robots_cache_ttl: TTL for robots.txt cache in seconds
"""Initialize the BeautifulSoupCrawler.
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
@ -75,33 +112,42 @@ class BeautifulSoupCrawler:
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"}
self.robots_cache_ttl = robots_cache_ttl
self._last_request_time_per_domain: Dict[str, float] = {}
self._robots_cache: Dict[str, RobotsTxtCache] = {}
self._client: Optional[httpx.AsyncClient] = None
self._robots_lock = asyncio.Lock()
# ---------- lifecycle helpers ----------
async def _ensure_client(self):
"""Initialize the HTTP client if not already created."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers)
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def __aenter__(self):
"""Enter the context manager, initializing the HTTP client."""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager, closing the HTTP client."""
await self.close()
# ---------- rate limiting ----------
@staticmethod
@lru_cache(maxsize=1024)
def _domain_from_url(url: str) -> str:
"""Extract the domain (netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The domain (netloc) of the URL.
"""
try:
return urlparse(url).netloc
except Exception:
@ -110,10 +156,24 @@ class BeautifulSoupCrawler:
@staticmethod
@lru_cache(maxsize=1024)
def _get_domain_root(url: str) -> str:
"""Get the root URL (scheme and netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The root URL (e.g., "https://example.com").
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None):
"""Enforce rate limiting for requests to the same domain.
Args:
url: The URL to check.
crawl_delay: Custom crawl delay in seconds (if any).
"""
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
delay = crawl_delay if crawl_delay is not None else self.crawl_delay
@ -128,9 +188,15 @@ class BeautifulSoupCrawler:
await asyncio.sleep(wait_for)
self._last_request_time_per_domain[domain] = time.time()
# ----------- robots.txt handling -----------
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
"""Get cached robots.txt data if valid"""
"""Get cached robots.txt data if valid.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found.
"""
if Protego is None:
return None
@ -140,9 +206,18 @@ class BeautifulSoupCrawler:
return None
async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache:
"""Fetch and cache robots.txt data"""
"""Fetch and cache robots.txt data.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
RobotsTxtCache: Cached robots.txt data with crawl delay.
Raises:
Exception: If fetching robots.txt fails.
"""
async with self._robots_lock:
# Check again after acquiring lock
cached = await self._get_robots_cache(domain_root)
if cached:
return cached
@ -170,6 +245,14 @@ class BeautifulSoupCrawler:
return cache_entry
async def _is_url_allowed(self, url: str) -> bool:
"""Check if a URL is allowed by robots.txt.
Args:
url: The URL to check.
Returns:
bool: True if the URL is allowed, False otherwise.
"""
if Protego is None:
return True
@ -189,6 +272,14 @@ class BeautifulSoupCrawler:
return True
async def _get_crawl_delay(self, url: str) -> float:
"""Get the crawl delay for a URL from robots.txt.
Args:
url: The URL to check.
Returns:
float: Crawl delay in seconds.
"""
if Protego is None:
return self.crawl_delay
@ -201,8 +292,18 @@ class BeautifulSoupCrawler:
except Exception:
return self.crawl_delay
# ---------- low-level fetchers ----------
async def _fetch_httpx(self, url: str) -> str:
"""Fetch a URL using HTTPX with retries.
Args:
url: The URL to fetch.
Returns:
str: The HTML content of the page.
Raises:
Exception: If all retry attempts fail.
"""
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
@ -230,6 +331,20 @@ class BeautifulSoupCrawler:
async def _render_with_playwright(
self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None
) -> str:
"""Fetch and render a URL using Playwright for JavaScript content.
Args:
url: The URL to fetch.
js_wait: Seconds to wait for JavaScript to load.
timeout: Timeout for the request (in seconds, defaults to instance timeout).
Returns:
str: The rendered HTML content.
Raises:
RuntimeError: If Playwright is not installed.
Exception: If all retry attempts fail.
"""
if async_playwright is None:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
@ -263,10 +378,19 @@ class BeautifulSoupCrawler:
)
await asyncio.sleep(backoff)
# ---------- extraction helpers ----------
@staticmethod
def _normalize_rule(rule: Union[str, Dict[str, Any]]) -> ExtractionRule:
"""Normalize extraction rule to ExtractionRule dataclass"""
"""Normalize an extraction rule to an ExtractionRule dataclass.
Args:
rule: A string (CSS selector) or dict with extraction parameters.
Returns:
ExtractionRule: Normalized extraction rule.
Raises:
ValueError: If the rule is invalid.
"""
if isinstance(rule, str):
return ExtractionRule(selector=rule)
if isinstance(rule, dict):
@ -280,7 +404,18 @@ class BeautifulSoupCrawler:
raise ValueError(f"Invalid extraction rule: {rule}")
def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str:
"""Extract content using BeautifulSoup or lxml xpath"""
"""Extract content from HTML using BeautifulSoup or lxml XPath.
Args:
html: The HTML content to extract from.
rule: The extraction rule to apply.
Returns:
str: The extracted content.
Raises:
RuntimeError: If XPath is used but lxml is not installed.
"""
soup = BeautifulSoup(html, "html.parser")
if rule.xpath:
@ -325,7 +460,6 @@ class BeautifulSoupCrawler:
return (val or "").strip()
return el.get_text(strip=True)
# ---------- public methods ----------
async def fetch_with_bs4(
self,
urls: Union[str, List[str], Dict[str, Dict[str, Any]]],
@ -335,23 +469,22 @@ class BeautifulSoupCrawler:
playwright_js_wait: float = 0.8,
join_all_matches: bool = False,
) -> Dict[str, str]:
"""
Fetch one or more URLs and extract text using BeautifulSoup (or lxml xpath).
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
Args:
urls: Can be:
- A single URL string
- A list of URLs (uses extraction_rules for all)
- A dict mapping URL -> extraction_rules (URL-specific rules)
extraction_rules: Default rules when urls is a string or list
use_playwright: Whether to use Playwright for JS rendering
playwright_js_wait: Wait time after page load for JS
join_all_matches: Force all rules to extract all matching elements
urls: A single URL, list of URLs, or dict mapping URLs to extraction rules.
extraction_rules: Default extraction rules for string or list URLs.
use_playwright: If True, use Playwright for JavaScript rendering.
playwright_js_wait: Seconds to wait for JavaScript to load.
join_all_matches: If True, extract all matching elements for each rule.
Returns:
dict[url] -> concatenated string of extracted content
Dict[str, str]: A dictionary mapping URLs to their extracted content.
Raises:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
# Handle different input formats
url_rules_map: Dict[str, Dict[str, Any]] = {}
if isinstance(urls, str):
@ -364,12 +497,10 @@ class BeautifulSoupCrawler:
for url in urls:
url_rules_map[url] = extraction_rules
elif isinstance(urls, dict):
# URL-specific rules
url_rules_map = urls
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
# Normalize all rules
normalized_url_rules: Dict[str, List[ExtractionRule]] = {}
for url, rules in url_rules_map.items():
normalized_rules = []
@ -388,7 +519,6 @@ class BeautifulSoupCrawler:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
# Fetch (rendered or not)
if use_playwright:
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
@ -396,7 +526,6 @@ class BeautifulSoupCrawler:
else:
html = await self._fetch_httpx(url)
# Extract content using URL-specific rules
pieces = []
for rule in normalized_url_rules[url]:
text = self._extract_with_bs4(html, rule)

View file

@ -1,10 +1,13 @@
from tavily import AsyncTavilyClient
from .bs4_crawler import BeautifulSoupCrawler
import os
from .config import TavilyConfig, SoupCrawlerConfig
from typing import Dict, Any, List, Union, Optional, Literal
"""Utilities for fetching web content using BeautifulSoup or Tavily.
This module provides functions to fetch and extract content from web pages, supporting
both BeautifulSoup for custom extraction rules and Tavily for API-based scraping.
"""
from typing import Dict, List, Union, Optional, Literal
from cognee.shared.logging_utils import get_logger
import asyncio
from .bs4_crawler import BeautifulSoupCrawler
from .config import TavilyConfig, SoupCrawlerConfig
logger = get_logger(__name__)
@ -16,20 +19,31 @@ async def fetch_page_content(
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
) -> Dict[str, Union[str, Dict[str, str]]]:
"""
Fetch page content using Tavily API if TAVILY_API_KEY is set,
otherwise fetch using BeautifulSoupCrawler directly.
"""Fetch content from one or more URLs using the specified tool.
Parameters:
urls: single URL or list of URLs
extraction_rules: dict mapping field names -> CSS selector or rule
use_playwright: whether to render JS (BeautifulSoupCrawler)
playwright_js_wait: seconds to wait for JS to load
join_all_matches: join all matching elements per rule
structured: if True, returns structured dict instead of concatenated string (based on extraction_rules field names)
This function retrieves web page content using either BeautifulSoup (with custom
extraction rules) or Tavily (API-based scraping). It handles single URLs or lists of
URLs and returns a dictionary mapping URLs to their extracted content.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
preferred_tool: The scraping tool to use ("tavily" or "beautifulsoup").
Defaults to "beautifulsoup".
tavily_config: Configuration for Tavily API, including API key.
Required if preferred_tool is "tavily".
soup_crawler_config: Configuration for BeautifulSoup crawler, including
extraction rules. Required if preferred_tool is "beautifulsoup" and
extraction_rules are needed.
Returns:
Dict mapping URL -> extracted string or structured dict
Dict[str, Union[str, Dict[str, str]]]: A dictionary mapping each URL to its
extracted content (as a string for BeautifulSoup or a dict for Tavily).
Raises:
ValueError: If Tavily API key is missing when using Tavily, or if
extraction_rules are not provided when using BeautifulSoup.
ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not
installed.
"""
if preferred_tool == "tavily":
if tavily_config.api_key is None:
@ -43,6 +57,7 @@ async def fetch_page_content(
logger.error(
"Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1"
)
raise
crawler = BeautifulSoupCrawler()
extraction_rules = soup_crawler_config.extraction_rules
if extraction_rules is None:
@ -58,20 +73,34 @@ async def fetch_page_content(
return results
except Exception as e:
logger.error(f"Error fetching page content: {str(e)}")
raise
async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]:
"""Fetch content from URLs using the Tavily API.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
Returns:
Dict[str, str]: A dictionary mapping each URL to its raw content as a string.
Raises:
ImportError: If tavily-python is not installed.
Exception: If the Tavily API request fails.
"""
try:
from tavily import AsyncTavilyClient
except ImportError:
logger.error(
"Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0"
)
raise
client = AsyncTavilyClient()
results = await client.extract(urls)
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}
for results in results.get("results", []):
return_results[results["url"]] = results["raw_content"]
for result in results.get("results", []):
return_results[result["url"]] = result["raw_content"]
return return_results

View file

@ -1,3 +1,10 @@
"""Web scraping tasks for storing scraped data in a graph database.
This module provides functions to scrape web content, create or update WebPage, WebSite,
and ScrapingJob data points, and store them in a Kuzu graph database. It supports
scheduled scraping tasks and ensures that node updates preserve existing graph edges.
"""
import os
import hashlib
from datetime import datetime
@ -22,7 +29,7 @@ try:
scheduler = BackgroundScheduler()
except ImportError:
raise ImportError("Please install apscheduler by pip install APScheduler >=3.10")
raise ImportError("Please install apscheduler by pip install APScheduler>=3.10")
logger = get_logger(__name__)
@ -37,6 +44,28 @@ async def cron_web_scraper_task(
tavily_config: TavilyConfig = None,
job_name: str = "scraping",
):
"""Schedule or run a web scraping task.
This function schedules a recurring web scraping task using APScheduler or runs it
immediately if no schedule is provided. It delegates to web_scraper_task for actual
scraping and graph storage.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs immediately.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to "scraping".
Returns:
Any: The result of web_scraper_task if run immediately, or None if scheduled.
Raises:
ValueError: If the schedule is an invalid cron expression.
ImportError: If APScheduler is not installed.
"""
now = datetime.now()
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
if schedule:
@ -89,10 +118,29 @@ async def web_scraper_task(
tavily_config: TavilyConfig = None,
job_name: str = None,
):
"""
Scrapes one or more URLs and returns WebPage, WebSite, and ScrapingJob data points.
Unique IDs are assigned to each WebPage, WebSite, and ScrapingJob.
Includes a description field summarizing other fields for each data point.
"""Scrape URLs and store data points in a Kuzu graph database.
This function scrapes content from the provided URLs, creates or updates WebPage,
WebSite, and ScrapingJob data points, and stores them in a Kuzu graph database.
Each data point includes a description field summarizing its attributes. It creates
'is_scraping' (ScrapingJob to WebSite) and 'is_part_of' (WebPage to WebSite)
relationships, preserving existing edges during node updates.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs once.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to a timestamp-based name.
Returns:
Any: The graph data returned by the graph database.
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
Exception: If fetching content or database operations fail.
"""
await setup()
graph_db = await get_graph_engine()
@ -260,7 +308,7 @@ async def web_scraper_task(
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
edge_mapping.append(
(
webpage.id, # Corrected: WebPage is the source, WebSite is the target
webpage.id,
websites_dict[base_url].id,
"is_part_of",
{
@ -280,8 +328,20 @@ async def web_scraper_task(
def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config):
"""
Checking if the right argument are given, if not TypeError will be raised.
"""Validate and configure arguments for web_scraper_task.
Args:
tavily_api_key: API key for Tavily.
extraction_rules: Extraction rules for BeautifulSoup.
tavily_config: Configuration for Tavily API.
soup_crawler_config: Configuration for BeautifulSoup crawler.
Returns:
Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config,
tavily_config, and preferred_tool ("tavily" or "beautifulsoup").
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
"""
preferred_tool = "beautifulsoup"
@ -302,7 +362,19 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle
return soup_crawler_config, tavily_config, preferred_tool
def get_path_after_base(base_url, url):
def get_path_after_base(base_url: str, url: str) -> str:
"""Extract the path after the base URL.
Args:
base_url: The base URL (e.g., "https://example.com").
url: The full URL to extract the path from.
Returns:
str: The path after the base URL, with leading slashes removed.
Raises:
ValueError: If the base URL and target URL are from different domains.
"""
parsed_base = urlparse(base_url)
parsed_url = urlparse(url)