add logging for crawling status; add cap to the crawl_delay from robots.txt

- Not advising to use the cap, but giving an option to be able to configure it
This commit is contained in:
Daulet Amirkhanov 2025-10-17 16:51:36 +01:00
parent b9877f9e87
commit b5190c90f1
4 changed files with 116 additions and 7 deletions

View file

@ -4,6 +4,9 @@ from typing import List
from cognee.modules.ingestion.exceptions.exceptions import IngestionError
from cognee.modules.ingestion import save_data_to_file
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class WebUrlLoader(LoaderInterface):
@ -100,16 +103,26 @@ class WebUrlLoader(LoaderInterface):
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
)
logger.info(f"Starting web URL crawling for: {file_path}")
logger.info(f"Using scraping tool: {preferred_tool}")
data = await fetch_page_content(
file_path,
preferred_tool=preferred_tool,
tavily_config=_tavily_config,
soup_crawler_config=_soup_config,
)
logger.info(f"Successfully fetched content from {len(data)} URL(s)")
logger.info("Processing and concatenating fetched content")
content = ""
for key, value in data.items():
content += f"{key}:\n{value}\n\n"
logger.info(f"Saving content to file (total size: {len(content)} characters)")
stored_path = await save_data_to_file(content)
logger.info(f"Successfully saved content to: {stored_path}")
return stored_path
except IngestionError:

View file

@ -75,6 +75,7 @@ class BeautifulSoupCrawler:
Attributes:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit).
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
@ -87,6 +88,7 @@ class BeautifulSoupCrawler:
*,
concurrency: int = 5,
crawl_delay: float = 0.5,
max_crawl_delay: Optional[float] = 10.0,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
@ -98,6 +100,7 @@ class BeautifulSoupCrawler:
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit).
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
@ -107,6 +110,7 @@ class BeautifulSoupCrawler:
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.crawl_delay = crawl_delay
self.max_crawl_delay = max_crawl_delay
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
@ -183,7 +187,11 @@ class BeautifulSoupCrawler:
elapsed = time.time() - last
wait_for = delay - elapsed
if wait_for > 0:
logger.info(
f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)"
)
await asyncio.sleep(wait_for)
logger.info(f"Rate limit wait completed for {url}")
self._last_request_time_per_domain[domain] = time.time()
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
@ -236,7 +244,16 @@ class BeautifulSoupCrawler:
crawl_delay = self.crawl_delay
if protego:
delay = protego.crawl_delay(agent) or protego.crawl_delay("*")
crawl_delay = delay if delay else self.crawl_delay
if delay:
# Apply max_crawl_delay cap if configured
if self.max_crawl_delay is not None and delay > self.max_crawl_delay:
logger.warning(
f"robots.txt specifies crawl_delay={delay}s for {domain_root}, "
f"capping to max_crawl_delay={self.max_crawl_delay}s"
)
crawl_delay = self.max_crawl_delay
else:
crawl_delay = delay
cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay)
self._robots_cache[domain_root] = cache_entry
@ -307,12 +324,16 @@ class BeautifulSoupCrawler:
attempt = 0
crawl_delay = await self._get_crawl_delay(url)
logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}")
while True:
try:
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
logger.info(
f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)"
)
return resp.text
except Exception as exc:
attempt += 1
@ -347,22 +368,35 @@ class BeautifulSoupCrawler:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
)
timeout_val = timeout or self.timeout
logger.info(
f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}"
)
attempt = 0
while True:
try:
async with async_playwright() as p:
logger.info(f"Launching headless Chromium browser for {url}")
browser = await p.chromium.launch(headless=True)
try:
context = await browser.new_context()
page = await context.new_page()
logger.info(f"Navigating to {url} and waiting for network idle")
await page.goto(
url,
wait_until="networkidle",
timeout=int((timeout or self.timeout) * 1000),
timeout=int(timeout_val * 1000),
)
if js_wait:
logger.info(f"Waiting {js_wait}s for JavaScript to execute")
await asyncio.sleep(js_wait)
return await page.content()
content = await page.content()
logger.info(
f"Successfully rendered {url} with Playwright (size={len(content)} bytes)"
)
return content
finally:
await browser.close()
except Exception as exc:
@ -498,6 +532,10 @@ class BeautifulSoupCrawler:
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
logger.info(
f"Preparing to fetch {len(url_rules_map)} URL(s) with {len(extraction_rules) if extraction_rules else 0} extraction rule(s)"
)
normalized_url_rules: Dict[str, List[ExtractionRule]] = {}
for url, rules in url_rules_map.items():
normalized_rules = []
@ -508,21 +546,36 @@ class BeautifulSoupCrawler:
normalized_rules.append(r)
normalized_url_rules[url] = normalized_rules
logger.info(f"Normalized extraction rules for {len(normalized_url_rules)} URL(s)")
async def _task(url: str):
async with self._sem:
try:
logger.info(f"Processing URL: {url}")
# Check robots.txt
allowed = await self._is_url_allowed(url)
if not allowed:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
logger.info(f"Robots.txt check passed for {url}")
# Fetch HTML
if use_playwright:
logger.info(
f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)"
)
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
)
else:
logger.info(f"Fetching {url} with httpx")
html = await self._fetch_httpx(url)
logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)")
# Extract content
pieces = []
for rule in normalized_url_rules[url]:
text = self._extract_with_bs4(html, rule)
@ -530,17 +583,24 @@ class BeautifulSoupCrawler:
pieces.append(text)
concatenated = " ".join(pieces).strip()
logger.info(f"Extracted {len(concatenated)} characters from {url}")
return url, concatenated
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
logger.info(f"Creating {len(url_rules_map)} async tasks for concurrent fetching")
tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()]
results = {}
completed = 0
total = len(tasks)
for coro in asyncio.as_completed(tasks):
url, text = await coro
results[url] = text
completed += 1
logger.info(f"Progress: {completed}/{total} URLs processed")
logger.info(f"Completed fetching all {len(results)} URL(s)")
return results

View file

@ -13,6 +13,9 @@ class TavilyConfig(BaseModel):
class SoupCrawlerConfig(BaseModel):
concurrency: int = 5
crawl_delay: float = 0.5
max_crawl_delay: Optional[float] = (
10.0 # Maximum crawl delay to respect from robots.txt (None = no limit)
)
timeout: float = 15.0
max_retries: int = 2
retry_delay_factor: float = 0.5

View file

@ -45,9 +45,13 @@ async def fetch_page_content(
ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not
installed.
"""
url_list = [urls] if isinstance(urls, str) else urls
logger.info(f"Starting to fetch content from {len(url_list)} URL(s) using {preferred_tool}")
if preferred_tool == "tavily":
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
logger.info("Using Tavily API for content extraction")
return await fetch_with_tavily(urls, tavily_config)
if preferred_tool == "beautifulsoup":
@ -60,10 +64,17 @@ async def fetch_page_content(
raise ImportError
if not soup_crawler_config or soup_crawler_config.extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
logger.info("Using BeautifulSoup for content extraction")
extraction_rules = soup_crawler_config.extraction_rules
logger.info(
f"Initializing BeautifulSoup crawler with concurrency={soup_crawler_config.concurrency}, timeout={soup_crawler_config.timeout}s, max_crawl_delay={soup_crawler_config.max_crawl_delay}s"
)
crawler = BeautifulSoupCrawler(
concurrency=soup_crawler_config.concurrency,
crawl_delay=soup_crawler_config.crawl_delay,
max_crawl_delay=soup_crawler_config.max_crawl_delay,
timeout=soup_crawler_config.timeout,
max_retries=soup_crawler_config.max_retries,
retry_delay_factor=soup_crawler_config.retry_delay_factor,
@ -71,6 +82,9 @@ async def fetch_page_content(
robots_cache_ttl=soup_crawler_config.robots_cache_ttl,
)
try:
logger.info(
f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={soup_crawler_config.use_playwright})"
)
results = await crawler.fetch_with_bs4(
urls,
extraction_rules,
@ -78,11 +92,13 @@ async def fetch_page_content(
playwright_js_wait=soup_crawler_config.playwright_js_wait,
join_all_matches=soup_crawler_config.join_all_matches,
)
logger.info(f"Successfully fetched content from {len(results)} URL(s)")
return results
except Exception as e:
logger.error(f"Error fetching page content: {str(e)}")
raise
finally:
logger.info("Closing BeautifulSoup crawler")
await crawler.close()
@ -108,19 +124,36 @@ async def fetch_with_tavily(
"Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0"
)
raise
url_list = [urls] if isinstance(urls, str) else urls
extract_depth = tavily_config.extract_depth if tavily_config else "basic"
timeout = tavily_config.timeout if tavily_config else 10
logger.info(
f"Initializing Tavily client with extract_depth={extract_depth}, timeout={timeout}s"
)
client = AsyncTavilyClient(
api_key=tavily_config.api_key if tavily_config else None,
proxies=tavily_config.proxies if tavily_config else None,
)
logger.info(f"Sending extract request to Tavily API for {len(url_list)} URL(s)")
results = await client.extract(
urls,
format="text",
extract_depth=tavily_config.extract_depth if tavily_config else "basic",
timeout=tavily_config.timeout if tavily_config else 10,
extract_depth=extract_depth,
timeout=timeout,
)
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
failed_count = len(results.get("failed_results", []))
if failed_count > 0:
logger.warning(f"Tavily API failed to fetch {failed_count} URL(s)")
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}
for result in results.get("results", []):
return_results[result["url"]] = result["raw_content"]
logger.info(f"Successfully fetched content from {len(return_results)} URL(s) via Tavily")
return return_results