diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index c74ee69d8..65ebb8748 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -140,7 +140,7 @@ async def add( # Add a single file await cognee.add("/home/user/documents/analysis.pdf") - + # Add a single url and bs4 extract ingestion method extraction_rules = { "title": "h1", @@ -148,11 +148,11 @@ async def add( "more_info": "a[href*='more-info']" } await cognee.add("https://example.com",extraction_rules=extraction_rules) - + # Add a single url and tavily extract ingestion method Make sure to TAVILY_API_KEY = YOUR_TAVILY_API_KEY as a environment variable await cognee.add("https://example.com") - + # Add multiple urls await cognee.add(["https://example.com","https://books.toscrape.com"]) ``` @@ -186,13 +186,20 @@ async def add( await setup() if not soup_crawler_config and extraction_rules: soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules) - if not tavily_config and os.getenv("TAVILY_API_KEY"): - tavily_config = TavilyConfig() + tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY")) soup_crawler.set(soup_crawler_config) tavily.set(tavily_config) - if urlparse(data).scheme in ["http", "https"]: + + http_schemes = {"http", "https"} + + def _is_http_url(item: Union[str, BinaryIO]) -> bool: + return isinstance(item, str) and urlparse(item).scheme in http_schemes + + if _is_http_url(data): + node_set = ["web_content"] if not node_set else node_set + ["web_content"] + elif isinstance(data, list) and any(_is_http_url(item) for item in data): node_set = ["web_content"] if not node_set else node_set + ["web_content"] user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user) diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index d55b697c6..c947d35c5 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -63,10 +63,19 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwar tavily = tavily_config.get() soup_crawler = soup_crawler_config.get() + preferred_tool = "beautifulsoup" if soup_crawler else "tavily" + if preferred_tool == "tavily" and tavily is None: + raise IngestionError( + message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig." + ) + if preferred_tool == "beautifulsoup" and soup_crawler is None: + raise IngestionError( + message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper." + ) data = await fetch_page_content( data_item, - preferred_tool="beautifulsoup" if soup_crawler else "tavily", + preferred_tool=preferred_tool, tavily_config=tavily, soup_crawler_config=soup_crawler, ) @@ -74,6 +83,8 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwar for key, value in data.items(): content += f"{key}:\n{value}\n\n" return await save_data_to_file(content) + except IngestionError: + raise except Exception as e: raise IngestionError( message=f"Error ingesting webpage results of url {data_item}: {str(e)}" diff --git a/cognee/tasks/web_scraper/config.py b/cognee/tasks/web_scraper/config.py index 4819800ab..e81db9b05 100644 --- a/cognee/tasks/web_scraper/config.py +++ b/cognee/tasks/web_scraper/config.py @@ -1,22 +1,21 @@ from pydantic import BaseModel, Field -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Literal import os class TavilyConfig(BaseModel): api_key: str = os.getenv("TAVILY_API_KEY") extract_depth: str = "basic" - format: str = "markdown" timeout: int = Field(None, ge=1, le=60) class SoupCrawlerConfig(BaseModel): - concurrency: int = (5,) - crawl_delay: float = (0.5,) - timeout: float = (15.0,) - max_retries: int = (2,) - retry_delay_factor: float = (0.5,) - headers: Optional[Dict[str, str]] = (None,) + concurrency: int = 5 + crawl_delay: float = 0.5 + timeout: float = 15.0 + max_retries: int = 2 + retry_delay_factor: float = 0.5 + headers: Optional[Dict[str, str]] = None extraction_rules: Dict[str, Any] use_playwright: bool = False playwright_js_wait: float = 0.8 diff --git a/cognee/tasks/web_scraper/models.py b/cognee/tasks/web_scraper/models.py index 18bb2e3ef..297aebb4f 100644 --- a/cognee/tasks/web_scraper/models.py +++ b/cognee/tasks/web_scraper/models.py @@ -6,7 +6,6 @@ from datetime import datetime class WebPage(DataPoint): """Represents a scraped web page with metadata""" - url: str name: Optional[str] content: str content_hash: str @@ -17,7 +16,7 @@ class WebPage(DataPoint): page_size: int extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used description: str - metadata: dict = {"index_fields": ["url", "title", "scraped_at", "description"]} + metadata: dict = {"index_fields": ["name", "description"]} class WebSite(DataPoint): @@ -31,7 +30,7 @@ class WebSite(DataPoint): page_count: int scraping_config: Dict[str, Any] description: str - metadata: dict = {"index_fields": ["domain", "base_url", "description"]} + metadata: dict = {"index_fields": ["name", "description"]} class ScrapingJob(DataPoint): @@ -44,4 +43,4 @@ class ScrapingJob(DataPoint): last_run: Optional[datetime] next_run: Optional[datetime] description: str - metadata: dict = {"index_fields": ["job_name", "status", "description"]} + metadata: dict = {"index_fields": ["name", "description"]} diff --git a/cognee/tasks/web_scraper/utils.py b/cognee/tasks/web_scraper/utils.py index a14dbbd20..53346fb1c 100644 --- a/cognee/tasks/web_scraper/utils.py +++ b/cognee/tasks/web_scraper/utils.py @@ -46,9 +46,9 @@ async def fetch_page_content( installed. """ if preferred_tool == "tavily": - if tavily_config.api_key is None: + if not tavily_config or tavily_config.api_key is None: raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily") - return await fetch_with_tavily(urls) + return await fetch_with_tavily(urls, tavily_config) elif preferred_tool == "beautifulsoup": try: @@ -59,9 +59,9 @@ async def fetch_page_content( ) raise crawler = BeautifulSoupCrawler() - extraction_rules = soup_crawler_config.extraction_rules - if extraction_rules is None: + if not soup_crawler_config and soup_crawler_config.extraction_rules is None: raise ValueError("extraction_rules must be provided when not using Tavily") + extraction_rules = soup_crawler_config.extraction_rules try: results = await crawler.fetch_with_bs4( urls, @@ -76,7 +76,9 @@ async def fetch_page_content( raise -async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]: +async def fetch_with_tavily( + urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None +) -> Dict[str, str]: """Fetch content from URLs using the Tavily API. Args: @@ -96,8 +98,8 @@ async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]: "Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0" ) raise - client = AsyncTavilyClient() - results = await client.extract(urls) + client = AsyncTavilyClient(api_key=tavily_config.api_key if tavily_config else None) + results = await client.extract(urls, format="text") for failed_result in results.get("failed_results", []): logger.warning(f"Failed to fetch {failed_result}") return_results = {} diff --git a/cognee/tasks/web_scraper/web_scraper_task.py b/cognee/tasks/web_scraper/web_scraper_task.py index dc286500a..383580779 100644 --- a/cognee/tasks/web_scraper/web_scraper_task.py +++ b/cognee/tasks/web_scraper/web_scraper_task.py @@ -87,12 +87,12 @@ async def cron_web_scraper_task( ) ), trigger=trigger, - id=uuid5(NAMESPACE_OID, name=job_name), - name=f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}", + id=job_name, + name=job_name or f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}", replace_existing=True, ) - - scheduler.start() + if not scheduler.running: + scheduler.start() return # If no schedule, run immediately @@ -191,7 +191,6 @@ async def web_scraper_task( tavily_config=tavily_config, soup_crawler_config=soup_crawler_config, ) - for page_url, content in results.items(): parsed_url = urlparse(page_url) domain = parsed_url.netloc @@ -250,16 +249,15 @@ async def web_scraper_task( f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n" f"URL: {page_url}\n" f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n" - f"Content hash: {content_hash[:16]}...\n" - f"Content type: text/html\n" + f"Content: {content_str}" + f"Content type: text\n" f"Page size: {len(content_str)} bytes\n" f"Status code: 200" ) - + page_extraction_rules = extraction_rules webpage = WebPage( - id=uuid5(NAMESPACE_OID, name=parsed_url.path.lstrip("/")), - url=page_url, - name=get_path_after_base(base_url, page_url), + id=uuid5(NAMESPACE_OID, name=page_url), + name=page_url, content=content_str, content_hash=content_hash, scraped_at=now, @@ -267,7 +265,7 @@ async def web_scraper_task( status_code=200, content_type="text/html", page_size=len(content_str), - extraction_rules=extraction_rules or {}, + extraction_rules=page_extraction_rules or {}, description=webpage_description, ) webpages.append(webpage) @@ -301,10 +299,10 @@ async def web_scraper_task( }, ) ) - + print(len(webpages)) for webpage in webpages: node_mapping[webpage.id] = webpage - parsed_url = urlparse(webpage.url) + parsed_url = urlparse(webpage.name) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" edge_mapping.append( ( diff --git a/cognee/tests/tasks/web_scraping/web_scraping_test.py b/cognee/tests/tasks/web_scraping/web_scraping_test.py index b817aea34..3e1fa2f78 100644 --- a/cognee/tests/tasks/web_scraping/web_scraping_test.py +++ b/cognee/tests/tasks/web_scraping/web_scraping_test.py @@ -2,11 +2,17 @@ import asyncio import cognee from cognee.tasks.web_scraper.config import SoupCrawlerConfig from cognee.tasks.web_scraper import cron_web_scraper_task +from uuid import uuid5, NAMESPACE_OID +from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine +from dotenv import load_dotenv +from cognee.api.v1.visualize import visualize_graph + +load_dotenv() async def test_web_scraping_using_bs4(): await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) + await cognee.prune.prune_system() url = "https://quotes.toscrape.com/" rules = { @@ -119,6 +125,9 @@ async def test_web_scraping_using_tavily_and_incremental_loading(): # ---------- cron job tests ---------- async def test_cron_web_scraper(): + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + graph_db = await get_graph_engine() urls = ["https://quotes.toscrape.com/", "https://books.toscrape.com/"] extraction_rules = { "quotes": {"selector": ".quote span.text", "all": True}, @@ -127,45 +136,51 @@ async def test_cron_web_scraper(): "prices": "article.product_pod p.price_color", } - # Run cron_web_scraper_task (schedule string is required) + # Run cron_web_scraper_task await cron_web_scraper_task( - urls=urls, - schedule="*/5 * * * *", # every 5 minutes + url=urls, + schedule="*/3 * * * *", # every 3 minutes + job_name="cron_scraping_job", extraction_rules=extraction_rules, - use_playwright=False, ) - - # Wait until first run of cron job is done - await asyncio.sleep(120) - # Validate that the scraped data is searchable - results = await cognee.search( - "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", - query_type=cognee.SearchType.GRAPH_COMPLETION, - ) - assert "Albert Einstein" in results[0] + scraping_job_done = await graph_db.get_node(uuid5(NAMESPACE_OID, name="cron_scraping_job")) + while True: + if scraping_job_done: + results = await cognee.search( + "Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) - results_books = await cognee.search( - "What is the price of 'A Light in the Attic' book?", - query_type=cognee.SearchType.GRAPH_COMPLETION, - ) - assert "51.77" in results_books[0] + assert "Albert Einstein" in results[0] - print("Cron job web_scraping test passed!") + results_books = await cognee.search( + "What is the price of 'A Light in the Attic' book?", + query_type=cognee.SearchType.GRAPH_COMPLETION, + ) + + assert "51.77" in results_books[0] + + print("Cron job web_scraping test passed!") + break + else: + scraping_job_done = await graph_db.get_node( + uuid5(NAMESPACE_OID, name="cron_scraping_job") + ) async def main(): - print("Starting BS4 incremental loading test...") - await test_web_scraping_using_bs4_and_incremental_loading() + # print("Starting BS4 incremental loading test...") + # await test_web_scraping_using_bs4_and_incremental_loading() - print("Starting BS4 normal test...") - await test_web_scraping_using_bs4() + # print("Starting BS4 normal test...") + # await test_web_scraping_using_bs4() - print("Starting Tavily incremental loading test...") - await test_web_scraping_using_tavily_and_incremental_loading() + # print("Starting Tavily incremental loading test...") + # await test_web_scraping_using_tavily_and_incremental_loading() - print("Starting Tavily normal test...") - await test_web_scraping_using_tavily() + # print("Starting Tavily normal test...") + # await test_web_scraping_using_tavily() print("Starting cron job test...") await test_cron_web_scraper() diff --git a/uv.lock b/uv.lock index a06bebdd9..dc005da37 100644 --- a/uv.lock +++ b/uv.lock @@ -244,6 +244,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/29/5ecc3a15d5a33e31b26c11426c45c501e439cb865d0bff96315d86443b78/appnope-0.1.4-py2.py3-none-any.whl", hash = "sha256:502575ee11cd7a28c0205f379b525beefebab9d161b7c964670864014ed7213c", size = 4321, upload-time = "2024-02-06T09:43:09.663Z" }, ] +[[package]] +name = "apscheduler" +version = "3.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/00/6d6814ddc19be2df62c8c898c4df6b5b1914f3bd024b780028caa392d186/apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133", size = 107347, upload-time = "2024-11-24T19:39:26.463Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/ae/9a053dd9229c0fde6b1f1f33f609ccff1ee79ddda364c756a924c6d8563b/APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da", size = 64004, upload-time = "2024-11-24T19:39:24.442Z" }, +] + [[package]] name = "argon2-cffi" version = "23.1.0" @@ -967,6 +979,7 @@ posthog = [ { name = "posthog" }, ] scraping = [ + { name = "apscheduler" }, { name = "beautifulsoup4" }, { name = "lxml" }, { name = "playwright" }, @@ -981,6 +994,7 @@ requires-dist = [ { name = "aiosqlite", specifier = ">=0.20.0,<1.0.0" }, { name = "alembic", specifier = ">=1.13.3,<2" }, { name = "anthropic", marker = "extra == 'anthropic'", specifier = ">=0.27" }, + { name = "apscheduler", marker = "extra == 'scraping'", specifier = ">=3.10.0,<=3.11.0" }, { name = "asyncpg", marker = "extra == 'postgres'", specifier = ">=0.30.0,<1.0.0" }, { name = "asyncpg", marker = "extra == 'postgres-binary'", specifier = ">=0.30.0,<1.0.0" }, { name = "baml-py", specifier = ">=0.201.0,<0.202.0" }, @@ -7693,6 +7707,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/2e/c14812d3d4d9cd1773c6be938f89e5735a1f11a9f184ac3639b93cef35d5/tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd", size = 30761, upload-time = "2025-03-05T21:17:41.549Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c2/14/e2a54fabd4f08cd7af1c07030603c3356b74da07f7cc056e600436edfa17/tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d", size = 18026, upload-time = "2025-03-05T21:17:39.857Z" }, +] + [[package]] name = "unstructured" version = "0.18.15"