From 20fb77316c245117dd78e830adf269198184610e Mon Sep 17 00:00:00 2001 From: Geoff-Robin Date: Sat, 4 Oct 2025 15:01:13 +0530 Subject: [PATCH] Done with integration with add workflow when incremental_loading is set to False --- cognee/api/v1/add/add.py | 22 +- cognee/tasks/ingestion/ingest_data.py | 15 +- .../ingestion/save_data_item_to_storage.py | 45 ++- cognee/tasks/web_scraper/__init__.py | 8 + cognee/tasks/web_scraper/bs4_connector.py | 270 ------------------ cognee/tasks/web_scraper/bs4_crawler.py | 33 ++- cognee/tasks/web_scraper/config.py | 24 ++ cognee/tasks/web_scraper/utils.py | 101 +++++-- cognee/tasks/web_scraper/web_scraper_task.py | 63 ++++ 9 files changed, 269 insertions(+), 312 deletions(-) create mode 100644 cognee/tasks/web_scraper/__init__.py delete mode 100644 cognee/tasks/web_scraper/bs4_connector.py create mode 100644 cognee/tasks/web_scraper/config.py create mode 100644 cognee/tasks/web_scraper/web_scraper_task.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index eeb867984..5643ecc94 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,5 +1,5 @@ from uuid import UUID -from typing import Union, BinaryIO, List, Optional +from typing import Union, BinaryIO, List, Optional, Dict, Literal from cognee.modules.users.models import User from cognee.modules.pipelines import Task, run_pipeline @@ -11,6 +11,7 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( ) from cognee.modules.engine.operations.setup import setup from cognee.tasks.ingestion import ingest_data, resolve_data_directories +from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig async def add( @@ -23,12 +24,16 @@ async def add( dataset_id: Optional[UUID] = None, preferred_loaders: List[str] = None, incremental_loading: bool = True, + extraction_rules: Optional[Dict[str, str]] = None, + preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", + tavily_config: Optional[TavilyConfig] = None, + soup_crawler_config: Optional[SoupCrawlerConfig] = None, ): """ Add data to Cognee for knowledge graph processing. This is the first step in the Cognee workflow - it ingests raw data and prepares it - for processing. The function accepts various data formats including text, files, and + for processing. The function accepts various data formats including text, files, urls and binary streams, then stores them in a specified dataset for further processing. Prerequisites: @@ -143,7 +148,18 @@ async def add( """ tasks = [ Task(resolve_data_directories, include_subdirectories=True), - Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders), + Task( + ingest_data, + dataset_name, + user, + node_set, + dataset_id, + preferred_loaders, + extraction_rules, + preferred_tool, + tavily_config, + soup_crawler_config, + ), ] await setup() diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index 3c20a2b13..a61f5d97b 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -1,7 +1,7 @@ import json import inspect from uuid import UUID -from typing import Union, BinaryIO, Any, List, Optional +from typing import Union, BinaryIO, Any, List, Optional, Dict, Literal import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine @@ -16,6 +16,7 @@ from cognee.modules.data.methods import ( get_dataset_data, load_or_create_datasets, ) +from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig from .save_data_item_to_storage import save_data_item_to_storage from .data_item_to_text_file import data_item_to_text_file @@ -28,6 +29,10 @@ async def ingest_data( node_set: Optional[List[str]] = None, dataset_id: UUID = None, preferred_loaders: List[str] = None, + extraction_rules: Optional[Dict[str, str]] = None, + preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", + tavily_config: Optional[TavilyConfig] = None, + soup_crawler_config: Optional[SoupCrawlerConfig] = None, ): if not user: user = await get_default_user() @@ -78,7 +83,13 @@ async def ingest_data( for data_item in data: # Get file path of data item or create a file it doesn't exist - original_file_path = await save_data_item_to_storage(data_item) + original_file_path = await save_data_item_to_storage( + data_item, + extraction_rules=extraction_rules, + preferred_tool=preferred_tool, + tavily_config=tavily_config, + soup_crawler_config=soup_crawler_config, + ) # Transform file path to be OS usable actual_file_path = get_data_file_path(original_file_path) diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 34f70e917..93df12732 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -1,12 +1,14 @@ import os from pathlib import Path from urllib.parse import urlparse -from typing import Union, BinaryIO, Any +from typing import Union, BinaryIO, Any, Dict from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion import save_data_to_file from cognee.shared.logging_utils import get_logger from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.tasks.web_scraper import check_valid_arguments_for_web_scraper +import asyncio logger = get_logger() @@ -17,10 +19,17 @@ class SaveDataSettings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="allow") +class HTMLContent(str): + def __new__(cls, value: str): + if not ("<" in value and ">" in value): + raise ValueError("Not valid HTML-like content") + return super().__new__(cls, value) + + settings = SaveDataSettings() -async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwargs) -> str: if "llama_index" in str(type(data_item)): # Dynamic import is used because the llama_index module is optional. from .transform_data import get_data_from_llama_index @@ -48,6 +57,38 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str # data is s3 file path if parsed_url.scheme == "s3": return data_item + elif parsed_url.scheme == "http" or parsed_url.scheme == "https": + # Validate URL by sending a HEAD request + try: + from cognee.tasks.web_scraper import fetch_page_content + + extraction_rules = kwargs.get("extraction_rules", None) + preferred_tool = kwargs.get("preferred_tool", "beautifulsoup") + tavily_config = kwargs.get("tavily_config", None) + soup_crawler_config = kwargs.get("soup_crawler_config", None) + check_valid_arguments_for_web_scraper( + extraction_rules=extraction_rules, + preferred_tool=preferred_tool, + tavily_config=tavily_config, + soup_crawler_config=soup_crawler_config, + ) + data = await fetch_page_content( + data_item, + extraction_rules=extraction_rules, + preferred_tool=preferred_tool, + tavily_config=tavily_config, + soup_crawler_config=soup_crawler_config, + ) + content = "" + for key, value in data.items(): + content += f"{key}:\n{value}\n\n" + else: + content = data[data_item] + return await save_data_to_file(content) + except Exception as e: + raise IngestionError( + message=f"Error ingesting webpage results of url {data_item}: {str(e)}" + ) # data is local file path elif parsed_url.scheme == "file": diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py new file mode 100644 index 000000000..7560dee56 --- /dev/null +++ b/cognee/tasks/web_scraper/__init__.py @@ -0,0 +1,8 @@ +from .bs4_crawler import BeautifulSoupCrawler +from .utils import fetch_page_content, check_valid_arguments_for_web_scraper + +__all__ = [ + "BeautifulSoupCrawler", + "fetch_page_content", + "check_valid_arguments_for_web_scraper", +] diff --git a/cognee/tasks/web_scraper/bs4_connector.py b/cognee/tasks/web_scraper/bs4_connector.py deleted file mode 100644 index 20a587216..000000000 --- a/cognee/tasks/web_scraper/bs4_connector.py +++ /dev/null @@ -1,270 +0,0 @@ -import asyncio -import time -from typing import Union, List, Dict, Any, Optional -from bs4 import BeautifulSoup -import httpx -from cognee.shared.logging_utils import get_logger - -logger = get_logger(__name__) - -try: - from playwright.async_api import async_playwright -except ImportError: - logger.error("Failed to import playwright, make sure to install using pip install playwright>=1.9.0") - -try: - from bs4 import BeautifulSoup -except ImportError: - logger.error("Failed to import BeautifulSoup, make sure to install using pip install beautifulsoup4") - - - - - -class BeautifulSoupCrawler: - def __init__( - self, - *, - concurrency: int = 5, - delay_between_requests: float = 0.5, - timeout: float = 15.0, - max_retries: int = 2, - retry_delay_factor: float = 0.5, - headers: Optional[Dict[str, str]] = None, - ): - """ - concurrency: number of concurrent requests allowed - delay_between_requests: minimum seconds to wait between requests to the SAME domain - timeout: per-request timeout - max_retries: number of retries on network errors - retry_delay_factor: multiplier for exponential retry failure delay - headers: default headers for requests - use_httpx: require httpx for async HTTP. If not available, an informative error will be raised. - """ - self.concurrency = concurrency - self._sem = asyncio.Semaphore(concurrency) - self.delay_between_requests = delay_between_requests - self.timeout = timeout - self.max_retries = max_retries - self.retry_delay_factor = retry_delay_factor - self.headers = headers or {"User-agent": "Cognee-Scraper/1.0"} - self._last_request_time_per_domain: Dict[str, float] = {} - self._client = None - - # ---------- lifecycle helpers ---------- - async def _ensure_client(self): - if self._client is None: - self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) if httpx else None - - async def close(self): - if self._client: - await self._client.aclose() - self._client = None - - # ---------- rate limiting ---------- - def _domain_from_url(self, url: str) -> str: - # quick parse to domain - try: - from urllib.parse import urlparse - p = urlparse(url) - return p.netloc - except Exception: - return url - - async def _respect_rate_limit(self, url: str): - domain = self._domain_from_url(url) - last = self._last_request_time_per_domain.get(domain) - if last is None: - self._last_request_time_per_domain[domain] = time.time() - return - elapsed = time.time() - last - wait_for = self.delay_between_requests - elapsed - if wait_for > 0: - await asyncio.sleep(wait_for) - self._last_request_time_per_domain[domain] = time.time() - - # ----------- robots.txt handling could be added here ----------- - async def _is_url_allowed(self, url: str) -> bool: - robots_txt_url = f"{self._get_base_url(url)}/robots.txt" - robots_txt_content = await self._fetch_httpx(robots_txt_url) - robots_txt_content = robots_txt_content.lower() - user_agent_name = self.headers.get("User-agent") - pos = robots_txt_content.find(f"user-agent: {user_agent_name}") - if pos == -1: - pos = robots_txt_content.find(f"user-agent:*") - if pos == -1: - return True - - pos = robots_txt_content.find("disallow", pos) - # TODO: Research more about robots.txt format - - - - - # ---------- low-level fetchers ---------- - async def _fetch_httpx(self, url: str) -> str: - await self._ensure_client() - assert self._client is not None, "HTTP client not initialized" - attempt = 0 - while True: - try: - await self._respect_rate_limit(url) - resp = await self._client.get(url) - resp.raise_for_status() - return resp.text - except Exception as exc: - attempt += 1 - if attempt > self.max_retries: - raise - delay = self.retry_delay_factor * (2 ** (attempt - 1)) - await asyncio.sleep(delay) - - async def _render_with_playwright(self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None) -> str: - if async_playwright is None: - raise RuntimeError("Playwright is not installed. Install with `pip install playwright` and run `playwright install`.") - # Basic Playwright rendering (Chromium). This is slower but renders JS. - attempt = 0 - while True: - try: - async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - context = await browser.new_context() - page = await context.new_page() - await page.goto(url, wait_until="networkidle", timeout=int((timeout or self.timeout) * 1000)) - # optional short wait to let in-page JS mutate DOM - if js_wait: - await asyncio.sleep(js_wait) - content = await page.content() - await browser.close() - return content - except Exception: - attempt += 1 - if attempt > self.max_retries: - raise - backoff = self.backoff_factor * (2 ** (attempt - 1)) - await asyncio.sleep(backoff) - - # ---------- extraction helpers ---------- - def _normalize_rule(self, rule) -> Dict[str, Any]: - if isinstance(rule, str): - return {"selector": rule, "attr": None, "all": False, "join_with": " "} - if isinstance(rule, dict): - return { - "selector": rule.get("selector"), - "attr": rule.get("attr"), - "all": bool(rule.get("all")), - "join_with": rule.get("join_with", " "), - "xpath": rule.get("xpath"), - } - raise ValueError("Invalid extraction rule") - - def _extract_with_bs4(self, html: str, rule: Dict[str, Any]) -> str: - soup = BeautifulSoup(html, "html.parser") - sel = rule.get("selector") - xpath = rule.get("xpath") - attr = rule.get("attr") - all_flag = rule.get("all", False) - join_with = rule.get("join_with", " ") - - if xpath: - # try lxml extraction via xpath if lxml is available - try: - from lxml import html as lxml_html - except Exception: - raise RuntimeError("XPath requested but lxml is not available. Install lxml or use CSS selectors.") - doc = lxml_html.fromstring(html) - nodes = doc.xpath(xpath) - texts = [] - for n in nodes: - if hasattr(n, "text_content"): - texts.append(n.text_content().strip()) - else: - texts.append(str(n).strip()) - return join_with.join(t for t in texts if t) - else: - if not sel: - return "" - if all_flag: - nodes = soup.select(sel) - pieces = [] - for el in nodes: - if attr: - val = el.get(attr) - if val: - pieces.append(val.strip()) - else: - text = el.get_text(strip=True) - if text: - pieces.append(text) - return join_with.join(pieces).strip() - else: - el = soup.select_one(sel) - if el is None: - return "" - if attr: - val = el.get(attr) - return (val or "").strip() - return el.get_text(strip=True) - - # ---------- public API (keeps the signature you asked for) ---------- - async def fetch_with_bs4( - self, - urls: Union[str, List[str]], - extraction_rules: Dict[str, Any], - *, - use_playwright: bool = False, - playwright_js_wait: float = 0.8, - join_all_matches: bool = False, # if True, for each rule use all matches (join them) - ) -> Dict[str, str]: - """ - Fetch one or more URLs and extract text using BeautifulSoup (or lxml xpath). - Returns: dict[url] -> single concatenated string (trimmed) - Parameters: - - urls: str or list[str] - - extraction_rules: dict[field_name -> selector or rule-dict] - rule-dict keys: selector (CSS), xpath (optional), attr (optional), all(bool), join_with(str) - - use_playwright: if True, use Playwright to render JS (must be installed), otherwise normal fetch - - playwright_js_wait: seconds to wait after load for JS to mutate DOM - - join_all_matches: convenience: if True, treat each rule as all=True - """ - if isinstance(urls, str): - urls = [urls] - - # normalize rules - normalized_rules = {} - for field, rule in extraction_rules.items(): - r = self._normalize_rule(rule) - if join_all_matches: - r["all"] = True - normalized_rules[field] = r - - # concurrency control + gather tasks - async def _task(url: str): - async with self._sem: - # fetch (rendered or not) - if use_playwright: - html = await self._render_with_playwright(url, js_wait=playwright_js_wait, timeout=self.timeout) - else: - html = await self._fetch_httpx(url) - - # Extract and concatenate results into a single string - pieces = [] - for field, rule in normalized_rules.items(): - text = self._extract_with_bs4(html, rule) - if text: - pieces.append(text) - concatenated = " ".join(pieces).strip() - return url, concatenated - - tasks = [asyncio.create_task(_task(u)) for u in urls] - results = {} - for coro in asyncio.as_completed(tasks): - try: - url, text = await coro - except Exception as e: - # store empty string on failure (or raise depending on your policy) - results[url] = "" - # Optionally you could log the error; for now we'll attach empty string - continue - results[url] = text - return results diff --git a/cognee/tasks/web_scraper/bs4_crawler.py b/cognee/tasks/web_scraper/bs4_crawler.py index df926bdf9..514567317 100644 --- a/cognee/tasks/web_scraper/bs4_crawler.py +++ b/cognee/tasks/web_scraper/bs4_crawler.py @@ -91,6 +91,8 @@ class BeautifulSoupCrawler: parsed_url = urlparse(url) robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" content = await self._fetch_httpx(robots_url) + if content.strip() == "": + return True # no robots.txt means allowed rp = Protego.parse(content) agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*") return rp.can_fetch(agent, url) or rp.can_fetch("*", url) @@ -114,23 +116,42 @@ class BeautifulSoupCrawler: await self._ensure_client() assert self._client is not None, "HTTP client not initialized" attempt = 0 + parsed = urlparse(url) + domain_root = f"{parsed.scheme}://{parsed.netloc}" + + # Handle robots.txt separately (no recursive crawl delay call) + is_robot = url.lower().endswith("/robots.txt") + while True: try: - # get crawl delay from robots.txt if available - crawl_delay = await self._get_crawl_delay( - f"{urlparse(url).scheme}://{urlparse(url).netloc}" - ) + # Only get crawl delay for non-robots.txt pages + crawl_delay = self.crawl_delay + if not is_robot: + try: + crawl_delay = await self._get_crawl_delay(domain_root) + except Exception as e: + logger.debug(f"Failed to fetch crawl delay for {domain_root}: {e}") + await self._respect_rate_limit(url, crawl_delay) resp = await self._client.get(url) resp.raise_for_status() return resp.text + except Exception as exc: + # Special case: if robots.txt failed, just return empty string + if is_robot: + logger.warning(f"Robots.txt not found or inaccessible at {url}: {exc}") + return "" + attempt += 1 if attempt > self.max_retries: - logger.error(f"Fetch failed for {url}: {exc}") + logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}") raise + delay = self.retry_delay_factor * (2 ** (attempt - 1)) - logger.warning(f"Retrying {url} after {delay:.2f}s (attempt {attempt})") + logger.warning( + f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}" + ) await asyncio.sleep(delay) async def _render_with_playwright( diff --git a/cognee/tasks/web_scraper/config.py b/cognee/tasks/web_scraper/config.py new file mode 100644 index 000000000..505cef1df --- /dev/null +++ b/cognee/tasks/web_scraper/config.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel, Field +from typing import Any, Dict, Optional +import os + + +class TavilyConfig(BaseModel): + api_key: str = os.getenv("TAVILY_API_KEY") + extract_depth: str = "basic" + format: str = "markdown" + timeout: int = Field(None, ge=1, le=60) + + +class SoupCrawlerConfig(BaseModel): + concurrency: int = (5,) + crawl_delay: float = (0.5,) + timeout: float = (15.0,) + max_retries: int = (2,) + retry_delay_factor: float = (0.5,) + headers: Optional[Dict[str, str]] = (None,) + extraction_rules: Dict[str, Any] + use_playwright: bool = False + playwright_js_wait: float = 0.8 + join_all_matches: bool = False + structured: bool = False diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index 5ff105ec9..18d54d13a 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -1,9 +1,10 @@ from tavily import AsyncTavilyClient -from bs4 import BeautifulSoup +from .bs4_crawler import BeautifulSoupCrawler import os -import httpx -from typing import Dict, Any, List, Union +from .config import TavilyConfig, SoupCrawlerConfig +from typing import Dict, Any, List, Union, Optional, Literal from cognee.shared.logging_utils import get_logger +import asyncio logger = get_logger(__name__) @@ -22,37 +23,79 @@ except ImportError: ) -async def fetch_page_content(urls: Union[str, List[str]], extraction_rules: Dict[str, Any]) -> str: - if os.getenv("TAVILY_API_KEY") is not None: +async def fetch_page_content( + urls: Union[str, List[str]], + *, + preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup", + extraction_rules: Optional[Dict[str, Any]] = None, + tavily_config: Optional[TavilyConfig] = None, + soup_crawler_config: Optional[SoupCrawlerConfig] = None, + use_playwright: Optional[bool] = False, + playwright_js_wait: Optional[float] = 0.8, + join_all_matches: Optional[bool] = False, + structured: Optional[bool] = False, +) -> Dict[str, Union[str, Dict[str, str]]]: + """ + Fetch page content using Tavily API if TAVILY_API_KEY is set, + otherwise fetch using BeautifulSoupCrawler directly. + + Parameters: + urls: single URL or list of URLs + extraction_rules: dict mapping field names -> CSS selector or rule + use_playwright: whether to render JS (BeautifulSoupCrawler) + playwright_js_wait: seconds to wait for JS to load + join_all_matches: join all matching elements per rule + structured: if True, returns structured dict instead of concatenated string (based on extraction_rules field names) + + Returns: + Dict mapping URL -> extracted string or structured dict + """ + if (os.getenv("TAVILY_API_KEY") or tavily_config.api_key) and preferred_tool == "tavily": return await fetch_with_tavily(urls) + else: - return await fetch_with_bs4(urls, extraction_rules) + crawler = BeautifulSoupCrawler() + extraction_rules = extraction_rules or soup_crawler_config.extraction_rules + if extraction_rules is None: + raise ValueError("extraction_rules must be provided when not using Tavily") + try: + results = await crawler.fetch_with_bs4( + urls, + extraction_rules, + use_playwright=use_playwright, + playwright_js_wait=playwright_js_wait, + join_all_matches=join_all_matches, + structured=structured, + ) + return results + except Exception as e: + logger.error(f"Error fetching page content: {str(e)}") async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]: client = AsyncTavilyClient() - results = await client.extract(urls, include_images=False) - result_dict = {} - for result in results["results"]: - result_dict[result["url"]] = result["raw_content"] - return result_dict + results = await client.extract(urls) + for failed_result in results.get("failed_results", []): + logger.warning(f"Failed to fetch {failed_result}") + return_results = {} + for results in results.get("results", []): + return_results[results["url"]] = results["raw_content"] + return return_results -async def fetch_with_bs4(urls: Union[str, List[str]], extraction_rules: Dict) -> Dict[str]: - result_dict = {} - if isinstance(urls, str): - urls = [urls] - async with httpx.AsyncClient(headers={"User-Agent": "Cognee-Scraper"}) as client: - for url in urls: - response = await client.get(url) - response.raise_for_status() - - soup = BeautifulSoup(response.text, "html.parser") - extracted_data = "" - for field, selector in extraction_rules.items(): - element = soup.select_one(selector) - extracted_data += (element.get_text(strip=True) + "\n") if element else "" - - result_dict[url] = extracted_data.strip() - - return result_dict +def check_valid_arguments_for_web_scraper( + extraction_rules, preferred_tool, tavily_config, soup_crawler_config +): + if preferred_tool == "tavily": + if not (os.getenv("TAVILY_API_KEY") or (tavily_config and tavily_config.api_key)): + raise ValueError( + "TAVILY_API_KEY must be set in environment variables or tavily_config.api_key must be provided when preferred_tool is 'tavily'" + ) + else: + print(preferred_tool) + print(soup_crawler_config) + print(soup_crawler_config and soup_crawler_config.extraction_rules) + if not (extraction_rules or (soup_crawler_config and soup_crawler_config.extraction_rules)): + raise ValueError( + "extraction_rules must be provided when preferred_tool is 'beautifulsoup'" + ) diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py new file mode 100644 index 000000000..c06b54b8e --- /dev/null +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -0,0 +1,63 @@ +from cognee.tasks.storage.add_data_points import add_data_points +from cognee.tasks.storage.index_data_points import index_data_points +from cognee.tasks.storage.index_graph_edges import index_graph_edges +from cognee.infrastructure.databases.graph import get_graph_engine +from .models import WebPage, WebSite, ScrapingJob +from typing import Union, List, Dict +from urllib.parse import urlparse + + +async def web_scraper_task(url: Union[str, List[str]], **kwargs): + graph_engine = await get_graph_engine() + # Mapping between parsed_url object and urls + mappings = {} + web_scraping_job = ScrapingJob( + job_name="default_job", + urls=[url] if isinstance(url, str) else url, + scraping_rules={}, + schedule=None, + status="active", + last_run=None, + next_run=None, + ) + data_point_mappings: Dict[WebSite, List[WebPage]] = {} + if isinstance(url, List): + for single_url in url: + parsed_url = urlparse(single_url) + domain = parsed_url.netloc + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + if mappings.get(parsed_url): + mappings[parsed_url] = [single_url] + else: + mappings[parsed_url].append(single_url) + else: + if mappings.get(parsed_url): + mappings[parsed_url] = [single_url] + else: + mappings[parsed_url].append(single_url) + for parsed_url in mappings.keys(): + domain = parsed_url.netloc + base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + web_site = WebSite( + domain=domain, + base_url=base_url, + robots_txt="", + crawl_delay=0, + last_crawled=None, + page_count=0, + scraping_config={}, + ) + for url in mappings[parsed_url]: + # Process each URL with the web scraping logic + web_page = WebPage( + url=url, + title="", + content="", + content_hash="", + scraped_at=None, + last_modified=None, + status_code=0, + content_type="", + page_size=0, + extraction_rules={}, + )