refactor: move url data fetching logic into save_data_item_to_storage
This commit is contained in:
parent
17b33ab443
commit
d7417d9b06
8 changed files with 91 additions and 155 deletions
|
|
@ -31,6 +31,7 @@ async def add(
|
||||||
incremental_loading: bool = True,
|
incremental_loading: bool = True,
|
||||||
data_per_batch: Optional[int] = 20,
|
data_per_batch: Optional[int] = 20,
|
||||||
loaders_config: dict[LoaderInterface, dict] = {},
|
loaders_config: dict[LoaderInterface, dict] = {},
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Add data to Cognee for knowledge graph processing.
|
Add data to Cognee for knowledge graph processing.
|
||||||
|
|
@ -179,6 +180,7 @@ async def add(
|
||||||
dataset_id,
|
dataset_id,
|
||||||
preferred_loaders,
|
preferred_loaders,
|
||||||
loaders_config,
|
loaders_config,
|
||||||
|
fetchers_config,
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
@ -204,6 +206,7 @@ async def add(
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
incremental_loading=incremental_loading,
|
incremental_loading=incremental_loading,
|
||||||
data_per_batch=data_per_batch,
|
data_per_batch=data_per_batch,
|
||||||
|
fetchers_config=fetchers_config,
|
||||||
):
|
):
|
||||||
pipeline_run_info = run_info
|
pipeline_run_info = run_info
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,6 @@ from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from cognee.modules.ingestion.exceptions.exceptions import IngestionError
|
from cognee.modules.ingestion.exceptions.exceptions import IngestionError
|
||||||
from cognee.modules.ingestion import save_data_to_file
|
|
||||||
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
|
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
@ -62,7 +60,7 @@ class WebUrlLoader(LoaderInterface):
|
||||||
Load and process the file, returning standardized result.
|
Load and process the file, returning standardized result.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_path: Path to the file to be processed
|
file_path: Path to the file to be processed (already saved by fetcher)
|
||||||
file_stream: If file stream is provided it will be used to process file instead
|
file_stream: If file stream is provided it will be used to process file instead
|
||||||
**kwargs: Additional loader-specific configuration
|
**kwargs: Additional loader-specific configuration
|
||||||
|
|
||||||
|
|
@ -71,63 +69,5 @@ class WebUrlLoader(LoaderInterface):
|
||||||
Raises:
|
Raises:
|
||||||
Exception: If file cannot be processed
|
Exception: If file cannot be processed
|
||||||
"""
|
"""
|
||||||
loaders_config = kwargs.get("loaders_config")
|
|
||||||
if not isinstance(loaders_config, dict):
|
|
||||||
raise IngestionError("loaders_config must be a valid dictionary")
|
|
||||||
|
|
||||||
web_url_loader_config = loaders_config.get(self.loader_name)
|
return file_path
|
||||||
if not isinstance(web_url_loader_config, dict):
|
|
||||||
raise IngestionError(f"{self.loader_name} configuration must be a valid dictionary")
|
|
||||||
|
|
||||||
try:
|
|
||||||
from cognee.context_global_variables import tavily_config, soup_crawler_config
|
|
||||||
from cognee.tasks.web_scraper import fetch_page_content
|
|
||||||
|
|
||||||
tavily_dict = web_url_loader_config.get("tavily_config")
|
|
||||||
_tavily_config = TavilyConfig(**tavily_dict) if tavily_dict else None
|
|
||||||
|
|
||||||
soup_dict = web_url_loader_config.get("soup_config")
|
|
||||||
_soup_config = SoupCrawlerConfig(**soup_dict) if soup_dict else None
|
|
||||||
|
|
||||||
# Set global configs for downstream access
|
|
||||||
tavily_config.set(_tavily_config)
|
|
||||||
soup_crawler_config.set(_soup_config)
|
|
||||||
|
|
||||||
preferred_tool = "beautifulsoup" if _soup_config else "tavily"
|
|
||||||
if preferred_tool == "tavily" and _tavily_config is None:
|
|
||||||
raise IngestionError(
|
|
||||||
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
|
|
||||||
)
|
|
||||||
if preferred_tool == "beautifulsoup" and _soup_config is None:
|
|
||||||
raise IngestionError(
|
|
||||||
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"Starting web URL crawling for: {file_path}")
|
|
||||||
logger.info(f"Using scraping tool: {preferred_tool}")
|
|
||||||
|
|
||||||
data = await fetch_page_content(
|
|
||||||
file_path,
|
|
||||||
preferred_tool=preferred_tool,
|
|
||||||
tavily_config=_tavily_config,
|
|
||||||
soup_crawler_config=_soup_config,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"Successfully fetched content from {len(data)} URL(s)")
|
|
||||||
logger.info("Processing and concatenating fetched content")
|
|
||||||
|
|
||||||
content = ""
|
|
||||||
for key, value in data.items():
|
|
||||||
content += f"{key}:\n{value}\n\n"
|
|
||||||
|
|
||||||
logger.info(f"Saving content to file (total size: {len(content)} characters)")
|
|
||||||
stored_path = await save_data_to_file(content)
|
|
||||||
logger.info(f"Successfully saved content to: {stored_path}")
|
|
||||||
|
|
||||||
return stored_path
|
|
||||||
except IngestionError:
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
raise IngestionError(
|
|
||||||
message=f"Error ingesting webpage from URL {file_path}: {str(e)}"
|
|
||||||
) from e
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
|
||||||
from cognee.modules.pipelines.layers.check_pipeline_run_qualification import (
|
from cognee.modules.pipelines.layers.check_pipeline_run_qualification import (
|
||||||
check_pipeline_run_qualification,
|
check_pipeline_run_qualification,
|
||||||
)
|
)
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
logger = get_logger("cognee.pipeline")
|
logger = get_logger("cognee.pipeline")
|
||||||
|
|
||||||
|
|
@ -36,6 +37,7 @@ async def run_pipeline(
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
incremental_loading: bool = False,
|
incremental_loading: bool = False,
|
||||||
data_per_batch: int = 20,
|
data_per_batch: int = 20,
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
):
|
):
|
||||||
validate_pipeline_tasks(tasks)
|
validate_pipeline_tasks(tasks)
|
||||||
await setup_and_check_environment(vector_db_config, graph_db_config)
|
await setup_and_check_environment(vector_db_config, graph_db_config)
|
||||||
|
|
@ -52,6 +54,7 @@ async def run_pipeline(
|
||||||
context={"dataset": dataset},
|
context={"dataset": dataset},
|
||||||
incremental_loading=incremental_loading,
|
incremental_loading=incremental_loading,
|
||||||
data_per_batch=data_per_batch,
|
data_per_batch=data_per_batch,
|
||||||
|
fetchers_config=fetchers_config,
|
||||||
):
|
):
|
||||||
yield run_info
|
yield run_info
|
||||||
|
|
||||||
|
|
@ -65,6 +68,7 @@ async def run_pipeline_per_dataset(
|
||||||
context: dict = None,
|
context: dict = None,
|
||||||
incremental_loading=False,
|
incremental_loading=False,
|
||||||
data_per_batch: int = 20,
|
data_per_batch: int = 20,
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
):
|
):
|
||||||
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
|
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
|
||||||
await set_database_global_context_variables(dataset.id, dataset.owner_id)
|
await set_database_global_context_variables(dataset.id, dataset.owner_id)
|
||||||
|
|
@ -80,7 +84,15 @@ async def run_pipeline_per_dataset(
|
||||||
return
|
return
|
||||||
|
|
||||||
pipeline_run = run_tasks(
|
pipeline_run = run_tasks(
|
||||||
tasks, dataset.id, data, user, pipeline_name, context, incremental_loading, data_per_batch
|
tasks,
|
||||||
|
dataset.id,
|
||||||
|
data,
|
||||||
|
user,
|
||||||
|
pipeline_name,
|
||||||
|
context,
|
||||||
|
incremental_loading,
|
||||||
|
data_per_batch,
|
||||||
|
fetchers_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
async for pipeline_run_info in pipeline_run:
|
async for pipeline_run_info in pipeline_run:
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,7 @@ async def run_tasks(
|
||||||
context: dict = None,
|
context: dict = None,
|
||||||
incremental_loading: bool = False,
|
incremental_loading: bool = False,
|
||||||
data_per_batch: int = 20,
|
data_per_batch: int = 20,
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
):
|
):
|
||||||
if not user:
|
if not user:
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
|
|
@ -106,6 +107,7 @@ async def run_tasks(
|
||||||
context,
|
context,
|
||||||
user,
|
user,
|
||||||
incremental_loading,
|
incremental_loading,
|
||||||
|
fetchers_config,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
for data_item in data_batch
|
for data_item in data_batch
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ async def run_tasks_data_item_incremental(
|
||||||
pipeline_run_id: str,
|
pipeline_run_id: str,
|
||||||
context: Optional[Dict[str, Any]],
|
context: Optional[Dict[str, Any]],
|
||||||
user: User,
|
user: User,
|
||||||
|
fetchers_config: dict[str, Any],
|
||||||
) -> AsyncGenerator[Dict[str, Any], None]:
|
) -> AsyncGenerator[Dict[str, Any], None]:
|
||||||
"""
|
"""
|
||||||
Process a single data item with incremental loading support.
|
Process a single data item with incremental loading support.
|
||||||
|
|
@ -64,51 +65,36 @@ async def run_tasks_data_item_incremental(
|
||||||
|
|
||||||
# If incremental_loading of data is set to True don't process documents already processed by pipeline
|
# If incremental_loading of data is set to True don't process documents already processed by pipeline
|
||||||
# If data is being added to Cognee for the first time calculate the id of the data
|
# If data is being added to Cognee for the first time calculate the id of the data
|
||||||
try:
|
if not isinstance(data_item, Data):
|
||||||
if not isinstance(data_item, Data):
|
file_path = await save_data_item_to_storage(data_item, fetchers_config)
|
||||||
file_path = await save_data_item_to_storage(data_item)
|
# Ingest data and add metadata
|
||||||
# Ingest data and add metadata
|
async with open_data_file(file_path) as file:
|
||||||
async with open_data_file(file_path) as file:
|
classified_data = ingestion.classify(file)
|
||||||
classified_data = ingestion.classify(file)
|
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||||
# data_id is the hash of file contents + owner id to avoid duplicate data
|
data_id = ingestion.identify(classified_data, user)
|
||||||
data_id = ingestion.identify(classified_data, user)
|
else:
|
||||||
else:
|
# If data was already processed by Cognee get data id
|
||||||
# If data was already processed by Cognee get data id
|
data_id = data_item.id
|
||||||
data_id = data_item.id
|
|
||||||
|
|
||||||
# Check pipeline status, if Data already processed for pipeline before skip current processing
|
# Check pipeline status, if Data already processed for pipeline before skip current processing
|
||||||
async with db_engine.get_async_session() as session:
|
async with db_engine.get_async_session() as session:
|
||||||
data_point = (
|
data_point = (
|
||||||
await session.execute(select(Data).filter(Data.id == data_id))
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
).scalar_one_or_none()
|
).scalar_one_or_none()
|
||||||
if data_point:
|
if data_point:
|
||||||
if (
|
if (
|
||||||
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
|
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
|
||||||
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
|
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
|
||||||
):
|
):
|
||||||
yield {
|
yield {
|
||||||
"run_info": PipelineRunAlreadyCompleted(
|
"run_info": PipelineRunAlreadyCompleted(
|
||||||
pipeline_run_id=pipeline_run_id,
|
pipeline_run_id=pipeline_run_id,
|
||||||
dataset_id=dataset.id,
|
dataset_id=dataset.id,
|
||||||
dataset_name=dataset.name,
|
dataset_name=dataset.name,
|
||||||
),
|
),
|
||||||
"data_id": data_id,
|
"data_id": data_id,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
except UnsupportedPathSchemeError as e:
|
|
||||||
logger.warning(f"data_item does not support incremental loading: {str(e)}")
|
|
||||||
# Fall back to regular processing since incremental loading is not supported
|
|
||||||
async for result in run_tasks_data_item_regular(
|
|
||||||
data_item=data_item,
|
|
||||||
dataset=dataset,
|
|
||||||
tasks=tasks,
|
|
||||||
pipeline_id=pipeline_id,
|
|
||||||
pipeline_run_id=pipeline_run_id,
|
|
||||||
context=context,
|
|
||||||
user=user,
|
|
||||||
):
|
|
||||||
yield result
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Process data based on data_item and list of tasks
|
# Process data based on data_item and list of tasks
|
||||||
|
|
@ -225,6 +211,7 @@ async def run_tasks_data_item(
|
||||||
context: Optional[Dict[str, Any]],
|
context: Optional[Dict[str, Any]],
|
||||||
user: User,
|
user: User,
|
||||||
incremental_loading: bool,
|
incremental_loading: bool,
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
) -> Optional[Dict[str, Any]]:
|
) -> Optional[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Process a single data item, choosing between incremental and regular processing.
|
Process a single data item, choosing between incremental and regular processing.
|
||||||
|
|
@ -259,6 +246,7 @@ async def run_tasks_data_item(
|
||||||
pipeline_run_id=pipeline_run_id,
|
pipeline_run_id=pipeline_run_id,
|
||||||
context=context,
|
context=context,
|
||||||
user=user,
|
user=user,
|
||||||
|
fetchers_config=fetchers_config,
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ async def ingest_data(
|
||||||
dataset_id: UUID = None,
|
dataset_id: UUID = None,
|
||||||
preferred_loaders: List[str] = None,
|
preferred_loaders: List[str] = None,
|
||||||
loaders_config: dict[LoaderInterface, dict] = {},
|
loaders_config: dict[LoaderInterface, dict] = {},
|
||||||
|
fetchers_config: dict[str, Any] = {},
|
||||||
):
|
):
|
||||||
if not user:
|
if not user:
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
|
|
@ -80,16 +81,10 @@ async def ingest_data(
|
||||||
dataset_data_map = {str(data.id): True for data in dataset_data}
|
dataset_data_map = {str(data.id): True for data in dataset_data}
|
||||||
|
|
||||||
for data_item in data:
|
for data_item in data:
|
||||||
try:
|
# Get file path of data item or create a file if it doesn't exist
|
||||||
# Get file path of data item or create a file if it doesn't exist
|
original_file_path = await save_data_item_to_storage(data_item, fetchers_config)
|
||||||
original_file_path = await save_data_item_to_storage(data_item)
|
# Transform file path to be OS usable
|
||||||
# Transform file path to be OS usable
|
actual_file_path = get_data_file_path(original_file_path)
|
||||||
actual_file_path = get_data_file_path(original_file_path)
|
|
||||||
except UnsupportedPathSchemeError:
|
|
||||||
# This data_item (e.g., HTTP/HTTPS URL) should be passed directly to the loader
|
|
||||||
# skip save_data_item_to_storage and get_data_file_path
|
|
||||||
actual_file_path = data_item
|
|
||||||
original_file_path = None # we don't have an original file path
|
|
||||||
|
|
||||||
# Store all input data as text files in Cognee data storage
|
# Store all input data as text files in Cognee data storage
|
||||||
cognee_storage_file_path, loader_engine = await data_item_to_text_file(
|
cognee_storage_file_path, loader_engine = await data_item_to_text_file(
|
||||||
|
|
@ -99,26 +94,15 @@ async def ingest_data(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Find metadata from original file
|
# Find metadata from original file
|
||||||
if original_file_path is not None:
|
# Standard flow: extract metadata from both original and stored files
|
||||||
# Standard flow: extract metadata from both original and stored files
|
async with open_data_file(original_file_path) as file:
|
||||||
async with open_data_file(original_file_path) as file:
|
classified_data = ingestion.classify(file)
|
||||||
classified_data = ingestion.classify(file)
|
data_id = ingestion.identify(classified_data, user)
|
||||||
data_id = ingestion.identify(classified_data, user)
|
original_file_metadata = classified_data.get_metadata()
|
||||||
original_file_metadata = classified_data.get_metadata()
|
|
||||||
|
|
||||||
async with open_data_file(cognee_storage_file_path) as file:
|
async with open_data_file(cognee_storage_file_path) as file:
|
||||||
classified_data = ingestion.classify(file)
|
classified_data = ingestion.classify(file)
|
||||||
storage_file_metadata = classified_data.get_metadata()
|
storage_file_metadata = classified_data.get_metadata()
|
||||||
else:
|
|
||||||
# Alternative flow (e.g., URLs): extract metadata once from stored file
|
|
||||||
async with open_data_file(cognee_storage_file_path) as file:
|
|
||||||
classified_data = ingestion.classify(file)
|
|
||||||
data_id = ingestion.identify(classified_data, user)
|
|
||||||
original_file_metadata = classified_data.get_metadata()
|
|
||||||
# Override file_path to be the actual data_item (e.g., URL) ?
|
|
||||||
original_file_metadata["file_path"] = actual_file_path
|
|
||||||
# Storage metadata is the same as original
|
|
||||||
storage_file_metadata = original_file_metadata.copy()
|
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,8 @@ from cognee.modules.ingestion import save_data_to_file
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
from cognee.tasks.ingestion.data_fetchers.web_url_fetcher import WebUrlFetcher
|
||||||
|
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
|
|
@ -22,7 +24,9 @@ class SaveDataSettings(BaseSettings):
|
||||||
settings = SaveDataSettings()
|
settings = SaveDataSettings()
|
||||||
|
|
||||||
|
|
||||||
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str:
|
async def save_data_item_to_storage(
|
||||||
|
data_item: Union[BinaryIO, str, Any], fetchers_config: dict[str, Any] = {}
|
||||||
|
) -> str:
|
||||||
if "llama_index" in str(type(data_item)):
|
if "llama_index" in str(type(data_item)):
|
||||||
# Dynamic import is used because the llama_index module is optional.
|
# Dynamic import is used because the llama_index module is optional.
|
||||||
from .transform_data import get_data_from_llama_index
|
from .transform_data import get_data_from_llama_index
|
||||||
|
|
@ -57,9 +61,8 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
|
||||||
if parsed_url.scheme == "s3":
|
if parsed_url.scheme == "s3":
|
||||||
return data_item
|
return data_item
|
||||||
elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
|
elif parsed_url.scheme == "http" or parsed_url.scheme == "https":
|
||||||
raise UnsupportedPathSchemeError(
|
fetcher = WebUrlFetcher()
|
||||||
message=f"HTTP/HTTPS URLs should be handled by loader, not by save_data_item_to_storage. Received: {data_item}"
|
return await fetcher.fetch(data_item, fetchers_config)
|
||||||
)
|
|
||||||
# data is local file path
|
# data is local file path
|
||||||
elif parsed_url.scheme == "file":
|
elif parsed_url.scheme == "file":
|
||||||
if settings.accept_local_file_path:
|
if settings.accept_local_file_path:
|
||||||
|
|
|
||||||
|
|
@ -1,22 +1,28 @@
|
||||||
|
from sys import exc_info
|
||||||
import pytest
|
import pytest
|
||||||
import cognee
|
import cognee
|
||||||
|
from cognee.modules.ingestion.exceptions.exceptions import IngestionError
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_add_fails_when_preferred_loader_not_specified():
|
async def test_add_fails_when_web_url_fetcher_config_not_specified():
|
||||||
from cognee.shared.logging_utils import setup_logging, ERROR
|
from cognee.shared.logging_utils import setup_logging, ERROR
|
||||||
|
|
||||||
setup_logging(log_level=ERROR)
|
setup_logging(log_level=ERROR)
|
||||||
await cognee.prune.prune_data()
|
await cognee.prune.prune_data()
|
||||||
await cognee.prune.prune_system(metadata=True)
|
await cognee.prune.prune_system(metadata=True)
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(IngestionError) as excinfo:
|
||||||
await cognee.add(
|
await cognee.add(
|
||||||
"https://en.wikipedia.org/wiki/Large_language_model",
|
"https://en.wikipedia.org/wiki/Large_language_model",
|
||||||
|
incremental_loading=False,
|
||||||
)
|
)
|
||||||
|
assert excinfo.value.message.startswith(
|
||||||
|
"web_url_fetcher configuration must be a valid dictionary"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_add_succesfully_adds_url_when_preferred_loader_specified():
|
async def test_add_succesfully_adds_url_when_fetcher_config_specified():
|
||||||
await cognee.prune.prune_data()
|
await cognee.prune.prune_data()
|
||||||
await cognee.prune.prune_system(metadata=True)
|
await cognee.prune.prune_system(metadata=True)
|
||||||
|
|
||||||
|
|
@ -27,8 +33,8 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified():
|
||||||
"paragraphs": {"selector": "p", "all": True},
|
"paragraphs": {"selector": "p", "all": True},
|
||||||
}
|
}
|
||||||
|
|
||||||
loaders_config = {
|
fetchers_config = {
|
||||||
"web_url_loader": {
|
"web_url_fetcher": {
|
||||||
"soup_config": {
|
"soup_config": {
|
||||||
"max_depth": 1,
|
"max_depth": 1,
|
||||||
"follow_links": False,
|
"follow_links": False,
|
||||||
|
|
@ -40,8 +46,8 @@ async def test_add_succesfully_adds_url_when_preferred_loader_specified():
|
||||||
try:
|
try:
|
||||||
await cognee.add(
|
await cognee.add(
|
||||||
"https://en.wikipedia.org/wiki/Large_language_model",
|
"https://en.wikipedia.org/wiki/Large_language_model",
|
||||||
preferred_loaders=["web_url_loader"],
|
incremental_loading=False,
|
||||||
loaders_config=loaders_config,
|
fetchers_config=fetchers_config,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Failed to add url: {e}")
|
pytest.fail(f"Failed to add url: {e}")
|
||||||
|
|
@ -59,8 +65,8 @@ async def test_add_with_incremental_loading_works():
|
||||||
"paragraphs": {"selector": "p", "all": True},
|
"paragraphs": {"selector": "p", "all": True},
|
||||||
}
|
}
|
||||||
|
|
||||||
loaders_config = {
|
fetchers_config = {
|
||||||
"web_url_loader": {
|
"web_url_fetcher": {
|
||||||
"soup_config": {
|
"soup_config": {
|
||||||
"max_depth": 1,
|
"max_depth": 1,
|
||||||
"follow_links": False,
|
"follow_links": False,
|
||||||
|
|
@ -71,9 +77,8 @@ async def test_add_with_incremental_loading_works():
|
||||||
try:
|
try:
|
||||||
await cognee.add(
|
await cognee.add(
|
||||||
"https://en.wikipedia.org/wiki/Large_language_model",
|
"https://en.wikipedia.org/wiki/Large_language_model",
|
||||||
preferred_loaders=["web_url_loader"],
|
|
||||||
incremental_loading=True,
|
incremental_loading=True,
|
||||||
loaders_config=loaders_config,
|
fetchers_config=fetchers_config,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Failed to add url: {e}")
|
pytest.fail(f"Failed to add url: {e}")
|
||||||
|
|
@ -91,8 +96,8 @@ async def test_add_without_incremental_loading_works():
|
||||||
"paragraphs": {"selector": "p", "all": True},
|
"paragraphs": {"selector": "p", "all": True},
|
||||||
}
|
}
|
||||||
|
|
||||||
loaders_config = {
|
fetchers_config = {
|
||||||
"web_url_loader": {
|
"web_url_fetcher": {
|
||||||
"soup_config": {
|
"soup_config": {
|
||||||
"max_depth": 1,
|
"max_depth": 1,
|
||||||
"follow_links": False,
|
"follow_links": False,
|
||||||
|
|
@ -103,9 +108,8 @@ async def test_add_without_incremental_loading_works():
|
||||||
try:
|
try:
|
||||||
await cognee.add(
|
await cognee.add(
|
||||||
"https://en.wikipedia.org/wiki/Large_language_model",
|
"https://en.wikipedia.org/wiki/Large_language_model",
|
||||||
preferred_loaders=["web_url_loader"],
|
|
||||||
incremental_loading=False,
|
incremental_loading=False,
|
||||||
loaders_config=loaders_config,
|
fetchers_config=fetchers_config,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"Failed to add url: {e}")
|
pytest.fail(f"Failed to add url: {e}")
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue