Done adding cron job web scraping
This commit is contained in:
parent
e5633bc368
commit
0f64f6804d
3 changed files with 66 additions and 4 deletions
|
|
@ -1,7 +1,11 @@
|
|||
from .bs4_crawler import BeautifulSoupCrawler
|
||||
from .utils import fetch_page_content
|
||||
from .web_scraper_task import cron_web_scraper_task, web_scraper_task
|
||||
|
||||
|
||||
__all__ = [
|
||||
"BeautifulSoupCrawler",
|
||||
"fetch_page_content",
|
||||
"cron_web_scraper_task",
|
||||
"web_scraper_task",
|
||||
]
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class WebSite(DataPoint):
|
|||
page_count: int
|
||||
scraping_config: Dict[str, Any]
|
||||
description: str
|
||||
metadata: dict = {"index_fields": ["domain", "base_url","description"]}
|
||||
metadata: dict = {"index_fields": ["domain", "base_url", "description"]}
|
||||
|
||||
|
||||
class ScrapingJob(DataPoint):
|
||||
|
|
@ -44,4 +44,4 @@ class ScrapingJob(DataPoint):
|
|||
last_run: Optional[datetime]
|
||||
next_run: Optional[datetime]
|
||||
description: str
|
||||
metadata: dict = {"index_fields": ["job_name", "status","description"]}
|
||||
metadata: dict = {"index_fields": ["job_name", "status", "description"]}
|
||||
|
|
|
|||
|
|
@ -4,8 +4,7 @@ from datetime import datetime
|
|||
from typing import Union, List
|
||||
from urllib.parse import urlparse
|
||||
from uuid import uuid5, NAMESPACE_OID
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
import asyncio
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
|
|
@ -18,9 +17,68 @@ from .models import WebPage, WebSite, ScrapingJob
|
|||
from .config import SoupCrawlerConfig, TavilyConfig
|
||||
from .utils import fetch_page_content
|
||||
|
||||
try:
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
scheduler = BackgroundScheduler()
|
||||
except ImportError:
|
||||
raise ImportError("Please install apscheduler by pip install APScheduler >=3.10")
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def cron_web_scraper_task(
|
||||
url: Union[str, List[str]],
|
||||
*,
|
||||
schedule: str = None,
|
||||
extraction_rules: dict = None,
|
||||
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
|
||||
soup_crawler_config: SoupCrawlerConfig = None,
|
||||
tavily_config: TavilyConfig = None,
|
||||
job_name: str = "scraping",
|
||||
):
|
||||
now = datetime.now()
|
||||
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
|
||||
if schedule:
|
||||
try:
|
||||
trigger = CronTrigger.from_crontab(schedule)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid cron string '{schedule}': {e}")
|
||||
|
||||
scheduler.add_job(
|
||||
lambda: asyncio.run(
|
||||
web_scraper_task(
|
||||
url=url,
|
||||
schedule=schedule,
|
||||
extraction_rules=extraction_rules,
|
||||
tavily_api_key=tavily_api_key,
|
||||
soup_crawler_config=soup_crawler_config,
|
||||
tavily_config=tavily_config,
|
||||
job_name=job_name,
|
||||
)
|
||||
),
|
||||
trigger=trigger,
|
||||
id=uuid5(NAMESPACE_OID, name=job_name),
|
||||
name=f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}",
|
||||
replace_existing=True,
|
||||
)
|
||||
|
||||
scheduler.start()
|
||||
return
|
||||
|
||||
# If no schedule, run immediately
|
||||
print(f"[{datetime.now()}] Running web scraper task immediately...")
|
||||
return await web_scraper_task(
|
||||
url=url,
|
||||
schedule=schedule,
|
||||
extraction_rules=extraction_rules,
|
||||
tavily_api_key=tavily_api_key,
|
||||
soup_crawler_config=soup_crawler_config,
|
||||
tavily_config=tavily_config,
|
||||
job_name=job_name,
|
||||
)
|
||||
|
||||
|
||||
async def web_scraper_task(
|
||||
url: Union[str, List[str]],
|
||||
*,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue