refactor: update web scraper configurations and simplify fetch logic

This commit is contained in:
Daulet Amirkhanov 2025-10-21 15:20:09 +01:00
parent 95e735d397
commit 5035c872a7
4 changed files with 25 additions and 63 deletions

View file

@ -7,14 +7,17 @@ from cognee.base_config import get_base_config
from cognee.infrastructure.databases.utils import get_or_create_dataset_database
from cognee.infrastructure.files.storage.config import file_storage_config
from cognee.modules.users.methods import get_user
from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig
# Note: ContextVar allows us to use different graph db configurations in Cognee
# for different async tasks, threads and processes
vector_db_config = ContextVar("vector_db_config", default=None)
graph_db_config = ContextVar("graph_db_config", default=None)
session_user = ContextVar("session_user", default=None)
soup_crawler_config = ContextVar("soup_crawler_config", default=None)
tavily_config = ContextVar("tavily_config", default=None)
soup_crawler_config: ContextVar[SoupCrawlerConfig | None] = ContextVar(
"soup_crawler_config", default=None
)
tavily_config: ContextVar[TavilyConfig | None] = ContextVar("tavily_config", default=None)
async def set_session_user_context_variable(user):

View file

@ -35,8 +35,6 @@ class WebUrlFetcher(DataFetcherInterface):
data = await fetch_page_content(
data_item_path,
preferred_tool=preferred_tool,
soup_crawler_config=_soup_config,
tavily_config=_tavily_config,
)
logger.info(f"Successfully fetched content from URL {data_item_path}")

View file

@ -66,6 +66,7 @@ class RobotsTxtCache:
timestamp: float = field(default_factory=time.time)
# TODO(daulet) refactor: This is no longer BeautifulSoup, rather just a crawler
class BeautifulSoupCrawler:
"""Crawler for fetching and extracting web content using BeautifulSoup.
@ -491,14 +492,12 @@ class BeautifulSoupCrawler:
return (val or "").strip()
return el.get_text(strip=True)
async def fetch_with_bs4(
async def fetch_urls(
self,
urls: Union[str, List[str], Dict[str, Dict[str, Any]]],
extraction_rules: Optional[Dict[str, Any]] = None,
urls: Union[str, List[str]],
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
join_all_matches: bool = False,
) -> Dict[str, str]:
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
@ -516,38 +515,11 @@ class BeautifulSoupCrawler:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
url_rules_map: Dict[str, Dict[str, Any]] = {}
if isinstance(urls, str):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a string")
url_rules_map[urls] = extraction_rules
elif isinstance(urls, list):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a list")
for url in urls:
url_rules_map[url] = extraction_rules
elif isinstance(urls, dict):
url_rules_map = urls
urls = [urls]
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
logger.info(
f"Preparing to fetch {len(url_rules_map)} URL(s) with {len(extraction_rules) if extraction_rules else 0} extraction rule(s)"
)
normalized_url_rules: Dict[str, List[ExtractionRule]] = {}
for url, rules in url_rules_map.items():
normalized_rules = []
for _, rule in rules.items():
r = self._normalize_rule(rule)
if join_all_matches:
r.all = True
normalized_rules.append(r)
normalized_url_rules[url] = normalized_rules
logger.info(f"Normalized extraction rules for {len(normalized_url_rules)} URL(s)")
async def _task(url: str):
async with self._sem:
try:
@ -575,30 +547,21 @@ class BeautifulSoupCrawler:
logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)")
# Extract content
pieces = []
for rule in normalized_url_rules[url]:
text = self._extract_with_bs4(html, rule)
if text:
pieces.append(text)
concatenated = " ".join(pieces).strip()
logger.info(f"Extracted {len(concatenated)} characters from {url}")
return url, concatenated
return url, html
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
logger.info(f"Creating {len(url_rules_map)} async tasks for concurrent fetching")
tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()]
logger.info(f"Creating {len(urls)} async tasks for concurrent fetching")
tasks = [asyncio.create_task(_task(u)) for u in urls]
results = {}
completed = 0
total = len(tasks)
for coro in asyncio.as_completed(tasks):
url, text = await coro
results[url] = text
url, html = await coro
results[url] = html
completed += 1
logger.info(f"Progress: {completed}/{total} URLs processed")

View file

@ -5,19 +5,17 @@ both BeautifulSoup for custom extraction rules and Tavily for API-based scraping
"""
from typing import Dict, List, Union, Optional, Literal
from cognee.context_global_variables import soup_crawler_config, tavily_config
from cognee.shared.logging_utils import get_logger
from .bs4_crawler import BeautifulSoupCrawler
from .config import TavilyConfig, SoupCrawlerConfig
from .config import TavilyConfig
logger = get_logger(__name__)
async def fetch_page_content(
urls: Union[str, List[str]],
*,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
) -> Dict[str, str]:
"""Fetch content from one or more URLs using the specified tool.
@ -48,6 +46,9 @@ async def fetch_page_content(
url_list = [urls] if isinstance(urls, str) else urls
logger.info(f"Starting to fetch content from {len(url_list)} URL(s) using {preferred_tool}")
_tavily_config = tavily_config.get()
_soup_crawler_config = soup_crawler_config.get()
if preferred_tool == "tavily":
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
@ -62,11 +63,10 @@ async def fetch_page_content(
"Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1"
)
raise ImportError
if not soup_crawler_config or soup_crawler_config.extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
if soup_crawler_config is None or soup_crawler_config.extraction_rules is None:
raise ValueError("soup_crawler_config must be provided when not using Tavily")
logger.info("Using BeautifulSoup for content extraction")
extraction_rules = soup_crawler_config.extraction_rules
logger.info(
f"Initializing BeautifulSoup crawler with concurrency={soup_crawler_config.concurrency}, timeout={soup_crawler_config.timeout}s, max_crawl_delay={soup_crawler_config.max_crawl_delay}s"
)
@ -85,12 +85,10 @@ async def fetch_page_content(
logger.info(
f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={soup_crawler_config.use_playwright})"
)
results = await crawler.fetch_with_bs4(
results = await crawler.fetch_urls(
urls,
extraction_rules,
use_playwright=soup_crawler_config.use_playwright,
playwright_js_wait=soup_crawler_config.playwright_js_wait,
join_all_matches=soup_crawler_config.join_all_matches,
)
logger.info(f"Successfully fetched content from {len(results)} URL(s)")
return results
@ -103,7 +101,7 @@ async def fetch_page_content(
async def fetch_with_tavily(
urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None
urls: Union[str, List[str]], tavily_config: TavilyConfig
) -> Dict[str, str]:
"""Fetch content from URLs using the Tavily API.
@ -133,8 +131,8 @@ async def fetch_with_tavily(
f"Initializing Tavily client with extract_depth={extract_depth}, timeout={timeout}s"
)
client = AsyncTavilyClient(
api_key=tavily_config.api_key if tavily_config else None,
proxies=tavily_config.proxies if tavily_config else None,
api_key=tavily_config.api_key,
proxies=tavily_config.proxies,
)
logger.info(f"Sending extract request to Tavily API for {len(url_list)} URL(s)")