Setup models.py and utils.py
This commit is contained in:
parent
70a2cc9d65
commit
925bd38195
2 changed files with 46 additions and 2 deletions
|
|
@ -2,8 +2,10 @@ from cognee.infrastructure.engine.models import DataPoint
|
|||
from typing import Optional, Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class WebPage(DataPoint):
|
||||
"""Represents a scraped web page with metadata"""
|
||||
|
||||
url: str
|
||||
title: Optional[str]
|
||||
content: str
|
||||
|
|
@ -16,8 +18,10 @@ class WebPage(DataPoint):
|
|||
extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used
|
||||
metadata: dict = {"index_fields": ["url", "title", "scraped_at"]}
|
||||
|
||||
|
||||
class WebSite(DataPoint):
|
||||
"""Represents a website or domain being scraped"""
|
||||
|
||||
domain: str
|
||||
base_url: str
|
||||
robots_txt: Optional[str]
|
||||
|
|
@ -27,13 +31,14 @@ class WebSite(DataPoint):
|
|||
scraping_config: Dict[str, Any]
|
||||
metadata: dict = {"index_fields": ["domain", "base_url"]}
|
||||
|
||||
|
||||
class ScrapingJob(DataPoint):
|
||||
"""Represents a scraping job configuration"""
|
||||
|
||||
job_name: str
|
||||
urls: List[str]
|
||||
scraping_rules: Dict[str, Any]
|
||||
schedule: Optional[str] # Cron-like schedule for recurring scrapes
|
||||
status: str # "active", "paused", "completed", "failed"
|
||||
last_run: Optional[datetime]
|
||||
next_run: Optional[datetime]
|
||||
metadata: dict = {"index_fields": ["job_name", "status"]}
|
||||
metadata: dict = {"index_fields": ["job_name", "status"]}
|
||||
|
|
|
|||
39
cognee/tasks/web_scraper/utils.py
Normal file
39
cognee/tasks/web_scraper/utils.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
from tavily import AsyncTavilyClient
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
import requests
|
||||
from typing import Dict, Any, List, Union
|
||||
|
||||
|
||||
async def fetch_page_content(urls: Union[str, List[str]], extraction_rules: Dict[str, Any]) -> str:
|
||||
if os.getenv("TAVILY_API_KEY") is not None:
|
||||
return await fetch_with_tavily(urls)
|
||||
else:
|
||||
return await fetch_with_bs4(urls, extraction_rules)
|
||||
|
||||
|
||||
async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]:
|
||||
client = AsyncTavilyClient()
|
||||
results = await client.extract(urls, include_images=False)
|
||||
result_dict = {}
|
||||
for result in results["results"]:
|
||||
result_dict[result["url"]] = result["raw_content"]
|
||||
return result_dict
|
||||
|
||||
|
||||
async def fetch_with_bs4(urls: Union[str,List[str]], extraction_rules: Dict) -> Dict[str]:
|
||||
result_dict = {}
|
||||
if isinstance(urls,str):
|
||||
urls = [urls]
|
||||
for url in urls:
|
||||
response = requests.get(url, headers={"User-Agent": "Cognee-Scraper"})
|
||||
response.raise_for_status()
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
extracted_data = ""
|
||||
|
||||
for field, selector in extraction_rules.items():
|
||||
element = soup.select_one(selector)
|
||||
extracted_data += element.get_text(strip=True) if element else ""
|
||||
|
||||
return result_dict
|
||||
Loading…
Add table
Reference in a new issue