revert url_crawler changes to cognee.add(), and update web_url_loader.load()
This commit is contained in:
parent
d884867d2c
commit
185600fe17
4 changed files with 42 additions and 52 deletions
|
|
@ -3,6 +3,7 @@ import os
|
|||
from typing import Union, BinaryIO, List, Optional, Dict, Any
|
||||
from pydantic import BaseModel
|
||||
from urllib.parse import urlparse
|
||||
from cognee.infrastructure.loaders import LoaderInterface
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.modules.pipelines import Task, run_pipeline
|
||||
from cognee.modules.pipelines.layers.resolve_authorized_user_dataset import (
|
||||
|
|
@ -17,16 +18,6 @@ from cognee.shared.logging_utils import get_logger
|
|||
|
||||
logger = get_logger()
|
||||
|
||||
try:
|
||||
from cognee.tasks.web_scraper.config import TavilyConfig, SoupCrawlerConfig
|
||||
from cognee.context_global_variables import (
|
||||
tavily_config as tavily,
|
||||
soup_crawler_config as soup_crawler,
|
||||
)
|
||||
except ImportError:
|
||||
logger.debug(f"Unable to import {str(ImportError)}")
|
||||
pass
|
||||
|
||||
|
||||
async def add(
|
||||
data: Union[BinaryIO, list[BinaryIO], str, list[str]],
|
||||
|
|
@ -38,10 +29,8 @@ async def add(
|
|||
dataset_id: Optional[UUID] = None,
|
||||
preferred_loaders: List[str] = None,
|
||||
incremental_loading: bool = True,
|
||||
extraction_rules: Optional[Dict[str, Any]] = None,
|
||||
tavily_config: Optional[BaseModel] = None,
|
||||
soup_crawler_config: Optional[BaseModel] = None,
|
||||
data_per_batch: Optional[int] = 20,
|
||||
loaders_config: dict[LoaderInterface, dict] = {},
|
||||
):
|
||||
"""
|
||||
Add data to Cognee for knowledge graph processing.
|
||||
|
|
@ -180,29 +169,6 @@ async def add(
|
|||
- TAVILY_API_KEY: YOUR_TAVILY_API_KEY
|
||||
|
||||
"""
|
||||
|
||||
try:
|
||||
if not soup_crawler_config and extraction_rules:
|
||||
soup_crawler_config = SoupCrawlerConfig(extraction_rules=extraction_rules)
|
||||
if not tavily_config and os.getenv("TAVILY_API_KEY"):
|
||||
tavily_config = TavilyConfig(api_key=os.getenv("TAVILY_API_KEY"))
|
||||
|
||||
soup_crawler.set(soup_crawler_config)
|
||||
tavily.set(tavily_config)
|
||||
|
||||
http_schemes = {"http", "https"}
|
||||
|
||||
def _is_http_url(item: Union[str, BinaryIO]) -> bool:
|
||||
return isinstance(item, str) and urlparse(item).scheme in http_schemes
|
||||
|
||||
if _is_http_url(data):
|
||||
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
|
||||
elif isinstance(data, list) and any(_is_http_url(item) for item in data):
|
||||
node_set = ["web_content"] if not node_set else node_set + ["web_content"]
|
||||
except NameError:
|
||||
logger.debug(f"Unable to import {str(ImportError)}")
|
||||
pass
|
||||
|
||||
tasks = [
|
||||
Task(resolve_data_directories, include_subdirectories=True),
|
||||
Task(
|
||||
|
|
@ -212,6 +178,7 @@ async def add(
|
|||
node_set,
|
||||
dataset_id,
|
||||
preferred_loaders,
|
||||
loaders_config,
|
||||
),
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -48,7 +48,9 @@ class WebUrlLoader(LoaderInterface):
|
|||
True if this loader can process the file, False otherwise
|
||||
"""
|
||||
if data_item_path is None:
|
||||
raise # TODO: Temporarily set this to default to None so that I don't update other loaders unnecessarily yet, see TODO in LoaderEngine.py
|
||||
raise IngestionError(
|
||||
"data_item_path should not be None"
|
||||
) # TODO: Temporarily set this to default to None so that I don't update other loaders unnecessarily yet, see TODO in LoaderEngine.py
|
||||
return data_item_path.startswith(("http://", "https://"))
|
||||
|
||||
async def load(self, file_path: str, **kwargs):
|
||||
|
|
@ -63,18 +65,31 @@ class WebUrlLoader(LoaderInterface):
|
|||
Raises:
|
||||
Exception: If file cannot be processed
|
||||
"""
|
||||
loaders_config = kwargs.get("loaders_config")
|
||||
if not isinstance(loaders_config, dict):
|
||||
raise IngestionError("loaders_config must be a valid dictionary")
|
||||
|
||||
web_url_loader_config = loaders_config.get(self.loader_name)
|
||||
if not isinstance(web_url_loader_config, dict):
|
||||
raise IngestionError(f"{self.loader_name} configuration must be a valid dictionary")
|
||||
|
||||
try:
|
||||
from cognee.context_global_variables import tavily_config, soup_crawler_config
|
||||
from cognee.tasks.web_scraper import fetch_page_content
|
||||
|
||||
tavily = tavily_config.get()
|
||||
soup_crawler = soup_crawler_config.get()
|
||||
preferred_tool = "beautifulsoup" if soup_crawler else "tavily"
|
||||
if preferred_tool == "tavily" and tavily is None:
|
||||
_tavily_config = web_url_loader_config.get("tavily_config")
|
||||
_soup_config = web_url_loader_config.get("soup_config")
|
||||
|
||||
# Set global configs for downstream access
|
||||
tavily_config.set(_tavily_config)
|
||||
soup_crawler_config.set(_soup_config)
|
||||
|
||||
preferred_tool = "beautifulsoup" if _soup_config else "tavily"
|
||||
if preferred_tool == "tavily" and _tavily_config is None:
|
||||
raise IngestionError(
|
||||
message="TavilyConfig must be set on the ingestion context when fetching HTTP URLs without a SoupCrawlerConfig."
|
||||
)
|
||||
if preferred_tool == "beautifulsoup" and soup_crawler is None:
|
||||
if preferred_tool == "beautifulsoup" and _soup_config is None:
|
||||
raise IngestionError(
|
||||
message="SoupCrawlerConfig must be set on the ingestion context when using the BeautifulSoup scraper."
|
||||
)
|
||||
|
|
@ -82,8 +97,8 @@ class WebUrlLoader(LoaderInterface):
|
|||
data = await fetch_page_content(
|
||||
file_path,
|
||||
preferred_tool=preferred_tool,
|
||||
tavily_config=tavily,
|
||||
soup_crawler_config=soup_crawler,
|
||||
tavily_config=_tavily_config,
|
||||
soup_crawler_config=_soup_config,
|
||||
)
|
||||
content = ""
|
||||
for key, value in data.items():
|
||||
|
|
@ -94,7 +109,4 @@ class WebUrlLoader(LoaderInterface):
|
|||
except IngestionError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise IngestionError(
|
||||
message=f"Error ingesting webpage results of url {file_path}: {str(e)}"
|
||||
)
|
||||
raise NotImplementedError
|
||||
raise IngestionError(message=f"Error ingesting webpage from URL {file_path}: {str(e)}")
|
||||
|
|
|
|||
|
|
@ -34,7 +34,9 @@ async def pull_from_s3(file_path, destination_file) -> None:
|
|||
|
||||
|
||||
async def data_item_to_text_file(
|
||||
data_item_path: str, preferred_loaders: List[str]
|
||||
data_item_path: str,
|
||||
preferred_loaders: List[str],
|
||||
loaders_config: dict[LoaderInterface, dict],
|
||||
) -> Tuple[str, LoaderInterface]:
|
||||
if isinstance(data_item_path, str):
|
||||
parsed_url = urlparse(data_item_path)
|
||||
|
|
@ -77,8 +79,13 @@ async def data_item_to_text_file(
|
|||
|
||||
elif data_item_path.startswith(("http://", "https://")):
|
||||
loader = get_loader_engine()
|
||||
return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader(
|
||||
data_item_path, preferred_loaders
|
||||
return (
|
||||
await loader.load_file(
|
||||
data_item_path,
|
||||
preferred_loaders,
|
||||
loaders_config, # TODO: right now loaders_config is only needed for web_url_loader, so keeping changes minimal
|
||||
),
|
||||
loader.get_loader(data_item_path, preferred_loaders),
|
||||
)
|
||||
# data is not a supported type
|
||||
raise IngestionError(message=f"Data type not supported: {type(data_item_path)}")
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import inspect
|
|||
from uuid import UUID
|
||||
from typing import Union, BinaryIO, Any, List, Optional
|
||||
|
||||
from cognee.infrastructure.loaders import LoaderInterface
|
||||
import cognee.modules.ingestion as ingestion
|
||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||
from cognee.modules.data.models import Data
|
||||
|
|
@ -28,6 +29,7 @@ async def ingest_data(
|
|||
node_set: Optional[List[str]] = None,
|
||||
dataset_id: UUID = None,
|
||||
preferred_loaders: List[str] = None,
|
||||
loaders_config: dict[LoaderInterface, dict] = {},
|
||||
):
|
||||
if not user:
|
||||
user = await get_default_user()
|
||||
|
|
@ -85,7 +87,9 @@ async def ingest_data(
|
|||
|
||||
# Store all input data as text files in Cognee data storage
|
||||
cognee_storage_file_path, loader_engine = await data_item_to_text_file(
|
||||
actual_file_path, preferred_loaders
|
||||
actual_file_path,
|
||||
preferred_loaders,
|
||||
loaders_config,
|
||||
)
|
||||
|
||||
# Find metadata from original file
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue