diff --git a/.github/workflows/basic_tests.yml b/.github/workflows/basic_tests.yml index 3f3e644a2..f89d031a6 100644 --- a/.github/workflows/basic_tests.yml +++ b/.github/workflows/basic_tests.yml @@ -123,6 +123,7 @@ jobs: uses: ./.github/actions/cognee_setup with: python-version: ${{ inputs.python-version }} + extra-dependencies: "scraping" - name: Run Integration Tests run: uv run pytest cognee/tests/integration/ diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 0f14683f9..73a3081be 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,8 +1,5 @@ from uuid import UUID -import os -from typing import Union, BinaryIO, List, Optional, Dict, Any -from pydantic import BaseModel -from urllib.parse import urlparse +from typing import Union, BinaryIO, List, Optional, Any from cognee.modules.users.models import User from cognee.modules.pipelines import Task, run_pipeline from cognee.modules.pipelines.layers.resolve_authorized_user_dataset import ( @@ -17,16 +14,6 @@ from cognee.shared.logging_utils import get_logger logger = get_logger() -try: - from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig - from cognee.context_global_variables import ( - tavily_config as tavily, - soup_crawler_config as soup_crawler, - ) -except ImportError: - logger.debug(f"Unable to import {str(ImportError)}") - pass - async def add( data: Union[BinaryIO, list[BinaryIO], str, list[str]], @@ -36,11 +23,8 @@ async def add( vector_db_config: dict = None, graph_db_config: dict = None, dataset_id: Optional[UUID] = None, - preferred_loaders: List[str] = None, + preferred_loaders: dict[str, dict[str, Any]] = None, incremental_loading: bool = True, - extraction_rules: Optional[Dict[str, Any]] = None, - tavily_config: Optional[BaseModel] = None, - soup_crawler_config: Optional[BaseModel] = None, data_per_batch: Optional[int] = 20, ): """ @@ -180,29 +164,6 @@ async def add( - TAVILY_API_KEY: YOUR_TAVILY_API_KEY """ - - try: - if not soup_crawler_config and extraction_rules: - soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) - if not tavily_config and os.getenv("TAVILY_API_KEY"): - tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY")) - - soup_crawler.set(soup_crawler_config) - tavily.set(tavily_config) - - http_schemes = {"http", "https"} - - def _is_http_url(item: Union[str, BinaryIO]) -> bool: - return isinstance(item, str) and urlparse(item).scheme in http_schemes - - if _is_http_url(data): - node_set = ["web_content"] if not node_set else node_set + ["web_content"] - elif isinstance(data, list) and any(_is_http_url(item) for item in data): - node_set = ["web_content"] if not node_set else node_set + ["web_content"] - except NameError: - logger.debug(f"Unable to import {str(ImportError)}") - pass - tasks = [ Task(resolve_data_directories, include_subdirectories=True), Task( diff --git a/cognee/api/v1/update/update.py b/cognee/api/v1/update/update.py index a421b3dc0..83b92c50f 100644 --- a/cognee/api/v1/update/update.py +++ b/cognee/api/v1/update/update.py @@ -1,5 +1,5 @@ from uuid import UUID -from typing import Union, BinaryIO, List, Optional +from typing import Union, BinaryIO, List, Optional, Any from cognee.modules.users.models import User from cognee.api.v1.delete import delete @@ -15,7 +15,7 @@ async def update( node_set: Optional[List[str]] = None, vector_db_config: dict = None, graph_db_config: dict = None, - preferred_loaders: List[str] = None, + preferred_loaders: dict[str, dict[str, Any]] = None, incremental_loading: bool = True, ): """ diff --git a/cognee/context_global_variables.py b/cognee/context_global_variables.py index 2ecf9b8d3..d52de4b4e 100644 --- a/cognee/context_global_variables.py +++ b/cognee/context_global_variables.py @@ -13,8 +13,6 @@ from cognee.modules.users.methods import get_user vector_db_config = ContextVar("vector_db_config", default=None) graph_db_config = ContextVar("graph_db_config", default=None) session_user = ContextVar("session_user", default=None) -soup_crawler_config = ContextVar("soup_crawler_config", default=None) -tavily_config = ContextVar("tavily_config", default=None) async def set_session_user_context_variable(user): diff --git a/cognee/infrastructure/loaders/LoaderEngine.py b/cognee/infrastructure/loaders/LoaderEngine.py index 6b62f7641..725f37b14 100644 --- a/cognee/infrastructure/loaders/LoaderEngine.py +++ b/cognee/infrastructure/loaders/LoaderEngine.py @@ -64,7 +64,9 @@ class LoaderEngine: return True def get_loader( - self, file_path: str, preferred_loaders: List[str] = None + self, + file_path: str, + preferred_loaders: dict[str, dict[str, Any]], ) -> Optional[LoaderInterface]: """ Get appropriate loader for a file. @@ -76,14 +78,21 @@ class LoaderEngine: Returns: LoaderInterface that can handle the file, or None if not found """ + from pathlib import Path file_info = filetype.guess(file_path) + path_extension = Path(file_path).suffix.lstrip(".") + # Try preferred loaders first if preferred_loaders: for loader_name in preferred_loaders: if loader_name in self._loaders: loader = self._loaders[loader_name] + # Try with path extension first (for text formats like html) + if loader.can_handle(extension=path_extension, mime_type=file_info.mime): + return loader + # Fall back to content-detected extension if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime): return loader else: @@ -93,6 +102,10 @@ class LoaderEngine: for loader_name in self.default_loader_priority: if loader_name in self._loaders: loader = self._loaders[loader_name] + # Try with path extension first (for text formats like html) + if loader.can_handle(extension=path_extension, mime_type=file_info.mime): + return loader + # Fall back to content-detected extension if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime): return loader else: @@ -105,7 +118,7 @@ class LoaderEngine: async def load_file( self, file_path: str, - preferred_loaders: Optional[List[str]] = None, + preferred_loaders: dict[str, dict[str, Any]] = None, **kwargs, ): """ @@ -113,7 +126,7 @@ class LoaderEngine: Args: file_path: Path to the file to be processed - preferred_loaders: List of preferred loader names to try first + preferred_loaders: Dict of loader names to their configurations **kwargs: Additional loader-specific configuration Raises: @@ -125,8 +138,16 @@ class LoaderEngine: raise ValueError(f"No loader found for file: {file_path}") logger.debug(f"Loading {file_path} with {loader.loader_name}") - # TODO: loading needs to be reworked to work with both file streams and file locations - return await loader.load(file_path, **kwargs) + + # Extract loader-specific config from preferred_loaders + loader_config = {} + if preferred_loaders and loader.loader_name in preferred_loaders: + loader_config = preferred_loaders[loader.loader_name] + + # Merge with any additional kwargs (kwargs take precedence) + merged_kwargs = {**loader_config, **kwargs} + + return await loader.load(file_path, **merged_kwargs) def get_available_loaders(self) -> List[str]: """ diff --git a/cognee/infrastructure/loaders/external/__init__.py b/cognee/infrastructure/loaders/external/__init__.py index 6bf9f9200..785338c09 100644 --- a/cognee/infrastructure/loaders/external/__init__.py +++ b/cognee/infrastructure/loaders/external/__init__.py @@ -27,3 +27,10 @@ try: __all__.append("AdvancedPdfLoader") except ImportError: pass + +try: + from .beautiful_soup_loader import BeautifulSoupLoader + + __all__.append("BeautifulSoupLoader") +except ImportError: + pass diff --git a/cognee/infrastructure/loaders/external/beautiful_soup_loader.py b/cognee/infrastructure/loaders/external/beautiful_soup_loader.py new file mode 100644 index 000000000..04954a228 --- /dev/null +++ b/cognee/infrastructure/loaders/external/beautiful_soup_loader.py @@ -0,0 +1,224 @@ +"""BeautifulSoup-based web crawler for extracting content from web pages. + +This module provides the BeautifulSoupCrawler class for fetching and extracting content +from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It +supports robots.txt handling, rate limiting, and custom extraction rules. +""" + +from typing import Union, Dict, Any, Optional, List +from dataclasses import dataclass +from bs4 import BeautifulSoup +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.shared.logging_utils import get_logger + +logger = get_logger(__name__) + + +@dataclass +class ExtractionRule: + """Normalized extraction rule for web content. + + Attributes: + selector: CSS selector for extraction (if any). + xpath: XPath expression for extraction (if any). + attr: HTML attribute to extract (if any). + all: If True, extract all matching elements; otherwise, extract first. + join_with: String to join multiple extracted elements. + """ + + selector: Optional[str] = None + xpath: Optional[str] = None + attr: Optional[str] = None + all: bool = False + join_with: str = " " + + +class BeautifulSoupLoader(LoaderInterface): + """Crawler for fetching and extracting web content using BeautifulSoup. + + Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt + compliance, and rate limiting. Extracts content using CSS selectors or XPath rules. + + Attributes: + concurrency: Number of concurrent requests allowed. + crawl_delay: Minimum seconds between requests to the same domain. + max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit). + timeout: Per-request timeout in seconds. + max_retries: Number of retries for failed requests. + retry_delay_factor: Multiplier for exponential backoff on retries. + headers: HTTP headers for requests (e.g., User-Agent). + robots_cache_ttl: Time-to-live for robots.txt cache in seconds. + """ + + @property + def supported_extensions(self) -> List[str]: + return ["html"] + + @property + def supported_mime_types(self) -> List[str]: + return ["text/html", "text/plain"] + + @property + def loader_name(self) -> str: + return "beautiful_soup_loader" + + def can_handle(self, extension: str, mime_type: str) -> bool: + can = extension in self.supported_extensions and mime_type in self.supported_mime_types + return can + + async def load( + self, + file_path: str, + extraction_rules: dict[str, Any] = None, + join_all_matches: bool = False, + **kwargs, + ): + """Load an HTML file, extract content, and save to storage. + + Args: + file_path: Path to the HTML file + extraction_rules: Dict of CSS selector rules for content extraction + join_all_matches: If True, extract all matching elements for each rule + **kwargs: Additional arguments + + Returns: + Path to the stored extracted text file + """ + if extraction_rules is None: + raise ValueError("extraction_rules required for BeautifulSoupLoader") + + logger.info(f"Processing HTML file: {file_path}") + + from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata + from cognee.infrastructure.files.storage import get_file_storage, get_storage_config + + with open(file_path, "rb") as f: + file_metadata = await get_file_metadata(f) + f.seek(0) + html = f.read() + + storage_file_name = "text_" + file_metadata["content_hash"] + ".txt" + + # Normalize extraction rules + normalized_rules: List[ExtractionRule] = [] + for _, rule in extraction_rules.items(): + r = self._normalize_rule(rule) + if join_all_matches: + r.all = True + normalized_rules.append(r) + + pieces = [] + for rule in normalized_rules: + text = self._extract_from_html(html, rule) + if text: + pieces.append(text) + + full_content = " ".join(pieces).strip() + + # Fallback: If no content extracted, check if the file is plain text (not HTML) + if not full_content: + from bs4 import BeautifulSoup + + soup = BeautifulSoup(html, "html.parser") + # If there are no HTML tags, treat as plain text + if not soup.find(): + logger.warning( + f"No HTML tags found in {file_path}. Treating as plain text. " + "This may happen when content is pre-extracted (e.g., via Tavily with text format)." + ) + full_content = html.decode("utf-8") if isinstance(html, bytes) else html + full_content = full_content.strip() + + if not full_content: + logger.warning(f"No content extracted from HTML file: {file_path}") + + # Store the extracted content + storage_config = get_storage_config() + data_root_directory = storage_config["data_root_directory"] + storage = get_file_storage(data_root_directory) + + full_file_path = await storage.store(storage_file_name, full_content) + + logger.info(f"Extracted {len(full_content)} characters from HTML") + return full_file_path + + def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule: + """Normalize an extraction rule to an ExtractionRule dataclass. + + Args: + rule: A string (CSS selector) or dict with extraction parameters. + + Returns: + ExtractionRule: Normalized extraction rule. + + Raises: + ValueError: If the rule is invalid. + """ + if isinstance(rule, str): + return ExtractionRule(selector=rule) + if isinstance(rule, dict): + return ExtractionRule( + selector=rule.get("selector"), + xpath=rule.get("xpath"), + attr=rule.get("attr"), + all=bool(rule.get("all", False)), + join_with=rule.get("join_with", " "), + ) + raise ValueError(f"Invalid extraction rule: {rule}") + + def _extract_from_html(self, html: str, rule: ExtractionRule) -> str: + """Extract content from HTML using BeautifulSoup or lxml XPath. + + Args: + html: The HTML content to extract from. + rule: The extraction rule to apply. + + Returns: + str: The extracted content. + + Raises: + RuntimeError: If XPath is used but lxml is not installed. + """ + soup = BeautifulSoup(html, "html.parser") + + if rule.xpath: + try: + from lxml import html as lxml_html + except ImportError: + raise RuntimeError( + "XPath requested but lxml is not available. Install lxml or use CSS selectors." + ) + doc = lxml_html.fromstring(html) + nodes = doc.xpath(rule.xpath) + texts = [] + for n in nodes: + if hasattr(n, "text_content"): + texts.append(n.text_content().strip()) + else: + texts.append(str(n).strip()) + return rule.join_with.join(t for t in texts if t) + + if not rule.selector: + return "" + + if rule.all: + nodes = soup.select(rule.selector) + pieces = [] + for el in nodes: + if rule.attr: + val = el.get(rule.attr) + if val: + pieces.append(val.strip()) + else: + text = el.get_text(strip=True) + if text: + pieces.append(text) + return rule.join_with.join(pieces).strip() + else: + el = soup.select_one(rule.selector) + if el is None: + return "" + if rule.attr: + val = el.get(rule.attr) + return (val or "").strip() + return el.get_text(strip=True) diff --git a/cognee/infrastructure/loaders/supported_loaders.py b/cognee/infrastructure/loaders/supported_loaders.py index d103babe3..156253b53 100644 --- a/cognee/infrastructure/loaders/supported_loaders.py +++ b/cognee/infrastructure/loaders/supported_loaders.py @@ -23,3 +23,10 @@ try: supported_loaders[AdvancedPdfLoader.loader_name] = AdvancedPdfLoader except ImportError: pass + +try: + from cognee.infrastructure.loaders.external import BeautifulSoupLoader + + supported_loaders[BeautifulSoupLoader.loader_name] = BeautifulSoupLoader +except ImportError: + pass diff --git a/cognee/modules/ingestion/save_data_to_file.py b/cognee/modules/ingestion/save_data_to_file.py index 0ba0b2983..42e8d45ba 100644 --- a/cognee/modules/ingestion/save_data_to_file.py +++ b/cognee/modules/ingestion/save_data_to_file.py @@ -1,10 +1,12 @@ -from typing import BinaryIO, Union +from typing import BinaryIO, Union, Optional from cognee.infrastructure.files.storage import get_file_storage, get_storage_config from .classify import classify import hashlib -async def save_data_to_file(data: Union[str, BinaryIO], filename: str = None): +async def save_data_to_file( + data: Union[str, BinaryIO], filename: str = None, file_extension: Optional[str] = None +): storage_config = get_storage_config() data_root_directory = storage_config["data_root_directory"] @@ -21,6 +23,11 @@ async def save_data_to_file(data: Union[str, BinaryIO], filename: str = None): file_name = file_metadata["name"] + if file_extension is not None: + extension = file_extension.lstrip(".") + file_name_without_ext = file_name.rsplit(".", 1)[0] + file_name = f"{file_name_without_ext}.{extension}" + storage = get_file_storage(data_root_directory) full_file_path = await storage.store(file_name, data) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e15e9e505..eb0ebe8bd 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -20,6 +20,7 @@ from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( from cognee.modules.pipelines.layers.check_pipeline_run_qualification import ( check_pipeline_run_qualification, ) +from typing import Any logger = get_logger("cognee.pipeline") @@ -80,7 +81,14 @@ async def run_pipeline_per_dataset( return pipeline_run = run_tasks( - tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch + tasks, + dataset.id, + data, + user, + pipeline_name, + context, + incremental_loading, + data_per_batch, ) async for pipeline_run_info in pipeline_run: diff --git a/cognee/tasks/ingestion/data_item_to_text_file.py b/cognee/tasks/ingestion/data_item_to_text_file.py index 9fcafca57..0303f6c92 100644 --- a/cognee/tasks/ingestion/data_item_to_text_file.py +++ b/cognee/tasks/ingestion/data_item_to_text_file.py @@ -1,6 +1,6 @@ import os from urllib.parse import urlparse -from typing import List, Tuple +from typing import Any, List, Tuple from pathlib import Path import tempfile @@ -34,7 +34,8 @@ async def pull_from_s3(file_path, destination_file) -> None: async def data_item_to_text_file( - data_item_path: str, preferred_loaders: List[str] + data_item_path: str, + preferred_loaders: dict[str, dict[str, Any]] = None, ) -> Tuple[str, LoaderInterface]: if isinstance(data_item_path, str): parsed_url = urlparse(data_item_path) @@ -74,6 +75,5 @@ async def data_item_to_text_file( ) else: raise IngestionError(message="Local files are not accepted.") - # data is not a supported type raise IngestionError(message=f"Data type not supported: {type(data_item_path)}") diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index 3c20a2b13..0572d0f1e 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -6,6 +6,7 @@ from typing import Union, BinaryIO, Any, List, Optional import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Data +from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets @@ -27,7 +28,7 @@ async def ingest_data( user: User, node_set: Optional[List[str]] = None, dataset_id: UUID = None, - preferred_loaders: List[str] = None, + preferred_loaders: dict[str, dict[str, Any]] = None, ): if not user: user = await get_default_user() @@ -44,7 +45,7 @@ async def ingest_data( user: User, node_set: Optional[List[str]] = None, dataset_id: UUID = None, - preferred_loaders: List[str] = None, + preferred_loaders: dict[str, dict[str, Any]] = None, ): new_datapoints = [] existing_data_points = [] @@ -77,22 +78,27 @@ async def ingest_data( dataset_data_map = {str(data.id): True for data in dataset_data} for data_item in data: - # Get file path of data item or create a file it doesn't exist + # Get file path of data item or create a file if it doesn't exist original_file_path = await save_data_item_to_storage(data_item) - # Transform file path to be OS usable actual_file_path = get_data_file_path(original_file_path) # Store all input data as text files in Cognee data storage cognee_storage_file_path, loader_engine = await data_item_to_text_file( - actual_file_path, preferred_loaders + actual_file_path, + preferred_loaders, ) + if loader_engine is None: + raise IngestionError("Loader cannot be None") + # Find metadata from original file + # Standard flow: extract metadata from both original and stored files async with open_data_file(original_file_path) as file: classified_data = ingestion.classify(file) # data_id is the hash of original file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) original_file_metadata = classified_data.get_metadata() diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index b6e1f7d00..05d21e617 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -8,6 +8,8 @@ from cognee.modules.ingestion import save_data_to_file from cognee.shared.logging_utils import get_logger from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.tasks.web_scraper.utils import fetch_page_content + logger = get_logger() @@ -18,13 +20,6 @@ class SaveDataSettings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="allow") -class HTMLContent(str): - def __new__(cls, value: str): - if not ("<" in value and ">" in value): - raise ValueError("Not valid HTML-like content") - return super().__new__(cls, value) - - settings = SaveDataSettings() @@ -63,40 +58,8 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str if parsed_url.scheme == "s3": return data_item elif parsed_url.scheme == "http" or parsed_url.scheme == "https": - # Validate URL by sending a HEAD request - try: - from cognee.context_global_variables import tavily_config, soup_crawler_config - from cognee.tasks.web_scraper import fetch_page_content - - tavily = tavily_config.get() - soup_crawler = soup_crawler_config.get() - preferred_tool = "beautifulsoup" if soup_crawler else "tavily" - if preferred_tool == "tavily" and tavily is None: - raise IngestionError( - message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig." - ) - if preferred_tool == "beautifulsoup" and soup_crawler is None: - raise IngestionError( - message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper." - ) - - data = await fetch_page_content( - data_item, - preferred_tool=preferred_tool, - tavily_config=tavily, - soup_crawler_config=soup_crawler, - ) - content = "" - for key, value in data.items(): - content += f"{key}:\n{value}\n\n" - return await save_data_to_file(content) - except IngestionError: - raise - except Exception as e: - raise IngestionError( - message=f"Error ingesting webpage results of url {data_item}: {str(e)}" - ) - + urls_to_page_contents = await fetch_page_content(data_item) + return await save_data_to_file(urls_to_page_contents[data_item], file_extension="html") # data is local file path elif parsed_url.scheme == "file": if settings.accept_local_file_path: diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py index d8e580fad..d52129c05 100644 --- a/cognee/tasks/web_scraper/__init__.py +++ b/cognee/tasks/web_scraper/__init__.py @@ -5,9 +5,24 @@ data in a graph database. It includes classes and functions for crawling web pag BeautifulSoup or Tavily, defining data models, and handling scraping configurations. """ -from .bs4_crawler import BeautifulSoupCrawler from .utils import fetch_page_content -from .web_scraper_task import cron_web_scraper_task, web_scraper_task +from .default_url_crawler import DefaultUrlCrawler + +# Lazy import for web_scraper_task to avoid requiring apscheduler +# Import these directly if needed: from cognee.tasks.web_scraper.web_scraper_task import ... + + +def __getattr__(name): + """Lazy load web scraper task functions that require apscheduler.""" + if name == "cron_web_scraper_task": + from .web_scraper_task import cron_web_scraper_task + + return cron_web_scraper_task + elif name == "web_scraper_task": + from .web_scraper_task import web_scraper_task + + return web_scraper_task + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") __all__ = [ @@ -15,4 +30,5 @@ __all__ = [ "fetch_page_content", "cron_web_scraper_task", "web_scraper_task", + "DefaultUrlCrawler", ] diff --git a/cognee/tasks/web_scraper/config.py b/cognee/tasks/web_scraper/config.py index 2ee43ed32..f23156f95 100644 --- a/cognee/tasks/web_scraper/config.py +++ b/cognee/tasks/web_scraper/config.py @@ -10,14 +10,16 @@ class TavilyConfig(BaseModel): timeout: Optional[int] = Field(default=10, ge=1, le=60) -class SoupCrawlerConfig(BaseModel): +class DefaultCrawlerConfig(BaseModel): concurrency: int = 5 crawl_delay: float = 0.5 + max_crawl_delay: Optional[float] = ( + 10.0 # Maximum crawl delay to respect from robots.txt (None = no limit) + ) timeout: float = 15.0 max_retries: int = 2 retry_delay_factor: float = 0.5 headers: Optional[Dict[str, str]] = None - extraction_rules: Dict[str, Any] use_playwright: bool = False playwright_js_wait: float = 0.8 robots_cache_ttl: float = 3600.0 diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/default_url_crawler.py similarity index 67% rename from cognee/tasks/web_scraper/bs4_crawler.py rename to cognee/tasks/web_scraper/default_url_crawler.py index 0fbff4808..d09bf3e80 100644 --- a/cognee/tasks/web_scraper/bs4_crawler.py +++ b/cognee/tasks/web_scraper/default_url_crawler.py @@ -1,21 +1,21 @@ -"""BeautifulSoup-based web crawler for extracting content from web pages. - -This module provides the BeautifulSoupCrawler class for fetching and extracting content -from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It -supports robots.txt handling, rate limiting, and custom extraction rules. -""" - import asyncio -import time -from typing import Union, List, Dict, Any, Optional -from urllib.parse import urlparse from dataclasses import dataclass, field from functools import lru_cache +import time +from typing import Any, Union, List, Dict, Optional +from urllib.parse import urlparse import httpx -from bs4 import BeautifulSoup -from cognee.shared.logging_utils import get_logger -logger = get_logger(__name__) +from cognee.shared.logging_utils import get_logger +from cognee.tasks.web_scraper.types import UrlsToHtmls + +logger = get_logger() + +try: + from protego import Protego +except ImportError: + logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1") + Protego = None try: from playwright.async_api import async_playwright @@ -25,31 +25,6 @@ except ImportError: ) async_playwright = None -try: - from protego import Protego -except ImportError: - logger.warning("Failed to import protego, make sure to install using pip install protego>=0.1") - Protego = None - - -@dataclass -class ExtractionRule: - """Normalized extraction rule for web content. - - Attributes: - selector: CSS selector for extraction (if any). - xpath: XPath expression for extraction (if any). - attr: HTML attribute to extract (if any). - all: If True, extract all matching elements; otherwise, extract first. - join_with: String to join multiple extracted elements. - """ - - selector: Optional[str] = None - xpath: Optional[str] = None - attr: Optional[str] = None - all: bool = False - join_with: str = " " - @dataclass class RobotsTxtCache: @@ -66,27 +41,13 @@ class RobotsTxtCache: timestamp: float = field(default_factory=time.time) -class BeautifulSoupCrawler: - """Crawler for fetching and extracting web content using BeautifulSoup. - - Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt - compliance, and rate limiting. Extracts content using CSS selectors or XPath rules. - - Attributes: - concurrency: Number of concurrent requests allowed. - crawl_delay: Minimum seconds between requests to the same domain. - timeout: Per-request timeout in seconds. - max_retries: Number of retries for failed requests. - retry_delay_factor: Multiplier for exponential backoff on retries. - headers: HTTP headers for requests (e.g., User-Agent). - robots_cache_ttl: Time-to-live for robots.txt cache in seconds. - """ - +class DefaultUrlCrawler: def __init__( self, *, concurrency: int = 5, crawl_delay: float = 0.5, + max_crawl_delay: Optional[float] = 10.0, timeout: float = 15.0, max_retries: int = 2, retry_delay_factor: float = 0.5, @@ -98,6 +59,7 @@ class BeautifulSoupCrawler: Args: concurrency: Number of concurrent requests allowed. crawl_delay: Minimum seconds between requests to the same domain. + max_crawl_delay: Maximum crawl delay to respect from robots.txt (None = no limit). timeout: Per-request timeout in seconds. max_retries: Number of retries for failed requests. retry_delay_factor: Multiplier for exponential backoff on retries. @@ -107,6 +69,7 @@ class BeautifulSoupCrawler: self.concurrency = concurrency self._sem = asyncio.Semaphore(concurrency) self.crawl_delay = crawl_delay + self.max_crawl_delay = max_crawl_delay self.timeout = timeout self.max_retries = max_retries self.retry_delay_factor = retry_delay_factor @@ -183,7 +146,11 @@ class BeautifulSoupCrawler: elapsed = time.time() - last wait_for = delay - elapsed if wait_for > 0: + logger.info( + f"Rate limiting: waiting {wait_for:.2f}s before requesting {url} (crawl_delay={delay}s from robots.txt)" + ) await asyncio.sleep(wait_for) + logger.info(f"Rate limit wait completed for {url}") self._last_request_time_per_domain[domain] = time.time() async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]: @@ -236,7 +203,16 @@ class BeautifulSoupCrawler: crawl_delay = self.crawl_delay if protego: delay = protego.crawl_delay(agent) or protego.crawl_delay("*") - crawl_delay = delay if delay else self.crawl_delay + if delay: + # Apply max_crawl_delay cap if configured + if self.max_crawl_delay is not None and delay > self.max_crawl_delay: + logger.warning( + f"robots.txt specifies crawl_delay={delay}s for {domain_root}, " + f"capping to max_crawl_delay={self.max_crawl_delay}s" + ) + crawl_delay = self.max_crawl_delay + else: + crawl_delay = delay cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay) self._robots_cache[domain_root] = cache_entry @@ -307,12 +283,16 @@ class BeautifulSoupCrawler: attempt = 0 crawl_delay = await self._get_crawl_delay(url) + logger.info(f"Fetching URL with httpx (crawl_delay={crawl_delay}s): {url}") while True: try: await self._respect_rate_limit(url, crawl_delay) resp = await self._client.get(url) resp.raise_for_status() + logger.info( + f"Successfully fetched {url} (status={resp.status_code}, size={len(resp.text)} bytes)" + ) return resp.text except Exception as exc: attempt += 1 @@ -347,22 +327,35 @@ class BeautifulSoupCrawler: raise RuntimeError( "Playwright is not installed. Install with `pip install playwright` and run `playwright install`." ) + + timeout_val = timeout or self.timeout + logger.info( + f"Rendering URL with Playwright (js_wait={js_wait}s, timeout={timeout_val}s): {url}" + ) + attempt = 0 while True: try: async with async_playwright() as p: + logger.info(f"Launching headless Chromium browser for {url}") browser = await p.chromium.launch(headless=True) try: context = await browser.new_context() page = await context.new_page() + logger.info(f"Navigating to {url} and waiting for network idle") await page.goto( url, wait_until="networkidle", - timeout=int((timeout or self.timeout) * 1000), + timeout=int(timeout_val * 1000), ) if js_wait: + logger.info(f"Waiting {js_wait}s for JavaScript to execute") await asyncio.sleep(js_wait) - return await page.content() + content = await page.content() + logger.info( + f"Successfully rendered {url} with Playwright (size={len(content)} bytes)" + ) + return content finally: await browser.close() except Exception as exc: @@ -376,96 +369,13 @@ class BeautifulSoupCrawler: ) await asyncio.sleep(backoff) - def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule: - """Normalize an extraction rule to an ExtractionRule dataclass. - - Args: - rule: A string (CSS selector) or dict with extraction parameters. - - Returns: - ExtractionRule: Normalized extraction rule. - - Raises: - ValueError: If the rule is invalid. - """ - if isinstance(rule, str): - return ExtractionRule(selector=rule) - if isinstance(rule, dict): - return ExtractionRule( - selector=rule.get("selector"), - xpath=rule.get("xpath"), - attr=rule.get("attr"), - all=bool(rule.get("all", False)), - join_with=rule.get("join_with", " "), - ) - raise ValueError(f"Invalid extraction rule: {rule}") - - def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str: - """Extract content from HTML using BeautifulSoup or lxml XPath. - - Args: - html: The HTML content to extract from. - rule: The extraction rule to apply. - - Returns: - str: The extracted content. - - Raises: - RuntimeError: If XPath is used but lxml is not installed. - """ - soup = BeautifulSoup(html, "html.parser") - - if rule.xpath: - try: - from lxml import html as lxml_html - except ImportError: - raise RuntimeError( - "XPath requested but lxml is not available. Install lxml or use CSS selectors." - ) - doc = lxml_html.fromstring(html) - nodes = doc.xpath(rule.xpath) - texts = [] - for n in nodes: - if hasattr(n, "text_content"): - texts.append(n.text_content().strip()) - else: - texts.append(str(n).strip()) - return rule.join_with.join(t for t in texts if t) - - if not rule.selector: - return "" - - if rule.all: - nodes = soup.select(rule.selector) - pieces = [] - for el in nodes: - if rule.attr: - val = el.get(rule.attr) - if val: - pieces.append(val.strip()) - else: - text = el.get_text(strip=True) - if text: - pieces.append(text) - return rule.join_with.join(pieces).strip() - else: - el = soup.select_one(rule.selector) - if el is None: - return "" - if rule.attr: - val = el.get(rule.attr) - return (val or "").strip() - return el.get_text(strip=True) - - async def fetch_with_bs4( + async def fetch_urls( self, - urls: Union[str, List[str], Dict[str, Dict[str, Any]]], - extraction_rules: Optional[Dict[str, Any]] = None, + urls: Union[str, List[str]], *, use_playwright: bool = False, playwright_js_wait: float = 0.8, - join_all_matches: bool = False, - ) -> Dict[str, str]: + ) -> UrlsToHtmls: """Fetch and extract content from URLs using BeautifulSoup or Playwright. Args: @@ -482,65 +392,55 @@ class BeautifulSoupCrawler: ValueError: If extraction_rules are missing when required or if urls is invalid. Exception: If fetching or extraction fails. """ - url_rules_map: Dict[str, Dict[str, Any]] = {} - if isinstance(urls, str): - if not extraction_rules: - raise ValueError("extraction_rules required when urls is a string") - url_rules_map[urls] = extraction_rules - elif isinstance(urls, list): - if not extraction_rules: - raise ValueError("extraction_rules required when urls is a list") - for url in urls: - url_rules_map[url] = extraction_rules - elif isinstance(urls, dict): - url_rules_map = urls + urls = [urls] else: raise ValueError(f"Invalid urls type: {type(urls)}") - normalized_url_rules: Dict[str, List[ExtractionRule]] = {} - for url, rules in url_rules_map.items(): - normalized_rules = [] - for _, rule in rules.items(): - r = self._normalize_rule(rule) - if join_all_matches: - r.all = True - normalized_rules.append(r) - normalized_url_rules[url] = normalized_rules - async def _task(url: str): async with self._sem: try: + logger.info(f"Processing URL: {url}") + + # Check robots.txt allowed = await self._is_url_allowed(url) if not allowed: logger.warning(f"URL disallowed by robots.txt: {url}") return url, "" + logger.info(f"Robots.txt check passed for {url}") + + # Fetch HTML if use_playwright: + logger.info( + f"Rendering {url} with Playwright (JS wait: {playwright_js_wait}s)" + ) html = await self._render_with_playwright( url, js_wait=playwright_js_wait, timeout=self.timeout ) else: + logger.info(f"Fetching {url} with httpx") html = await self._fetch_httpx(url) - pieces = [] - for rule in normalized_url_rules[url]: - text = self._extract_with_bs4(html, rule) - if text: - pieces.append(text) + logger.info(f"Successfully fetched HTML from {url} ({len(html)} bytes)") - concatenated = " ".join(pieces).strip() - return url, concatenated + return url, html except Exception as e: logger.error(f"Error processing {url}: {e}") return url, "" - tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()] + logger.info(f"Creating {len(urls)} async tasks for concurrent fetching") + tasks = [asyncio.create_task(_task(u)) for u in urls] results = {} + completed = 0 + total = len(tasks) for coro in asyncio.as_completed(tasks): - url, text = await coro - results[url] = text + url, html = await coro + results[url] = html + completed += 1 + logger.info(f"Progress: {completed}/{total} URLs processed") + logger.info(f"Completed fetching all {len(results)} URL(s)") return results diff --git a/cognee/tasks/web_scraper/types.py b/cognee/tasks/web_scraper/types.py new file mode 100644 index 000000000..54a3f5d42 --- /dev/null +++ b/cognee/tasks/web_scraper/types.py @@ -0,0 +1,4 @@ +from typing import TypeAlias + + +UrlsToHtmls: TypeAlias = dict[str, str] diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index 6d094f423..1f51bf98d 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -4,21 +4,17 @@ This module provides functions to fetch and extract content from web pages, supp both BeautifulSoup for custom extraction rules and Tavily for API-based scraping. """ -from typing import Dict, List, Union, Optional, Literal +import os +from typing import List, Union from cognee.shared.logging_utils import get_logger -from .bs4_crawler import BeautifulSoupCrawler -from .config import TavilyConfig, SoupCrawlerConfig +from cognee.tasks.web_scraper.types import UrlsToHtmls +from .default_url_crawler import DefaultUrlCrawler +from .config import DefaultCrawlerConfig, TavilyConfig logger = get_logger(__name__) -async def fetch_page_content( - urls: Union[str, List[str]], - *, - preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", - tavily_config: Optional[TavilyConfig] = None, - soup_crawler_config: Optional[SoupCrawlerConfig] = None, -) -> Dict[str, str]: +async def fetch_page_content(urls: Union[str, List[str]]) -> UrlsToHtmls: """Fetch content from one or more URLs using the specified tool. This function retrieves web page content using either BeautifulSoup (with custom @@ -31,7 +27,7 @@ async def fetch_page_content( Defaults to "beautifulsoup". tavily_config: Configuration for Tavily API, including API key. Required if preferred_tool is "tavily". - soup_crawler_config: Configuration for BeautifulSoup crawler, including + default_crawler_config: Configuration for BeautifulSoup crawler, including extraction rules. Required if preferred_tool is "beautifulsoup" and extraction_rules are needed. @@ -45,50 +41,52 @@ async def fetch_page_content( ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not installed. """ - if preferred_tool == "tavily": - if not tavily_config or tavily_config.api_key is None: - raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily") - return await fetch_with_tavily(urls, tavily_config) + url_list = [urls] if isinstance(urls, str) else urls - if preferred_tool == "beautifulsoup": - try: - from bs4 import BeautifulSoup as _ # noqa: F401 - except ImportError: - logger.error( - "Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1" - ) - raise ImportError - if not soup_crawler_config or soup_crawler_config.extraction_rules is None: - raise ValueError("extraction_rules must be provided when not using Tavily") - extraction_rules = soup_crawler_config.extraction_rules - crawler = BeautifulSoupCrawler( - concurrency=soup_crawler_config.concurrency, - crawl_delay=soup_crawler_config.crawl_delay, - timeout=soup_crawler_config.timeout, - max_retries=soup_crawler_config.max_retries, - retry_delay_factor=soup_crawler_config.retry_delay_factor, - headers=soup_crawler_config.headers, - robots_cache_ttl=soup_crawler_config.robots_cache_ttl, + if os.getenv("TAVILY_API_KEY"): + logger.info("Using Tavily API for url fetching") + return await fetch_with_tavily(urls) + else: + logger.info("Using default crawler for content extraction") + + default_crawler_config = ( + DefaultCrawlerConfig() + ) # We've decided to use defaults, and configure through env vars as needed + + logger.info( + f"Initializing BeautifulSoup crawler with concurrency={default_crawler_config.concurrency}, timeout={default_crawler_config.timeout}s, max_crawl_delay={default_crawler_config.max_crawl_delay}s" + ) + + crawler = DefaultUrlCrawler( + concurrency=default_crawler_config.concurrency, + crawl_delay=default_crawler_config.crawl_delay, + max_crawl_delay=default_crawler_config.max_crawl_delay, + timeout=default_crawler_config.timeout, + max_retries=default_crawler_config.max_retries, + retry_delay_factor=default_crawler_config.retry_delay_factor, + headers=default_crawler_config.headers, + robots_cache_ttl=default_crawler_config.robots_cache_ttl, ) try: - results = await crawler.fetch_with_bs4( - urls, - extraction_rules, - use_playwright=soup_crawler_config.use_playwright, - playwright_js_wait=soup_crawler_config.playwright_js_wait, - join_all_matches=soup_crawler_config.join_all_matches, + logger.info( + f"Starting to crawl {len(url_list)} URL(s) with BeautifulSoup (use_playwright={default_crawler_config.use_playwright})" ) + results = await crawler.fetch_urls( + urls, + use_playwright=default_crawler_config.use_playwright, + playwright_js_wait=default_crawler_config.playwright_js_wait, + ) + logger.info(f"Successfully fetched content from {len(results)} URL(s)") return results except Exception as e: logger.error(f"Error fetching page content: {str(e)}") raise finally: + logger.info("Closing BeautifulSoup crawler") await crawler.close() -async def fetch_with_tavily( - urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None -) -> Dict[str, str]: +async def fetch_with_tavily(urls: Union[str, List[str]]) -> UrlsToHtmls: """Fetch content from URLs using the Tavily API. Args: @@ -108,19 +106,37 @@ async def fetch_with_tavily( "Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0" ) raise - client = AsyncTavilyClient( - api_key=tavily_config.api_key if tavily_config else None, - proxies=tavily_config.proxies if tavily_config else None, + + tavily_config = TavilyConfig() + url_list = [urls] if isinstance(urls, str) else urls + extract_depth = tavily_config.extract_depth if tavily_config else "basic" + timeout = tavily_config.timeout if tavily_config else 10 + + logger.info( + f"Initializing Tavily client with extract_depth={extract_depth}, timeout={timeout}s" ) + client = AsyncTavilyClient( + api_key=tavily_config.api_key, + proxies=tavily_config.proxies, + ) + + logger.info(f"Sending extract request to Tavily API for {len(url_list)} URL(s)") results = await client.extract( urls, format="text", - extract_depth=tavily_config.extract_depth if tavily_config else "basic", - timeout=tavily_config.timeout if tavily_config else 10, + extract_depth=extract_depth, + timeout=timeout, ) - for failed_result in results.get("failed_results", []): - logger.warning(f"Failed to fetch {failed_result}") + + failed_count = len(results.get("failed_results", [])) + if failed_count > 0: + logger.warning(f"Tavily API failed to fetch {failed_count} URL(s)") + for failed_result in results.get("failed_results", []): + logger.warning(f"Failed to fetch {failed_result}") + return_results = {} for result in results.get("results", []): return_results[result["url"]] = result["raw_content"] + + logger.info(f"Successfully fetched content from {len(return_results)} URL(s) via Tavily") return return_results diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py index 52154c6ef..2bade3719 100644 --- a/cognee/tasks/web_scraper/web_scraper_task.py +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -19,7 +19,7 @@ from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.modules.engine.operations.setup import setup from .models import WebPage, WebSite, ScrapingJob -from .config import SoupCrawlerConfig, TavilyConfig +from .config import DefaultCrawlerConfig, TavilyConfig from .utils import fetch_page_content try: @@ -47,7 +47,7 @@ async def cron_web_scraper_task( schedule: str = None, extraction_rules: dict = None, tavily_api_key: str = os.getenv("TAVILY_API_KEY"), - soup_crawler_config: SoupCrawlerConfig = None, + soup_crawler_config: DefaultCrawlerConfig = None, tavily_config: TavilyConfig = None, job_name: str = "scraping", ): @@ -121,7 +121,7 @@ async def web_scraper_task( schedule: str = None, extraction_rules: dict = None, tavily_api_key: str = os.getenv("TAVILY_API_KEY"), - soup_crawler_config: SoupCrawlerConfig = None, + soup_crawler_config: DefaultCrawlerConfig = None, tavily_config: TavilyConfig = None, job_name: str = None, ): @@ -341,7 +341,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle soup_crawler_config: Configuration for BeautifulSoup crawler. Returns: - Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, + Tuple[DefaultCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config, tavily_config, and preferred_tool ("tavily" or "beautifulsoup"). Raises: @@ -350,7 +350,7 @@ def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawle preferred_tool = "beautifulsoup" if extraction_rules and not soup_crawler_config: - soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) + soup_crawler_config = DefaultCrawlerConfig(extraction_rules=extraction_rules) if tavily_api_key: if not tavily_config: diff --git a/cognee/tests/integration/web_url_crawler/test_default_url_crawler.py b/cognee/tests/integration/web_url_crawler/test_default_url_crawler.py new file mode 100644 index 000000000..156cc87a4 --- /dev/null +++ b/cognee/tests/integration/web_url_crawler/test_default_url_crawler.py @@ -0,0 +1,13 @@ +import pytest +from cognee.tasks.web_scraper import DefaultUrlCrawler + + +@pytest.mark.asyncio +async def test_fetch(): + crawler = DefaultUrlCrawler() + url = "https://en.wikipedia.org/wiki/Large_language_model" + results = await crawler.fetch_urls(url) + assert len(results) == 1 + assert isinstance(results, dict) + html = results[url] + assert isinstance(html, str) diff --git a/cognee/tests/integration/web_url_crawler/test_tavily_crawler.py b/cognee/tests/integration/web_url_crawler/test_tavily_crawler.py new file mode 100644 index 000000000..946ce8378 --- /dev/null +++ b/cognee/tests/integration/web_url_crawler/test_tavily_crawler.py @@ -0,0 +1,19 @@ +import os +import pytest +from cognee.tasks.web_scraper.utils import fetch_with_tavily + +skip_in_ci = pytest.mark.skipif( + os.getenv("GITHUB_ACTIONS") == "true", + reason="Skipping in Github for now - before we get TAVILY_API_KEY", +) + + +@skip_in_ci +@pytest.mark.asyncio +async def test_fetch(): + url = "https://en.wikipedia.org/wiki/Large_language_model" + results = await fetch_with_tavily(url) + assert isinstance(results, dict) + assert len(results) == 1 + html = results[url] + assert isinstance(html, str) diff --git a/cognee/tests/integration/web_url_crawler/test_url_adding_e2e.py b/cognee/tests/integration/web_url_crawler/test_url_adding_e2e.py new file mode 100644 index 000000000..afe2dce7f --- /dev/null +++ b/cognee/tests/integration/web_url_crawler/test_url_adding_e2e.py @@ -0,0 +1,310 @@ +import pytest +import cognee +from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path +from cognee.infrastructure.loaders.LoaderEngine import LoaderEngine +from cognee.infrastructure.loaders.external.beautiful_soup_loader import BeautifulSoupLoader +from cognee.tasks.ingestion import save_data_item_to_storage +from pathlib import Path + + +@pytest.mark.asyncio +async def test_url_saves_as_html_file(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + file = Path(file_path) + assert file.exists() + assert file.stat().st_size > 0 + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_saved_html_is_valid(): + try: + from bs4 import BeautifulSoup + except ImportError: + pytest.fail("Test case requires bs4 installed") + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + content = Path(file_path).read_text() + + soup = BeautifulSoup(content, "html.parser") + assert soup.find() is not None, "File should contain parseable HTML" + + has_html_elements = any( + [ + soup.find("html"), + soup.find("head"), + soup.find("body"), + soup.find("div"), + soup.find("p"), + ] + ) + assert has_html_elements, "File should contain common HTML elements" + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_add_url(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + await cognee.add("https://en.wikipedia.org/wiki/Large_language_model") + + +@pytest.mark.asyncio +async def test_add_url_without_incremental_loading(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + incremental_loading=False, + ) + except Exception as e: + pytest.fail(f"Failed to add url: {e}") + + +@pytest.mark.asyncio +async def test_add_url_with_incremental_loading(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + incremental_loading=True, + ) + except Exception as e: + pytest.fail(f"Failed to add url: {e}") + + +@pytest.mark.asyncio +async def test_add_url_with_extraction_rules(): # TODO: this'll fail due to not implemented `load()` yet + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + try: + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + preferred_loaders={"beautiful_soup_loader": {"extraction_rules": extraction_rules}}, + ) + except Exception as e: + pytest.fail(f"Failed to add url: {e}") + + +@pytest.mark.asyncio +async def test_loader_is_none_by_default(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + file = Path(file_path) + assert file.exists() + assert file.stat().st_size > 0 + + loader_engine = LoaderEngine() + preferred_loaders = {"beautiful_soup_loader": {"extraction_rules": extraction_rules}} + loader = loader_engine.get_loader( + file_path, + preferred_loaders=preferred_loaders, + ) + + assert loader is None + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_beautiful_soup_loader_is_selected_loader_if_preferred_loader_provided(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + file = Path(file_path) + assert file.exists() + assert file.stat().st_size > 0 + + loader_engine = LoaderEngine() + bs_loader = BeautifulSoupLoader() + loader_engine.register_loader(bs_loader) + preferred_loaders = {"beautiful_soup_loader": {"extraction_rules": extraction_rules}} + loader = loader_engine.get_loader( + file_path, + preferred_loaders=preferred_loaders, + ) + + assert loader == bs_loader + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_beautiful_soup_loader_raises_if_required_args_are_missing(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + file = Path(file_path) + assert file.exists() + assert file.stat().st_size > 0 + + loader_engine = LoaderEngine() + bs_loader = BeautifulSoupLoader() + loader_engine.register_loader(bs_loader) + preferred_loaders = {"beautiful_soup_loader": {}} + with pytest.raises(ValueError): + await loader_engine.load_file( + file_path, + preferred_loaders=preferred_loaders, + ) + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + preferred_loaders = {"beautiful_soup_loader": {"extraction_rules": extraction_rules}} + await loader_engine.load_file( + file_path, + preferred_loaders=preferred_loaders, + ) + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_beautiful_soup_loader_successfully_loads_file_if_required_args_present(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + file = Path(file_path) + assert file.exists() + assert file.stat().st_size > 0 + + loader_engine = LoaderEngine() + bs_loader = BeautifulSoupLoader() + loader_engine.register_loader(bs_loader) + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + preferred_loaders = {"beautiful_soup_loader": {"extraction_rules": extraction_rules}} + await loader_engine.load_file( + file_path, + preferred_loaders=preferred_loaders, + ) + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") + + +@pytest.mark.asyncio +async def test_beautiful_soup_loads_file_successfully(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": {"selector": "a", "attr": "href", "all": True}, + "paragraphs": {"selector": "p", "all": True}, + } + + try: + original_file_path = await save_data_item_to_storage( + "https://en.wikipedia.org/wiki/Large_language_model" + ) + file_path = get_data_file_path(original_file_path) + assert file_path.endswith(".html") + original_file = Path(file_path) + assert original_file.exists() + assert original_file.stat().st_size > 0 + + loader_engine = LoaderEngine() + bs_loader = BeautifulSoupLoader() + loader_engine.register_loader(bs_loader) + preferred_loaders = {"beautiful_soup_loader": {"extraction_rules": extraction_rules}} + loader = loader_engine.get_loader( + file_path, + preferred_loaders=preferred_loaders, + ) + + assert loader == bs_loader + + cognee_loaded_txt_path = await loader_engine.load_file( + file_path=file_path, preferred_loaders=preferred_loaders + ) + + cognee_loaded_txt_path = get_data_file_path(cognee_loaded_txt_path) + + assert cognee_loaded_txt_path.endswith(".txt") + + extracted_file = Path(cognee_loaded_txt_path) + + assert extracted_file.exists() + assert extracted_file.stat().st_size > 0 + + original_basename = original_file.stem + extracted_basename = extracted_file.stem + assert original_basename == extracted_basename, ( + f"Expected same base name: {original_basename} vs {extracted_basename}" + ) + except Exception as e: + pytest.fail(f"Failed to save data item to storage: {e}") diff --git a/cognee/tests/tasks/web_scraping/web_scraping_test.py b/cognee/tests/tasks/web_scraping/web_scraping_test.py index bf66b5155..81c58ac8d 100644 --- a/cognee/tests/tasks/web_scraping/web_scraping_test.py +++ b/cognee/tests/tasks/web_scraping/web_scraping_test.py @@ -1,6 +1,6 @@ import asyncio import cognee -from cognee.tasks.web_scraper.config import SoupCrawlerConfig +from cognee.tasks.web_scraper.config import DefaultCrawlerConfig from cognee.tasks.web_scraper import cron_web_scraper_task @@ -14,7 +14,7 @@ async def test_web_scraping_using_bs4(): "authors": {"selector": ".quote small", "all": True}, } - soup_config = SoupCrawlerConfig( + soup_config = DefaultCrawlerConfig( concurrency=5, crawl_delay=0.5, timeout=15.0, @@ -47,7 +47,7 @@ async def test_web_scraping_using_bs4_and_incremental_loading(): url = "https://books.toscrape.com/" rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"} - soup_config = SoupCrawlerConfig( + soup_config = DefaultCrawlerConfig( concurrency=1, crawl_delay=0.1, timeout=10.0, diff --git a/examples/python/web_url_fetcher_example.py b/examples/python/web_url_fetcher_example.py new file mode 100644 index 000000000..aff8094bf --- /dev/null +++ b/examples/python/web_url_fetcher_example.py @@ -0,0 +1,37 @@ +import asyncio + +import cognee + + +async def main(): + await cognee.prune.prune_data() + print("Data pruned.") + + await cognee.prune.prune_system(metadata=True) + + extraction_rules = { + "title": {"selector": "title"}, + "headings": {"selector": "h1, h2, h3", "all": True}, + "links": { + "selector": "a", + "attr": "href", + "all": True, + }, + "paragraphs": {"selector": "p", "all": True}, + } + + await cognee.add( + "https://en.wikipedia.org/wiki/Large_language_model", + incremental_loading=False, + preferred_loaders={"beautiful_soup_loader": {"extraction_rules": extraction_rules}}, + ) + + await cognee.cognify() + print("Knowledge graph created.") + + await cognee.visualize_graph() + print("Data visualized") + + +if __name__ == "__main__": + asyncio.run(main())