Merge branch 'dev' into chore/pull-main-mcp-docker-image-before-running

This commit is contained in:
Daulet Amirkhanov 2025-10-11 20:20:56 +01:00 committed by GitHub
commit e7e18302f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
57 changed files with 1577 additions and 582 deletions

View file

@ -89,15 +89,6 @@ export default function useChat(dataset: Dataset) {
}
interface Node {
name: string;
}
interface Relationship {
relationship_name: string;
}
type InsightMessage = [Node, Relationship, Node];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function convertToSearchTypeOutput(systemMessage: any[] | any, searchType: string): string {
@ -106,14 +97,6 @@ function convertToSearchTypeOutput(systemMessage: any[] | any, searchType: strin
}
switch (searchType) {
case "INSIGHTS":
return systemMessage.map((message: InsightMessage) => {
const [node1, relationship, node2] = message;
if (node1.name && node2.name) {
return `${node1.name} ${relationship.relationship_name} ${node2.name}.`;
}
return "";
}).join("\n");
case "SUMMARIES":
return systemMessage.map((message: { text: string }) => message.text).join("\n");
case "CHUNKS":

View file

@ -266,7 +266,7 @@ The MCP server exposes its functionality through tools. Call them from any MCP c
- **codify**: Analyse a code repository, build a code graph, stores it in memory
- **search**: Query memory supports GRAPH_COMPLETION, RAG_COMPLETION, CODE, CHUNKS, INSIGHTS
- **search**: Query memory supports GRAPH_COMPLETION, RAG_COMPLETION, CODE, CHUNKS
- **list_data**: List all datasets and their data items with IDs for deletion operations

View file

@ -37,5 +37,4 @@ dev = [
allow-direct-references = true
[project.scripts]
cognee = "src:main"
cognee-mcp = "src:main_mcp"
cognee-mcp = "src:main"

View file

@ -255,7 +255,7 @@ async def cognify(
# 2. Get entity relationships and connections
relationships = await cognee.search(
"connections between concepts",
query_type=SearchType.INSIGHTS
query_type=SearchType.GRAPH_COMPLETION
)
# 3. Find relevant document chunks
@ -478,11 +478,6 @@ async def search(search_query: str, search_type: str) -> list:
Best for: Direct document retrieval, specific fact-finding.
Returns: LLM responses based on relevant text chunks.
**INSIGHTS**:
Structured entity relationships and semantic connections.
Best for: Understanding concept relationships, knowledge mapping.
Returns: Formatted relationship data and entity connections.
**CHUNKS**:
Raw text segments that match the query semantically.
Best for: Finding specific passages, citations, exact content.
@ -524,7 +519,6 @@ async def search(search_query: str, search_type: str) -> list:
- "RAG_COMPLETION": Returns an LLM response based on the search query and standard RAG data
- "CODE": Returns code-related knowledge in JSON format
- "CHUNKS": Returns raw text chunks from the knowledge graph
- "INSIGHTS": Returns relationships between nodes in readable format
- "SUMMARIES": Returns pre-generated hierarchical summaries
- "CYPHER": Direct graph database queries
- "FEELING_LUCKY": Automatically selects best search type
@ -537,7 +531,6 @@ async def search(search_query: str, search_type: str) -> list:
A list containing a single TextContent object with the search results.
The format of the result depends on the search_type:
- **GRAPH_COMPLETION/RAG_COMPLETION**: Conversational AI response strings
- **INSIGHTS**: Formatted relationship descriptions and entity connections
- **CHUNKS**: Relevant text passages with source metadata
- **SUMMARIES**: Hierarchical summaries from general to specific
- **CODE**: Structured code information with context
@ -547,7 +540,6 @@ async def search(search_query: str, search_type: str) -> list:
Performance & Optimization:
- **GRAPH_COMPLETION**: Slower but most intelligent, uses LLM + graph context
- **RAG_COMPLETION**: Medium speed, uses LLM + document chunks (no graph traversal)
- **INSIGHTS**: Fast, returns structured relationships without LLM processing
- **CHUNKS**: Fastest, pure vector similarity search without LLM
- **SUMMARIES**: Fast, returns pre-computed summaries
- **CODE**: Medium speed, specialized for code understanding
@ -586,9 +578,6 @@ async def search(search_query: str, search_type: str) -> list:
return str(search_results[0])
elif search_type.upper() == "CHUNKS":
return str(search_results)
elif search_type.upper() == "INSIGHTS":
results = retrieved_edges_to_string(search_results)
return results
else:
return str(search_results)

View file

@ -19,6 +19,7 @@ from .api.v1.add import add
from .api.v1.delete import delete
from .api.v1.cognify import cognify
from .modules.memify import memify
from .api.v1.update import update
from .api.v1.config.config import config
from .api.v1.datasets.datasets import datasets
from .api.v1.prune import prune

View file

@ -241,16 +241,6 @@ class HealthChecker:
"""Get comprehensive health status."""
components = {}
# Critical services
critical_components = [
"relational_db",
"vector_db",
"graph_db",
"file_storage",
"llm_provider",
"embedding_service",
]
critical_checks = [
("relational_db", self.check_relational_db()),
("vector_db", self.check_vector_db()),
@ -296,11 +286,11 @@ class HealthChecker:
else:
components[name] = result
critical_comps = [check[0] for check in critical_checks]
# Determine overall status
critical_unhealthy = any(
comp.status == HealthStatus.UNHEALTHY
comp.status == HealthStatus.UNHEALTHY and name in critical_comps
for name, comp in components.items()
if name in critical_components
)
has_degraded = any(comp.status == HealthStatus.DEGRADED for comp in components.values())

View file

@ -1,5 +1,6 @@
from uuid import UUID
from typing import Union, BinaryIO, List, Optional
import os
from typing import Union, BinaryIO, List, Optional, Dict, Any
from cognee.modules.users.models import User
from cognee.modules.pipelines import Task, run_pipeline
@ -11,6 +12,12 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
from cognee.context_global_variables import (
tavily_config as tavily,
soup_crawler_config as soup_crawler,
)
from urllib.parse import urlparse
async def add(
@ -23,12 +30,15 @@ async def add(
dataset_id: Optional[UUID] = None,
preferred_loaders: List[str] = None,
incremental_loading: bool = True,
extraction_rules: Optional[Dict[str, Any]] = None,
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
):
"""
Add data to Cognee for knowledge graph processing.
This is the first step in the Cognee workflow - it ingests raw data and prepares it
for processing. The function accepts various data formats including text, files, and
for processing. The function accepts various data formats including text, files, urls and
binary streams, then stores them in a specified dataset for further processing.
Prerequisites:
@ -68,6 +78,7 @@ async def add(
- S3 path: "s3://my-bucket/documents/file.pdf"
- List of mixed types: ["text content", "/path/file.pdf", "file://doc.txt", file_handle]
- Binary file object: open("file.txt", "rb")
- url: A web link url (https or http)
dataset_name: Name of the dataset to store data in. Defaults to "main_dataset".
Create separate datasets to organize different knowledge domains.
user: User object for authentication and permissions. Uses default user if None.
@ -78,6 +89,9 @@ async def add(
vector_db_config: Optional configuration for vector database (for custom setups).
graph_db_config: Optional configuration for graph database (for custom setups).
dataset_id: Optional specific dataset UUID to use instead of dataset_name.
extraction_rules: Optional dictionary of rules (e.g., CSS selectors, XPath) for extracting specific content from web pages using BeautifulSoup
tavily_config: Optional configuration for Tavily API, including API key and extraction settings
soup_crawler_config: Optional configuration for BeautifulSoup crawler, specifying concurrency, crawl delay, and extraction rules.
Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including:
@ -126,6 +140,21 @@ async def add(
# Add a single file
await cognee.add("/home/user/documents/analysis.pdf")
# Add a single url and bs4 extract ingestion method
extraction_rules = {
"title": "h1",
"description": "p",
"more_info": "a[href*='more-info']"
}
await cognee.add("https://example.com",extraction_rules=extraction_rules)
# Add a single url and tavily extract ingestion method
Make sure to set TAVILY_API_KEY = YOUR_TAVILY_API_KEY as a environment variable
await cognee.add("https://example.com")
# Add multiple urls
await cognee.add(["https://example.com","https://books.toscrape.com"])
```
Environment Variables:
@ -139,11 +168,38 @@ async def add(
- DEFAULT_USER_PASSWORD: Custom default user password
- VECTOR_DB_PROVIDER: "lancedb" (default), "chromadb", "pgvector"
- GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j"
- TAVILY_API_KEY: YOUR_TAVILY_API_KEY
"""
if not soup_crawler_config and extraction_rules:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if not tavily_config and os.getenv("TAVILY_API_KEY"):
tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY"))
soup_crawler.set(soup_crawler_config)
tavily.set(tavily_config)
http_schemes = {"http", "https"}
def _is_http_url(item: Union[str, BinaryIO]) -> bool:
return isinstance(item, str) and urlparse(item).scheme in http_schemes
if _is_http_url(data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
elif isinstance(data, list) and any(_is_http_url(item) for item in data):
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),
Task(
ingest_data,
dataset_name,
user,
node_set,
dataset_id,
preferred_loaders,
),
]
await setup()

View file

@ -73,7 +73,11 @@ def get_add_router() -> APIRouter:
try:
add_run = await cognee_add(
data, datasetName, user=user, dataset_id=datasetId, node_set=node_set
data,
datasetName,
user=user,
dataset_id=datasetId,
node_set=node_set if node_set else None,
)
if isinstance(add_run, PipelineRunErrored):

View file

@ -148,7 +148,7 @@ async def cognify(
# 2. Get entity relationships and connections
relationships = await cognee.search(
"connections between concepts",
query_type=SearchType.INSIGHTS
query_type=SearchType.GRAPH_COMPLETION
)
# 3. Find relevant document chunks

View file

@ -14,7 +14,6 @@ DEFAULT_TOOLS = [
"type": "string",
"description": "Type of search to perform",
"enum": [
"INSIGHTS",
"CODE",
"GRAPH_COMPLETION",
"NATURAL_LANGUAGE",

View file

@ -59,7 +59,7 @@ async def handle_search(arguments: Dict[str, Any], user) -> list:
valid_search_types = (
search_tool["parameters"]["properties"]["search_type"]["enum"]
if search_tool
else ["INSIGHTS", "CODE", "GRAPH_COMPLETION", "NATURAL_LANGUAGE"]
else ["CODE", "GRAPH_COMPLETION", "NATURAL_LANGUAGE"]
)
if search_type_str not in valid_search_types:

View file

@ -14,7 +14,6 @@ DEFAULT_TOOLS = [
"type": "string",
"description": "Type of search to perform",
"enum": [
"INSIGHTS",
"CODE",
"GRAPH_COMPLETION",
"NATURAL_LANGUAGE",

View file

@ -52,11 +52,6 @@ async def search(
Best for: Direct document retrieval, specific fact-finding.
Returns: LLM responses based on relevant text chunks.
**INSIGHTS**:
Structured entity relationships and semantic connections.
Best for: Understanding concept relationships, knowledge mapping.
Returns: Formatted relationship data and entity connections.
**CHUNKS**:
Raw text segments that match the query semantically.
Best for: Finding specific passages, citations, exact content.
@ -124,9 +119,6 @@ async def search(
**GRAPH_COMPLETION/RAG_COMPLETION**:
[List of conversational AI response strings]
**INSIGHTS**:
[List of formatted relationship descriptions and entity connections]
**CHUNKS**:
[List of relevant text passages with source metadata]
@ -146,7 +138,6 @@ async def search(
Performance & Optimization:
- **GRAPH_COMPLETION**: Slower but most intelligent, uses LLM + graph context
- **RAG_COMPLETION**: Medium speed, uses LLM + document chunks (no graph traversal)
- **INSIGHTS**: Fast, returns structured relationships without LLM processing
- **CHUNKS**: Fastest, pure vector similarity search without LLM
- **SUMMARIES**: Fast, returns pre-computed summaries
- **CODE**: Medium speed, specialized for code understanding

View file

@ -75,7 +75,7 @@ def get_update_router() -> APIRouter:
data=data,
dataset_id=dataset_id,
user=user,
node_set=node_set,
node_set=node_set if node_set else None,
)
# If any cognify run errored return JSONResponse with proper error status code

View file

@ -10,9 +10,9 @@ from cognee.api.v1.cognify import cognify
async def update(
data_id: UUID,
data: Union[BinaryIO, list[BinaryIO], str, list[str]],
dataset_id: UUID,
user: User = None,
node_set: Optional[List[str]] = None,
dataset_id: Optional[UUID] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
preferred_loaders: List[str] = None,

View file

@ -31,10 +31,6 @@ Search Types & Use Cases:
Traditional RAG using document chunks without graph structure.
Best for: Direct document retrieval, specific fact-finding.
**INSIGHTS**:
Structured entity relationships and semantic connections.
Best for: Understanding concept relationships, knowledge mapping.
**CHUNKS**:
Raw text segments that match the query semantically.
Best for: Finding specific passages, citations, exact content.

View file

@ -19,7 +19,6 @@ COMMAND_DESCRIPTIONS = {
SEARCH_TYPE_CHOICES = [
"GRAPH_COMPLETION",
"RAG_COMPLETION",
"INSIGHTS",
"CHUNKS",
"SUMMARIES",
"CODE",

View file

@ -12,6 +12,8 @@ from cognee.modules.users.methods import get_user
# for different async tasks, threads and processes
vector_db_config = ContextVar("vector_db_config", default=None)
graph_db_config = ContextVar("graph_db_config", default=None)
soup_crawler_config = ContextVar("soup_crawler_config", default=None)
tavily_config = ContextVar("tavily_config", default=None)
async def set_database_global_context_variables(dataset: Union[str, UUID], user_id: UUID):

View file

@ -10,8 +10,6 @@ Here are the available `SearchType` tools and their specific functions:
- Summarizing large amounts of information
- Quick understanding of complex subjects
* **`INSIGHTS`**: The `INSIGHTS` search type discovers connections and relationships between entities in the knowledge graph.
**Best for:**
- Discovering how entities are connected
@ -95,9 +93,6 @@ Here are the available `SearchType` tools and their specific functions:
Query: "Summarize the key findings from these research papers"
Response: `SUMMARIES`
Query: "What is the relationship between the methodologies used in these papers?"
Response: `INSIGHTS`
Query: "When was Einstein born?"
Response: `CHUNKS`

View file

@ -1,133 +0,0 @@
import asyncio
from typing import Any, Optional
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge, Node
from cognee.modules.retrieval.base_graph_retriever import BaseGraphRetriever
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.infrastructure.databases.vector.exceptions.exceptions import CollectionNotFoundError
logger = get_logger("InsightsRetriever")
class InsightsRetriever(BaseGraphRetriever):
"""
Retriever for handling graph connection-based insights.
Public methods include:
- get_context
- get_completion
Instance variables include:
- exploration_levels
- top_k
"""
def __init__(self, exploration_levels: int = 1, top_k: Optional[int] = 5):
"""Initialize retriever with exploration levels and search parameters."""
self.exploration_levels = exploration_levels
self.top_k = top_k
async def get_context(self, query: str) -> list:
"""
Find neighbours of a given node in the graph.
If the provided query does not correspond to an existing node,
search for similar entities and retrieve their connections.
Reraises NoDataError if there is no data found in the system.
Parameters:
-----------
- query (str): A string identifier for the node whose neighbours are to be
retrieved.
Returns:
--------
- list: A list of unique connections found for the queried node.
"""
if query is None:
return []
node_id = query
graph_engine = await get_graph_engine()
exact_node = await graph_engine.extract_node(node_id)
if exact_node is not None and "id" in exact_node:
node_connections = await graph_engine.get_connections(str(exact_node["id"]))
else:
vector_engine = get_vector_engine()
try:
results = await asyncio.gather(
vector_engine.search("Entity_name", query_text=query, limit=self.top_k),
vector_engine.search("EntityType_name", query_text=query, limit=self.top_k),
)
except CollectionNotFoundError as error:
logger.error("Entity collections not found")
raise NoDataError("No data found in the system, please add data first.") from error
results = [*results[0], *results[1]]
relevant_results = [result for result in results if result.score < 0.5][: self.top_k]
if len(relevant_results) == 0:
return []
node_connections_results = await asyncio.gather(
*[graph_engine.get_connections(result.id) for result in relevant_results]
)
node_connections = []
for neighbours in node_connections_results:
node_connections.extend(neighbours)
unique_node_connections_map = {}
unique_node_connections = []
for node_connection in node_connections:
if "id" not in node_connection[0] or "id" not in node_connection[2]:
continue
unique_id = f"{node_connection[0]['id']} {node_connection[1]['relationship_name']} {node_connection[2]['id']}"
if unique_id not in unique_node_connections_map:
unique_node_connections_map[unique_id] = True
unique_node_connections.append(node_connection)
return unique_node_connections
# return [
# Edge(
# node1=Node(node_id=connection[0]["id"], attributes=connection[0]),
# node2=Node(node_id=connection[2]["id"], attributes=connection[2]),
# attributes={
# **connection[1],
# "relationship_type": connection[1]["relationship_name"],
# },
# )
# for connection in unique_node_connections
# ]
async def get_completion(self, query: str, context: Optional[Any] = None) -> Any:
"""
Returns the graph connections context.
If a context is not provided, it fetches the context using the query provided.
Parameters:
-----------
- query (str): A string identifier used to fetch the context.
- context (Optional[Any]): An optional context to use for the completion; if None,
it fetches the context based on the query. (default None)
Returns:
--------
- Any: The context used for the completion, which is either provided or fetched
based on the query.
"""
if context is None:
context = await self.get_context(query)
return context

View file

@ -62,7 +62,7 @@ async def code_description_to_code_part(
try:
if include_docs:
search_results = await search(query_text=query, query_type="INSIGHTS")
search_results = await search(query_text=query, query_type="GRAPH_COMPLETION")
concatenated_descriptions = " ".join(
obj["description"]

View file

@ -9,7 +9,6 @@ from cognee.modules.search.exceptions import UnsupportedSearchTypeError
# Retrievers
from cognee.modules.retrieval.user_qa_feedback import UserQAFeedback
from cognee.modules.retrieval.chunks_retriever import ChunksRetriever
from cognee.modules.retrieval.insights_retriever import InsightsRetriever
from cognee.modules.retrieval.summaries_retriever import SummariesRetriever
from cognee.modules.retrieval.completion_retriever import CompletionRetriever
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
@ -44,10 +43,6 @@ async def get_search_type_tools(
SummariesRetriever(top_k=top_k).get_completion,
SummariesRetriever(top_k=top_k).get_context,
],
SearchType.INSIGHTS: [
InsightsRetriever(top_k=top_k).get_completion,
InsightsRetriever(top_k=top_k).get_context,
],
SearchType.CHUNKS: [
ChunksRetriever(top_k=top_k).get_completion,
ChunksRetriever(top_k=top_k).get_context,

View file

@ -3,7 +3,6 @@ from enum import Enum
class SearchType(Enum):
SUMMARIES = "SUMMARIES"
INSIGHTS = "INSIGHTS"
CHUNKS = "CHUNKS"
RAG_COMPLETION = "RAG_COMPLETION"
GRAPH_COMPLETION = "GRAPH_COMPLETION"

View file

@ -7,6 +7,7 @@ from cognee.modules.ingestion.exceptions import IngestionError
from cognee.modules.ingestion import save_data_to_file
from cognee.shared.logging_utils import get_logger
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.context_global_variables import tavily_config, soup_crawler_config
logger = get_logger()
@ -17,6 +18,13 @@ class SaveDataSettings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="allow")
class HTMLContent(str):
def __new__(cls, value: str):
if not ("<" in value and ">" in value):
raise ValueError("Not valid HTML-like content")
return super().__new__(cls, value)
settings = SaveDataSettings()
@ -48,6 +56,39 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
# data is s3 file path
if parsed_url.scheme == "s3":
return data_item
elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
# Validate URL by sending a HEAD request
try:
from cognee.tasks.web_scraper import fetch_page_content
tavily = tavily_config.get()
soup_crawler = soup_crawler_config.get()
preferred_tool = "beautifulsoup" if soup_crawler else "tavily"
if preferred_tool == "tavily" and tavily is None:
raise IngestionError(
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
)
if preferred_tool == "beautifulsoup" and soup_crawler is None:
raise IngestionError(
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
)
data = await fetch_page_content(
data_item,
preferred_tool=preferred_tool,
tavily_config=tavily,
soup_crawler_config=soup_crawler,
)
content = ""
for key, value in data.items():
content += f"{key}:\n{value}\n\n"
return await save_data_to_file(content)
except IngestionError:
raise
except Exception as e:
raise IngestionError(
message=f"Error ingesting webpage results of url {data_item}: {str(e)}"
)
# data is local file path
elif parsed_url.scheme == "file":

View file

@ -0,0 +1,18 @@
"""Web scraping module for cognee.
This module provides tools for scraping web content, managing scraping jobs, and storing
data in a graph database. It includes classes and functions for crawling web pages using
BeautifulSoup or Tavily, defining data models, and handling scraping configurations.
"""
from .bs4_crawler import BeautifulSoupCrawler
from .utils import fetch_page_content
from .web_scraper_task import cron_web_scraper_task, web_scraper_task
__all__ = [
"BeautifulSoupCrawler",
"fetch_page_content",
"cron_web_scraper_task",
"web_scraper_task",
]

View file

@ -0,0 +1,546 @@
"""BeautifulSoup-based web crawler for extracting content from web pages.
This module provides the BeautifulSoupCrawler class for fetching and extracting content
from web pages using BeautifulSoup or Playwright for JavaScript-rendered pages. It
supports robots.txt handling, rate limiting, and custom extraction rules.
"""
import asyncio
import time
from typing import Union, List, Dict, Any, Optional
from urllib.parse import urlparse
from dataclasses import dataclass, field
from functools import lru_cache
import httpx
from bs4 import BeautifulSoup
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
try:
from playwright.async_api import async_playwright
except ImportError:
logger.error(
"Failed to import playwright, make sure to install using pip install playwright>=1.9.0"
)
async_playwright = None
try:
from protego import Protego
except ImportError:
logger.error("Failed to import protego, make sure to install using pip install protego>=0.1")
Protego = None
@dataclass
class ExtractionRule:
"""Normalized extraction rule for web content.
Attributes:
selector: CSS selector for extraction (if any).
xpath: XPath expression for extraction (if any).
attr: HTML attribute to extract (if any).
all: If True, extract all matching elements; otherwise, extract first.
join_with: String to join multiple extracted elements.
"""
selector: Optional[str] = None
xpath: Optional[str] = None
attr: Optional[str] = None
all: bool = False
join_with: str = " "
@dataclass
class RobotsTxtCache:
"""Cache for robots.txt data.
Attributes:
protego: Parsed robots.txt object (Protego instance).
crawl_delay: Delay between requests (in seconds).
timestamp: Time when the cache entry was created.
"""
protego: Any
crawl_delay: float
timestamp: float = field(default_factory=time.time)
class BeautifulSoupCrawler:
"""Crawler for fetching and extracting web content using BeautifulSoup.
Supports asynchronous HTTP requests, Playwright for JavaScript rendering, robots.txt
compliance, and rate limiting. Extracts content using CSS selectors or XPath rules.
Attributes:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (e.g., User-Agent).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
def __init__(
self,
*,
concurrency: int = 5,
crawl_delay: float = 0.5,
timeout: float = 15.0,
max_retries: int = 2,
retry_delay_factor: float = 0.5,
headers: Optional[Dict[str, str]] = None,
robots_cache_ttl: float = 3600.0,
):
"""Initialize the BeautifulSoupCrawler.
Args:
concurrency: Number of concurrent requests allowed.
crawl_delay: Minimum seconds between requests to the same domain.
timeout: Per-request timeout in seconds.
max_retries: Number of retries for failed requests.
retry_delay_factor: Multiplier for exponential backoff on retries.
headers: HTTP headers for requests (defaults to User-Agent: Cognee-Scraper/1.0).
robots_cache_ttl: Time-to-live for robots.txt cache in seconds.
"""
self.concurrency = concurrency
self._sem = asyncio.Semaphore(concurrency)
self.crawl_delay = crawl_delay
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay_factor = retry_delay_factor
self.headers = headers or {"User-Agent": "Cognee-Scraper/1.0"}
self.robots_cache_ttl = robots_cache_ttl
self._last_request_time_per_domain: Dict[str, float] = {}
self._robots_cache: Dict[str, RobotsTxtCache] = {}
self._client: Optional[httpx.AsyncClient] = None
self._robots_lock = asyncio.Lock()
async def _ensure_client(self):
"""Initialize the HTTP client if not already created."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout, headers=self.headers)
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None
async def __aenter__(self):
"""Enter the context manager, initializing the HTTP client."""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager, closing the HTTP client."""
await self.close()
@lru_cache(maxsize=1024)
def _domain_from_url(self, url: str) -> str:
"""Extract the domain (netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The domain (netloc) of the URL.
"""
try:
return urlparse(url).netloc
except Exception:
return url
@lru_cache(maxsize=1024)
def _get_domain_root(self, url: str) -> str:
"""Get the root URL (scheme and netloc) from a URL.
Args:
url: The URL to parse.
Returns:
str: The root URL (e.g., "https://example.com").
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
async def _respect_rate_limit(self, url: str, crawl_delay: Optional[float] = None):
"""Enforce rate limiting for requests to the same domain.
Args:
url: The URL to check.
crawl_delay: Custom crawl delay in seconds (if any).
"""
domain = self._domain_from_url(url)
last = self._last_request_time_per_domain.get(domain)
delay = crawl_delay if crawl_delay is not None else self.crawl_delay
if last is None:
self._last_request_time_per_domain[domain] = time.time()
return
elapsed = time.time() - last
wait_for = delay - elapsed
if wait_for > 0:
await asyncio.sleep(wait_for)
self._last_request_time_per_domain[domain] = time.time()
async def _get_robots_cache(self, domain_root: str) -> Optional[RobotsTxtCache]:
"""Get cached robots.txt data if valid.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
Optional[RobotsTxtCache]: Cached robots.txt data or None if expired or not found.
"""
if Protego is None:
return None
cached = self._robots_cache.get(domain_root)
if cached and (time.time() - cached.timestamp) < self.robots_cache_ttl:
return cached
return None
async def _fetch_and_cache_robots(self, domain_root: str) -> RobotsTxtCache:
"""Fetch and cache robots.txt data.
Args:
domain_root: The root URL (e.g., "https://example.com").
Returns:
RobotsTxtCache: Cached robots.txt data with crawl delay.
Raises:
Exception: If fetching robots.txt fails.
"""
async with self._robots_lock:
cached = await self._get_robots_cache(domain_root)
if cached:
return cached
robots_url = f"{domain_root}/robots.txt"
try:
await self._ensure_client()
await self._respect_rate_limit(robots_url, self.crawl_delay)
resp = await self._client.get(robots_url, timeout=5.0)
content = resp.text if resp.status_code == 200 else ""
except Exception as e:
logger.debug(f"Failed to fetch robots.txt from {domain_root}: {e}")
content = ""
protego = Protego.parse(content) if content.strip() else None
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
crawl_delay = self.crawl_delay
if protego:
delay = protego.crawl_delay(agent) or protego.crawl_delay("*")
crawl_delay = delay if delay else self.crawl_delay
cache_entry = RobotsTxtCache(protego=protego, crawl_delay=crawl_delay)
self._robots_cache[domain_root] = cache_entry
return cache_entry
async def _is_url_allowed(self, url: str) -> bool:
"""Check if a URL is allowed by robots.txt.
Args:
url: The URL to check.
Returns:
bool: True if the URL is allowed, False otherwise.
"""
if Protego is None:
return True
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
if cache.protego is None:
return True
agent = next((v for k, v in self.headers.items() if k.lower() == "user-agent"), "*")
return cache.protego.can_fetch(agent, url) or cache.protego.can_fetch("*", url)
except Exception as e:
logger.debug(f"Error checking robots.txt for {url}: {e}")
return True
async def _get_crawl_delay(self, url: str) -> float:
"""Get the crawl delay for a URL from robots.txt.
Args:
url: The URL to check.
Returns:
float: Crawl delay in seconds.
"""
if Protego is None:
return self.crawl_delay
try:
domain_root = self._get_domain_root(url)
cache = await self._get_robots_cache(domain_root)
if cache is None:
cache = await self._fetch_and_cache_robots(domain_root)
return cache.crawl_delay
except Exception:
return self.crawl_delay
async def _fetch_httpx(self, url: str) -> str:
"""Fetch a URL using HTTPX with retries.
Args:
url: The URL to fetch.
Returns:
str: The HTML content of the page.
Raises:
Exception: If all retry attempts fail.
"""
await self._ensure_client()
assert self._client is not None, "HTTP client not initialized"
attempt = 0
crawl_delay = await self._get_crawl_delay(url)
while True:
try:
await self._respect_rate_limit(url, crawl_delay)
resp = await self._client.get(url)
resp.raise_for_status()
return resp.text
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Fetch failed for {url} after {attempt} attempts: {exc}")
raise
delay = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying {url} after {delay:.2f}s (attempt {attempt}) due to {exc}"
)
await asyncio.sleep(delay)
async def _render_with_playwright(
self, url: str, js_wait: float = 1.0, timeout: Optional[float] = None
) -> str:
"""Fetch and render a URL using Playwright for JavaScript content.
Args:
url: The URL to fetch.
js_wait: Seconds to wait for JavaScript to load.
timeout: Timeout for the request (in seconds, defaults to instance timeout).
Returns:
str: The rendered HTML content.
Raises:
RuntimeError: If Playwright is not installed.
Exception: If all retry attempts fail.
"""
if async_playwright is None:
raise RuntimeError(
"Playwright is not installed. Install with `pip install playwright` and run `playwright install`."
)
attempt = 0
while True:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
try:
context = await browser.new_context()
page = await context.new_page()
await page.goto(
url,
wait_until="networkidle",
timeout=int((timeout or self.timeout) * 1000),
)
if js_wait:
await asyncio.sleep(js_wait)
return await page.content()
finally:
await browser.close()
except Exception as exc:
attempt += 1
if attempt > self.max_retries:
logger.error(f"Playwright fetch failed for {url}: {exc}")
raise
backoff = self.retry_delay_factor * (2 ** (attempt - 1))
logger.warning(
f"Retrying playwright fetch {url} after {backoff:.2f}s (attempt {attempt})"
)
await asyncio.sleep(backoff)
def _normalize_rule(self, rule: Union[str, Dict[str, Any]]) -> ExtractionRule:
"""Normalize an extraction rule to an ExtractionRule dataclass.
Args:
rule: A string (CSS selector) or dict with extraction parameters.
Returns:
ExtractionRule: Normalized extraction rule.
Raises:
ValueError: If the rule is invalid.
"""
if isinstance(rule, str):
return ExtractionRule(selector=rule)
if isinstance(rule, dict):
return ExtractionRule(
selector=rule.get("selector"),
xpath=rule.get("xpath"),
attr=rule.get("attr"),
all=bool(rule.get("all", False)),
join_with=rule.get("join_with", " "),
)
raise ValueError(f"Invalid extraction rule: {rule}")
def _extract_with_bs4(self, html: str, rule: ExtractionRule) -> str:
"""Extract content from HTML using BeautifulSoup or lxml XPath.
Args:
html: The HTML content to extract from.
rule: The extraction rule to apply.
Returns:
str: The extracted content.
Raises:
RuntimeError: If XPath is used but lxml is not installed.
"""
soup = BeautifulSoup(html, "html.parser")
if rule.xpath:
try:
from lxml import html as lxml_html
except ImportError:
raise RuntimeError(
"XPath requested but lxml is not available. Install lxml or use CSS selectors."
)
doc = lxml_html.fromstring(html)
nodes = doc.xpath(rule.xpath)
texts = []
for n in nodes:
if hasattr(n, "text_content"):
texts.append(n.text_content().strip())
else:
texts.append(str(n).strip())
return rule.join_with.join(t for t in texts if t)
if not rule.selector:
return ""
if rule.all:
nodes = soup.select(rule.selector)
pieces = []
for el in nodes:
if rule.attr:
val = el.get(rule.attr)
if val:
pieces.append(val.strip())
else:
text = el.get_text(strip=True)
if text:
pieces.append(text)
return rule.join_with.join(pieces).strip()
else:
el = soup.select_one(rule.selector)
if el is None:
return ""
if rule.attr:
val = el.get(rule.attr)
return (val or "").strip()
return el.get_text(strip=True)
async def fetch_with_bs4(
self,
urls: Union[str, List[str], Dict[str, Dict[str, Any]]],
extraction_rules: Optional[Dict[str, Any]] = None,
*,
use_playwright: bool = False,
playwright_js_wait: float = 0.8,
join_all_matches: bool = False,
) -> Dict[str, str]:
"""Fetch and extract content from URLs using BeautifulSoup or Playwright.
Args:
urls: A single URL, list of URLs, or dict mapping URLs to extraction rules.
extraction_rules: Default extraction rules for string or list URLs.
use_playwright: If True, use Playwright for JavaScript rendering.
playwright_js_wait: Seconds to wait for JavaScript to load.
join_all_matches: If True, extract all matching elements for each rule.
Returns:
Dict[str, str]: A dictionary mapping URLs to their extracted content.
Raises:
ValueError: If extraction_rules are missing when required or if urls is invalid.
Exception: If fetching or extraction fails.
"""
url_rules_map: Dict[str, Dict[str, Any]] = {}
if isinstance(urls, str):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a string")
url_rules_map[urls] = extraction_rules
elif isinstance(urls, list):
if not extraction_rules:
raise ValueError("extraction_rules required when urls is a list")
for url in urls:
url_rules_map[url] = extraction_rules
elif isinstance(urls, dict):
url_rules_map = urls
else:
raise ValueError(f"Invalid urls type: {type(urls)}")
normalized_url_rules: Dict[str, List[ExtractionRule]] = {}
for url, rules in url_rules_map.items():
normalized_rules = []
for _, rule in rules.items():
r = self._normalize_rule(rule)
if join_all_matches:
r.all = True
normalized_rules.append(r)
normalized_url_rules[url] = normalized_rules
async def _task(url: str):
async with self._sem:
try:
allowed = await self._is_url_allowed(url)
if not allowed:
logger.warning(f"URL disallowed by robots.txt: {url}")
return url, ""
if use_playwright:
html = await self._render_with_playwright(
url, js_wait=playwright_js_wait, timeout=self.timeout
)
else:
html = await self._fetch_httpx(url)
pieces = []
for rule in normalized_url_rules[url]:
text = self._extract_with_bs4(html, rule)
if text:
pieces.append(text)
concatenated = " ".join(pieces).strip()
return url, concatenated
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return url, ""
tasks = [asyncio.create_task(_task(u)) for u in url_rules_map.keys()]
results = {}
for coro in asyncio.as_completed(tasks):
url, text = await coro
results[url] = text
return results

View file

@ -0,0 +1,24 @@
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional, Literal
import os
class TavilyConfig(BaseModel):
api_key: Optional[str] = os.getenv("TAVILY_API_KEY")
extract_depth: Literal["basic", "advanced"] = "basic"
proxies: Optional[Dict[str, str]] = None
timeout: Optional[int] = Field(default=10, ge=1, le=60)
class SoupCrawlerConfig(BaseModel):
concurrency: int = 5
crawl_delay: float = 0.5
timeout: float = 15.0
max_retries: int = 2
retry_delay_factor: float = 0.5
headers: Optional[Dict[str, str]] = None
extraction_rules: Dict[str, Any]
use_playwright: bool = False
playwright_js_wait: float = 0.8
robots_cache_ttl: float = 3600.0
join_all_matches: bool = False

View file

@ -0,0 +1,46 @@
from cognee.infrastructure.engine import DataPoint
from typing import Optional, Dict, Any, List
from datetime import datetime
class WebPage(DataPoint):
"""Represents a scraped web page with metadata"""
name: Optional[str]
content: str
content_hash: str
scraped_at: datetime
last_modified: Optional[datetime]
status_code: int
content_type: str
page_size: int
extraction_rules: Dict[str, Any] # CSS selectors, XPath rules used
description: str
metadata: dict = {"index_fields": ["name", "description", "content"]}
class WebSite(DataPoint):
"""Represents a website or domain being scraped"""
name: str
base_url: str
robots_txt: Optional[str]
crawl_delay: float
last_crawled: datetime
page_count: int
scraping_config: Dict[str, Any]
description: str
metadata: dict = {"index_fields": ["name", "description"]}
class ScrapingJob(DataPoint):
"""Represents a scraping job configuration"""
name: str
urls: List[str]
schedule: Optional[str] # Cron-like schedule for recurring scrapes
status: str # "active", "paused", "completed", "failed"
last_run: Optional[datetime]
next_run: Optional[datetime]
description: str
metadata: dict = {"index_fields": ["name", "description"]}

View file

@ -0,0 +1,126 @@
"""Utilities for fetching web content using BeautifulSoup or Tavily.
This module provides functions to fetch and extract content from web pages, supporting
both BeautifulSoup for custom extraction rules and Tavily for API-based scraping.
"""
from typing import Dict, List, Union, Optional, Literal
from cognee.shared.logging_utils import get_logger
from .bs4_crawler import BeautifulSoupCrawler
from .config import TavilyConfig, SoupCrawlerConfig
logger = get_logger(__name__)
async def fetch_page_content(
urls: Union[str, List[str]],
*,
preferred_tool: Optional[Literal["tavily", "beautifulsoup"]] = "beautifulsoup",
tavily_config: Optional[TavilyConfig] = None,
soup_crawler_config: Optional[SoupCrawlerConfig] = None,
) -> Dict[str, str]:
"""Fetch content from one or more URLs using the specified tool.
This function retrieves web page content using either BeautifulSoup (with custom
extraction rules) or Tavily (API-based scraping). It handles single URLs or lists of
URLs and returns a dictionary mapping URLs to their extracted content.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
preferred_tool: The scraping tool to use ("tavily" or "beautifulsoup").
Defaults to "beautifulsoup".
tavily_config: Configuration for Tavily API, including API key.
Required if preferred_tool is "tavily".
soup_crawler_config: Configuration for BeautifulSoup crawler, including
extraction rules. Required if preferred_tool is "beautifulsoup" and
extraction_rules are needed.
Returns:
Dict[str, str]: A dictionary mapping each URL to its
extracted content (as a string for BeautifulSoup or a dict for Tavily).
Raises:
ValueError: If Tavily API key is missing when using Tavily, or if
extraction_rules are not provided when using BeautifulSoup.
ImportError: If required dependencies (beautifulsoup4 or tavily-python) are not
installed.
"""
if preferred_tool == "tavily":
if not tavily_config or tavily_config.api_key is None:
raise ValueError("TAVILY_API_KEY must be set in TavilyConfig to use Tavily")
return await fetch_with_tavily(urls, tavily_config)
if preferred_tool == "beautifulsoup":
try:
from bs4 import BeautifulSoup as _ # noqa: F401
except ImportError:
logger.error(
"Failed to import bs4, make sure to install using pip install beautifulsoup4>=4.13.1"
)
raise ImportError
if not soup_crawler_config or soup_crawler_config.extraction_rules is None:
raise ValueError("extraction_rules must be provided when not using Tavily")
extraction_rules = soup_crawler_config.extraction_rules
crawler = BeautifulSoupCrawler(
concurrency=soup_crawler_config.concurrency,
crawl_delay=soup_crawler_config.crawl_delay,
timeout=soup_crawler_config.timeout,
max_retries=soup_crawler_config.max_retries,
retry_delay_factor=soup_crawler_config.retry_delay_factor,
headers=soup_crawler_config.headers,
robots_cache_ttl=soup_crawler_config.robots_cache_ttl,
)
try:
results = await crawler.fetch_with_bs4(
urls,
extraction_rules,
use_playwright=soup_crawler_config.use_playwright,
playwright_js_wait=soup_crawler_config.playwright_js_wait,
join_all_matches=soup_crawler_config.join_all_matches,
)
return results
except Exception as e:
logger.error(f"Error fetching page content: {str(e)}")
raise
finally:
await crawler.close()
async def fetch_with_tavily(
urls: Union[str, List[str]], tavily_config: Optional[TavilyConfig] = None
) -> Dict[str, str]:
"""Fetch content from URLs using the Tavily API.
Args:
urls: A single URL (str) or a list of URLs (List[str]) to scrape.
Returns:
Dict[str, str]: A dictionary mapping each URL to its raw content as a string.
Raises:
ImportError: If tavily-python is not installed.
Exception: If the Tavily API request fails.
"""
try:
from tavily import AsyncTavilyClient
except ImportError:
logger.error(
"Failed to import tavily, make sure to install using pip install tavily-python>=0.7.0"
)
raise
client = AsyncTavilyClient(
api_key=tavily_config.api_key if tavily_config else None,
proxies=tavily_config.proxies if tavily_config else None,
)
results = await client.extract(
urls,
format="text",
extract_depth=tavily_config.extract_depth if tavily_config else "basic",
timeout=tavily_config.timeout if tavily_config else 10,
)
for failed_result in results.get("failed_results", []):
logger.warning(f"Failed to fetch {failed_result}")
return_results = {}
for result in results.get("results", []):
return_results[result["url"]] = result["raw_content"]
return return_results

View file

@ -0,0 +1,396 @@
"""Web scraping tasks for storing scraped data in a graph database.
This module provides functions to scrape web content, create or update WebPage, WebSite,
and ScrapingJob data points, and store them in a Kuzu graph database. It supports
scheduled scraping tasks and ensures that node updates preserve existing graph edges.
"""
import os
import hashlib
from datetime import datetime
from typing import Union, List
from urllib.parse import urlparse
from uuid import uuid5, NAMESPACE_OID
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.shared.logging_utils import get_logger
from cognee.tasks.storage.index_data_points import index_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.engine.operations.setup import setup
from .models import WebPage, WebSite, ScrapingJob
from .config import SoupCrawlerConfig, TavilyConfig
from .utils import fetch_page_content
try:
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.asyncio import AsyncIOScheduler
except ImportError:
raise ImportError("Please install apscheduler by pip install APScheduler>=3.10")
logger = get_logger(__name__)
_scheduler = None
def get_scheduler():
global _scheduler
if _scheduler is None:
_scheduler = AsyncIOScheduler()
return _scheduler
async def cron_web_scraper_task(
url: Union[str, List[str]],
*,
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = "scraping",
):
"""Schedule or run a web scraping task.
This function schedules a recurring web scraping task using APScheduler or runs it
immediately if no schedule is provided. It delegates to web_scraper_task for actual
scraping and graph storage.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs immediately.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to "scraping".
Returns:
Any: The result of web_scraper_task if run immediately, or None if scheduled.
Raises:
ValueError: If the schedule is an invalid cron expression.
ImportError: If APScheduler is not installed.
"""
now = datetime.now()
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
if schedule:
try:
trigger = CronTrigger.from_crontab(schedule)
except ValueError as e:
raise ValueError(f"Invalid cron string '{schedule}': {e}")
scheduler = get_scheduler()
scheduler.add_job(
web_scraper_task,
kwargs={
"url": url,
"schedule": schedule,
"extraction_rules": extraction_rules,
"tavily_api_key": tavily_api_key,
"soup_crawler_config": soup_crawler_config,
"tavily_config": tavily_config,
"job_name": job_name,
},
trigger=trigger,
id=job_name,
name=job_name or f"WebScraper_{uuid5(NAMESPACE_OID, name=job_name)}",
replace_existing=True,
)
if not scheduler.running:
scheduler.start()
return
# If no schedule, run immediately
logger.info(f"[{datetime.now()}] Running web scraper task immediately...")
return await web_scraper_task(
url=url,
schedule=schedule,
extraction_rules=extraction_rules,
tavily_api_key=tavily_api_key,
soup_crawler_config=soup_crawler_config,
tavily_config=tavily_config,
job_name=job_name,
)
async def web_scraper_task(
url: Union[str, List[str]],
*,
schedule: str = None,
extraction_rules: dict = None,
tavily_api_key: str = os.getenv("TAVILY_API_KEY"),
soup_crawler_config: SoupCrawlerConfig = None,
tavily_config: TavilyConfig = None,
job_name: str = None,
):
"""Scrape URLs and store data points in a Graph database.
This function scrapes content from the provided URLs, creates or updates WebPage,
WebSite, and ScrapingJob data points, and stores them in a Graph database.
Each data point includes a description field summarizing its attributes. It creates
'is_scraping' (ScrapingJob to WebSite) and 'is_part_of' (WebPage to WebSite)
relationships, preserving existing edges during node updates.
Args:
url: A single URL or list of URLs to scrape.
schedule: A cron expression for scheduling (e.g., "0 0 * * *"). If None, runs once.
extraction_rules: Dictionary of extraction rules for BeautifulSoup (e.g., CSS selectors).
tavily_api_key: API key for Tavily. Defaults to TAVILY_API_KEY environment variable.
soup_crawler_config: Configuration for BeautifulSoup crawler.
tavily_config: Configuration for Tavily API.
job_name: Name of the scraping job. Defaults to a timestamp-based name.
Returns:
Any: The graph data returned by the graph database.
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
Exception: If fetching content or database operations fail.
"""
await setup()
graph_db = await get_graph_engine()
if isinstance(url, str):
url = [url]
soup_crawler_config, tavily_config, preferred_tool = check_arguments(
tavily_api_key, extraction_rules, tavily_config, soup_crawler_config
)
now = datetime.now()
job_name = job_name or f"scrape_{now.strftime('%Y%m%d_%H%M%S')}"
status = "active"
trigger = CronTrigger.from_crontab(schedule) if schedule else None
next_run = trigger.get_next_fire_time(None, now) if trigger else None
scraping_job_created = await graph_db.get_node(uuid5(NAMESPACE_OID, name=job_name))
# Create description for ScrapingJob
scraping_job_description = (
f"Scraping job: {job_name}\n"
f"URLs: {', '.join(url)}\n"
f"Status: {status}\n"
f"Schedule: {schedule}\n"
f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
)
scraping_job = ScrapingJob(
id=uuid5(NAMESPACE_OID, name=job_name),
name=job_name,
urls=url,
status=status,
schedule=schedule,
last_run=now,
next_run=next_run,
description=scraping_job_description,
)
if scraping_job_created:
await graph_db.add_node(scraping_job) # Update existing scraping job
websites_dict = {}
webpages = []
# Fetch content
results = await fetch_page_content(
urls=url,
preferred_tool=preferred_tool,
tavily_config=tavily_config,
soup_crawler_config=soup_crawler_config,
)
for page_url, content in results.items():
parsed_url = urlparse(page_url)
domain = parsed_url.netloc
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Create or update WebSite
if base_url not in websites_dict:
# Create description for WebSite
website_description = (
f"Website: {domain}\n"
f"Base URL: {base_url}\n"
f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Page count: 1\n"
f"Scraping tool: {preferred_tool}\n"
f"Robots.txt: {'Available' if websites_dict.get(base_url, {}).get('robots_txt') else 'Not set'}\n"
f"Crawl delay: 0.5 seconds"
)
websites_dict[base_url] = WebSite(
id=uuid5(NAMESPACE_OID, name=domain),
name=domain,
base_url=base_url,
robots_txt=None,
crawl_delay=0.5,
last_crawled=now,
page_count=1,
scraping_config={
"extraction_rules": extraction_rules or {},
"tool": preferred_tool,
},
description=website_description,
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
else:
websites_dict[base_url].page_count += 1
# Update description for existing WebSite
websites_dict[base_url].description = (
f"Website: {domain}\n"
f"Base URL: {base_url}\n"
f"Last crawled: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Page count: {websites_dict[base_url].page_count}\n"
f"Scraping tool: {preferred_tool}\n"
f"Robots.txt: {'Available' if websites_dict[base_url].robots_txt else 'Not set'}\n"
f"Crawl delay: {websites_dict[base_url].crawl_delay} seconds"
)
if scraping_job_created:
await graph_db.add_node(websites_dict[base_url])
# Create WebPage
content_str = content if isinstance(content, str) else str(content)
content_hash = hashlib.sha256(content_str.encode("utf-8")).hexdigest()
content_preview = content_str[:500] + ("..." if len(content_str) > 500 else "")
# Create description for WebPage
webpage_description = (
f"Webpage: {parsed_url.path.lstrip('/') or 'Home'}\n"
f"URL: {page_url}\n"
f"Scraped at: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Content: {content_preview}\n"
f"Content type: text\n"
f"Page size: {len(content_str)} bytes\n"
f"Status code: 200"
)
page_extraction_rules = extraction_rules
webpage = WebPage(
id=uuid5(NAMESPACE_OID, name=page_url),
name=page_url,
content=content_str,
content_hash=content_hash,
scraped_at=now,
last_modified=None,
status_code=200,
content_type="text/html",
page_size=len(content_str),
extraction_rules=page_extraction_rules or {},
description=webpage_description,
)
webpages.append(webpage)
scraping_job.status = "completed" if webpages else "failed"
# Update ScrapingJob description with final status
scraping_job.description = (
f"Scraping job: {job_name}\n"
f"URLs: {', '.join(url)}\n"
f"Status: {scraping_job.status}\n"
f"Last run: {now.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Next run: {next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else 'Not scheduled'}"
)
websites = list(websites_dict.values())
# Adding Nodes and Edges
node_mapping = {scraping_job.id: scraping_job}
edge_mapping = []
for website in websites:
node_mapping[website.id] = website
edge_mapping.append(
(
scraping_job.id,
website.id,
"is_scraping",
{
"source_node_id": scraping_job.id,
"target_node_id": website.id,
"relationship_name": "is_scraping",
},
)
)
for webpage in webpages:
node_mapping[webpage.id] = webpage
parsed_url = urlparse(webpage.name)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
edge_mapping.append(
(
webpage.id,
websites_dict[base_url].id,
"is_part_of",
{
"source_node_id": webpage.id,
"target_node_id": websites_dict[base_url].id,
"relationship_name": "is_part_of",
},
)
)
await graph_db.add_nodes(list(node_mapping.values()))
await graph_db.add_edges(edge_mapping)
await index_data_points(list(node_mapping.values()))
await index_graph_edges()
return await graph_db.get_graph_data()
def check_arguments(tavily_api_key, extraction_rules, tavily_config, soup_crawler_config):
"""Validate and configure arguments for web_scraper_task.
Args:
tavily_api_key: API key for Tavily.
extraction_rules: Extraction rules for BeautifulSoup.
tavily_config: Configuration for Tavily API.
soup_crawler_config: Configuration for BeautifulSoup crawler.
Returns:
Tuple[SoupCrawlerConfig, TavilyConfig, str]: Configured soup_crawler_config,
tavily_config, and preferred_tool ("tavily" or "beautifulsoup").
Raises:
TypeError: If neither tavily_config nor soup_crawler_config is provided.
"""
preferred_tool = "beautifulsoup"
if extraction_rules and not soup_crawler_config:
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
if tavily_api_key:
if not tavily_config:
tavily_config = TavilyConfig(api_key=tavily_api_key)
else:
tavily_config.api_key = tavily_api_key
if not extraction_rules and not soup_crawler_config:
preferred_tool = "tavily"
if not tavily_config and not soup_crawler_config:
raise TypeError("Make sure you pass arguments for web_scraper_task")
return soup_crawler_config, tavily_config, preferred_tool
def get_path_after_base(base_url: str, url: str) -> str:
"""Extract the path after the base URL.
Args:
base_url: The base URL (e.g., "https://example.com").
url: The full URL to extract the path from.
Returns:
str: The path after the base URL, with leading slashes removed.
Raises:
ValueError: If the base URL and target URL are from different domains.
"""
parsed_base = urlparse(base_url)
parsed_url = urlparse(url)
# Ensure they have the same netloc (domain)
if parsed_base.netloc != parsed_url.netloc:
raise ValueError("Base URL and target URL are from different domains")
# Return everything after base_url path
base_path = parsed_base.path.rstrip("/")
full_path = parsed_url.path
if full_path.startswith(base_path):
return full_path[len(base_path) :].lstrip("/")
else:
return full_path.lstrip("/")

View file

@ -53,7 +53,6 @@ class TestCliConfig:
expected_types = [
"GRAPH_COMPLETION",
"RAG_COMPLETION",
"INSIGHTS",
"CHUNKS",
"SUMMARIES",
"CODE",

View file

@ -0,0 +1,172 @@
import asyncio
import cognee
from cognee.tasks.web_scraper.config import SoupCrawlerConfig
from cognee.tasks.web_scraper import cron_web_scraper_task
async def test_web_scraping_using_bs4():
await cognee.prune.prune_data()
await cognee.prune.prune_system()
url = "https://quotes.toscrape.com/"
rules = {
"quotes": {"selector": ".quote span.text", "all": True},
"authors": {"selector": ".quote small", "all": True},
}
soup_config = SoupCrawlerConfig(
concurrency=5,
crawl_delay=0.5,
timeout=15.0,
max_retries=2,
retry_delay_factor=0.5,
extraction_rules=rules,
use_playwright=False,
)
await cognee.add(
data=url,
soup_crawler_config=soup_config,
incremental_loading=False,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
async def test_web_scraping_using_bs4_and_incremental_loading():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://books.toscrape.com/"
rules = {"titles": "article.product_pod h3 a", "prices": "article.product_pod p.price_color"}
soup_config = SoupCrawlerConfig(
concurrency=1,
crawl_delay=0.1,
timeout=10.0,
max_retries=1,
retry_delay_factor=0.5,
extraction_rules=rules,
use_playwright=False,
structured=True,
)
await cognee.add(
data=url,
soup_crawler_config=soup_config,
incremental_loading=True,
)
await cognee.cognify()
results = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results[0]
print("Test passed! Found 'A Light in the Attic' in scraped data.")
async def test_web_scraping_using_tavily():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://quotes.toscrape.com/"
await cognee.add(
data=url,
incremental_loading=False,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
async def test_web_scraping_using_tavily_and_incremental_loading():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
url = "https://quotes.toscrape.com/"
await cognee.add(
data=url,
incremental_loading=True,
)
await cognee.cognify()
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
print("Test passed! Found Albert Einstein in scraped data.")
# ---------- cron job tests ----------
async def test_cron_web_scraper():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
urls = ["https://quotes.toscrape.com/", "https://books.toscrape.com/"]
extraction_rules = {
"quotes": ".quote .text",
"authors": ".quote .author",
"titles": "article.product_pod h3 a",
"prices": "article.product_pod p.price_color",
}
# Run cron_web_scraper_task
await cron_web_scraper_task(
url=urls,
job_name="cron_scraping_job",
extraction_rules=extraction_rules,
)
results = await cognee.search(
"Who said 'The world as we have created it is a process of our thinking. It cannot be changed without changing our thinking'?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "Albert Einstein" in results[0]
results_books = await cognee.search(
"What is the price of 'A Light in the Attic' book?",
query_type=cognee.SearchType.GRAPH_COMPLETION,
)
assert "51.77" in results_books[0]
print("Cron job web_scraping test passed!")
async def main():
print("Starting BS4 incremental loading test...")
await test_web_scraping_using_bs4_and_incremental_loading()
print("Starting BS4 normal test...")
await test_web_scraping_using_bs4()
print("Starting Tavily incremental loading test...")
await test_web_scraping_using_tavily_and_incremental_loading()
print("Starting Tavily normal test...")
await test_web_scraping_using_tavily()
print("Starting cron job test...")
await test_cron_web_scraper()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -133,20 +133,16 @@ async def main():
dataset_name_1 = "natural_language"
dataset_name_2 = "quantum"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name_1)
await cognee.add([explanation_file_path_nlp], dataset_name_1)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name_2)
await cognee.add([explanation_file_path_quantum], dataset_name_2)
await cognee.cognify([dataset_name_2, dataset_name_1])
@ -159,7 +155,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -38,19 +38,16 @@ async def main():
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path_nlp], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
await cognee.add([text], dataset_name)
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
@ -61,7 +58,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -131,20 +131,16 @@ async def main():
dataset_name_1 = "natural_language"
dataset_name_2 = "quantum"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name_1)
await cognee.add([explanation_file_path_nlp], dataset_name_1)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name_2)
await cognee.add([explanation_file_path_quantum], dataset_name_2)
await cognee.cognify([dataset_name_2, dataset_name_1])
@ -157,7 +153,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -6,6 +6,7 @@ from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
from cognee import update
logger = get_logger()
@ -42,7 +43,7 @@ async def main():
await cognee.add([text], dataset_name)
await cognee.cognify([dataset_name])
cognify_run_info = await cognee.cognify([dataset_name])
from cognee.infrastructure.databases.vector import get_vector_engine
@ -51,7 +52,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")
@ -77,6 +78,35 @@ async def main():
assert len(history) == 6, "Search history is not correct."
# Test updating of documents
# Get Pipeline Run object
pipeline_run_obj = list(cognify_run_info.values())[0]
for data_item in pipeline_run_obj.data_ingestion_info:
# Update all documents in dataset to only contain Mark and Cindy information
await update(
dataset_id=pipeline_run_obj.dataset_id,
data_id=data_item["data_id"],
data="Mark met with Cindy at a cafe.",
)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="What information do you contain?"
)
assert "Mark" in search_results[0], (
"Failed to update document, no mention of Mark in search results"
)
assert "Cindy" in search_results[0], (
"Failed to update document, no mention of Cindy in search results"
)
assert "Artificial intelligence" not in search_results[0], (
"Failed to update document, Artificial intelligence still mentioned in search results"
)
# Test visualization
from cognee import visualize_graph
await visualize_graph()
# Assert local data files are cleaned properly
await cognee.prune.prune_data()
data_root_directory = get_storage_config()["data_root_directory"]

View file

@ -32,20 +32,16 @@ async def main():
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path_nlp], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
@ -56,7 +52,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -32,20 +32,16 @@ async def main():
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path_nlp], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
@ -56,7 +52,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -38,20 +38,16 @@ async def main():
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path_nlp], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
@ -60,7 +56,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -34,25 +34,21 @@ async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
# Add document for default user
await cognee.add([explanation_file_path], dataset_name="NLP")
await cognee.add([explanation_file_path_nlp], dataset_name="NLP")
default_user = await get_default_user()
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
# Add document for test user
test_user = await create_user("user@example.com", "example")
await cognee.add([text], dataset_name="QUANTUM", user=test_user)
await cognee.add([explanation_file_path_quantum], dataset_name="QUANTUM", user=test_user)
nlp_cognify_result = await cognee.cognify(["NLP"], user=default_user)
quantum_cognify_result = await cognee.cognify(["QUANTUM"], user=test_user)
@ -101,7 +97,7 @@ async def main():
add_error = False
try:
await cognee.add(
[explanation_file_path],
[explanation_file_path_nlp],
dataset_name="QUANTUM",
dataset_id=test_user_dataset_id,
user=default_user,
@ -143,7 +139,7 @@ async def main():
# Add new data to test_users dataset from default_user
await cognee.add(
[explanation_file_path],
[explanation_file_path_nlp],
dataset_name="QUANTUM",
dataset_id=test_user_dataset_id,
user=default_user,
@ -216,7 +212,7 @@ async def main():
)
# Try deleting data from test_user dataset with default_user after getting delete permission
# Get the dataset data to find the ID of the remaining data item (explanation_file_path)
# Get the dataset data to find the ID of the remaining data item (explanation_file_path_nlp)
test_user_dataset_data = await get_dataset_data(test_user_dataset_id)
explanation_file_data_id = test_user_dataset_data[0].id

View file

@ -141,10 +141,10 @@ async def main():
dataset_name_1 = "natural_language"
dataset_name_2 = "quantum"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name_1)
await cognee.add([explanation_file_path_nlp], dataset_name_1)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
@ -167,7 +167,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")
@ -202,7 +202,7 @@ async def main():
history = await get_history(user.id)
assert len(history) == 8, "Search history is not correct."
await test_local_file_deletion(text, explanation_file_path)
await test_local_file_deletion(text, explanation_file_path_nlp)
await cognee.prune.prune_data()
data_root_directory = get_storage_config()["data_root_directory"]

View file

@ -42,19 +42,16 @@ async def main():
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
explanation_file_path_nlp = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path_nlp], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
await cognee.add([text], dataset_name)
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])
@ -65,7 +62,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -47,7 +47,7 @@ async def main():
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
query_type=SearchType.GRAPH_COMPLETION, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted sentences are:\n")

View file

@ -1,3 +1,5 @@
import pathlib
import os
import cognee
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge
@ -27,15 +29,11 @@ async def main():
text_1 = """Germany is located in europe right next to the Netherlands"""
await cognee.add(text_1, dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
explanation_file_path_quantum = os.path.join(
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
)
await cognee.add([text], dataset_name)
await cognee.add([explanation_file_path_quantum], dataset_name)
await cognee.cognify([dataset_name])

View file

@ -1,251 +0,0 @@
import os
import pytest
import pathlib
import cognee
from cognee.low_level import setup
from cognee.tasks.storage import add_data_points
from cognee.modules.engine.models import Entity, EntityType
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.exceptions.exceptions import NoDataError
from cognee.modules.retrieval.insights_retriever import InsightsRetriever
class TestInsightsRetriever:
@pytest.mark.asyncio
async def test_insights_context_simple(self):
system_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".cognee_system/test_insights_context_simple"
)
cognee.config.system_root_directory(system_directory_path)
data_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".data_storage/test_insights_context_simple"
)
cognee.config.data_root_directory(data_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
entityTypePerson = EntityType(
name="Person",
description="An individual",
)
person1 = Entity(
name="Steve Rodger",
is_a=entityTypePerson,
description="An American actor, comedian, and filmmaker",
)
person2 = Entity(
name="Mike Broski",
is_a=entityTypePerson,
description="Financial advisor and philanthropist",
)
person3 = Entity(
name="Christina Mayer",
is_a=entityTypePerson,
description="Maker of next generation of iconic American music videos",
)
entityTypeCompany = EntityType(
name="Company",
description="An organization that operates on an annual basis",
)
company1 = Entity(
name="Apple",
is_a=entityTypeCompany,
description="An American multinational technology company headquartered in Cupertino, California",
)
company2 = Entity(
name="Google",
is_a=entityTypeCompany,
description="An American multinational technology company that specializes in Internet-related services and products",
)
company3 = Entity(
name="Facebook",
is_a=entityTypeCompany,
description="An American social media, messaging, and online platform",
)
entities = [person1, person2, person3, company1, company2, company3]
await add_data_points(entities)
retriever = InsightsRetriever()
context = await retriever.get_context("Mike")
assert context[0][0]["name"] == "Mike Broski", "Failed to get Mike Broski"
@pytest.mark.asyncio
async def test_insights_context_complex(self):
system_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".cognee_system/test_insights_context_complex"
)
cognee.config.system_root_directory(system_directory_path)
data_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".data_storage/test_insights_context_complex"
)
cognee.config.data_root_directory(data_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
entityTypePerson = EntityType(
name="Person",
description="An individual",
)
person1 = Entity(
name="Steve Rodger",
is_a=entityTypePerson,
description="An American actor, comedian, and filmmaker",
)
person2 = Entity(
name="Mike Broski",
is_a=entityTypePerson,
description="Financial advisor and philanthropist",
)
person3 = Entity(
name="Christina Mayer",
is_a=entityTypePerson,
description="Maker of next generation of iconic American music videos",
)
person4 = Entity(
name="Jason Statham",
is_a=entityTypePerson,
description="An American actor",
)
person5 = Entity(
name="Mike Tyson",
is_a=entityTypePerson,
description="A former professional boxer from the United States",
)
entityTypeCompany = EntityType(
name="Company",
description="An organization that operates on an annual basis",
)
company1 = Entity(
name="Apple",
is_a=entityTypeCompany,
description="An American multinational technology company headquartered in Cupertino, California",
)
company2 = Entity(
name="Google",
is_a=entityTypeCompany,
description="An American multinational technology company that specializes in Internet-related services and products",
)
company3 = Entity(
name="Facebook",
is_a=entityTypeCompany,
description="An American social media, messaging, and online platform",
)
entities = [person1, person2, person3, company1, company2, company3]
await add_data_points(entities)
graph_engine = await get_graph_engine()
await graph_engine.add_edges(
[
(
(str)(person1.id),
(str)(company1.id),
"works_for",
dict(
relationship_name="works_for",
source_node_id=person1.id,
target_node_id=company1.id,
),
),
(
(str)(person2.id),
(str)(company2.id),
"works_for",
dict(
relationship_name="works_for",
source_node_id=person2.id,
target_node_id=company2.id,
),
),
(
(str)(person3.id),
(str)(company3.id),
"works_for",
dict(
relationship_name="works_for",
source_node_id=person3.id,
target_node_id=company3.id,
),
),
(
(str)(person4.id),
(str)(company1.id),
"works_for",
dict(
relationship_name="works_for",
source_node_id=person4.id,
target_node_id=company1.id,
),
),
(
(str)(person5.id),
(str)(company1.id),
"works_for",
dict(
relationship_name="works_for",
source_node_id=person5.id,
target_node_id=company1.id,
),
),
]
)
retriever = InsightsRetriever(top_k=20)
context = await retriever.get_context("Christina")
assert context[0][0]["name"] == "Christina Mayer", "Failed to get Christina Mayer"
@pytest.mark.asyncio
async def test_insights_context_on_empty_graph(self):
system_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".cognee_system/test_insights_context_on_empty_graph"
)
cognee.config.system_root_directory(system_directory_path)
data_directory_path = os.path.join(
pathlib.Path(__file__).parent, ".data_storage/test_insights_context_on_empty_graph"
)
cognee.config.data_root_directory(data_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
retriever = InsightsRetriever()
with pytest.raises(NoDataError):
await retriever.get_context("Christina Mayer")
vector_engine = get_vector_engine()
await vector_engine.create_collection("Entity_name", payload_schema=Entity)
await vector_engine.create_collection("EntityType_name", payload_schema=EntityType)
context = await retriever.get_context("Christina Mayer")
assert context == [], "Returned context should be empty on an empty graph"

View file

@ -34,7 +34,7 @@ class CogneeConfig(QABenchmarkConfig):
system_prompt_path: str = "answer_simple_question_benchmark2.txt"
# Search parameters (fallback if not using eval framework)
search_type: SearchType = SearchType.INSIGHTS
search_type: SearchType = SearchType.GRAPH_COMPLETION
# Clean slate on initialization
clean_start: bool = True

View file

@ -57,7 +57,9 @@ async def main():
# Now let's perform some searches
# 1. Search for insights related to "ChromaDB"
insights_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text="ChromaDB")
insights_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="ChromaDB"
)
print("\nInsights about ChromaDB:")
for result in insights_results:
print(f"- {result}")

View file

@ -55,7 +55,9 @@ async def main():
# Now let's perform some searches
# 1. Search for insights related to "KuzuDB"
insights_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text="KuzuDB")
insights_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="KuzuDB"
)
print("\nInsights about KuzuDB:")
for result in insights_results:
print(f"- {result}")

View file

@ -64,7 +64,9 @@ async def main():
# Now let's perform some searches
# 1. Search for insights related to "Neo4j"
insights_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text="Neo4j")
insights_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="Neo4j"
)
print("\nInsights about Neo4j:")
for result in insights_results:
print(f"- {result}")

View file

@ -79,7 +79,7 @@ async def main():
# Now let's perform some searches
# 1. Search for insights related to "Neptune Analytics"
insights_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text="Neptune Analytics"
query_type=SearchType.GRAPH_COMPLETION, query_text="Neptune Analytics"
)
print("\n========Insights about Neptune Analytics========:")
for result in insights_results:

View file

@ -69,7 +69,9 @@ async def main():
# Now let's perform some searches
# 1. Search for insights related to "PGVector"
insights_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text="PGVector")
insights_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="PGVector"
)
print("\nInsights about PGVector:")
for result in insights_results:
print(f"- {result}")

View file

@ -50,7 +50,9 @@ async def main():
query_text = "Tell me about NLP"
print(f"Searching cognee for insights with query: '{query_text}'")
# Query cognee for insights on the added text
search_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text=query_text)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text=query_text
)
print("Search results:")
# Display results

View file

@ -1795,7 +1795,7 @@
}
],
"source": [
"search_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text=node_name)\n",
"search_results = await cognee.search(query_type=SearchType.GRAPH_COMPLETION, query_text=node_name)\n",
"print(\"\\n\\nExtracted sentences are:\\n\")\n",
"for result in search_results:\n",
" print(f\"{result}\\n\")"

@ -1 +0,0 @@
Subproject commit 130b84db9270734756d16918e5c86034777140fc

View file

@ -295,7 +295,7 @@
"cell_type": "code",
"source": [
"# Search graph insights\n",
"insights_results = await search(query_text=\"Neptune Analytics\", query_type=SearchType.INSIGHTS)\n",
"insights_results = await search(query_text=\"Neptune Analytics\", query_type=SearchType.GRAPH_COMPLETION)\n",
"print(\"\\nInsights about Neptune Analytics:\")\n",
"for result in insights_results:\n",
" src_node = result[0].get(\"name\", result[0][\"type\"])\n",

View file

@ -63,7 +63,14 @@ api=[]
distributed = [
"modal>=1.0.5,<2.0.0",
]
scraping = [
"tavily-python>=0.7.0",
"beautifulsoup4>=4.13.1",
"playwright>=1.9.0",
"lxml>=4.9.3,<5.0.0",
"protego>=0.1",
"APScheduler>=3.10.0,<=3.11.0"
]
neo4j = ["neo4j>=5.28.0,<6"]
neptune = ["langchain_aws>=0.2.22"]
postgres = [

2
uv.lock generated
View file

@ -8966,4 +8966,4 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8e/e0/69a553d2047f9a2c7347caa225bb3a63b6d7704ad74610cb7823baa08ed7/zstandard-0.25.0-cp313-cp313-win32.whl", hash = "sha256:7030defa83eef3e51ff26f0b7bfb229f0204b66fe18e04359ce3474ac33cbc09", size = 436936, upload-time = "2025-09-14T22:17:52.658Z" },
{ url = "https://files.pythonhosted.org/packages/d9/82/b9c06c870f3bd8767c201f1edbdf9e8dc34be5b0fbc5682c4f80fe948475/zstandard-0.25.0-cp313-cp313-win_amd64.whl", hash = "sha256:1f830a0dac88719af0ae43b8b2d6aef487d437036468ef3c2ea59c51f9d55fd5", size = 506232, upload-time = "2025-09-14T22:17:50.402Z" },
{ url = "https://files.pythonhosted.org/packages/d4/57/60c3c01243bb81d381c9916e2a6d9e149ab8627c0c7d7abb2d73384b3c0c/zstandard-0.25.0-cp313-cp313-win_arm64.whl", hash = "sha256:85304a43f4d513f5464ceb938aa02c1e78c2943b29f44a750b48b25ac999a049", size = 462671, upload-time = "2025-09-14T22:17:51.533Z" },
]
]