feat: web scraping connector task (#1501)

<!-- .github/pull_request_template.md -->

## Description
<!--
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

## Type of Change
<!-- Please check the relevant option -->
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-10-11 11:25:52 +02:00 committed by GitHub
commit 04a0998e5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1440 additions and 8 deletions

View file

@ -37,5 +37,4 @@ dev = [
allow-direct-references = true
[project.scripts]
cognee = "src:main"
cognee-mcp = "src:main_mcp"
cognee-mcp = "src:main"

View file

@ -1,5 +1,6 @@
from uuid import UUID
from typing import Union, BinaryIO, List, Optional
import os
from typing import Union, BinaryIO, List, Optional, Dict, Any
from cognee.modules.users.models import User
from cognee.modules.pipelines import Task, run_pipeline
@ -11,6 +12,12 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
from cognee.context_global_variables import (
tavily_config as tavily,
soup_crawler_config as soup_crawler,
)
from urllib.parse import urlparse
async def add(
@ -23,12 +30,15 @@ async def add(
dataset_id: Optional[UUID] = None,
preferred_loaders: List[str] = None,
incremental_loading: bool = True,
extraction_rules: Optional[Dict[str, Any]] = None,
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
):
"""
Add data to Cognee for knowledge graph processing.
This is the first step in the Cognee workflow - it ingests raw data and prepares it
for processing. The function accepts various data formats including text, files, and
for processing. The function accepts various data formats including text, files, urls and
binary streams, then stores them in a specified dataset for further processing.
Prerequisites:
@ -68,6 +78,7 @@ async def add(
- S3 path: "s3://my-bucket/documents/file.pdf"
- List of mixed types: ["text content", "/path/file.pdf", "file://doc.txt", file_handle]
- Binary file object: open("file.txt", "rb")
- url: A web link url (https or http)
dataset_name: Name of the dataset to store data in. Defaults to "main_dataset".
Create separate datasets to organize different knowledge domains.
user: User object for authentication and permissions. Uses default user if None.
@ -78,6 +89,9 @@ async def add(
vector_db_config: Optional configuration for vector database (for custom setups).
graph_db_config: Optional configuration for graph database (for custom setups).
dataset_id: Optional specific dataset UUID to use instead of dataset_name.
extraction_rules: Optional dictionary of rules (e.g., CSS selectors, XPath) for extracting specific content from web pages using BeautifulSoup
tavily_config: Optional configuration for Tavily API, including API key and extraction settings
soup_crawler_config: Optional configuration for BeautifulSoup crawler, specifying concurrency, crawl delay, and extraction rules.
Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including:
@ -126,6 +140,21 @@ async def add(
# Add a single file
await cognee.add("/home/user/documents/analysis.pdf")
# Add a single url and bs4 extract ingestion method
extraction_rules = {
"title": "h1",
"description": "p",
"more_info": "a[href*='more-info']"
}
await cognee.add("https://example.com",extraction_rules=extraction_rules)
# Add a single url and tavily extract ingestion method
Make sure to set TAVILY_API_KEY = YOUR_TAVILY_API_KEY as a environment variable
await cognee.add("https://example.com")
# Add multiple urls
await cognee.add(["https://example.com","https://books.toscrape.com"])
```
Environment Variables:
@ -139,11 +168,38 @@ async def add(
- DEFAULT_USER_PASSWORD: Custom default user password
- VECTOR_DB_PROVIDER: "lancedb" (default), "chromadb", "pgvector"
- GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j"
- TAVILY_API_KEY: YOUR_TAVILY_API_KEY
"""
if not soup_crawler_config and extraction_rules:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if not tavily_config and os.getenv("TAVILY_API_KEY"):
tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY"))
soup_crawler.set(soup_crawler_config)
tavily.set(tavily_config)
http_schemes = {"http", "https"}
def _is_http_url(item: Union[str, BinaryIO]) -> bool:
return isinstance(item, str) and urlparse(item).scheme in http_schemes
if _is_http_url(data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
elif isinstance(data, list) and any(_is_http_url(item) for item in data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),
Task(
ingest_data,
dataset_name,
user,
node_set,
dataset_id,
preferred_loaders,
),
]
await setup()

View file

@ -12,6 +12,8 @@ from cognee.modules.users.methods import get_user
# for different async tasks, threads and processes
vector_db_config = ContextVar("vector_db_config", default=None)
graph_db_config = ContextVar("graph_db_config", default=None)
soup_crawler_config = ContextVar("soup_crawler_config", default=None)
tavily_config = ContextVar("tavily_config", default=None)
async def set_database_global_context_variables(dataset: Union[str, UUID], user_id: UUID):

View file

@ -7,6 +7,7 @@ from cognee.modules.ingestion.exceptions import IngestionError
from cognee.modules.ingestion import save_data_to_file
from cognee.shared.logging_utils import get_logger
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.context_global_variables import tavily_config, soup_crawler_config
logger = get_logger()
@ -17,6 +18,13 @@ class SaveDataSettings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="allow")
class HTMLContent(str):
def __new__(cls, value: str):
if not ("<" in value and ">" in value):
raise ValueError("Not valid HTML-like content")
return super().__new__(cls, value)
settings = SaveDataSettings()
@ -48,6 +56,39 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
# data is s3 file path
if parsed_url.scheme == "s3":
return data_item
elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
# Validate URL by sending a HEAD request
try:
from cognee.tasks.web_scraper import fetch_page_content
tavily = tavily_config.get()
soup_crawler = soup_crawler_config.get()
preferred_tool = "beautifulsoup" if soup_crawler else "tavily"
if preferred_tool == "tavily" and tavily is None:
raise IngestionError(
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
)
if preferred_tool == "beautifulsoup" and soup_crawler is None:
raise IngestionError(
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
)
data = await fetch_page_content(
data_item,
preferred_tool=preferred_tool,
tavily_config=tavily,
soup_crawler_config=soup_crawler,
)
content = ""
for key, value in data.items():
content += f"{key}:\n{value}\n\n"
return await save_data_to_file(content)
except IngestionError:
raise
except Exception as e:
raise IngestionError(
message=f"Error ingesting webpage results of url {data_item}: {str(e)}"
)
# data is local file path
elif parsed_url.scheme == "file":

View file

@ -0,0 +1,18 @@
"""Web scraping module for cognee.
This module provides tools for scraping web content, managing scraping jobs, and storing
data in a graph database. It includes classes and functions for crawling web pages using
BeautifulSoup or Tavily, defining data models, and handling scraping configurations.
"""
from .bs4_crawler import BeautifulSoupCrawler
from .utils import fetch_page_content
from .web_scraper_task import cron_web_scraper_task, web_scraper_task
__all__ = [
"BeautifulSoupCrawler",
"fetch_page_content",
"cron_web_scraper_task",
"web_scraper_task",
]

View file

@ -0,0 +1,546 @@
"""BeautifulSoup-based web crawler for extracting content from web pages.
This module provides the BeautifulSoupCrawler class for fetching and extracting content
from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It
supports robots.txt handling, rate limiting, and custom extraction rules.
"""
import asyncio
import time
from typing import Union, List, Dict, Any, Optional
from urllib.parse import urlparse
from dataclasses import dataclass, field
from functools import lru_cache
import httpx
from bs4 import BeautifulSoup
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
try:
from playwright.async_api import async_playwright
except ImportError:
logger.error(
"Failed to import playwright, make sure to install using pip install playwright>=1.9.0"
)
async_playwright = None
try:
from protego import Protego
except ImportError:
logger.error("Failed to import protego, make sure to install using pip install protego>=0.1")
Protego = None
@dataclass
class ExtractionRule:
"""Normalized extraction rule for web content.
Attributes:
selector: CSS selector for extraction (if any).
xpath: XPath expression for extraction (if any).
attr: HTML attribute to extract (if any).
all: If True, extract all matching elements; otherwise, extract first.
join_with: String to join multiple extracted elements.
"""
selector: Optional[str] = None
xpath: Optional[str] = None
attr: Optional[str] = None
all: bool = False
join_with: str = " "
@dataclass
class RobotsTxtCache:
"""Cache for robots.txt data.
Attributes:
protego: Parsed robots.txt object (Protego instance).
crawl_delay: Delay between requests (in seconds).
timestamp: Time when the cache entry was created.
"""
protego: Any
crawl_delay: float
timestamp: float = field(default_factory=time.time)
class BeautifulSoupCrawler:
"""Crawler for fetching and extracting web content using BeautifulSoup.
Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt
compliance, and rate limiting. Extracts content using CSS selectors or XPath rules.
Attributes:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (e.g., User-Agent).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
def __init__(
self,
*,
concurrency: int = 5,
crawl_delay: float = 0.5,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
robots_cache_ttl: float = 3600.0,
):
"""Initialize the BeautifulSoupCrawler.
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.crawl_delay = crawl_delay
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"}
self.robots_cache_ttl = robots_cache_ttl
self._last_request_time_per_domain: Dict[str, float] = {}
self._robots_cache: Dict[str, RobotsTxtCache] = {}
self._client: Optional[httpx.AsyncClient] = None
self._robots_lock = asyncio.Lock()
async def _ensure_client(self):
"""Initialize the HTTP client if not already created."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers)
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def __aenter__(self):
"""Enter the context manager, initializing the HTTP client."""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager, closing the HTTP client."""
await self.close()
@lru_cache(maxsize=1024)
def _domain_from_url(self, url: str) -> str:
"""Extract the domain (netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The domain (netloc) of the URL.
"""
try:
return urlparse(url).netloc
except Exception:
return url
@lru_cache(maxsize=1024)
def _get_domain_root(self, url: str) -> str:
"""Get the root URL (scheme and netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The root URL (e.g., "https://example.com").
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None):
"""Enforce rate limiting for requests to the same domain.
Args:
url: The URL to check.
crawl_delay: Custom crawl delay in seconds (if any).
"""
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
delay = crawl_delay if crawl_delay is not None else self.crawl_delay
if last is None:
self._last_request_time_per_domain[domain] = time.time()
return
elapsed = time.time() - last
wait_for = delay - elapsed
if wait_for > 0:
await asyncio.sleep(wait_for)
self._last_request_time_per_domain[domain] = time.time()
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
"""Get cached robots.txt data if valid.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found.
"""
if Protego is None:
return None
cached = self._robots_cache.get(domain_root)
if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl:
return cached
return None
async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache:
"""Fetch and cache robots.txt data.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
RobotsTxtCache: Cached robots.txt data with crawl delay.
Raises:
Exception: If fetching robots.txt fails.
"""
async with self._robots_lock:
cached = await self._get_robots_cache(domain_root)
if cached:
return cached
robots_url = f"{domain_root}/robots.txt"
try:
await self._ensure_client()
await self._respect_rate_limit(robots_url, self.crawl_delay)
resp = await self._client.get(robots_url, timeout=5.0)
content = resp.text if resp.status_code == 200 else ""
except Exception as e:
logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}")
content = ""
protego = Protego.parse(content) if content.strip() else None
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
crawl_delay = self.crawl_delay
if protego:
delay = protego.crawl_delay(agent) or protego.crawl_delay("*")
crawl_delay = delay if delay else self.crawl_delay
cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay)
self._robots_cache[domain_root] = cache_entry
return cache_entry
async def _is_url_allowed(self, url: str) -> bool:
"""Check if a URL is allowed by robots.txt.
Args:
url: The URL to check.
Returns:
bool: True if the URL is allowed, False otherwise.
"""
if Protego is None:
return True
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
if cache.protego is None:
return True
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url)
except Exception as e:
logger.debug(f"Error checking robots.txt for {url}: {e}")
return True
async def _get_crawl_delay(self, url: str) -> float:
"""Get the crawl delay for a URL from robots.txt.
Args:
url: The URL to check.
Returns:
float: Crawl delay in seconds.
"""
if Protego is None:
return self.crawl_delay
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
return cache.crawl_delay
except Exception:
return self.crawl_delay
async def _fetch_httpx(self, url: str) -> str:
"""Fetch a URL using HTTPX with retries.
Args:
url: The URL to fetch.
Returns:
str: The HTML content of the page.
Raises:
Exception: If all retry attempts fail.
"""
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
crawl_delay = await self._get_crawl_delay(url)
while True:
try:
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
return resp.text
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}")
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}"
)
await asyncio.sleep(delay)
async def _render_with_playwright(
self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None
) -> str:
"""Fetch and render a URL using Playwright for JavaScript content.
Args:
url: The URL to fetch.
js_wait: Seconds to wait for JavaScript to load.
timeout: Timeout for the request (in seconds, defaults to instance timeout).
Returns:
str: The rendered HTML content.
Raises:
RuntimeError: If Playwright is not installed.
Exception: If all retry attempts fail.
"""
if async_playwright is None:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
)
attempt = 0
while True:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
try:
context = await browser.new_context()
page = await context.new_page()
await page.goto(
url,
wait_until="networkidle",
timeout=int((timeout or self.timeout) * 1000),
)
if js_wait:
await asyncio.sleep(js_wait)
return await page.content()
finally:
await browser.close()
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Playwright fetch failed for {url}: {exc}")
raise
backoff = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})"
)
await asyncio.sleep(backoff)
def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule:
"""Normalize an extraction rule to an ExtractionRule dataclass.
Args:
rule: A string (CSS selector) or dict with extraction parameters.
Returns:
ExtractionRule: Normalized extraction rule.
Raises:
ValueError: If the rule is invalid.
"""
if isinstance(rule, str):
return ExtractionRule(selector=rule)
if isinstance(rule, dict):
return ExtractionRule(
selector=rule.get("selector"),
xpath=rule.get("xpath"),
attr=rule.get("attr"),
all=bool(rule.get("all", False)),
join_with=rule.get("join_with", " "),
)
raise ValueError(f"Invalid extraction rule: {rule}")
def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str:
"""Extract content from HTML using BeautifulSoup or lxml XPath.
Args:
html: The HTML content to extract from.
rule: The extraction rule to apply.
Returns:
str: The extracted content.
Raises:
RuntimeError: If XPath is used but lxml is not installed.
"""
soup = BeautifulSoup(html, "html.parser")
if rule.xpath:
try:
from lxml import html as lxml_html
except ImportError:
raise RuntimeError(
"XPath requested but lxml is not available. Install lxml or use CSS selectors."
)
doc = lxml_html.fromstring(html)
nodes = doc.xpath(rule.xpath)
texts = []
for n in nodes:
if hasattr(n, "text_content"):
texts.append(n.text_content().strip())
else:
texts.append(str(n).strip())
return rule.join_with.join(t for t in texts if t)
if not rule.selector:
return ""
if rule.all:
nodes = soup.select(rule.selector)
pieces = []
for el in nodes:
if rule.attr:
val = el.get(rule.attr)
if val:
pieces.append(val.strip())
else:
text = el.get_text(strip=True)
if text:
pieces.append(text)
return rule.join_with.join(pieces).strip()
else:
el = soup.select_one(rule.selector)
if el is None:
return ""
if rule.attr:
val = el.get(rule.attr)
return (val or "").strip()
return el.get_text(strip=True)
async def fetch_with_bs4(
self,
urls: Union[str, List[str], Dict[str, Dict[str, Any]]],
extraction_rules: Optional[Dict[str, Any]] = None,
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
join_all_matches: bool = False,
) -> Dict[str, str]:
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
Args:
urls: A single URL, list of URLs, or dict mapping URLs to extraction rules.
extraction_rules: Default extraction rules for string or list URLs.
use_playwright: If True, use Playwright for JavaScript rendering.
playwright_js_wait: Seconds to wait for JavaScript to load.
join_all_matches: If True, extract all matching elements for each rule.
Returns:
Dict[str, str]: A dictionary mapping URLs to their extracted content.
Raises:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
url_rules_map: Dict[str, Dict[str, Any]] = {}
if isinstance(urls, str):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a string")
url_rules_map[urls] = extraction_rules
elif isinstance(urls, list):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a list")
for url in urls:
url_rules_map[url] = extraction_rules
elif isinstance(urls, dict):
url_rules_map = urls
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
normalized_url_rules: Dict[str, List[ExtractionRule]] = {}
for url, rules in url_rules_map.items():
normalized_rules = []
for _, rule in rules.items():
r = self._normalize_rule(rule)
if join_all_matches:
r.all = True
normalized_rules.append(r)
normalized_url_rules[url] = normalized_rules
async def _task(url: str):
async with self._sem:
try:
allowed = await self._is_url_allowed(url)
if not allowed:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
if use_playwright:
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
)
else:
html = await self._fetch_httpx(url)
pieces = []
for rule in normalized_url_rules[url]:
text = self._extract_with_bs4(html, rule)
if text:
pieces.append(text)
concatenated = " ".join(pieces).strip()
return url, concatenated
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()]
results = {}
for coro in asyncio.as_completed(tasks):
url, text = await coro
results[url] = text
return results

View file

@ -0,0 +1,24 @@
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional, Literal
import os
class TavilyConfig(BaseModel):
api_key: Optional[str] = os.getenv("TAVILY_API_KEY")
extract_depth: Literal["basic", "advanced"] = "basic"
proxies: Optional[Dict[str, str]] = None
timeout: Optional[int] = Field(default=10, ge=1, le=60)
class SoupCrawlerConfig(BaseModel):
concurrency: int = 5
crawl_delay: float = 0.5
timeout: float = 15.0
max_retries: int = 2
retry_delay_factor: float = 0.5
headers: Optional[Dict[str, str]] = None
extraction_rules: Dict[str, Any]
use_playwright: bool = False
playwright_js_wait: float = 0.8
robots_cache_ttl: float = 3600.0
join_all_matches: bool = False

View file

@ -0,0 +1,46 @@
from cognee.infrastructure.engine import DataPoint
from typing import Optional, Dict, Any, List
from datetime import datetime
class WebPage(DataPoint):
"""Represents a scraped web page with metadata"""
name: Optional[str]
content: str
content_hash: str
scraped_at: datetime
last_modified: Optional[datetime]
status_code: int
content_type: str
page_size: int
extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used
description: str
metadata: dict = {"index_fields": ["name", "description", "content"]}
class WebSite(DataPoint):
"""Represents a website or domain being scraped"""
name: str
base_url: str
robots_txt: Optional[str]
crawl_delay: float
last_crawled: datetime
page_count: int
scraping_config: Dict[str, Any]
description: str
metadata: dict = {"index_fields": ["name", "description"]}
class ScrapingJob(DataPoint):
"""Represents a scraping job configuration"""
name: str
urls: List[str]
schedule: Optional[str] # Cron-like schedule for recurring scrapes
status: str # "active", "paused", "completed", "failed"
last_run: Optional[datetime]
next_run: Optional[datetime]
description: str
metadata: dict = {"index_fields": ["name", "description"]}

View file

@ -0,0 +1,126 @@
"""Utilities for fetching web content using BeautifulSoup or Tavily.
This module provides functions to fetch and extract content from web pages, supporting
both BeautifulSoup for custom extraction rules and Tavily for API-based scraping.
"""
from typing import Dict, List, Union, Optional, Literal
from cognee.shared.logging_utils import get_logger
from .bs4_crawler import BeautifulSoupCrawler
from .config import TavilyConfig, SoupCrawlerConfig
logger = get_logger(__name__)
async def fetch_page_content(
urls: Union[str, List[str]],
*,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
) -> Dict[str, str]:
"""Fetch content from one or more URLs using the specified tool.
This function retrieves web page content using either BeautifulSoup (with custom
extraction rules) or Tavily (API-based scraping). It handles single URLs or lists of
URLs and returns a dictionary mapping URLs to their extracted content.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
preferred_tool: The scraping tool to use ("tavily" or "beautifulsoup").
Defaults to "beautifulsoup".
tavily_config: Configuration for Tavily API, including API key.
Required if preferred_tool is "tavily".
soup_crawler_config: Configuration for BeautifulSoup crawler, including
extraction rules. Required if preferred_tool is "beautifulsoup" and
extraction_rules are needed.
Returns:
Dict[str, str]: A dictionary mapping each URL to its
extracted content (as a string for BeautifulSoup or a dict for Tavily).
Raises:
ValueError: If Tavily API key is missing when using Tavily, or if
extraction_rules are not provided when using BeautifulSoup.
ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not
installed.
"""
if preferred_tool == "tavily":
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
return await fetch_with_tavily(urls, tavily_config)
if preferred_tool == "beautifulsoup":
try:
from bs4 import BeautifulSoup as _ # noqa: F401
except ImportError:
logger.error(
"Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1"
)
raise ImportError
if not soup_crawler_config or soup_crawler_config.extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
extraction_rules = soup_crawler_config.extraction_rules
crawler = BeautifulSoupCrawler(
concurrency=soup_crawler_config.concurrency,
crawl_delay=soup_crawler_config.crawl_delay,
timeout=soup_crawler_config.timeout,
max_retries=soup_crawler_config.max_retries,
retry_delay_factor=soup_crawler_config.retry_delay_factor,
headers=soup_crawler_config.headers,
robots_cache_ttl=soup_crawler_config.robots_cache_ttl,
)
try:
results = await crawler.fetch_with_bs4(
urls,
extraction_rules,
use_playwright=soup_crawler_config.use_playwright,
playwright_js_wait=soup_crawler_config.playwright_js_wait,
join_all_matches=soup_crawler_config.join_all_matches,
)
return results
except Exception as e:
logger.error(f"Error fetching page content: {str(e)}")
raise
finally:
await crawler.close()
async def fetch_with_tavily(
urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None
) -> Dict[str, str]:
"""Fetch content from URLs using the Tavily API.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
Returns:
Dict[str, str]: A dictionary mapping each URL to its raw content as a string.
Raises:
ImportError: If tavily-python is not installed.
Exception: If the Tavily API request fails.
"""
try:
from tavily import AsyncTavilyClient
except ImportError:
logger.error(
"Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0"
)
raise
client = AsyncTavilyClient(
api_key=tavily_config.api_key if tavily_config else None,
proxies=tavily_config.proxies if tavily_config else None,
)
results = await client.extract(
urls,
format="text",
extract_depth=tavily_config.extract_depth if tavily_config else "basic",
timeout=tavily_config.timeout if tavily_config else 10,
)
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}
for result in results.get("results", []):
return_results[result["url"]] = result["raw_content"]
return return_results

View file

@ -0,0 +1,396 @@
"""Web scraping tasks for storing scraped data in a graph database.
This module provides functions to scrape web content, create or update WebPage, WebSite,
and ScrapingJob data points, and store them in a Kuzu graph database. It supports
scheduled scraping tasks and ensures that node updates preserve existing graph edges.
"""
import os
import hashlib
from datetime import datetime
from typing import Union, List
from urllib.parse import urlparse
from uuid import uuid5, NAMESPACE_OID
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.shared.logging_utils import get_logger
from cognee.tasks.storage.index_data_points import index_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.engine.operations.setup import setup
from .models import WebPage, WebSite, ScrapingJob
from .config import SoupCrawlerConfig, TavilyConfig
from .utils import fetch_page_content
try:
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.asyncio import AsyncIOScheduler
except ImportError:
raise ImportError("Please install apscheduler by pip install APScheduler>=3.10")
logger = get_logger(__name__)
_scheduler = None
def get_scheduler():
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
return _scheduler
async def cron_web_scraper_task(
url: Union[str, List[str]],
*,
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = "scraping",
):
"""Schedule or run a web scraping task.
This function schedules a recurring web scraping task using APScheduler or runs it
immediately if no schedule is provided. It delegates to web_scraper_task for actual
scraping and graph storage.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs immediately.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to "scraping".
Returns:
Any: The result of web_scraper_task if run immediately, or None if scheduled.
Raises:
ValueError: If the schedule is an invalid cron expression.
ImportError: If APScheduler is not installed.
"""
now = datetime.now()
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
if schedule:
try:
trigger = CronTrigger.from_crontab(schedule)
except ValueError as e:
raise ValueError(f"Invalid cron string '{schedule}': {e}")
scheduler = get_scheduler()
scheduler.add_job(
web_scraper_task,
kwargs={
"url": url,
"schedule": schedule,
"extraction_rules": extraction_rules,
"tavily_api_key": tavily_api_key,
"soup_crawler_config": soup_crawler_config,
"tavily_config": tavily_config,
"job_name": job_name,
},
trigger=trigger,
id=job_name,
name=job_name or f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}",
replace_existing=True,
)
if not scheduler.running:
scheduler.start()
return
# If no schedule, run immediately
logger.info(f"[{datetime.now()}] Running web scraper task immediately...")
return await web_scraper_task(
url=url,
schedule=schedule,
extraction_rules=extraction_rules,
tavily_api_key=tavily_api_key,
soup_crawler_config=soup_crawler_config,
tavily_config=tavily_config,
job_name=job_name,
)
async def web_scraper_task(
url: Union[str, List[str]],
*,
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = None,
):
"""Scrape URLs and store data points in a Graph database.
This function scrapes content from the provided URLs, creates or updates WebPage,
WebSite, and ScrapingJob data points, and stores them in a Graph database.
Each data point includes a description field summarizing its attributes. It creates
'is_scraping' (ScrapingJob to WebSite) and 'is_part_of' (WebPage to WebSite)
relationships, preserving existing edges during node updates.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs once.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to a timestamp-based name.
Returns:
Any: The graph data returned by the graph database.
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
Exception: If fetching content or database operations fail.
"""
await setup()
graph_db = await get_graph_engine()
if isinstance(url, str):
url = [url]
soup_crawler_config, tavily_config, preferred_tool = check_arguments(
tavily_api_key, extraction_rules, tavily_config, soup_crawler_config
)
now = datetime.now()
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
status = "active"
trigger = CronTrigger.from_crontab(schedule) if schedule else None
next_run = trigger.get_next_fire_time(None, now) if trigger else None
scraping_job_created = await graph_db.get_node(uuid5(NAMESPACE_OID, name=job_name))
# Create description for ScrapingJob
scraping_job_description = (
f"Scraping job: {job_name}\n"
f"URLs: {', '.join(url)}\n"
f"Status: {status}\n"
f"Schedule: {schedule}\n"
f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
)
scraping_job = ScrapingJob(
id=uuid5(NAMESPACE_OID, name=job_name),
name=job_name,
urls=url,
status=status,
schedule=schedule,
last_run=now,
next_run=next_run,
description=scraping_job_description,
)
if scraping_job_created:
await graph_db.add_node(scraping_job) # Update existing scraping job
websites_dict = {}
webpages = []
# Fetch content
results = await fetch_page_content(
urls=url,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
for page_url, content in results.items():
parsed_url = urlparse(page_url)
domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Create or update WebSite
if base_url not in websites_dict:
# Create description for WebSite
website_description = (
f"Website: {domain}\n"
f"Base URL: {base_url}\n"
f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Page count: 1\n"
f"Scraping tool: {preferred_tool}\n"
f"Robots.txt: {'Available' if websites_dict.get(base_url, {}).get('robots_txt') else 'Not set'}\n"
f"Crawl delay: 0.5 seconds"
)
websites_dict[base_url] = WebSite(
id=uuid5(NAMESPACE_OID, name=domain),
name=domain,
base_url=base_url,
robots_txt=None,
crawl_delay=0.5,
last_crawled=now,
page_count=1,
scraping_config={
"extraction_rules": extraction_rules or {},
"tool": preferred_tool,
},
description=website_description,
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
else:
websites_dict[base_url].page_count += 1
# Update description for existing WebSite
websites_dict[base_url].description = (
f"Website: {domain}\n"
f"Base URL: {base_url}\n"
f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Page count: {websites_dict[base_url].page_count}\n"
f"Scraping tool: {preferred_tool}\n"
f"Robots.txt: {'Available' if websites_dict[base_url].robots_txt else 'Not set'}\n"
f"Crawl delay: {websites_dict[base_url].crawl_delay} seconds"
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
# Create WebPage
content_str = content if isinstance(content, str) else str(content)
content_hash = hashlib.sha256(content_str.encode("utf-8")).hexdigest()
content_preview = content_str[:500] + ("..." if len(content_str) > 500 else "")
# Create description for WebPage
webpage_description = (
f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n"
f"URL: {page_url}\n"
f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Content: {content_preview}\n"
f"Content type: text\n"
f"Page size: {len(content_str)} bytes\n"
f"Status code: 200"
)
page_extraction_rules = extraction_rules
webpage = WebPage(
id=uuid5(NAMESPACE_OID, name=page_url),
name=page_url,
content=content_str,
content_hash=content_hash,
scraped_at=now,
last_modified=None,
status_code=200,
content_type="text/html",
page_size=len(content_str),
extraction_rules=page_extraction_rules or {},
description=webpage_description,
)
webpages.append(webpage)
scraping_job.status = "completed" if webpages else "failed"
# Update ScrapingJob description with final status
scraping_job.description = (
f"Scraping job: {job_name}\n"
f"URLs: {', '.join(url)}\n"
f"Status: {scraping_job.status}\n"
f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
)
websites = list(websites_dict.values())
# Adding Nodes and Edges
node_mapping = {scraping_job.id: scraping_job}
edge_mapping = []
for website in websites:
node_mapping[website.id] = website
edge_mapping.append(
(
scraping_job.id,
website.id,
"is_scraping",
{
"source_node_id": scraping_job.id,
"target_node_id": website.id,
"relationship_name": "is_scraping",
},
)
)
for webpage in webpages:
node_mapping[webpage.id] = webpage
parsed_url = urlparse(webpage.name)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
edge_mapping.append(
(
webpage.id,
websites_dict[base_url].id,
"is_part_of",
{
"source_node_id": webpage.id,
"target_node_id": websites_dict[base_url].id,
"relationship_name": "is_part_of",
},
)
)
await graph_db.add_nodes(list(node_mapping.values()))
await graph_db.add_edges(edge_mapping)
await index_data_points(list(node_mapping.values()))
await index_graph_edges()
return await graph_db.get_graph_data()
def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config):
"""Validate and configure arguments for web_scraper_task.
Args:
tavily_api_key: API key for Tavily.
extraction_rules: Extraction rules for BeautifulSoup.
tavily_config: Configuration for Tavily API.
soup_crawler_config: Configuration for BeautifulSoup crawler.
Returns:
Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config,
tavily_config, and preferred_tool ("tavily" or "beautifulsoup").
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
"""
preferred_tool = "beautifulsoup"
if extraction_rules and not soup_crawler_config:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if tavily_api_key:
if not tavily_config:
tavily_config = TavilyConfig(api_key=tavily_api_key)
else:
tavily_config.api_key = tavily_api_key
if not extraction_rules and not soup_crawler_config:
preferred_tool = "tavily"
if not tavily_config and not soup_crawler_config:
raise TypeError("Make sure you pass arguments for web_scraper_task")
return soup_crawler_config, tavily_config, preferred_tool
def get_path_after_base(base_url: str, url: str) -> str:
"""Extract the path after the base URL.
Args:
base_url: The base URL (e.g., "https://example.com").
url: The full URL to extract the path from.
Returns:
str: The path after the base URL, with leading slashes removed.
Raises:
ValueError: If the base URL and target URL are from different domains.
"""
parsed_base = urlparse(base_url)
parsed_url = urlparse(url)
# Ensure they have the same netloc (domain)
if parsed_base.netloc != parsed_url.netloc:
raise ValueError("Base URL and target URL are from different domains")
# Return everything after base_url path
base_path = parsed_base.path.rstrip("/")
full_path = parsed_url.path
if full_path.startswith(base_path):
return full_path[len(base_path) :].lstrip("/")
else:
return full_path.lstrip("/")

View file

@ -0,0 +1,172 @@
import asyncio
import cognee
from cognee.tasks.web_scraper.config import SoupCrawlerConfig
from cognee.tasks.web_scraper import cron_web_scraper_task
async def test_web_scraping_using_bs4():
await cognee.prune.prune_data()
await cognee.prune.prune_system()
url = "https://quotes.toscrape.com/"
rules = {
"quotes": {"selector": ".quote span.text", "all": True},
"authors": {"selector": ".quote small", "all": True},
}
soup_config = SoupCrawlerConfig(
concurrency=5,
crawl_delay=0.5,
timeout=15.0,
max_retries=2,
retry_delay_factor=0.5,
extraction_rules=rules,
use_playwright=False,
)
await cognee.add(
data=url,
soup_crawler_config=soup_config,
incremental_loading=False,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
async def test_web_scraping_using_bs4_and_incremental_loading():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://books.toscrape.com/"
rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"}
soup_config = SoupCrawlerConfig(
concurrency=1,
crawl_delay=0.1,
timeout=10.0,
max_retries=1,
retry_delay_factor=0.5,
extraction_rules=rules,
use_playwright=False,
structured=True,
)
await cognee.add(
data=url,
soup_crawler_config=soup_config,
incremental_loading=True,
)
await cognee.cognify()
results = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results[0]
print("Test passed! Found 'A Light in the Attic' in scraped data.")
async def test_web_scraping_using_tavily():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://quotes.toscrape.com/"
await cognee.add(
data=url,
incremental_loading=False,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
async def test_web_scraping_using_tavily_and_incremental_loading():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://quotes.toscrape.com/"
await cognee.add(
data=url,
incremental_loading=True,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
# ---------- cron job tests ----------
async def test_cron_web_scraper():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
urls = ["https://quotes.toscrape.com/", "https://books.toscrape.com/"]
extraction_rules = {
"quotes": ".quote .text",
"authors": ".quote .author",
"titles": "article.product_pod h3 a",
"prices": "article.product_pod p.price_color",
}
# Run cron_web_scraper_task
await cron_web_scraper_task(
url=urls,
job_name="cron_scraping_job",
extraction_rules=extraction_rules,
)
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
results_books = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results_books[0]
print("Cron job web_scraping test passed!")
async def main():
print("Starting BS4 incremental loading test...")
await test_web_scraping_using_bs4_and_incremental_loading()
print("Starting BS4 normal test...")
await test_web_scraping_using_bs4()
print("Starting Tavily incremental loading test...")
await test_web_scraping_using_tavily_and_incremental_loading()
print("Starting Tavily normal test...")
await test_web_scraping_using_tavily()
print("Starting cron job test...")
await test_cron_web_scraper()
if __name__ == "__main__":
asyncio.run(main())

@ -1 +0,0 @@
Subproject commit 130b84db9270734756d16918e5c86034777140fc

View file

@ -63,7 +63,14 @@ api=[]
distributed = [
"modal>=1.0.5,<2.0.0",
]
scraping = [
"tavily-python>=0.7.0",
"beautifulsoup4>=4.13.1",
"playwright>=1.9.0",
"lxml>=4.9.3,<5.0.0",
"protego>=0.1",
"APScheduler>=3.10.0,<=3.11.0"
]
neo4j = ["neo4j>=5.28.0,<6"]
neptune = ["langchain_aws>=0.2.22"]
postgres = [

2
uv.lock generated
View file

@ -8966,4 +8966,4 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8e/e0/69a553d2047f9a2c7347caa225bb3a63b6d7704ad74610cb7823baa08ed7/zstandard-0.25.0-cp313-cp313-win32.whl", hash = "sha256:7030defa83eef3e51ff26f0b7bfb229f0204b66fe18e04359ce3474ac33cbc09", size = 436936, upload-time = "2025-09-14T22:17:52.658Z" },
{ url = "https://files.pythonhosted.org/packages/d9/82/b9c06c870f3bd8767c201f1edbdf9e8dc34be5b0fbc5682c4f80fe948475/zstandard-0.25.0-cp313-cp313-win_amd64.whl", hash = "sha256:1f830a0dac88719af0ae43b8b2d6aef487d437036468ef3c2ea59c51f9d55fd5", size = 506232, upload-time = "2025-09-14T22:17:50.402Z" },
{ url = "https://files.pythonhosted.org/packages/d4/57/60c3c01243bb81d381c9916e2a6d9e149ab8627c0c7d7abb2d73384b3c0c/zstandard-0.25.0-cp313-cp313-win_arm64.whl", hash = "sha256:85304a43f4d513f5464ceb938aa02c1e78c2943b29f44a750b48b25ac999a049", size = 462671, upload-time = "2025-09-14T22:17:51.533Z" },
]
]