Done with scraping_task successfully

This commit is contained in:
Geoff-Robin 2025-10-06 02:27:20 +05:30
parent f148b1df89
commit f449fce0f1
3 changed files with 264 additions and 63 deletions

View file

@ -1,4 +1,4 @@
from cognee.infrastructure.engine.models import DataPoint from cognee.infrastructure.engine import DataPoint
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from datetime import datetime from datetime import datetime
@ -7,7 +7,7 @@ class WebPage(DataPoint):
"""Represents a scraped web page with metadata""" """Represents a scraped web page with metadata"""
url: str url: str
title: Optional[str] name: Optional[str]
content: str content: str
content_hash: str content_hash: str
scraped_at: datetime scraped_at: datetime
@ -16,29 +16,32 @@ class WebPage(DataPoint):
content_type: str content_type: str
page_size: int page_size: int
extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used
metadata: dict = {"index_fields": ["url", "title", "scraped_at"]} description: str
metadata: dict = {"index_fields": ["url", "title", "scraped_at", "description"]}
class WebSite(DataPoint): class WebSite(DataPoint):
"""Represents a website or domain being scraped""" """Represents a website or domain being scraped"""
domain: str name: str
base_url: str base_url: str
robots_txt: Optional[str] robots_txt: Optional[str]
crawl_delay: float crawl_delay: float
last_crawled: datetime last_crawled: datetime
page_count: int page_count: int
scraping_config: Dict[str, Any] scraping_config: Dict[str, Any]
metadata: dict = {"index_fields": ["domain", "base_url"]} description: str
metadata: dict = {"index_fields": ["domain", "base_url","description"]}
class ScrapingJob(DataPoint): class ScrapingJob(DataPoint):
"""Represents a scraping job configuration""" """Represents a scraping job configuration"""
job_name: str name: str
urls: List[str] urls: List[str]
schedule: Optional[str] # Cron-like schedule for recurring scrapes schedule: Optional[str] # Cron-like schedule for recurring scrapes
status: str # "active", "paused", "completed", "failed" status: str # "active", "paused", "completed", "failed"
last_run: Optional[datetime] last_run: Optional[datetime]
next_run: Optional[datetime] next_run: Optional[datetime]
metadata: dict = {"index_fields": ["job_name", "status"]} description: str
metadata: dict = {"index_fields": ["job_name", "status","description"]}

View file

@ -1,63 +1,262 @@
from cognee.tasks.storage.add_data_points import add_data_points import os
import hashlib
from datetime import datetime
from typing import Union, List
from urllib.parse import urlparse
from uuid import uuid5, NAMESPACE_OID
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.shared.logging_utils import get_logger
from cognee.tasks.storage.index_data_points import index_data_points from cognee.tasks.storage.index_data_points import index_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.modules.engine.operations.setup import setup
from .models import WebPage, WebSite, ScrapingJob from .models import WebPage, WebSite, ScrapingJob
from typing import Union, List, Dict from .config import SoupCrawlerConfig, TavilyConfig
from urllib.parse import urlparse from .utils import fetch_page_content
logger = get_logger(__name__)
async def web_scraper_task(url: Union[str, List[str]], **kwargs): async def web_scraper_task(
graph_engine = await get_graph_engine() url: Union[str, List[str]],
# Mapping between parsed_url object and urls *,
mappings = {} schedule: str = None,
web_scraping_job = ScrapingJob( extraction_rules: dict = None,
job_name="default_job", tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
urls=[url] if isinstance(url, str) else url, soup_crawler_config: SoupCrawlerConfig = None,
scraping_rules={}, tavily_config: TavilyConfig = None,
schedule=None, job_name: str = None,
status="active", ):
last_run=None, """
next_run=None, Scrapes one or more URLs and returns WebPage, WebSite, and ScrapingJob data points.
Unique IDs are assigned to each WebPage, WebSite, and ScrapingJob.
Includes a description field summarizing other fields for each data point.
"""
await setup()
graph_db = await get_graph_engine()
if isinstance(url, str):
url = [url]
soup_crawler_config, tavily_config, preferred_tool = check_arguments(
tavily_api_key, extraction_rules, tavily_config, soup_crawler_config
) )
data_point_mappings: Dict[WebSite, List[WebPage]] = {} now = datetime.now()
if isinstance(url, List): job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
for single_url in url: status = "active"
parsed_url = urlparse(single_url) trigger = CronTrigger.from_crontab(schedule) if schedule else None
domain = parsed_url.netloc next_run = trigger.get_next_fire_time(None, now) if trigger else None
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" scraping_job_created = await graph_db.get_node(uuid5(NAMESPACE_OID, name=job_name))
if mappings.get(parsed_url):
mappings[parsed_url] = [single_url] # Create description for ScrapingJob
else: scraping_job_description = (
mappings[parsed_url].append(single_url) f"Scraping job: {job_name}\n"
else: f"URLs: {', '.join(url)}\n"
if mappings.get(parsed_url): f"Status: {status}\n"
mappings[parsed_url] = [single_url] f"Schedule: {schedule}\n"
else: f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
mappings[parsed_url].append(single_url) f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
for parsed_url in mappings.keys(): )
scraping_job = ScrapingJob(
id=uuid5(NAMESPACE_OID, name=job_name),
name=job_name,
urls=url,
status=status,
schedule=schedule,
last_run=now,
next_run=next_run,
description=scraping_job_description,
)
if scraping_job_created:
await graph_db.add_node(scraping_job) # Update existing scraping job
websites_dict = {}
webpages = []
# Fetch content
results = await fetch_page_content(
urls=url,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
for page_url, content in results.items():
parsed_url = urlparse(page_url)
domain = parsed_url.netloc domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
web_site = WebSite(
domain=domain, # Create or update WebSite
base_url=base_url, if base_url not in websites_dict:
robots_txt="", # Create description for WebSite
crawl_delay=0, website_description = (
last_crawled=None, f"Website: {domain}\n"
page_count=0, f"Base URL: {base_url}\n"
scraping_config={}, f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
) f"Page count: 1\n"
for url in mappings[parsed_url]: f"Scraping tool: {preferred_tool}\n"
# Process each URL with the web scraping logic f"Robots.txt: {'Available' if websites_dict.get(base_url, {}).get('robots_txt') else 'Not set'}\n"
web_page = WebPage( f"Crawl delay: 0.5 seconds"
url=url,
title="",
content="",
content_hash="",
scraped_at=None,
last_modified=None,
status_code=0,
content_type="",
page_size=0,
extraction_rules={},
) )
websites_dict[base_url] = WebSite(
id=uuid5(NAMESPACE_OID, name=domain),
name=domain,
base_url=base_url,
robots_txt=None,
crawl_delay=0.5,
last_crawled=now,
page_count=1,
scraping_config={
"extraction_rules": extraction_rules or {},
"tool": preferred_tool,
},
description=website_description,
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
else:
websites_dict[base_url].page_count += 1
# Update description for existing WebSite
websites_dict[base_url].description = (
f"Website: {domain}\n"
f"Base URL: {base_url}\n"
f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Page count: {websites_dict[base_url].page_count}\n"
f"Scraping tool: {preferred_tool}\n"
f"Robots.txt: {'Available' if websites_dict[base_url].robots_txt else 'Not set'}\n"
f"Crawl delay: {websites_dict[base_url].crawl_delay} seconds"
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
# Create WebPage
content_str = content if isinstance(content, str) else str(content)
content_hash = hashlib.sha256(content_str.encode("utf-8")).hexdigest()
# Create description for WebPage
webpage_description = (
f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n"
f"URL: {page_url}\n"
f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Content hash: {content_hash[:16]}...\n"
f"Content type: text/html\n"
f"Page size: {len(content_str)} bytes\n"
f"Status code: 200"
)
webpage = WebPage(
id=uuid5(NAMESPACE_OID, name=parsed_url.path.lstrip("/")),
url=page_url,
name=get_path_after_base(base_url, page_url),
content=content_str,
content_hash=content_hash,
scraped_at=now,
last_modified=None,
status_code=200,
content_type="text/html",
page_size=len(content_str),
extraction_rules=extraction_rules or {},
description=webpage_description,
)
webpages.append(webpage)
scraping_job.status = "completed" if webpages else "failed"
# Update ScrapingJob description with final status
scraping_job.description = (
f"Scraping job: {job_name}\n"
f"URLs: {', '.join(url)}\n"
f"Status: {scraping_job.status}\n"
f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
)
websites = list(websites_dict.values())
# Adding Nodes and Edges
node_mapping = {scraping_job.id: scraping_job}
edge_mapping = []
for website in websites:
node_mapping[website.id] = website
edge_mapping.append(
(
scraping_job.id,
website.id,
"is_scraping",
{
"source_node_id": scraping_job.id,
"target_node_id": website.id,
"relationship_name": "is_scraping",
},
)
)
for webpage in webpages:
node_mapping[webpage.id] = webpage
parsed_url = urlparse(webpage.url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
edge_mapping.append(
(
webpage.id, # Corrected: WebPage is the source, WebSite is the target
websites_dict[base_url].id,
"is_part_of",
{
"source_node_id": webpage.id,
"target_node_id": websites_dict[base_url].id,
"relationship_name": "is_part_of",
},
)
)
await graph_db.add_nodes(list(node_mapping.values()))
await graph_db.add_edges(edge_mapping)
await index_data_points(list(node_mapping.values()))
await index_graph_edges()
return await graph_db.get_graph_data()
def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config):
"""
Checking if the right argument are given, if not TypeError will be raised.
"""
preferred_tool = "beautifulsoup"
if extraction_rules and not soup_crawler_config:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if tavily_api_key:
if not tavily_config:
tavily_config = TavilyConfig(api_key=tavily_api_key)
else:
tavily_config.api_key = tavily_api_key
if not extraction_rules and not soup_crawler_config:
preferred_tool = "tavily"
if not tavily_config and not soup_crawler_config:
raise TypeError("Make sure you pass arguments for web_scraper_task")
return soup_crawler_config, tavily_config, preferred_tool
def get_path_after_base(base_url, url):
parsed_base = urlparse(base_url)
parsed_url = urlparse(url)
# Ensure they have the same netloc (domain)
if parsed_base.netloc != parsed_url.netloc:
raise ValueError("Base URL and target URL are from different domains")
# Return everything after base_url path
base_path = parsed_base.path.rstrip("/")
full_path = parsed_url.path
if full_path.startswith(base_path):
return full_path[len(base_path) :].lstrip("/")
else:
return full_path.lstrip("/")

View file

@ -4,9 +4,8 @@ from cognee.tasks.web_scraper.config import SoupCrawlerConfig
async def test_web_scraping_using_bs4(): async def test_web_scraping_using_bs4():
# 0. Prune only data (not full system prune)
await cognee.prune.prune_data() await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
# 1. Setup test URL and extraction rules # 1. Setup test URL and extraction rules
url = "https://quotes.toscrape.com/" url = "https://quotes.toscrape.com/"
rules = { rules = {