diff --git a/cognee/tasks/web_scraper/__init__.py b/cognee/tasks/web_scraper/__init__.py index a37e17871..8a142b515 100644 --- a/cognee/tasks/web_scraper/__init__.py +++ b/cognee/tasks/web_scraper/__init__.py @@ -1,7 +1,11 @@ from .bs4_crawler import BeautifulSoupCrawler from .utils import fetch_page_content +from .web_scraper_task import cron_web_scraper_task, web_scraper_task + __all__ = [ "BeautifulSoupCrawler", "fetch_page_content", + "cron_web_scraper_task", + "web_scraper_task", ] diff --git a/cognee/tasks/web_scraper/models.py b/cognee/tasks/web_scraper/models.py index 538e11be8..18bb2e3ef 100644 --- a/cognee/tasks/web_scraper/models.py +++ b/cognee/tasks/web_scraper/models.py @@ -31,7 +31,7 @@ class WebSite(DataPoint): page_count: int scraping_config: Dict[str, Any] description: str - metadata: dict = {"index_fields": ["domain", "base_url","description"]} + metadata: dict = {"index_fields": ["domain", "base_url", "description"]} class ScrapingJob(DataPoint): @@ -44,4 +44,4 @@ class ScrapingJob(DataPoint): last_run: Optional[datetime] next_run: Optional[datetime] description: str - metadata: dict = {"index_fields": ["job_name", "status","description"]} + metadata: dict = {"index_fields": ["job_name", "status", "description"]} diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py index e9f197045..9fb43c2cb 100644 --- a/cognee/tasks/web_scraper/web_scraper_task.py +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -4,8 +4,7 @@ from datetime import datetime from typing import Union, List from urllib.parse import urlparse from uuid import uuid5, NAMESPACE_OID - -from apscheduler.schedulers.asyncio import AsyncIOScheduler +import asyncio from apscheduler.triggers.cron import CronTrigger from cognee.infrastructure.databases.graph import get_graph_engine @@ -18,9 +17,68 @@ from .models import WebPage, WebSite, ScrapingJob from .config import SoupCrawlerConfig, TavilyConfig from .utils import fetch_page_content +try: + from apscheduler.schedulers.background import BackgroundScheduler + + scheduler = BackgroundScheduler() +except ImportError: + raise ImportError("Please install apscheduler by pip install APScheduler >=3.10") + logger = get_logger(__name__) +async def cron_web_scraper_task( + url: Union[str, List[str]], + *, + schedule: str = None, + extraction_rules: dict = None, + tavily_api_key: str = os.getenv("TAVILY_API_KEY"), + soup_crawler_config: SoupCrawlerConfig = None, + tavily_config: TavilyConfig = None, + job_name: str = "scraping", +): + now = datetime.now() + job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}" + if schedule: + try: + trigger = CronTrigger.from_crontab(schedule) + except ValueError as e: + raise ValueError(f"Invalid cron string '{schedule}': {e}") + + scheduler.add_job( + lambda: asyncio.run( + web_scraper_task( + url=url, + schedule=schedule, + extraction_rules=extraction_rules, + tavily_api_key=tavily_api_key, + soup_crawler_config=soup_crawler_config, + tavily_config=tavily_config, + job_name=job_name, + ) + ), + trigger=trigger, + id=uuid5(NAMESPACE_OID, name=job_name), + name=f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}", + replace_existing=True, + ) + + scheduler.start() + return + + # If no schedule, run immediately + print(f"[{datetime.now()}] Running web scraper task immediately...") + return await web_scraper_task( + url=url, + schedule=schedule, + extraction_rules=extraction_rules, + tavily_api_key=tavily_api_key, + soup_crawler_config=soup_crawler_config, + tavily_config=tavily_config, + job_name=job_name, + ) + + async def web_scraper_task( url: Union[str, List[str]], *,