CodeRabbit reviews solved

This commit is contained in:
Geoff-Robin 2025-10-06 17:15:25 +05:30
parent ae740eda96
commit 1b5c099f8b
8 changed files with 125 additions and 68 deletions

View file

@ -140,7 +140,7 @@ async def add(
# Add a single file
await cognee.add("/home/user/documents/analysis.pdf")
# Add a single url and bs4 extract ingestion method
extraction_rules = {
"title": "h1",
@ -148,11 +148,11 @@ async def add(
"more_info": "a[href*='more-info']"
}
await cognee.add("https://example.com",extraction_rules=extraction_rules)
# Add a single url and tavily extract ingestion method
Make sure to TAVILY_API_KEY = YOUR_TAVILY_API_KEY as a environment variable
await cognee.add("https://example.com")
# Add multiple urls
await cognee.add(["https://example.com","https://books.toscrape.com"])
```
@ -186,13 +186,20 @@ async def add(
await setup()
if not soup_crawler_config and extraction_rules:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if not tavily_config and os.getenv("TAVILY_API_KEY"):
tavily_config = TavilyConfig()
tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY"))
soup_crawler.set(soup_crawler_config)
tavily.set(tavily_config)
if urlparse(data).scheme in ["http", "https"]:
http_schemes = {"http", "https"}
def _is_http_url(item: Union[str, BinaryIO]) -> bool:
return isinstance(item, str) and urlparse(item).scheme in http_schemes
if _is_http_url(data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
elif isinstance(data, list) and any(_is_http_url(item) for item in data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user)

View file

@ -63,10 +63,19 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwar
tavily = tavily_config.get()
soup_crawler = soup_crawler_config.get()
preferred_tool = "beautifulsoup" if soup_crawler else "tavily"
if preferred_tool == "tavily" and tavily is None:
raise IngestionError(
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
)
if preferred_tool == "beautifulsoup" and soup_crawler is None:
raise IngestionError(
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
)
data = await fetch_page_content(
data_item,
preferred_tool="beautifulsoup" if soup_crawler else "tavily",
preferred_tool=preferred_tool,
tavily_config=tavily,
soup_crawler_config=soup_crawler,
)
@ -74,6 +83,8 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwar
for key, value in data.items():
content += f"{key}:\n{value}\n\n"
return await save_data_to_file(content)
except IngestionError:
raise
except Exception as e:
raise IngestionError(
message=f"Error ingesting webpage results of url {data_item}: {str(e)}"

View file

@ -1,22 +1,21 @@
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Literal
import os
class TavilyConfig(BaseModel):
api_key: str = os.getenv("TAVILY_API_KEY")
extract_depth: str = "basic"
format: str = "markdown"
timeout: int = Field(None, ge=1, le=60)
class SoupCrawlerConfig(BaseModel):
concurrency: int = (5,)
crawl_delay: float = (0.5,)
timeout: float = (15.0,)
max_retries: int = (2,)
retry_delay_factor: float = (0.5,)
headers: Optional[Dict[str, str]] = (None,)
concurrency: int = 5
crawl_delay: float = 0.5
timeout: float = 15.0
max_retries: int = 2
retry_delay_factor: float = 0.5
headers: Optional[Dict[str, str]] = None
extraction_rules: Dict[str, Any]
use_playwright: bool = False
playwright_js_wait: float = 0.8

View file

@ -6,7 +6,6 @@ from datetime import datetime
class WebPage(DataPoint):
"""Represents a scraped web page with metadata"""
url: str
name: Optional[str]
content: str
content_hash: str
@ -17,7 +16,7 @@ class WebPage(DataPoint):
page_size: int
extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used
description: str
metadata: dict = {"index_fields": ["url", "title", "scraped_at", "description"]}
metadata: dict = {"index_fields": ["name", "description"]}
class WebSite(DataPoint):
@ -31,7 +30,7 @@ class WebSite(DataPoint):
page_count: int
scraping_config: Dict[str, Any]
description: str
metadata: dict = {"index_fields": ["domain", "base_url", "description"]}
metadata: dict = {"index_fields": ["name", "description"]}
class ScrapingJob(DataPoint):
@ -44,4 +43,4 @@ class ScrapingJob(DataPoint):
last_run: Optional[datetime]
next_run: Optional[datetime]
description: str
metadata: dict = {"index_fields": ["job_name", "status", "description"]}
metadata: dict = {"index_fields": ["name", "description"]}

View file

@ -46,9 +46,9 @@ async def fetch_page_content(
installed.
"""
if preferred_tool == "tavily":
if tavily_config.api_key is None:
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
return await fetch_with_tavily(urls)
return await fetch_with_tavily(urls, tavily_config)
elif preferred_tool == "beautifulsoup":
try:
@ -59,9 +59,9 @@ async def fetch_page_content(
)
raise
crawler = BeautifulSoupCrawler()
extraction_rules = soup_crawler_config.extraction_rules
if extraction_rules is None:
if not soup_crawler_config and soup_crawler_config.extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
extraction_rules = soup_crawler_config.extraction_rules
try:
results = await crawler.fetch_with_bs4(
urls,
@ -76,7 +76,9 @@ async def fetch_page_content(
raise
async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]:
async def fetch_with_tavily(
urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None
) -> Dict[str, str]:
"""Fetch content from URLs using the Tavily API.
Args:
@ -96,8 +98,8 @@ async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]:
"Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0"
)
raise
client = AsyncTavilyClient()
results = await client.extract(urls)
client = AsyncTavilyClient(api_key=tavily_config.api_key if tavily_config else None)
results = await client.extract(urls, format="text")
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}

View file

@ -87,12 +87,12 @@ async def cron_web_scraper_task(
)
),
trigger=trigger,
id=uuid5(NAMESPACE_OID, name=job_name),
name=f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}",
id=job_name,
name=job_name or f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}",
replace_existing=True,
)
scheduler.start()
if not scheduler.running:
scheduler.start()
return
# If no schedule, run immediately
@ -191,7 +191,6 @@ async def web_scraper_task(
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
for page_url, content in results.items():
parsed_url = urlparse(page_url)
domain = parsed_url.netloc
@ -250,16 +249,15 @@ async def web_scraper_task(
f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n"
f"URL: {page_url}\n"
f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Content hash: {content_hash[:16]}...\n"
f"Content type: text/html\n"
f"Content: {content_str}"
f"Content type: text\n"
f"Page size: {len(content_str)} bytes\n"
f"Status code: 200"
)
page_extraction_rules = extraction_rules
webpage = WebPage(
id=uuid5(NAMESPACE_OID, name=parsed_url.path.lstrip("/")),
url=page_url,
name=get_path_after_base(base_url, page_url),
id=uuid5(NAMESPACE_OID, name=page_url),
name=page_url,
content=content_str,
content_hash=content_hash,
scraped_at=now,
@ -267,7 +265,7 @@ async def web_scraper_task(
status_code=200,
content_type="text/html",
page_size=len(content_str),
extraction_rules=extraction_rules or {},
extraction_rules=page_extraction_rules or {},
description=webpage_description,
)
webpages.append(webpage)
@ -301,10 +299,10 @@ async def web_scraper_task(
},
)
)
print(len(webpages))
for webpage in webpages:
node_mapping[webpage.id] = webpage
parsed_url = urlparse(webpage.url)
parsed_url = urlparse(webpage.name)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
edge_mapping.append(
(

View file

@ -2,11 +2,17 @@ import asyncio
import cognee
from cognee.tasks.web_scraper.config import SoupCrawlerConfig
from cognee.tasks.web_scraper import cron_web_scraper_task
from uuid import uuid5, NAMESPACE_OID
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
from dotenv import load_dotenv
from cognee.api.v1.visualize import visualize_graph
load_dotenv()
async def test_web_scraping_using_bs4():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await cognee.prune.prune_system()
url = "https://quotes.toscrape.com/"
rules = {
@ -119,6 +125,9 @@ async def test_web_scraping_using_tavily_and_incremental_loading():
# ---------- cron job tests ----------
async def test_cron_web_scraper():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
graph_db = await get_graph_engine()
urls = ["https://quotes.toscrape.com/", "https://books.toscrape.com/"]
extraction_rules = {
"quotes": {"selector": ".quote span.text", "all": True},
@ -127,45 +136,51 @@ async def test_cron_web_scraper():
"prices": "article.product_pod p.price_color",
}
# Run cron_web_scraper_task (schedule string is required)
# Run cron_web_scraper_task
await cron_web_scraper_task(
urls=urls,
schedule="*/5 * * * *", # every 5 minutes
url=urls,
schedule="*/3 * * * *", # every 3 minutes
job_name="cron_scraping_job",
extraction_rules=extraction_rules,
use_playwright=False,
)
# Wait until first run of cron job is done
await asyncio.sleep(120)
# Validate that the scraped data is searchable
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
scraping_job_done = await graph_db.get_node(uuid5(NAMESPACE_OID, name="cron_scraping_job"))
while True:
if scraping_job_done:
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
results_books = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results_books[0]
assert "Albert Einstein" in results[0]
print("Cron job web_scraping test passed!")
results_books = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results_books[0]
print("Cron job web_scraping test passed!")
break
else:
scraping_job_done = await graph_db.get_node(
uuid5(NAMESPACE_OID, name="cron_scraping_job")
)
async def main():
print("Starting BS4 incremental loading test...")
await test_web_scraping_using_bs4_and_incremental_loading()
# print("Starting BS4 incremental loading test...")
# await test_web_scraping_using_bs4_and_incremental_loading()
print("Starting BS4 normal test...")
await test_web_scraping_using_bs4()
# print("Starting BS4 normal test...")
# await test_web_scraping_using_bs4()
print("Starting Tavily incremental loading test...")
await test_web_scraping_using_tavily_and_incremental_loading()
# print("Starting Tavily incremental loading test...")
# await test_web_scraping_using_tavily_and_incremental_loading()
print("Starting Tavily normal test...")
await test_web_scraping_using_tavily()
# print("Starting Tavily normal test...")
# await test_web_scraping_using_tavily()
print("Starting cron job test...")
await test_cron_web_scraper()

26
uv.lock generated
View file

@ -244,6 +244,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/81/29/5ecc3a15d5a33e31b26c11426c45c501e439cb865d0bff96315d86443b78/appnope-0.1.4-py2.py3-none-any.whl", hash = "sha256:502575ee11cd7a28c0205f379b525beefebab9d161b7c964670864014ed7213c", size = 4321, upload-time = "2024-02-06T09:43:09.663Z" },
]
[[package]]
name = "apscheduler"
version = "3.11.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "tzlocal" },
]
sdist = { url = "https://files.pythonhosted.org/packages/4e/00/6d6814ddc19be2df62c8c898c4df6b5b1914f3bd024b780028caa392d186/apscheduler-3.11.0.tar.gz", hash = "sha256:4c622d250b0955a65d5d0eb91c33e6d43fd879834bf541e0a18661ae60460133", size = 107347, upload-time = "2024-11-24T19:39:26.463Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d0/ae/9a053dd9229c0fde6b1f1f33f609ccff1ee79ddda364c756a924c6d8563b/APScheduler-3.11.0-py3-none-any.whl", hash = "sha256:fc134ca32e50f5eadcc4938e3a4545ab19131435e851abb40b34d63d5141c6da", size = 64004, upload-time = "2024-11-24T19:39:24.442Z" },
]
[[package]]
name = "argon2-cffi"
version = "23.1.0"
@ -967,6 +979,7 @@ posthog = [
{ name = "posthog" },
]
scraping = [
{ name = "apscheduler" },
{ name = "beautifulsoup4" },
{ name = "lxml" },
{ name = "playwright" },
@ -981,6 +994,7 @@ requires-dist = [
{ name = "aiosqlite", specifier = ">=0.20.0,<1.0.0" },
{ name = "alembic", specifier = ">=1.13.3,<2" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = ">=0.27" },
{ name = "apscheduler", marker = "extra == 'scraping'", specifier = ">=3.10.0,<=3.11.0" },
{ name = "asyncpg", marker = "extra == 'postgres'", specifier = ">=0.30.0,<1.0.0" },
{ name = "asyncpg", marker = "extra == 'postgres-binary'", specifier = ">=0.30.0,<1.0.0" },
{ name = "baml-py", specifier = ">=0.201.0,<0.202.0" },
@ -7693,6 +7707,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" },
]
[[package]]
name = "tzlocal"
version = "5.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "tzdata", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8b/2e/c14812d3d4d9cd1773c6be938f89e5735a1f11a9f184ac3639b93cef35d5/tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd", size = 30761, upload-time = "2025-03-05T21:17:41.549Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c2/14/e2a54fabd4f08cd7af1c07030603c3356b74da07f7cc056e600436edfa17/tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d", size = 18026, upload-time = "2025-03-05T21:17:39.857Z" },
]
[[package]]
name = "unstructured"
version = "0.18.15"