diff --git a/cognee/tasks/web_scraper/models.py b/cognee/tasks/web_scraper/models.py index 8d5f32f43..12ac91166 100644 --- a/cognee/tasks/web_scraper/models.py +++ b/cognee/tasks/web_scraper/models.py @@ -2,8 +2,10 @@ from cognee.infrastructure.engine.models import DataPoint from typing import Optional, Dict, Any, List from datetime import datetime + class WebPage(DataPoint): """Represents a scraped web page with metadata""" + url: str title: Optional[str] content: str @@ -16,8 +18,10 @@ class WebPage(DataPoint): extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used metadata: dict = {"index_fields": ["url", "title", "scraped_at"]} + class WebSite(DataPoint): """Represents a website or domain being scraped""" + domain: str base_url: str robots_txt: Optional[str] @@ -27,13 +31,14 @@ class WebSite(DataPoint): scraping_config: Dict[str, Any] metadata: dict = {"index_fields": ["domain", "base_url"]} + class ScrapingJob(DataPoint): """Represents a scraping job configuration""" + job_name: str urls: List[str] - scraping_rules: Dict[str, Any] schedule: Optional[str] # Cron-like schedule for recurring scrapes status: str # "active", "paused", "completed", "failed" last_run: Optional[datetime] next_run: Optional[datetime] - metadata: dict = {"index_fields": ["job_name", "status"]} \ No newline at end of file + metadata: dict = {"index_fields": ["job_name", "status"]} diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py new file mode 100644 index 000000000..bcf12ac32 --- /dev/null +++ b/cognee/tasks/web_scraper/utils.py @@ -0,0 +1,39 @@ +from tavily import AsyncTavilyClient +from bs4 import BeautifulSoup +import os +import requests +from typing import Dict, Any, List, Union + + +async def fetch_page_content(urls: Union[str, List[str]], extraction_rules: Dict[str, Any]) -> str: + if os.getenv("TAVILY_API_KEY") is not None: + return await fetch_with_tavily(urls) + else: + return await fetch_with_bs4(urls, extraction_rules) + + +async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]: + client = AsyncTavilyClient() + results = await client.extract(urls, include_images=False) + result_dict = {} + for result in results["results"]: + result_dict[result["url"]] = result["raw_content"] + return result_dict + + +async def fetch_with_bs4(urls: Union[str,List[str]], extraction_rules: Dict) -> Dict[str]: + result_dict = {} + if isinstance(urls,str): + urls = [urls] + for url in urls: + response = requests.get(url, headers={"User-Agent": "Cognee-Scraper"}) + response.raise_for_status() + + soup = BeautifulSoup(response.text, "html.parser") + extracted_data = "" + + for field, selector in extraction_rules.items(): + element = soup.select_one(selector) + extracted_data += element.get_text(strip=True) if element else "" + + return result_dict