Done with integration with add workflow when incremental_loading is set to False

This commit is contained in:
Geoff-Robin 2025-10-04 15:01:13 +05:30
parent 1ab9d24cf0
commit 20fb77316c
9 changed files with 269 additions and 312 deletions

View file

@ -1,5 +1,5 @@
from uuid import UUID
from typing import Union, BinaryIO, List, Optional
from typing import Union, BinaryIO, List, Optional, Dict, Literal
from cognee.modules.users.models import User
from cognee.modules.pipelines import Task, run_pipeline
@ -11,6 +11,7 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
async def add(
@ -23,12 +24,16 @@ async def add(
dataset_id: Optional[UUID] = None,
preferred_loaders: List[str] = None,
incremental_loading: bool = True,
extraction_rules: Optional[Dict[str, str]] = None,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
):
"""
Add data to Cognee for knowledge graph processing.
This is the first step in the Cognee workflow - it ingests raw data and prepares it
for processing. The function accepts various data formats including text, files, and
for processing. The function accepts various data formats including text, files, urls and
binary streams, then stores them in a specified dataset for further processing.
Prerequisites:
@ -143,7 +148,18 @@ async def add(
"""
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),
Task(
ingest_data,
dataset_name,
user,
node_set,
dataset_id,
preferred_loaders,
extraction_rules,
preferred_tool,
tavily_config,
soup_crawler_config,
),
]
await setup()

View file

@ -1,7 +1,7 @@
import json
import inspect
from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
from typing import Union, BinaryIO, Any, List, Optional, Dict, Literal
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
@ -16,6 +16,7 @@ from cognee.modules.data.methods import (
get_dataset_data,
load_or_create_datasets,
)
from cognee.tasks.web_scraper.config import SoupCrawlerConfig, TavilyConfig
from .save_data_item_to_storage import save_data_item_to_storage
from .data_item_to_text_file import data_item_to_text_file
@ -28,6 +29,10 @@ async def ingest_data(
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
preferred_loaders: List[str] = None,
extraction_rules: Optional[Dict[str, str]] = None,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
):
if not user:
user = await get_default_user()
@ -78,7 +83,13 @@ async def ingest_data(
for data_item in data:
# Get file path of data item or create a file it doesn't exist
original_file_path = await save_data_item_to_storage(data_item)
original_file_path = await save_data_item_to_storage(
data_item,
extraction_rules=extraction_rules,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
# Transform file path to be OS usable
actual_file_path = get_data_file_path(original_file_path)

View file

@ -1,12 +1,14 @@
import os
from pathlib import Path
from urllib.parse import urlparse
from typing import Union, BinaryIO, Any
from typing import Union, BinaryIO, Any, Dict
from cognee.modules.ingestion.exceptions import IngestionError
from cognee.modules.ingestion import save_data_to_file
from cognee.shared.logging_utils import get_logger
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.tasks.web_scraper import check_valid_arguments_for_web_scraper
import asyncio
logger = get_logger()
@ -17,10 +19,17 @@ class SaveDataSettings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="allow")
class HTMLContent(str):
def __new__(cls, value: str):
if not ("<" in value and ">" in value):
raise ValueError("Not valid HTML-like content")
return super().__new__(cls, value)
settings = SaveDataSettings()
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str:
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any], **kwargs) -> str:
if "llama_index" in str(type(data_item)):
# Dynamic import is used because the llama_index module is optional.
from .transform_data import get_data_from_llama_index
@ -48,6 +57,38 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
# data is s3 file path
if parsed_url.scheme == "s3":
return data_item
elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
# Validate URL by sending a HEAD request
try:
from cognee.tasks.web_scraper import fetch_page_content
extraction_rules = kwargs.get("extraction_rules", None)
preferred_tool = kwargs.get("preferred_tool", "beautifulsoup")
tavily_config = kwargs.get("tavily_config", None)
soup_crawler_config = kwargs.get("soup_crawler_config", None)
check_valid_arguments_for_web_scraper(
extraction_rules=extraction_rules,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
data = await fetch_page_content(
data_item,
extraction_rules=extraction_rules,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
content = ""
for key, value in data.items():
content += f"{key}:\n{value}\n\n"
else:
content = data[data_item]
return await save_data_to_file(content)
except Exception as e:
raise IngestionError(
message=f"Error ingesting webpage results of url {data_item}: {str(e)}"
)
# data is local file path
elif parsed_url.scheme == "file":

View file

@ -0,0 +1,8 @@
from .bs4_crawler import BeautifulSoupCrawler
from .utils import fetch_page_content, check_valid_arguments_for_web_scraper
__all__ = [
"BeautifulSoupCrawler",
"fetch_page_content",
"check_valid_arguments_for_web_scraper",
]

View file

@ -1,270 +0,0 @@
import asyncio
import time
from typing import Union, List, Dict, Any, Optional
from bs4 import BeautifulSoup
import httpx
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
try:
from playwright.async_api import async_playwright
except ImportError:
logger.error("Failed to import playwright, make sure to install using pip install playwright>=1.9.0")
try:
from bs4 import BeautifulSoup
except ImportError:
logger.error("Failed to import BeautifulSoup, make sure to install using pip install beautifulsoup4")
class BeautifulSoupCrawler:
def __init__(
self,
*,
concurrency: int = 5,
delay_between_requests: float = 0.5,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
):
"""
concurrency: number of concurrent requests allowed
delay_between_requests: minimum seconds to wait between requests to the SAME domain
timeout: per-request timeout
max_retries: number of retries on network errors
retry_delay_factor: multiplier for exponential retry failure delay
headers: default headers for requests
use_httpx: require httpx for async HTTP. If not available, an informative error will be raised.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.delay_between_requests = delay_between_requests
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-agent": "Cognee-Scraper/1.0"}
self._last_request_time_per_domain: Dict[str, float] = {}
self._client = None
# ---------- lifecycle helpers ----------
async def _ensure_client(self):
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers) if httpx else None
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
# ---------- rate limiting ----------
def _domain_from_url(self, url: str) -> str:
# quick parse to domain
try:
from urllib.parse import urlparse
p = urlparse(url)
return p.netloc
except Exception:
return url
async def _respect_rate_limit(self, url: str):
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
if last is None:
self._last_request_time_per_domain[domain] = time.time()
return
elapsed = time.time() - last
wait_for = self.delay_between_requests - elapsed
if wait_for > 0:
await asyncio.sleep(wait_for)
self._last_request_time_per_domain[domain] = time.time()
# ----------- robots.txt handling could be added here -----------
async def _is_url_allowed(self, url: str) -> bool:
robots_txt_url = f"{self._get_base_url(url)}/robots.txt"
robots_txt_content = await self._fetch_httpx(robots_txt_url)
robots_txt_content = robots_txt_content.lower()
user_agent_name = self.headers.get("User-agent")
pos = robots_txt_content.find(f"user-agent: {user_agent_name}")
if pos == -1:
pos = robots_txt_content.find(f"user-agent:*")
if pos == -1:
return True
pos = robots_txt_content.find("disallow", pos)
# TODO: Research more about robots.txt format
# ---------- low-level fetchers ----------
async def _fetch_httpx(self, url: str) -> str:
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
while True:
try:
await self._respect_rate_limit(url)
resp = await self._client.get(url)
resp.raise_for_status()
return resp.text
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
await asyncio.sleep(delay)
async def _render_with_playwright(self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None) -> str:
if async_playwright is None:
raise RuntimeError("Playwright is not installed. Install with `pip install playwright` and run `playwright install`.")
# Basic Playwright rendering (Chromium). This is slower but renders JS.
attempt = 0
while True:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
await page.goto(url, wait_until="networkidle", timeout=int((timeout or self.timeout) * 1000))
# optional short wait to let in-page JS mutate DOM
if js_wait:
await asyncio.sleep(js_wait)
content = await page.content()
await browser.close()
return content
except Exception:
attempt += 1
if attempt > self.max_retries:
raise
backoff = self.backoff_factor * (2 ** (attempt - 1))
await asyncio.sleep(backoff)
# ---------- extraction helpers ----------
def _normalize_rule(self, rule) -> Dict[str, Any]:
if isinstance(rule, str):
return {"selector": rule, "attr": None, "all": False, "join_with": " "}
if isinstance(rule, dict):
return {
"selector": rule.get("selector"),
"attr": rule.get("attr"),
"all": bool(rule.get("all")),
"join_with": rule.get("join_with", " "),
"xpath": rule.get("xpath"),
}
raise ValueError("Invalid extraction rule")
def _extract_with_bs4(self, html: str, rule: Dict[str, Any]) -> str:
soup = BeautifulSoup(html, "html.parser")
sel = rule.get("selector")
xpath = rule.get("xpath")
attr = rule.get("attr")
all_flag = rule.get("all", False)
join_with = rule.get("join_with", " ")
if xpath:
# try lxml extraction via xpath if lxml is available
try:
from lxml import html as lxml_html
except Exception:
raise RuntimeError("XPath requested but lxml is not available. Install lxml or use CSS selectors.")
doc = lxml_html.fromstring(html)
nodes = doc.xpath(xpath)
texts = []
for n in nodes:
if hasattr(n, "text_content"):
texts.append(n.text_content().strip())
else:
texts.append(str(n).strip())
return join_with.join(t for t in texts if t)
else:
if not sel:
return ""
if all_flag:
nodes = soup.select(sel)
pieces = []
for el in nodes:
if attr:
val = el.get(attr)
if val:
pieces.append(val.strip())
else:
text = el.get_text(strip=True)
if text:
pieces.append(text)
return join_with.join(pieces).strip()
else:
el = soup.select_one(sel)
if el is None:
return ""
if attr:
val = el.get(attr)
return (val or "").strip()
return el.get_text(strip=True)
# ---------- public API (keeps the signature you asked for) ----------
async def fetch_with_bs4(
self,
urls: Union[str, List[str]],
extraction_rules: Dict[str, Any],
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
join_all_matches: bool = False, # if True, for each rule use all matches (join them)
) -> Dict[str, str]:
"""
Fetch one or more URLs and extract text using BeautifulSoup (or lxml xpath).
Returns: dict[url] -> single concatenated string (trimmed)
Parameters:
- urls: str or list[str]
- extraction_rules: dict[field_name -> selector or rule-dict]
rule-dict keys: selector (CSS), xpath (optional), attr (optional), all(bool), join_with(str)
- use_playwright: if True, use Playwright to render JS (must be installed), otherwise normal fetch
- playwright_js_wait: seconds to wait after load for JS to mutate DOM
- join_all_matches: convenience: if True, treat each rule as all=True
"""
if isinstance(urls, str):
urls = [urls]
# normalize rules
normalized_rules = {}
for field, rule in extraction_rules.items():
r = self._normalize_rule(rule)
if join_all_matches:
r["all"] = True
normalized_rules[field] = r
# concurrency control + gather tasks
async def _task(url: str):
async with self._sem:
# fetch (rendered or not)
if use_playwright:
html = await self._render_with_playwright(url, js_wait=playwright_js_wait, timeout=self.timeout)
else:
html = await self._fetch_httpx(url)
# Extract and concatenate results into a single string
pieces = []
for field, rule in normalized_rules.items():
text = self._extract_with_bs4(html, rule)
if text:
pieces.append(text)
concatenated = " ".join(pieces).strip()
return url, concatenated
tasks = [asyncio.create_task(_task(u)) for u in urls]
results = {}
for coro in asyncio.as_completed(tasks):
try:
url, text = await coro
except Exception as e:
# store empty string on failure (or raise depending on your policy)
results[url] = ""
# Optionally you could log the error; for now we'll attach empty string
continue
results[url] = text
return results

View file

@ -91,6 +91,8 @@ class BeautifulSoupCrawler:
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
content = await self._fetch_httpx(robots_url)
if content.strip() == "":
return True # no robots.txt means allowed
rp = Protego.parse(content)
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
return rp.can_fetch(agent, url) or rp.can_fetch("*", url)
@ -114,23 +116,42 @@ class BeautifulSoupCrawler:
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
parsed = urlparse(url)
domain_root = f"{parsed.scheme}://{parsed.netloc}"
# Handle robots.txt separately (no recursive crawl delay call)
is_robot = url.lower().endswith("/robots.txt")
while True:
try:
# get crawl delay from robots.txt if available
crawl_delay = await self._get_crawl_delay(
f"{urlparse(url).scheme}://{urlparse(url).netloc}"
)
# Only get crawl delay for non-robots.txt pages
crawl_delay = self.crawl_delay
if not is_robot:
try:
crawl_delay = await self._get_crawl_delay(domain_root)
except Exception as e:
logger.debug(f"Failed to fetch crawl delay for {domain_root}: {e}")
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
return resp.text
except Exception as exc:
# Special case: if robots.txt failed, just return empty string
if is_robot:
logger.warning(f"Robots.txt not found or inaccessible at {url}: {exc}")
return ""
attempt += 1
if attempt > self.max_retries:
logger.error(f"Fetch failed for {url}: {exc}")
logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}")
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(f"Retrying {url} after {delay:.2f}s (attempt {attempt})")
logger.warning(
f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}"
)
await asyncio.sleep(delay)
async def _render_with_playwright(

View file

@ -0,0 +1,24 @@
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional
import os
class TavilyConfig(BaseModel):
api_key: str = os.getenv("TAVILY_API_KEY")
extract_depth: str = "basic"
format: str = "markdown"
timeout: int = Field(None, ge=1, le=60)
class SoupCrawlerConfig(BaseModel):
concurrency: int = (5,)
crawl_delay: float = (0.5,)
timeout: float = (15.0,)
max_retries: int = (2,)
retry_delay_factor: float = (0.5,)
headers: Optional[Dict[str, str]] = (None,)
extraction_rules: Dict[str, Any]
use_playwright: bool = False
playwright_js_wait: float = 0.8
join_all_matches: bool = False
structured: bool = False

View file

@ -1,9 +1,10 @@
from tavily import AsyncTavilyClient
from bs4 import BeautifulSoup
from .bs4_crawler import BeautifulSoupCrawler
import os
import httpx
from typing import Dict, Any, List, Union
from .config import TavilyConfig, SoupCrawlerConfig
from typing import Dict, Any, List, Union, Optional, Literal
from cognee.shared.logging_utils import get_logger
import asyncio
logger = get_logger(__name__)
@ -22,37 +23,79 @@ except ImportError:
)
async def fetch_page_content(urls: Union[str, List[str]], extraction_rules: Dict[str, Any]) -> str:
if os.getenv("TAVILY_API_KEY") is not None:
async def fetch_page_content(
urls: Union[str, List[str]],
*,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
extraction_rules: Optional[Dict[str, Any]] = None,
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
use_playwright: Optional[bool] = False,
playwright_js_wait: Optional[float] = 0.8,
join_all_matches: Optional[bool] = False,
structured: Optional[bool] = False,
) -> Dict[str, Union[str, Dict[str, str]]]:
"""
Fetch page content using Tavily API if TAVILY_API_KEY is set,
otherwise fetch using BeautifulSoupCrawler directly.
Parameters:
urls: single URL or list of URLs
extraction_rules: dict mapping field names -> CSS selector or rule
use_playwright: whether to render JS (BeautifulSoupCrawler)
playwright_js_wait: seconds to wait for JS to load
join_all_matches: join all matching elements per rule
structured: if True, returns structured dict instead of concatenated string (based on extraction_rules field names)
Returns:
Dict mapping URL -> extracted string or structured dict
"""
if (os.getenv("TAVILY_API_KEY") or tavily_config.api_key) and preferred_tool == "tavily":
return await fetch_with_tavily(urls)
else:
return await fetch_with_bs4(urls, extraction_rules)
crawler = BeautifulSoupCrawler()
extraction_rules = extraction_rules or soup_crawler_config.extraction_rules
if extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
try:
results = await crawler.fetch_with_bs4(
urls,
extraction_rules,
use_playwright=use_playwright,
playwright_js_wait=playwright_js_wait,
join_all_matches=join_all_matches,
structured=structured,
)
return results
except Exception as e:
logger.error(f"Error fetching page content: {str(e)}")
async def fetch_with_tavily(urls: Union[str, List[str]]) -> Dict[str, str]:
client = AsyncTavilyClient()
results = await client.extract(urls, include_images=False)
result_dict = {}
for result in results["results"]:
result_dict[result["url"]] = result["raw_content"]
return result_dict
results = await client.extract(urls)
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}
for results in results.get("results", []):
return_results[results["url"]] = results["raw_content"]
return return_results
async def fetch_with_bs4(urls: Union[str, List[str]], extraction_rules: Dict) -> Dict[str]:
result_dict = {}
if isinstance(urls, str):
urls = [urls]
async with httpx.AsyncClient(headers={"User-Agent": "Cognee-Scraper"}) as client:
for url in urls:
response = await client.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
extracted_data = ""
for field, selector in extraction_rules.items():
element = soup.select_one(selector)
extracted_data += (element.get_text(strip=True) + "\n") if element else ""
result_dict[url] = extracted_data.strip()
return result_dict
def check_valid_arguments_for_web_scraper(
extraction_rules, preferred_tool, tavily_config, soup_crawler_config
):
if preferred_tool == "tavily":
if not (os.getenv("TAVILY_API_KEY") or (tavily_config and tavily_config.api_key)):
raise ValueError(
"TAVILY_API_KEY must be set in environment variables or tavily_config.api_key must be provided when preferred_tool is 'tavily'"
)
else:
print(preferred_tool)
print(soup_crawler_config)
print(soup_crawler_config and soup_crawler_config.extraction_rules)
if not (extraction_rules or (soup_crawler_config and soup_crawler_config.extraction_rules)):
raise ValueError(
"extraction_rules must be provided when preferred_tool is 'beautifulsoup'"
)

View file

@ -0,0 +1,63 @@
from cognee.tasks.storage.add_data_points import add_data_points
from cognee.tasks.storage.index_data_points import index_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.infrastructure.databases.graph import get_graph_engine
from .models import WebPage, WebSite, ScrapingJob
from typing import Union, List, Dict
from urllib.parse import urlparse
async def web_scraper_task(url: Union[str, List[str]], **kwargs):
graph_engine = await get_graph_engine()
# Mapping between parsed_url object and urls
mappings = {}
web_scraping_job = ScrapingJob(
job_name="default_job",
urls=[url] if isinstance(url, str) else url,
scraping_rules={},
schedule=None,
status="active",
last_run=None,
next_run=None,
)
data_point_mappings: Dict[WebSite, List[WebPage]] = {}
if isinstance(url, List):
for single_url in url:
parsed_url = urlparse(single_url)
domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
if mappings.get(parsed_url):
mappings[parsed_url] = [single_url]
else:
mappings[parsed_url].append(single_url)
else:
if mappings.get(parsed_url):
mappings[parsed_url] = [single_url]
else:
mappings[parsed_url].append(single_url)
for parsed_url in mappings.keys():
domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
web_site = WebSite(
domain=domain,
base_url=base_url,
robots_txt="",
crawl_delay=0,
last_crawled=None,
page_count=0,
scraping_config={},
)
for url in mappings[parsed_url]:
# Process each URL with the web scraping logic
web_page = WebPage(
url=url,
title="",
content="",
content_hash="",
scraped_at=None,
last_modified=None,
status_code=0,
content_type="",
page_size=0,
extraction_rules={},
)