Separate BeautifulSoup crawling from fetching

This commit is contained in:
Daulet Amirkhanov 2025-10-21 15:49:12 +01:00
parent a7ff188018
commit 9d9969676f
11 changed files with 489 additions and 573 deletions

View file

@ -7,18 +7,12 @@ from cognee.base_config import get_base_config
from cognee.infrastructure.databases.utils import get_or_create_dataset_database
from cognee.infrastructure.files.storage.config import file_storage_config
from cognee.modules.users.methods import get_user
from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig
# Note: ContextVar allows us to use different graph db configurations in Cognee
# for different async tasks, threads and processes
vector_db_config = ContextVar("vector_db_config", default=None)
graph_db_config = ContextVar("graph_db_config", default=None)
session_user = ContextVar("session_user", default=None)
soup_crawler_config: ContextVar[SoupCrawlerConfig | None] = ContextVar(
"soup_crawler_config", default=None
)
tavily_config: ContextVar[TavilyConfig | None] = ContextVar("tavily_config", default=None)
async def set_session_user_context_variable(user):
session_user.set(user)

View file

@ -1,8 +0,0 @@
__all__ = []
try:
from .web_url_fetcher import WebUrlFetcher
__all__.append("WebUrlFetcher")
except ImportError:
pass

View file

@ -1,15 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any
class DataFetcherInterface(ABC):
@abstractmethod
def fetcher_name(self) -> str:
pass
@abstractmethod
async def fetch(self, data_item_path: str) -> str:
"""
args: data_item_path - path to the data item
"""
pass

View file

@ -1,55 +0,0 @@
import os
from cognee.modules.ingestion import save_data_to_file
from cognee.tasks.ingestion.data_fetchers.data_fetcher_interface import DataFetcherInterface
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class WebUrlFetcher(DataFetcherInterface):
def __init__(self): ...
def fetcher_name(self):
return "web_url_fetcher"
async def fetch(self, data_item_path: str):
from cognee.context_global_variables import tavily_config, soup_crawler_config
from cognee.tasks.web_scraper import fetch_page_content
if os.getenv("TAVILY_API_KEY"):
_tavily_config = TavilyConfig()
_soup_config = None
preferred_tool = "tavily"
else:
_tavily_config = None
_soup_config = SoupCrawlerConfig()
preferred_tool = "beautifulsoup"
tavily_config.set(_tavily_config)
soup_crawler_config.set(_soup_config)
logger.info(f"Starting web URL crawling for: {data_item_path}")
logger.info(f"Using scraping tool: {preferred_tool}")
data = await fetch_page_content(
data_item_path,
preferred_tool=preferred_tool,
)
logger.info(f"Successfully fetched content from URL {data_item_path}")
# fetch_page_content returns a dict like {url: content}
# Extract the content string before saving
if isinstance(data, dict):
# Concatenate all URL contents (usually just one URL)
content = ""
for url, text in data.items():
content += f"{url}:\n{text}\n\n"
logger.info(
f"Extracted content from {len(data)} URL(s), total size: {len(content)} characters"
)
else:
content = data
return await save_data_to_file(content)

View file

@ -8,6 +8,7 @@ BeautifulSoup or Tavily, defining data models, and handling scraping configurati
from .bs4_crawler import BeautifulSoupCrawler
from .utils import fetch_page_content
from .web_scraper_task import cron_web_scraper_task, web_scraper_task
from .default_url_crawler import DefaultUrlCrawler
__all__ = [
@ -15,4 +16,5 @@ __all__ = [
"fetch_page_content",
"cron_web_scraper_task",
"web_scraper_task",
"DefaultUrlCrawler",
]

View file

@ -5,32 +5,13 @@ from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages.
supports robots.txt handling, rate limiting, and custom extraction rules.
"""
import asyncio
import time
from typing import Union, List, Dict, Any, Optional
from urllib.parse import urlparse
from dataclasses import dataclass, field
from functools import lru_cache
import httpx
from dataclasses import dataclass
from bs4 import BeautifulSoup
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
try:
from playwright.async_api import async_playwright
except ImportError:
logger.warning(
"Failed to import playwright, make sure to install using pip install playwright>=1.9.0"
)
async_playwright = None
try:
from protego import Protego
except ImportError:
logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1")
Protego = None
@dataclass
class ExtractionRule:
@ -51,21 +32,6 @@ class ExtractionRule:
join_with: str = " "
@dataclass
class RobotsTxtCache:
"""Cache for robots.txt data.
Attributes:
protego: Parsed robots.txt object (Protego instance).
crawl_delay: Delay between requests (in seconds).
timestamp: Time when the cache entry was created.
"""
protego: Any
crawl_delay: float
timestamp: float = field(default_factory=time.time)
# TODO(daulet) refactor: This is no longer BeautifulSoup, rather just a crawler
class BeautifulSoupCrawler:
"""Crawler for fetching and extracting web content using BeautifulSoup.
@ -84,333 +50,6 @@ class BeautifulSoupCrawler:
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
def __init__(
self,
*,
concurrency: int = 5,
crawl_delay: float = 0.5,
max_crawl_delay: Optional[float] = 10.0,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
robots_cache_ttl: float = 3600.0,
):
"""Initialize the BeautifulSoupCrawler.
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit).
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.crawl_delay = crawl_delay
self.max_crawl_delay = max_crawl_delay
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"}
self.robots_cache_ttl = robots_cache_ttl
self._last_request_time_per_domain: Dict[str, float] = {}
self._robots_cache: Dict[str, RobotsTxtCache] = {}
self._client: Optional[httpx.AsyncClient] = None
self._robots_lock = asyncio.Lock()
async def _ensure_client(self):
"""Initialize the HTTP client if not already created."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers)
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def __aenter__(self):
"""Enter the context manager, initializing the HTTP client."""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager, closing the HTTP client."""
await self.close()
@lru_cache(maxsize=1024)
def _domain_from_url(self, url: str) -> str:
"""Extract the domain (netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The domain (netloc) of the URL.
"""
try:
return urlparse(url).netloc
except Exception:
return url
@lru_cache(maxsize=1024)
def _get_domain_root(self, url: str) -> str:
"""Get the root URL (scheme and netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The root URL (e.g., "https://example.com").
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None):
"""Enforce rate limiting for requests to the same domain.
Args:
url: The URL to check.
crawl_delay: Custom crawl delay in seconds (if any).
"""
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
delay = crawl_delay if crawl_delay is not None else self.crawl_delay
if last is None:
self._last_request_time_per_domain[domain] = time.time()
return
elapsed = time.time() - last
wait_for = delay - elapsed
if wait_for > 0:
logger.info(
f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)"
)
await asyncio.sleep(wait_for)
logger.info(f"Rate limit wait completed for {url}")
self._last_request_time_per_domain[domain] = time.time()
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
"""Get cached robots.txt data if valid.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found.
"""
if Protego is None:
return None
cached = self._robots_cache.get(domain_root)
if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl:
return cached
return None
async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache:
"""Fetch and cache robots.txt data.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
RobotsTxtCache: Cached robots.txt data with crawl delay.
Raises:
Exception: If fetching robots.txt fails.
"""
async with self._robots_lock:
cached = await self._get_robots_cache(domain_root)
if cached:
return cached
robots_url = f"{domain_root}/robots.txt"
try:
await self._ensure_client()
await self._respect_rate_limit(robots_url, self.crawl_delay)
resp = await self._client.get(robots_url, timeout=5.0)
content = resp.text if resp.status_code == 200 else ""
except Exception as e:
logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}")
content = ""
protego = Protego.parse(content) if content.strip() else None
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
crawl_delay = self.crawl_delay
if protego:
delay = protego.crawl_delay(agent) or protego.crawl_delay("*")
if delay:
# Apply max_crawl_delay cap if configured
if self.max_crawl_delay is not None and delay > self.max_crawl_delay:
logger.warning(
f"robots.txt specifies crawl_delay={delay}s for {domain_root}, "
f"capping to max_crawl_delay={self.max_crawl_delay}s"
)
crawl_delay = self.max_crawl_delay
else:
crawl_delay = delay
cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay)
self._robots_cache[domain_root] = cache_entry
return cache_entry
async def _is_url_allowed(self, url: str) -> bool:
"""Check if a URL is allowed by robots.txt.
Args:
url: The URL to check.
Returns:
bool: True if the URL is allowed, False otherwise.
"""
if Protego is None:
return True
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
if cache.protego is None:
return True
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url)
except Exception as e:
logger.debug(f"Error checking robots.txt for {url}: {e}")
return True
async def _get_crawl_delay(self, url: str) -> float:
"""Get the crawl delay for a URL from robots.txt.
Args:
url: The URL to check.
Returns:
float: Crawl delay in seconds.
"""
if Protego is None:
return self.crawl_delay
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
return cache.crawl_delay
except Exception:
return self.crawl_delay
async def _fetch_httpx(self, url: str) -> str:
"""Fetch a URL using HTTPX with retries.
Args:
url: The URL to fetch.
Returns:
str: The HTML content of the page.
Raises:
Exception: If all retry attempts fail.
"""
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
crawl_delay = await self._get_crawl_delay(url)
logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}")
while True:
try:
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
logger.info(
f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)"
)
return resp.text
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}")
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}"
)
await asyncio.sleep(delay)
async def _render_with_playwright(
self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None
) -> str:
"""Fetch and render a URL using Playwright for JavaScript content.
Args:
url: The URL to fetch.
js_wait: Seconds to wait for JavaScript to load.
timeout: Timeout for the request (in seconds, defaults to instance timeout).
Returns:
str: The rendered HTML content.
Raises:
RuntimeError: If Playwright is not installed.
Exception: If all retry attempts fail.
"""
if async_playwright is None:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
)
timeout_val = timeout or self.timeout
logger.info(
f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}"
)
attempt = 0
while True:
try:
async with async_playwright() as p:
logger.info(f"Launching headless Chromium browser for {url}")
browser = await p.chromium.launch(headless=True)
try:
context = await browser.new_context()
page = await context.new_page()
logger.info(f"Navigating to {url} and waiting for network idle")
await page.goto(
url,
wait_until="networkidle",
timeout=int(timeout_val * 1000),
)
if js_wait:
logger.info(f"Waiting {js_wait}s for JavaScript to execute")
await asyncio.sleep(js_wait)
content = await page.content()
logger.info(
f"Successfully rendered {url} with Playwright (size={len(content)} bytes)"
)
return content
finally:
await browser.close()
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Playwright fetch failed for {url}: {exc}")
raise
backoff = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})"
)
await asyncio.sleep(backoff)
def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule:
"""Normalize an extraction rule to an ExtractionRule dataclass.
@ -435,7 +74,7 @@ class BeautifulSoupCrawler:
)
raise ValueError(f"Invalid extraction rule: {rule}")
def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str:
def extract(self, html: str, rule: ExtractionRule) -> str:
"""Extract content from HTML using BeautifulSoup or lxml XPath.
Args:
@ -491,79 +130,3 @@ class BeautifulSoupCrawler:
val = el.get(rule.attr)
return (val or "").strip()
return el.get_text(strip=True)
async def fetch_urls(
self,
urls: Union[str, List[str]],
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
) -> Dict[str, str]:
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
Args:
urls: A single URL, list of URLs, or dict mapping URLs to extraction rules.
extraction_rules: Default extraction rules for string or list URLs.
use_playwright: If True, use Playwright for JavaScript rendering.
playwright_js_wait: Seconds to wait for JavaScript to load.
join_all_matches: If True, extract all matching elements for each rule.
Returns:
Dict[str, str]: A dictionary mapping URLs to their extracted content.
Raises:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
if isinstance(urls, str):
urls = [urls]
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
async def _task(url: str):
async with self._sem:
try:
logger.info(f"Processing URL: {url}")
# Check robots.txt
allowed = await self._is_url_allowed(url)
if not allowed:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
logger.info(f"Robots.txt check passed for {url}")
# Fetch HTML
if use_playwright:
logger.info(
f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)"
)
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
)
else:
logger.info(f"Fetching {url} with httpx")
html = await self._fetch_httpx(url)
logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)")
return url, html
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
logger.info(f"Creating {len(urls)} async tasks for concurrent fetching")
tasks = [asyncio.create_task(_task(u)) for u in urls]
results = {}
completed = 0
total = len(tasks)
for coro in asyncio.as_completed(tasks):
url, html = await coro
results[url] = html
completed += 1
logger.info(f"Progress: {completed}/{total} URLs processed")
logger.info(f"Completed fetching all {len(results)} URL(s)")
return results

View file

@ -10,7 +10,7 @@ class TavilyConfig(BaseModel):
timeout: Optional[int] = Field(default=10, ge=1, le=60)
class SoupCrawlerConfig(BaseModel):
class DefaultCrawlerConfig(BaseModel):
concurrency: int = 5
crawl_delay: float = 0.5
max_crawl_delay: Optional[float] = (

View file

@ -0,0 +1,446 @@
import asyncio
from dataclasses import dataclass, field
from functools import lru_cache
import time
from typing import Any, Union, List, Dict, Optional
from urllib.parse import urlparse
import httpx
from cognee.shared.logging_utils import get_logger
from cognee.tasks.web_scraper.utils import UrlsToHtmls
logger = get_logger()
try:
from protego import Protego
except ImportError:
logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1")
Protego = None
try:
from playwright.async_api import async_playwright
except ImportError:
logger.warning(
"Failed to import playwright, make sure to install using pip install playwright>=1.9.0"
)
async_playwright = None
@dataclass
class RobotsTxtCache:
"""Cache for robots.txt data.
Attributes:
protego: Parsed robots.txt object (Protego instance).
crawl_delay: Delay between requests (in seconds).
timestamp: Time when the cache entry was created.
"""
protego: Any
crawl_delay: float
timestamp: float = field(default_factory=time.time)
class DefaultUrlCrawler:
def __init__(
self,
*,
concurrency: int = 5,
crawl_delay: float = 0.5,
max_crawl_delay: Optional[float] = 10.0,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
robots_cache_ttl: float = 3600.0,
):
"""Initialize the BeautifulSoupCrawler.
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit).
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.crawl_delay = crawl_delay
self.max_crawl_delay = max_crawl_delay
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"}
self.robots_cache_ttl = robots_cache_ttl
self._last_request_time_per_domain: Dict[str, float] = {}
self._robots_cache: Dict[str, RobotsTxtCache] = {}
self._client: Optional[httpx.AsyncClient] = None
self._robots_lock = asyncio.Lock()
async def _ensure_client(self):
"""Initialize the HTTP client if not already created."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers)
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def __aenter__(self):
"""Enter the context manager, initializing the HTTP client."""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager, closing the HTTP client."""
await self.close()
@lru_cache(maxsize=1024)
def _domain_from_url(self, url: str) -> str:
"""Extract the domain (netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The domain (netloc) of the URL.
"""
try:
return urlparse(url).netloc
except Exception:
return url
@lru_cache(maxsize=1024)
def _get_domain_root(self, url: str) -> str:
"""Get the root URL (scheme and netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The root URL (e.g., "https://example.com").
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None):
"""Enforce rate limiting for requests to the same domain.
Args:
url: The URL to check.
crawl_delay: Custom crawl delay in seconds (if any).
"""
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
delay = crawl_delay if crawl_delay is not None else self.crawl_delay
if last is None:
self._last_request_time_per_domain[domain] = time.time()
return
elapsed = time.time() - last
wait_for = delay - elapsed
if wait_for > 0:
logger.info(
f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)"
)
await asyncio.sleep(wait_for)
logger.info(f"Rate limit wait completed for {url}")
self._last_request_time_per_domain[domain] = time.time()
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
"""Get cached robots.txt data if valid.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found.
"""
if Protego is None:
return None
cached = self._robots_cache.get(domain_root)
if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl:
return cached
return None
async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache:
"""Fetch and cache robots.txt data.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
RobotsTxtCache: Cached robots.txt data with crawl delay.
Raises:
Exception: If fetching robots.txt fails.
"""
async with self._robots_lock:
cached = await self._get_robots_cache(domain_root)
if cached:
return cached
robots_url = f"{domain_root}/robots.txt"
try:
await self._ensure_client()
await self._respect_rate_limit(robots_url, self.crawl_delay)
resp = await self._client.get(robots_url, timeout=5.0)
content = resp.text if resp.status_code == 200 else ""
except Exception as e:
logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}")
content = ""
protego = Protego.parse(content) if content.strip() else None
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
crawl_delay = self.crawl_delay
if protego:
delay = protego.crawl_delay(agent) or protego.crawl_delay("*")
if delay:
# Apply max_crawl_delay cap if configured
if self.max_crawl_delay is not None and delay > self.max_crawl_delay:
logger.warning(
f"robots.txt specifies crawl_delay={delay}s for {domain_root}, "
f"capping to max_crawl_delay={self.max_crawl_delay}s"
)
crawl_delay = self.max_crawl_delay
else:
crawl_delay = delay
cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay)
self._robots_cache[domain_root] = cache_entry
return cache_entry
async def _is_url_allowed(self, url: str) -> bool:
"""Check if a URL is allowed by robots.txt.
Args:
url: The URL to check.
Returns:
bool: True if the URL is allowed, False otherwise.
"""
if Protego is None:
return True
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
if cache.protego is None:
return True
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url)
except Exception as e:
logger.debug(f"Error checking robots.txt for {url}: {e}")
return True
async def _get_crawl_delay(self, url: str) -> float:
"""Get the crawl delay for a URL from robots.txt.
Args:
url: The URL to check.
Returns:
float: Crawl delay in seconds.
"""
if Protego is None:
return self.crawl_delay
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
return cache.crawl_delay
except Exception:
return self.crawl_delay
async def _fetch_httpx(self, url: str) -> str:
"""Fetch a URL using HTTPX with retries.
Args:
url: The URL to fetch.
Returns:
str: The HTML content of the page.
Raises:
Exception: If all retry attempts fail.
"""
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
crawl_delay = await self._get_crawl_delay(url)
logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}")
while True:
try:
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
logger.info(
f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)"
)
return resp.text
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}")
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}"
)
await asyncio.sleep(delay)
async def _render_with_playwright(
self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None
) -> str:
"""Fetch and render a URL using Playwright for JavaScript content.
Args:
url: The URL to fetch.
js_wait: Seconds to wait for JavaScript to load.
timeout: Timeout for the request (in seconds, defaults to instance timeout).
Returns:
str: The rendered HTML content.
Raises:
RuntimeError: If Playwright is not installed.
Exception: If all retry attempts fail.
"""
if async_playwright is None:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
)
timeout_val = timeout or self.timeout
logger.info(
f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}"
)
attempt = 0
while True:
try:
async with async_playwright() as p:
logger.info(f"Launching headless Chromium browser for {url}")
browser = await p.chromium.launch(headless=True)
try:
context = await browser.new_context()
page = await context.new_page()
logger.info(f"Navigating to {url} and waiting for network idle")
await page.goto(
url,
wait_until="networkidle",
timeout=int(timeout_val * 1000),
)
if js_wait:
logger.info(f"Waiting {js_wait}s for JavaScript to execute")
await asyncio.sleep(js_wait)
content = await page.content()
logger.info(
f"Successfully rendered {url} with Playwright (size={len(content)} bytes)"
)
return content
finally:
await browser.close()
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Playwright fetch failed for {url}: {exc}")
raise
backoff = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})"
)
await asyncio.sleep(backoff)
async def fetch_urls(
self,
urls: Union[str, List[str]],
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
) -> UrlsToHtmls:
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
Args:
urls: A single URL, list of URLs, or dict mapping URLs to extraction rules.
extraction_rules: Default extraction rules for string or list URLs.
use_playwright: If True, use Playwright for JavaScript rendering.
playwright_js_wait: Seconds to wait for JavaScript to load.
join_all_matches: If True, extract all matching elements for each rule.
Returns:
Dict[str, str]: A dictionary mapping URLs to their extracted content.
Raises:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
if isinstance(urls, str):
urls = [urls]
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
async def _task(url: str):
async with self._sem:
try:
logger.info(f"Processing URL: {url}")
# Check robots.txt
allowed = await self._is_url_allowed(url)
if not allowed:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
logger.info(f"Robots.txt check passed for {url}")
# Fetch HTML
if use_playwright:
logger.info(
f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)"
)
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
)
else:
logger.info(f"Fetching {url} with httpx")
html = await self._fetch_httpx(url)
logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)")
return url, html
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
logger.info(f"Creating {len(urls)} async tasks for concurrent fetching")
tasks = [asyncio.create_task(_task(u)) for u in urls]
results = {}
completed = 0
total = len(tasks)
for coro in asyncio.as_completed(tasks):
url, html = await coro
results[url] = html
completed += 1
logger.info(f"Progress: {completed}/{total} URLs processed")
logger.info(f"Completed fetching all {len(results)} URL(s)")
return results

View file

@ -4,19 +4,20 @@ This module provides functions to fetch and extract content from web pages, supp
both BeautifulSoup for custom extraction rules and Tavily for API-based scraping.
"""
from typing import Dict, List, Union, Optional, Literal
from cognee.context_global_variables import soup_crawler_config, tavily_config
import os
from re import L
from typing import List, Union, TypeAlias
from cognee.shared.logging_utils import get_logger
from .default_url_crawler import DefaultUrlCrawler
from .bs4_crawler import BeautifulSoupCrawler
from .config import TavilyConfig
from .config import DefaultCrawlerConfig, TavilyConfig
logger = get_logger(__name__)
UrlsToHtmls: TypeAlias = dict[str, str]
async def fetch_page_content(
urls: Union[str, List[str]],
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
) -> Dict[str, str]:
async def fetch_page_content(urls: Union[str, List[str]]) -> UrlsToHtmls:
"""Fetch content from one or more URLs using the specified tool.
This function retrieves web page content using either BeautifulSoup (with custom
@ -29,7 +30,7 @@ async def fetch_page_content(
Defaults to "beautifulsoup".
tavily_config: Configuration for Tavily API, including API key.
Required if preferred_tool is "tavily".
soup_crawler_config: Configuration for BeautifulSoup crawler, including
default_crawler_config: Configuration for BeautifulSoup crawler, including
extraction rules. Required if preferred_tool is "beautifulsoup" and
extraction_rules are needed.
@ -44,51 +45,39 @@ async def fetch_page_content(
installed.
"""
url_list = [urls] if isinstance(urls, str) else urls
logger.info(f"Starting to fetch content from {len(url_list)} URL(s) using {preferred_tool}")
_tavily_config = tavily_config.get()
_soup_crawler_config = soup_crawler_config.get()
if preferred_tool == "tavily":
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
logger.info("Using Tavily API for content extraction")
if os.getenv("TAVILY_API_KEY"):
logger.info("Using Tavily API for url fetching")
return await fetch_with_tavily(urls, tavily_config)
else:
logger.info("Using default crawler for content extraction")
if preferred_tool == "beautifulsoup":
try:
from bs4 import BeautifulSoup as _ # noqa: F401
except ImportError:
logger.error(
"Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1"
)
raise ImportError
if soup_crawler_config is None or soup_crawler_config.extraction_rules is None:
raise ValueError("soup_crawler_config must be provided when not using Tavily")
default_crawler_config = (
DefaultCrawlerConfig()
) # We've decided to use defaults, and configure through env vars as needed
logger.info("Using BeautifulSoup for content extraction")
logger.info(
f"Initializing BeautifulSoup crawler with concurrency={soup_crawler_config.concurrency}, timeout={soup_crawler_config.timeout}s, max_crawl_delay={soup_crawler_config.max_crawl_delay}s"
f"Initializing BeautifulSoup crawler with concurrency={default_crawler_config.concurrency}, timeout={default_crawler_config.timeout}s, max_crawl_delay={default_crawler_config.max_crawl_delay}s"
)
crawler = BeautifulSoupCrawler(
concurrency=soup_crawler_config.concurrency,
crawl_delay=soup_crawler_config.crawl_delay,
max_crawl_delay=soup_crawler_config.max_crawl_delay,
timeout=soup_crawler_config.timeout,
max_retries=soup_crawler_config.max_retries,
retry_delay_factor=soup_crawler_config.retry_delay_factor,
headers=soup_crawler_config.headers,
robots_cache_ttl=soup_crawler_config.robots_cache_ttl,
crawler = DefaultUrlCrawler(
concurrency=default_crawler_config.concurrency,
crawl_delay=default_crawler_config.crawl_delay,
max_crawl_delay=default_crawler_config.max_crawl_delay,
timeout=default_crawler_config.timeout,
max_retries=default_crawler_config.max_retries,
retry_delay_factor=default_crawler_config.retry_delay_factor,
headers=default_crawler_config.headers,
robots_cache_ttl=default_crawler_config.robots_cache_ttl,
)
try:
logger.info(
f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={soup_crawler_config.use_playwright})"
f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={default_crawler_config.use_playwright})"
)
results = await crawler.fetch_urls(
urls,
use_playwright=soup_crawler_config.use_playwright,
playwright_js_wait=soup_crawler_config.playwright_js_wait,
use_playwright=default_crawler_config.use_playwright,
playwright_js_wait=default_crawler_config.playwright_js_wait,
)
logger.info(f"Successfully fetched content from {len(results)} URL(s)")
return results
@ -102,7 +91,7 @@ async def fetch_page_content(
async def fetch_with_tavily(
urls: Union[str, List[str]], tavily_config: TavilyConfig
) -> Dict[str, str]:
) -> UrlsToHtmls:
"""Fetch content from URLs using the Tavily API.
Args:

View file

@ -19,7 +19,7 @@ from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.engine.operations.setup import setup
from .models import WebPage, WebSite, ScrapingJob
from .config import SoupCrawlerConfig, TavilyConfig
from .config import DefaultCrawlerConfig, TavilyConfig
from .utils import fetch_page_content
try:
@ -47,7 +47,7 @@ async def cron_web_scraper_task(
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
soup_crawler_config: DefaultCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = "scraping",
):
@ -121,7 +121,7 @@ async def web_scraper_task(
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
soup_crawler_config: DefaultCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = None,
):
@ -341,7 +341,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle
soup_crawler_config: Configuration for BeautifulSoup crawler.
Returns:
Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config,
Tuple[DefaultCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config,
tavily_config, and preferred_tool ("tavily" or "beautifulsoup").
Raises:
@ -350,7 +350,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle
preferred_tool = "beautifulsoup"
if extraction_rules and not soup_crawler_config:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
soup_crawler_config = DefaultCrawlerConfig(extraction_rules=extraction_rules)
if tavily_api_key:
if not tavily_config:

View file

@ -1,6 +1,6 @@
import asyncio
import cognee
from cognee.tasks.web_scraper.config import SoupCrawlerConfig
from cognee.tasks.web_scraper.config import DefaultCrawlerConfig
from cognee.tasks.web_scraper import cron_web_scraper_task
@ -14,7 +14,7 @@ async def test_web_scraping_using_bs4():
"authors": {"selector": ".quote small", "all": True},
}
soup_config = SoupCrawlerConfig(
soup_config = DefaultCrawlerConfig(
concurrency=5,
crawl_delay=0.5,
timeout=15.0,
@ -47,7 +47,7 @@ async def test_web_scraping_using_bs4_and_incremental_loading():
url = "https://books.toscrape.com/"
rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"}
soup_config = SoupCrawlerConfig(
soup_config = DefaultCrawlerConfig(
concurrency=1,
crawl_delay=0.1,
timeout=10.0,