diff --git a/cognee-mcp/pyproject.toml b/cognee-mcp/pyproject.toml index bc0ebeac5..e5b31ad46 100644 --- a/cognee-mcp/pyproject.toml +++ b/cognee-mcp/pyproject.toml @@ -37,5 +37,4 @@ dev = [ allow-direct-references = true [project.scripts] -cognee = "src:main" -cognee-mcp = "src:main_mcp" +cognee-mcp = "src:main" \ No newline at end of file diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 1880735c2..c67afe7de 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,5 +1,6 @@ from uuid import UUID -from typing import Union, BinaryIO, List, Optional +import os +from typing import Union, BinaryIO, List, Optional, Dict, Any from cognee.modules.users.models import User from cognee.modules.pipelines import Task, run_pipeline @@ -11,6 +12,12 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( ) from cognee.modules.engine.operations.setup import setup from cognee.tasks.ingestion import ingest_data, resolve_data_directories +from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig +from cognee.context_global_variables import ( + tavily_config as tavily, + soup_crawler_config as soup_crawler, +) +from urllib.parse import urlparse async def add( @@ -23,12 +30,15 @@ async def add( dataset_id: Optional[UUID] = None, preferred_loaders: List[str] = None, incremental_loading: bool = True, + extraction_rules: Optional[Dict[str, Any]] = None, + tavily_config: Optional[TavilyConfig] = None, + soup_crawler_config: Optional[SoupCrawlerConfig] = None, ): """ Add data to Cognee for knowledge graph processing. This is the first step in the Cognee workflow - it ingests raw data and prepares it - for processing. The function accepts various data formats including text, files, and + for processing. The function accepts various data formats including text, files, urls and binary streams, then stores them in a specified dataset for further processing. Prerequisites: @@ -68,6 +78,7 @@ async def add( - S3 path: "s3://my-bucket/documents/file.pdf" - List of mixed types: ["text content", "/path/file.pdf", "file://doc.txt", file_handle] - Binary file object: open("file.txt", "rb") + - url: A web link url (https or http) dataset_name: Name of the dataset to store data in. Defaults to "main_dataset". Create separate datasets to organize different knowledge domains. user: User object for authentication and permissions. Uses default user if None. @@ -78,6 +89,9 @@ async def add( vector_db_config: Optional configuration for vector database (for custom setups). graph_db_config: Optional configuration for graph database (for custom setups). dataset_id: Optional specific dataset UUID to use instead of dataset_name. + extraction_rules: Optional dictionary of rules (e.g., CSS selectors, XPath) for extracting specific content from web pages using BeautifulSoup + tavily_config: Optional configuration for Tavily API, including API key and extraction settings + soup_crawler_config: Optional configuration for BeautifulSoup crawler, specifying concurrency, crawl delay, and extraction rules. Returns: PipelineRunInfo: Information about the ingestion pipeline execution including: @@ -126,6 +140,21 @@ async def add( # Add a single file await cognee.add("/home/user/documents/analysis.pdf") + + # Add a single url and bs4 extract ingestion method + extraction_rules = { + "title": "h1", + "description": "p", + "more_info": "a[href*='more-info']" + } + await cognee.add("https://example.com",extraction_rules=extraction_rules) + + # Add a single url and tavily extract ingestion method + Make sure to set TAVILY_API_KEY = YOUR_TAVILY_API_KEY as a environment variable + await cognee.add("https://example.com") + + # Add multiple urls + await cognee.add(["https://example.com","https://books.toscrape.com"]) ``` Environment Variables: @@ -139,11 +168,38 @@ async def add( - DEFAULT_USER_PASSWORD: Custom default user password - VECTOR_DB_PROVIDER: "lancedb" (default), "chromadb", "pgvector" - GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j" + - TAVILY_API_KEY: YOUR_TAVILY_API_KEY """ + + if not soup_crawler_config and extraction_rules: + soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) + if not tavily_config and os.getenv("TAVILY_API_KEY"): + tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY")) + + soup_crawler.set(soup_crawler_config) + tavily.set(tavily_config) + + http_schemes = {"http", "https"} + + def _is_http_url(item: Union[str, BinaryIO]) -> bool: + return isinstance(item, str) and urlparse(item).scheme in http_schemes + + if _is_http_url(data): + node_set = ["web_content"] if not node_set else node_set + ["web_content"] + elif isinstance(data, list) and any(_is_http_url(item) for item in data): + node_set = ["web_content"] if not node_set else node_set + ["web_content"] + tasks = [ Task(resolve_data_directories, include_subdirectories=True), - Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders), + Task( + ingest_data, + dataset_name, + user, + node_set, + dataset_id, + preferred_loaders, + ), ] await setup() diff --git a/cognee/context_global_variables.py b/cognee/context_global_variables.py index f6642e3dc..500846e27 100644 --- a/cognee/context_global_variables.py +++ b/cognee/context_global_variables.py @@ -12,6 +12,8 @@ from cognee.modules.users.methods import get_user # for different async tasks, threads and processes vector_db_config = ContextVar("vector_db_config", default=None) graph_db_config = ContextVar("graph_db_config", default=None) +soup_crawler_config = ContextVar("soup_crawler_config", default=None) +tavily_config = ContextVar("tavily_config", default=None) async def set_database_global_context_variables(dataset: Union[str, UUID], user_id: UUID): diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 34f70e917..105f17c0d 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -7,6 +7,7 @@ from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion import save_data_to_file from cognee.shared.logging_utils import get_logger from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.context_global_variables import tavily_config, soup_crawler_config logger = get_logger() @@ -17,6 +18,13 @@ class SaveDataSettings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="allow") +class HTMLContent(str): + def __new__(cls, value: str): + if not ("<" in value and ">" in value): + raise ValueError("Not valid HTML-like content") + return super().__new__(cls, value) + + settings = SaveDataSettings() @@ -48,6 +56,39 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str # data is s3 file path if parsed_url.scheme == "s3": return data_item + elif parsed_url.scheme == "http" or parsed_url.scheme == "https": + # Validate URL by sending a HEAD request + try: + from cognee.tasks.web_scraper import fetch_page_content + + tavily = tavily_config.get() + soup_crawler = soup_crawler_config.get() + preferred_tool = "beautifulsoup" if soup_crawler else "tavily" + if preferred_tool == "tavily" and tavily is None: + raise IngestionError( + message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig." + ) + if preferred_tool == "beautifulsoup" and soup_crawler is None: + raise IngestionError( + message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper." + ) + + data = await fetch_page_content( + data_item, + preferred_tool=preferred_tool, + tavily_config=tavily, + soup_crawler_config=soup_crawler, + ) + content = "" + for key, value in data.items(): + content += f"{key}:\n{value}\n\n" + return await save_data_to_file(content) + except IngestionError: + raise + except Exception as e: + raise IngestionError( + message=f"Error ingesting webpage results of url {data_item}: {str(e)}" + ) # data is local file path elif parsed_url.scheme == "file": diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py new file mode 100644 index 000000000..d8e580fad --- /dev/null +++ b/cognee/tasks/web_scraper/__init__.py @@ -0,0 +1,18 @@ +"""Web scraping module for cognee. + +This module provides tools for scraping web content, managing scraping jobs, and storing +data in a graph database. It includes classes and functions for crawling web pages using +BeautifulSoup or Tavily, defining data models, and handling scraping configurations. +""" + +from .bs4_crawler import BeautifulSoupCrawler +from .utils import fetch_page_content +from .web_scraper_task import cron_web_scraper_task, web_scraper_task + + +__all__ = [ + "BeautifulSoupCrawler", + "fetch_page_content", + "cron_web_scraper_task", + "web_scraper_task", +] diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/bs4_crawler.py new file mode 100644 index 000000000..568127311 --- /dev/null +++ b/cognee/tasks/web_scraper/bs4_crawler.py @@ -0,0 +1,546 @@ +"""BeautifulSoup-based web crawler for extracting content from web pages. + +This module provides the BeautifulSoupCrawler class for fetching and extracting content +from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It +supports robots.txt handling, rate limiting, and custom extraction rules. +""" + +import asyncio +import time +from typing import Union, List, Dict, Any, Optional +from urllib.parse import urlparse +from dataclasses import dataclass, field +from functools import lru_cache +import httpx +from bs4 import BeautifulSoup +from cognee.shared.logging_utils import get_logger + +logger = get_logger(__name__) + +try: + from playwright.async_api import async_playwright +except ImportError: + logger.error( + "Failed to import playwright, make sure to install using pip install playwright>=1.9.0" + ) + async_playwright = None + +try: + from protego import Protego +except ImportError: + logger.error("Failed to import protego, make sure to install using pip install protego>=0.1") + Protego = None + + +@dataclass +class ExtractionRule: + """Normalized extraction rule for web content. + + Attributes: + selector: CSS selector for extraction (if any). + xpath: XPath expression for extraction (if any). + attr: HTML attribute to extract (if any). + all: If True, extract all matching elements; otherwise, extract first. + join_with: String to join multiple extracted elements. + """ + + selector: Optional[str] = None + xpath: Optional[str] = None + attr: Optional[str] = None + all: bool = False + join_with: str = " " + + +@dataclass +class RobotsTxtCache: + """Cache for robots.txt data. + + Attributes: + protego: Parsed robots.txt object (Protego instance). + crawl_delay: Delay between requests (in seconds). + timestamp: Time when the cache entry was created. + """ + + protego: Any + crawl_delay: float + timestamp: float = field(default_factory=time.time) + + +class BeautifulSoupCrawler: + """Crawler for fetching and extracting web content using BeautifulSoup. + + Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt + compliance, and rate limiting. Extracts content using CSS selectors or XPath rules. + + Attributes: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (e.g., User-Agent). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. + """ + + def __init__( + self, + *, + concurrency: int = 5, + crawl_delay: float = 0.5, + timeout: float = 15.0, + max_retries: int = 2, + retry_delay_factor: float = 0.5, + headers: Optional[Dict[str, str]] = None, + robots_cache_ttl: float = 3600.0, + ): + """Initialize the BeautifulSoupCrawler. + + Args: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. + """ + self.concurrency = concurrency + self._sem = asyncio.Semaphore(concurrency) + self.crawl_delay = crawl_delay + self.timeout = timeout + self.max_retries = max_retries + self.retry_delay_factor = retry_delay_factor + self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"} + self.robots_cache_ttl = robots_cache_ttl + self._last_request_time_per_domain: Dict[str, float] = {} + self._robots_cache: Dict[str, RobotsTxtCache] = {} + self._client: Optional[httpx.AsyncClient] = None + self._robots_lock = asyncio.Lock() + + async def _ensure_client(self): + """Initialize the HTTP client if not already created.""" + if self._client is None: + self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) + + async def close(self): + """Close the HTTP client.""" + if self._client: + await self._client.aclose() + self._client = None + + async def __aenter__(self): + """Enter the context manager, initializing the HTTP client.""" + await self._ensure_client() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager, closing the HTTP client.""" + await self.close() + + @lru_cache(maxsize=1024) + def _domain_from_url(self, url: str) -> str: + """Extract the domain (netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The domain (netloc) of the URL. + """ + try: + return urlparse(url).netloc + except Exception: + return url + + @lru_cache(maxsize=1024) + def _get_domain_root(self, url: str) -> str: + """Get the root URL (scheme and netloc) from a URL. + + Args: + url: The URL to parse. + + Returns: + str: The root URL (e.g., "https://example.com"). + """ + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None): + """Enforce rate limiting for requests to the same domain. + + Args: + url: The URL to check. + crawl_delay: Custom crawl delay in seconds (if any). + """ + domain = self._domain_from_url(url) + last = self._last_request_time_per_domain.get(domain) + delay = crawl_delay if crawl_delay is not None else self.crawl_delay + + if last is None: + self._last_request_time_per_domain[domain] = time.time() + return + + elapsed = time.time() - last + wait_for = delay - elapsed + if wait_for > 0: + await asyncio.sleep(wait_for) + self._last_request_time_per_domain[domain] = time.time() + + async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]: + """Get cached robots.txt data if valid. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found. + """ + if Protego is None: + return None + + cached = self._robots_cache.get(domain_root) + if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl: + return cached + return None + + async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache: + """Fetch and cache robots.txt data. + + Args: + domain_root: The root URL (e.g., "https://example.com"). + + Returns: + RobotsTxtCache: Cached robots.txt data with crawl delay. + + Raises: + Exception: If fetching robots.txt fails. + """ + async with self._robots_lock: + cached = await self._get_robots_cache(domain_root) + if cached: + return cached + + robots_url = f"{domain_root}/robots.txt" + try: + await self._ensure_client() + await self._respect_rate_limit(robots_url, self.crawl_delay) + resp = await self._client.get(robots_url, timeout=5.0) + content = resp.text if resp.status_code == 200 else "" + except Exception as e: + logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}") + content = "" + + protego = Protego.parse(content) if content.strip() else None + agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") + + crawl_delay = self.crawl_delay + if protego: + delay = protego.crawl_delay(agent) or protego.crawl_delay("*") + crawl_delay = delay if delay else self.crawl_delay + + cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay) + self._robots_cache[domain_root] = cache_entry + return cache_entry + + async def _is_url_allowed(self, url: str) -> bool: + """Check if a URL is allowed by robots.txt. + + Args: + url: The URL to check. + + Returns: + bool: True if the URL is allowed, False otherwise. + """ + if Protego is None: + return True + + try: + domain_root = self._get_domain_root(url) + cache = await self._get_robots_cache(domain_root) + if cache is None: + cache = await self._fetch_and_cache_robots(domain_root) + + if cache.protego is None: + return True + + agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") + return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url) + except Exception as e: + logger.debug(f"Error checking robots.txt for {url}: {e}") + return True + + async def _get_crawl_delay(self, url: str) -> float: + """Get the crawl delay for a URL from robots.txt. + + Args: + url: The URL to check. + + Returns: + float: Crawl delay in seconds. + """ + if Protego is None: + return self.crawl_delay + + try: + domain_root = self._get_domain_root(url) + cache = await self._get_robots_cache(domain_root) + if cache is None: + cache = await self._fetch_and_cache_robots(domain_root) + return cache.crawl_delay + except Exception: + return self.crawl_delay + + async def _fetch_httpx(self, url: str) -> str: + """Fetch a URL using HTTPX with retries. + + Args: + url: The URL to fetch. + + Returns: + str: The HTML content of the page. + + Raises: + Exception: If all retry attempts fail. + """ + await self._ensure_client() + assert self._client is not None, "HTTP client not initialized" + + attempt = 0 + crawl_delay = await self._get_crawl_delay(url) + + while True: + try: + await self._respect_rate_limit(url, crawl_delay) + resp = await self._client.get(url) + resp.raise_for_status() + return resp.text + except Exception as exc: + attempt += 1 + if attempt > self.max_retries: + logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}") + raise + + delay = self.retry_delay_factor * (2 ** (attempt - 1)) + logger.warning( + f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}" + ) + await asyncio.sleep(delay) + + async def _render_with_playwright( + self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None + ) -> str: + """Fetch and render a URL using Playwright for JavaScript content. + + Args: + url: The URL to fetch. + js_wait: Seconds to wait for JavaScript to load. + timeout: Timeout for the request (in seconds, defaults to instance timeout). + + Returns: + str: The rendered HTML content. + + Raises: + RuntimeError: If Playwright is not installed. + Exception: If all retry attempts fail. + """ + if async_playwright is None: + raise RuntimeError( + "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." + ) + attempt = 0 + while True: + try: + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + try: + context = await browser.new_context() + page = await context.new_page() + await page.goto( + url, + wait_until="networkidle", + timeout=int((timeout or self.timeout) * 1000), + ) + if js_wait: + await asyncio.sleep(js_wait) + return await page.content() + finally: + await browser.close() + except Exception as exc: + attempt += 1 + if attempt > self.max_retries: + logger.error(f"Playwright fetch failed for {url}: {exc}") + raise + backoff = self.retry_delay_factor * (2 ** (attempt - 1)) + logger.warning( + f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})" + ) + await asyncio.sleep(backoff) + + def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule: + """Normalize an extraction rule to an ExtractionRule dataclass. + + Args: + rule: A string (CSS selector) or dict with extraction parameters. + + Returns: + ExtractionRule: Normalized extraction rule. + + Raises: + ValueError: If the rule is invalid. + """ + if isinstance(rule, str): + return ExtractionRule(selector=rule) + if isinstance(rule, dict): + return ExtractionRule( + selector=rule.get("selector"), + xpath=rule.get("xpath"), + attr=rule.get("attr"), + all=bool(rule.get("all", False)), + join_with=rule.get("join_with", " "), + ) + raise ValueError(f"Invalid extraction rule: {rule}") + + def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str: + """Extract content from HTML using BeautifulSoup or lxml XPath. + + Args: + html: The HTML content to extract from. + rule: The extraction rule to apply. + + Returns: + str: The extracted content. + + Raises: + RuntimeError: If XPath is used but lxml is not installed. + """ + soup = BeautifulSoup(html, "html.parser") + + if rule.xpath: + try: + from lxml import html as lxml_html + except ImportError: + raise RuntimeError( + "XPath requested but lxml is not available. Install lxml or use CSS selectors." + ) + doc = lxml_html.fromstring(html) + nodes = doc.xpath(rule.xpath) + texts = [] + for n in nodes: + if hasattr(n, "text_content"): + texts.append(n.text_content().strip()) + else: + texts.append(str(n).strip()) + return rule.join_with.join(t for t in texts if t) + + if not rule.selector: + return "" + + if rule.all: + nodes = soup.select(rule.selector) + pieces = [] + for el in nodes: + if rule.attr: + val = el.get(rule.attr) + if val: + pieces.append(val.strip()) + else: + text = el.get_text(strip=True) + if text: + pieces.append(text) + return rule.join_with.join(pieces).strip() + else: + el = soup.select_one(rule.selector) + if el is None: + return "" + if rule.attr: + val = el.get(rule.attr) + return (val or "").strip() + return el.get_text(strip=True) + + async def fetch_with_bs4( + self, + urls: Union[str, List[str], Dict[str, Dict[str, Any]]], + extraction_rules: Optional[Dict[str, Any]] = None, + *, + use_playwright: bool = False, + playwright_js_wait: float = 0.8, + join_all_matches: bool = False, + ) -> Dict[str, str]: + """Fetch and extract content from URLs using BeautifulSoup or Playwright. + + Args: + urls: A single URL, list of URLs, or dict mapping URLs to extraction rules. + extraction_rules: Default extraction rules for string or list URLs. + use_playwright: If True, use Playwright for JavaScript rendering. + playwright_js_wait: Seconds to wait for JavaScript to load. + join_all_matches: If True, extract all matching elements for each rule. + + Returns: + Dict[str, str]: A dictionary mapping URLs to their extracted content. + + Raises: + ValueError: If extraction_rules are missing when required or if urls is invalid. + Exception: If fetching or extraction fails. + """ + url_rules_map: Dict[str, Dict[str, Any]] = {} + + if isinstance(urls, str): + if not extraction_rules: + raise ValueError("extraction_rules required when urls is a string") + url_rules_map[urls] = extraction_rules + elif isinstance(urls, list): + if not extraction_rules: + raise ValueError("extraction_rules required when urls is a list") + for url in urls: + url_rules_map[url] = extraction_rules + elif isinstance(urls, dict): + url_rules_map = urls + else: + raise ValueError(f"Invalid urls type: {type(urls)}") + + normalized_url_rules: Dict[str, List[ExtractionRule]] = {} + for url, rules in url_rules_map.items(): + normalized_rules = [] + for _, rule in rules.items(): + r = self._normalize_rule(rule) + if join_all_matches: + r.all = True + normalized_rules.append(r) + normalized_url_rules[url] = normalized_rules + + async def _task(url: str): + async with self._sem: + try: + allowed = await self._is_url_allowed(url) + if not allowed: + logger.warning(f"URL disallowed by robots.txt: {url}") + return url, "" + + if use_playwright: + html = await self._render_with_playwright( + url, js_wait=playwright_js_wait, timeout=self.timeout + ) + else: + html = await self._fetch_httpx(url) + + pieces = [] + for rule in normalized_url_rules[url]: + text = self._extract_with_bs4(html, rule) + if text: + pieces.append(text) + + concatenated = " ".join(pieces).strip() + return url, concatenated + + except Exception as e: + logger.error(f"Error processing {url}: {e}") + return url, "" + + tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()] + results = {} + + for coro in asyncio.as_completed(tasks): + url, text = await coro + results[url] = text + + return results diff --git a/cognee/tasks/web_scraper/config.py b/cognee/tasks/web_scraper/config.py new file mode 100644 index 000000000..2ee43ed32 --- /dev/null +++ b/cognee/tasks/web_scraper/config.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel, Field +from typing import Any, Dict, Optional, Literal +import os + + +class TavilyConfig(BaseModel): + api_key: Optional[str] = os.getenv("TAVILY_API_KEY") + extract_depth: Literal["basic", "advanced"] = "basic" + proxies: Optional[Dict[str, str]] = None + timeout: Optional[int] = Field(default=10, ge=1, le=60) + + +class SoupCrawlerConfig(BaseModel): + concurrency: int = 5 + crawl_delay: float = 0.5 + timeout: float = 15.0 + max_retries: int = 2 + retry_delay_factor: float = 0.5 + headers: Optional[Dict[str, str]] = None + extraction_rules: Dict[str, Any] + use_playwright: bool = False + playwright_js_wait: float = 0.8 + robots_cache_ttl: float = 3600.0 + join_all_matches: bool = False diff --git a/cognee/tasks/web_scraper/models.py b/cognee/tasks/web_scraper/models.py new file mode 100644 index 000000000..aed2db0ed --- /dev/null +++ b/cognee/tasks/web_scraper/models.py @@ -0,0 +1,46 @@ +from cognee.infrastructure.engine import DataPoint +from typing import Optional, Dict, Any, List +from datetime import datetime + + +class WebPage(DataPoint): + """Represents a scraped web page with metadata""" + + name: Optional[str] + content: str + content_hash: str + scraped_at: datetime + last_modified: Optional[datetime] + status_code: int + content_type: str + page_size: int + extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used + description: str + metadata: dict = {"index_fields": ["name", "description", "content"]} + + +class WebSite(DataPoint): + """Represents a website or domain being scraped""" + + name: str + base_url: str + robots_txt: Optional[str] + crawl_delay: float + last_crawled: datetime + page_count: int + scraping_config: Dict[str, Any] + description: str + metadata: dict = {"index_fields": ["name", "description"]} + + +class ScrapingJob(DataPoint): + """Represents a scraping job configuration""" + + name: str + urls: List[str] + schedule: Optional[str] # Cron-like schedule for recurring scrapes + status: str # "active", "paused", "completed", "failed" + last_run: Optional[datetime] + next_run: Optional[datetime] + description: str + metadata: dict = {"index_fields": ["name", "description"]} diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py new file mode 100644 index 000000000..6d094f423 --- /dev/null +++ b/cognee/tasks/web_scraper/utils.py @@ -0,0 +1,126 @@ +"""Utilities for fetching web content using BeautifulSoup or Tavily. + +This module provides functions to fetch and extract content from web pages, supporting +both BeautifulSoup for custom extraction rules and Tavily for API-based scraping. +""" + +from typing import Dict, List, Union, Optional, Literal +from cognee.shared.logging_utils import get_logger +from .bs4_crawler import BeautifulSoupCrawler +from .config import TavilyConfig, SoupCrawlerConfig + +logger = get_logger(__name__) + + +async def fetch_page_content( + urls: Union[str, List[str]], + *, + preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", + tavily_config: Optional[TavilyConfig] = None, + soup_crawler_config: Optional[SoupCrawlerConfig] = None, +) -> Dict[str, str]: + """Fetch content from one or more URLs using the specified tool. + + This function retrieves web page content using either BeautifulSoup (with custom + extraction rules) or Tavily (API-based scraping). It handles single URLs or lists of + URLs and returns a dictionary mapping URLs to their extracted content. + + Args: + urls: A single URL (str) or a list of URLs (List[str]) to scrape. + preferred_tool: The scraping tool to use ("tavily" or "beautifulsoup"). + Defaults to "beautifulsoup". + tavily_config: Configuration for Tavily API, including API key. + Required if preferred_tool is "tavily". + soup_crawler_config: Configuration for BeautifulSoup crawler, including + extraction rules. Required if preferred_tool is "beautifulsoup" and + extraction_rules are needed. + + Returns: + Dict[str, str]: A dictionary mapping each URL to its + extracted content (as a string for BeautifulSoup or a dict for Tavily). + + Raises: + ValueError: If Tavily API key is missing when using Tavily, or if + extraction_rules are not provided when using BeautifulSoup. + ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not + installed. + """ + if preferred_tool == "tavily": + if not tavily_config or tavily_config.api_key is None: + raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily") + return await fetch_with_tavily(urls, tavily_config) + + if preferred_tool == "beautifulsoup": + try: + from bs4 import BeautifulSoup as _ # noqa: F401 + except ImportError: + logger.error( + "Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1" + ) + raise ImportError + if not soup_crawler_config or soup_crawler_config.extraction_rules is None: + raise ValueError("extraction_rules must be provided when not using Tavily") + extraction_rules = soup_crawler_config.extraction_rules + crawler = BeautifulSoupCrawler( + concurrency=soup_crawler_config.concurrency, + crawl_delay=soup_crawler_config.crawl_delay, + timeout=soup_crawler_config.timeout, + max_retries=soup_crawler_config.max_retries, + retry_delay_factor=soup_crawler_config.retry_delay_factor, + headers=soup_crawler_config.headers, + robots_cache_ttl=soup_crawler_config.robots_cache_ttl, + ) + try: + results = await crawler.fetch_with_bs4( + urls, + extraction_rules, + use_playwright=soup_crawler_config.use_playwright, + playwright_js_wait=soup_crawler_config.playwright_js_wait, + join_all_matches=soup_crawler_config.join_all_matches, + ) + return results + except Exception as e: + logger.error(f"Error fetching page content: {str(e)}") + raise + finally: + await crawler.close() + + +async def fetch_with_tavily( + urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None +) -> Dict[str, str]: + """Fetch content from URLs using the Tavily API. + + Args: + urls: A single URL (str) or a list of URLs (List[str]) to scrape. + + Returns: + Dict[str, str]: A dictionary mapping each URL to its raw content as a string. + + Raises: + ImportError: If tavily-python is not installed. + Exception: If the Tavily API request fails. + """ + try: + from tavily import AsyncTavilyClient + except ImportError: + logger.error( + "Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0" + ) + raise + client = AsyncTavilyClient( + api_key=tavily_config.api_key if tavily_config else None, + proxies=tavily_config.proxies if tavily_config else None, + ) + results = await client.extract( + urls, + format="text", + extract_depth=tavily_config.extract_depth if tavily_config else "basic", + timeout=tavily_config.timeout if tavily_config else 10, + ) + for failed_result in results.get("failed_results", []): + logger.warning(f"Failed to fetch {failed_result}") + return_results = {} + for result in results.get("results", []): + return_results[result["url"]] = result["raw_content"] + return return_results diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py new file mode 100644 index 000000000..52154c6ef --- /dev/null +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -0,0 +1,396 @@ +"""Web scraping tasks for storing scraped data in a graph database. + +This module provides functions to scrape web content, create or update WebPage, WebSite, +and ScrapingJob data points, and store them in a Kuzu graph database. It supports +scheduled scraping tasks and ensures that node updates preserve existing graph edges. +""" + +import os +import hashlib +from datetime import datetime +from typing import Union, List +from urllib.parse import urlparse +from uuid import uuid5, NAMESPACE_OID + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.shared.logging_utils import get_logger +from cognee.tasks.storage.index_data_points import index_data_points +from cognee.tasks.storage.index_graph_edges import index_graph_edges +from cognee.modules.engine.operations.setup import setup + +from .models import WebPage, WebSite, ScrapingJob +from .config import SoupCrawlerConfig, TavilyConfig +from .utils import fetch_page_content + +try: + from apscheduler.triggers.cron import CronTrigger + from apscheduler.schedulers.asyncio import AsyncIOScheduler +except ImportError: + raise ImportError("Please install apscheduler by pip install APScheduler>=3.10") + +logger = get_logger(__name__) + + +_scheduler = None + + +def get_scheduler(): + global _scheduler + if _scheduler is None: + _scheduler = AsyncIOScheduler() + return _scheduler + + +async def cron_web_scraper_task( + url: Union[str, List[str]], + *, + schedule: str = None, + extraction_rules: dict = None, + tavily_api_key: str = os.getenv("TAVILY_API_KEY"), + soup_crawler_config: SoupCrawlerConfig = None, + tavily_config: TavilyConfig = None, + job_name: str = "scraping", +): + """Schedule or run a web scraping task. + + This function schedules a recurring web scraping task using APScheduler or runs it + immediately if no schedule is provided. It delegates to web_scraper_task for actual + scraping and graph storage. + + Args: + url: A single URL or list of URLs to scrape. + schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs immediately. + extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors). + tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable. + soup_crawler_config: Configuration for BeautifulSoup crawler. + tavily_config: Configuration for Tavily API. + job_name: Name of the scraping job. Defaults to "scraping". + + Returns: + Any: The result of web_scraper_task if run immediately, or None if scheduled. + + Raises: + ValueError: If the schedule is an invalid cron expression. + ImportError: If APScheduler is not installed. + """ + now = datetime.now() + job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}" + if schedule: + try: + trigger = CronTrigger.from_crontab(schedule) + except ValueError as e: + raise ValueError(f"Invalid cron string '{schedule}': {e}") + + scheduler = get_scheduler() + scheduler.add_job( + web_scraper_task, + kwargs={ + "url": url, + "schedule": schedule, + "extraction_rules": extraction_rules, + "tavily_api_key": tavily_api_key, + "soup_crawler_config": soup_crawler_config, + "tavily_config": tavily_config, + "job_name": job_name, + }, + trigger=trigger, + id=job_name, + name=job_name or f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}", + replace_existing=True, + ) + if not scheduler.running: + scheduler.start() + return + + # If no schedule, run immediately + logger.info(f"[{datetime.now()}] Running web scraper task immediately...") + return await web_scraper_task( + url=url, + schedule=schedule, + extraction_rules=extraction_rules, + tavily_api_key=tavily_api_key, + soup_crawler_config=soup_crawler_config, + tavily_config=tavily_config, + job_name=job_name, + ) + + +async def web_scraper_task( + url: Union[str, List[str]], + *, + schedule: str = None, + extraction_rules: dict = None, + tavily_api_key: str = os.getenv("TAVILY_API_KEY"), + soup_crawler_config: SoupCrawlerConfig = None, + tavily_config: TavilyConfig = None, + job_name: str = None, +): + """Scrape URLs and store data points in a Graph database. + + This function scrapes content from the provided URLs, creates or updates WebPage, + WebSite, and ScrapingJob data points, and stores them in a Graph database. + Each data point includes a description field summarizing its attributes. It creates + 'is_scraping' (ScrapingJob to WebSite) and 'is_part_of' (WebPage to WebSite) + relationships, preserving existing edges during node updates. + + Args: + url: A single URL or list of URLs to scrape. + schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs once. + extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors). + tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable. + soup_crawler_config: Configuration for BeautifulSoup crawler. + tavily_config: Configuration for Tavily API. + job_name: Name of the scraping job. Defaults to a timestamp-based name. + + Returns: + Any: The graph data returned by the graph database. + + Raises: + TypeError: If neither tavily_config nor soup_crawler_config is provided. + Exception: If fetching content or database operations fail. + """ + await setup() + graph_db = await get_graph_engine() + + if isinstance(url, str): + url = [url] + + soup_crawler_config, tavily_config, preferred_tool = check_arguments( + tavily_api_key, extraction_rules, tavily_config, soup_crawler_config + ) + now = datetime.now() + job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}" + status = "active" + trigger = CronTrigger.from_crontab(schedule) if schedule else None + next_run = trigger.get_next_fire_time(None, now) if trigger else None + scraping_job_created = await graph_db.get_node(uuid5(NAMESPACE_OID, name=job_name)) + + # Create description for ScrapingJob + scraping_job_description = ( + f"Scraping job: {job_name}\n" + f"URLs: {', '.join(url)}\n" + f"Status: {status}\n" + f"Schedule: {schedule}\n" + f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}" + ) + + scraping_job = ScrapingJob( + id=uuid5(NAMESPACE_OID, name=job_name), + name=job_name, + urls=url, + status=status, + schedule=schedule, + last_run=now, + next_run=next_run, + description=scraping_job_description, + ) + + if scraping_job_created: + await graph_db.add_node(scraping_job) # Update existing scraping job + websites_dict = {} + webpages = [] + + # Fetch content + results = await fetch_page_content( + urls=url, + preferred_tool=preferred_tool, + tavily_config=tavily_config, + soup_crawler_config=soup_crawler_config, + ) + for page_url, content in results.items(): + parsed_url = urlparse(page_url) + domain = parsed_url.netloc + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + + # Create or update WebSite + if base_url not in websites_dict: + # Create description for WebSite + website_description = ( + f"Website: {domain}\n" + f"Base URL: {base_url}\n" + f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Page count: 1\n" + f"Scraping tool: {preferred_tool}\n" + f"Robots.txt: {'Available' if websites_dict.get(base_url, {}).get('robots_txt') else 'Not set'}\n" + f"Crawl delay: 0.5 seconds" + ) + + websites_dict[base_url] = WebSite( + id=uuid5(NAMESPACE_OID, name=domain), + name=domain, + base_url=base_url, + robots_txt=None, + crawl_delay=0.5, + last_crawled=now, + page_count=1, + scraping_config={ + "extraction_rules": extraction_rules or {}, + "tool": preferred_tool, + }, + description=website_description, + ) + if scraping_job_created: + await graph_db.add_node(websites_dict[base_url]) + else: + websites_dict[base_url].page_count += 1 + # Update description for existing WebSite + websites_dict[base_url].description = ( + f"Website: {domain}\n" + f"Base URL: {base_url}\n" + f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Page count: {websites_dict[base_url].page_count}\n" + f"Scraping tool: {preferred_tool}\n" + f"Robots.txt: {'Available' if websites_dict[base_url].robots_txt else 'Not set'}\n" + f"Crawl delay: {websites_dict[base_url].crawl_delay} seconds" + ) + if scraping_job_created: + await graph_db.add_node(websites_dict[base_url]) + + # Create WebPage + content_str = content if isinstance(content, str) else str(content) + content_hash = hashlib.sha256(content_str.encode("utf-8")).hexdigest() + content_preview = content_str[:500] + ("..." if len(content_str) > 500 else "") + # Create description for WebPage + webpage_description = ( + f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n" + f"URL: {page_url}\n" + f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Content: {content_preview}\n" + f"Content type: text\n" + f"Page size: {len(content_str)} bytes\n" + f"Status code: 200" + ) + page_extraction_rules = extraction_rules + webpage = WebPage( + id=uuid5(NAMESPACE_OID, name=page_url), + name=page_url, + content=content_str, + content_hash=content_hash, + scraped_at=now, + last_modified=None, + status_code=200, + content_type="text/html", + page_size=len(content_str), + extraction_rules=page_extraction_rules or {}, + description=webpage_description, + ) + webpages.append(webpage) + + scraping_job.status = "completed" if webpages else "failed" + # Update ScrapingJob description with final status + scraping_job.description = ( + f"Scraping job: {job_name}\n" + f"URLs: {', '.join(url)}\n" + f"Status: {scraping_job.status}\n" + f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}" + ) + + websites = list(websites_dict.values()) + # Adding Nodes and Edges + node_mapping = {scraping_job.id: scraping_job} + edge_mapping = [] + + for website in websites: + node_mapping[website.id] = website + edge_mapping.append( + ( + scraping_job.id, + website.id, + "is_scraping", + { + "source_node_id": scraping_job.id, + "target_node_id": website.id, + "relationship_name": "is_scraping", + }, + ) + ) + for webpage in webpages: + node_mapping[webpage.id] = webpage + parsed_url = urlparse(webpage.name) + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + edge_mapping.append( + ( + webpage.id, + websites_dict[base_url].id, + "is_part_of", + { + "source_node_id": webpage.id, + "target_node_id": websites_dict[base_url].id, + "relationship_name": "is_part_of", + }, + ) + ) + + await graph_db.add_nodes(list(node_mapping.values())) + await graph_db.add_edges(edge_mapping) + await index_data_points(list(node_mapping.values())) + await index_graph_edges() + + return await graph_db.get_graph_data() + + +def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config): + """Validate and configure arguments for web_scraper_task. + + Args: + tavily_api_key: API key for Tavily. + extraction_rules: Extraction rules for BeautifulSoup. + tavily_config: Configuration for Tavily API. + soup_crawler_config: Configuration for BeautifulSoup crawler. + + Returns: + Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, + tavily_config, and preferred_tool ("tavily" or "beautifulsoup"). + + Raises: + TypeError: If neither tavily_config nor soup_crawler_config is provided. + """ + preferred_tool = "beautifulsoup" + + if extraction_rules and not soup_crawler_config: + soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) + + if tavily_api_key: + if not tavily_config: + tavily_config = TavilyConfig(api_key=tavily_api_key) + else: + tavily_config.api_key = tavily_api_key + if not extraction_rules and not soup_crawler_config: + preferred_tool = "tavily" + + if not tavily_config and not soup_crawler_config: + raise TypeError("Make sure you pass arguments for web_scraper_task") + + return soup_crawler_config, tavily_config, preferred_tool + + +def get_path_after_base(base_url: str, url: str) -> str: + """Extract the path after the base URL. + + Args: + base_url: The base URL (e.g., "https://example.com"). + url: The full URL to extract the path from. + + Returns: + str: The path after the base URL, with leading slashes removed. + + Raises: + ValueError: If the base URL and target URL are from different domains. + """ + parsed_base = urlparse(base_url) + parsed_url = urlparse(url) + + # Ensure they have the same netloc (domain) + if parsed_base.netloc != parsed_url.netloc: + raise ValueError("Base URL and target URL are from different domains") + + # Return everything after base_url path + base_path = parsed_base.path.rstrip("/") + full_path = parsed_url.path + + if full_path.startswith(base_path): + return full_path[len(base_path) :].lstrip("/") + else: + return full_path.lstrip("/") diff --git a/cognee/tests/tasks/web_scraping/web_scraping_test.py b/cognee/tests/tasks/web_scraping/web_scraping_test.py new file mode 100644 index 000000000..bf66b5155 --- /dev/null +++ b/cognee/tests/tasks/web_scraping/web_scraping_test.py @@ -0,0 +1,172 @@ +import asyncio +import cognee +from cognee.tasks.web_scraper.config import SoupCrawlerConfig +from cognee.tasks.web_scraper import cron_web_scraper_task + + +async def test_web_scraping_using_bs4(): + await cognee.prune.prune_data() + await cognee.prune.prune_system() + + url = "https://quotes.toscrape.com/" + rules = { + "quotes": {"selector": ".quote span.text", "all": True}, + "authors": {"selector": ".quote small", "all": True}, + } + + soup_config = SoupCrawlerConfig( + concurrency=5, + crawl_delay=0.5, + timeout=15.0, + max_retries=2, + retry_delay_factor=0.5, + extraction_rules=rules, + use_playwright=False, + ) + + await cognee.add( + data=url, + soup_crawler_config=soup_config, + incremental_loading=False, + ) + + await cognee.cognify() + + results = await cognee.search( + "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + assert "Albert Einstein" in results[0] + print("Test passed! Found Albert Einstein in scraped data.") + + +async def test_web_scraping_using_bs4_and_incremental_loading(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + url = "https://books.toscrape.com/" + rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"} + + soup_config = SoupCrawlerConfig( + concurrency=1, + crawl_delay=0.1, + timeout=10.0, + max_retries=1, + retry_delay_factor=0.5, + extraction_rules=rules, + use_playwright=False, + structured=True, + ) + + await cognee.add( + data=url, + soup_crawler_config=soup_config, + incremental_loading=True, + ) + + await cognee.cognify() + + results = await cognee.search( + "What is the price of 'A Light in the Attic' book?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + assert "51.77" in results[0] + print("Test passed! Found 'A Light in the Attic' in scraped data.") + + +async def test_web_scraping_using_tavily(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + url = "https://quotes.toscrape.com/" + + await cognee.add( + data=url, + incremental_loading=False, + ) + + await cognee.cognify() + + results = await cognee.search( + "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + assert "Albert Einstein" in results[0] + print("Test passed! Found Albert Einstein in scraped data.") + + +async def test_web_scraping_using_tavily_and_incremental_loading(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + url = "https://quotes.toscrape.com/" + + await cognee.add( + data=url, + incremental_loading=True, + ) + + await cognee.cognify() + + results = await cognee.search( + "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + assert "Albert Einstein" in results[0] + print("Test passed! Found Albert Einstein in scraped data.") + + +# ---------- cron job tests ---------- +async def test_cron_web_scraper(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + urls = ["https://quotes.toscrape.com/", "https://books.toscrape.com/"] + extraction_rules = { + "quotes": ".quote .text", + "authors": ".quote .author", + "titles": "article.product_pod h3 a", + "prices": "article.product_pod p.price_color", + } + + # Run cron_web_scraper_task + await cron_web_scraper_task( + url=urls, + job_name="cron_scraping_job", + extraction_rules=extraction_rules, + ) + results = await cognee.search( + "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + + assert "Albert Einstein" in results[0] + + results_books = await cognee.search( + "What is the price of 'A Light in the Attic' book?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + + assert "51.77" in results_books[0] + + print("Cron job web_scraping test passed!") + + +async def main(): + print("Starting BS4 incremental loading test...") + await test_web_scraping_using_bs4_and_incremental_loading() + + print("Starting BS4 normal test...") + await test_web_scraping_using_bs4() + + print("Starting Tavily incremental loading test...") + await test_web_scraping_using_tavily_and_incremental_loading() + + print("Starting Tavily normal test...") + await test_web_scraping_using_tavily() + + print("Starting cron job test...") + await test_cron_web_scraper() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/notebooks/data/graphrag b/notebooks/data/graphrag deleted file mode 160000 index 130b84db9..000000000 --- a/notebooks/data/graphrag +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 130b84db9270734756d16918e5c86034777140fc diff --git a/pyproject.toml b/pyproject.toml index 3df57e1f5..4baf40e2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,14 @@ api=[] distributed = [ "modal>=1.0.5,<2.0.0", ] - +scraping = [ + "tavily-python>=0.7.0", + "beautifulsoup4>=4.13.1", + "playwright>=1.9.0", + "lxml>=4.9.3,<5.0.0", + "protego>=0.1", + "APScheduler>=3.10.0,<=3.11.0" +] neo4j = ["neo4j>=5.28.0,<6"] neptune = ["langchain_aws>=0.2.22"] postgres = [ diff --git a/uv.lock b/uv.lock index 570da9289..e280ca0dc 100644 --- a/uv.lock +++ b/uv.lock @@ -8966,4 +8966,4 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/e0/69a553d2047f9a2c7347caa225bb3a63b6d7704ad74610cb7823baa08ed7/zstandard-0.25.0-cp313-cp313-win32.whl", hash = "sha256:7030defa83eef3e51ff26f0b7bfb229f0204b66fe18e04359ce3474ac33cbc09", size = 436936, upload-time = "2025-09-14T22:17:52.658Z" }, { url = "https://files.pythonhosted.org/packages/d9/82/b9c06c870f3bd8767c201f1edbdf9e8dc34be5b0fbc5682c4f80fe948475/zstandard-0.25.0-cp313-cp313-win_amd64.whl", hash = "sha256:1f830a0dac88719af0ae43b8b2d6aef487d437036468ef3c2ea59c51f9d55fd5", size = 506232, upload-time = "2025-09-14T22:17:50.402Z" }, { url = "https://files.pythonhosted.org/packages/d4/57/60c3c01243bb81d381c9916e2a6d9e149ab8627c0c7d7abb2d73384b3c0c/zstandard-0.25.0-cp313-cp313-win_arm64.whl", hash = "sha256:85304a43f4d513f5464ceb938aa02c1e78c2943b29f44a750b48b25ac999a049", size = 462671, upload-time = "2025-09-14T22:17:51.533Z" }, -] +] \ No newline at end of file