diff --git a/cognee/modules/ingestion/data_types/BinaryData.py b/cognee/modules/ingestion/data_types/BinaryData.py index f96e0d65c..2e579932c 100644 --- a/cognee/modules/ingestion/data_types/BinaryData.py +++ b/cognee/modules/ingestion/data_types/BinaryData.py @@ -30,9 +30,38 @@ class BinaryData(IngestionData): async def ensure_metadata(self): if self.metadata is None: - self.metadata = await get_file_metadata(self.data) + # Handle case where file might be closed + if hasattr(self.data, "closed") and self.data.closed: + # Try to reopen the file if we have a file path + if hasattr(self.data, "name") and self.data.name: + try: + with open(self.data.name, "rb") as reopened_file: + self.metadata = await get_file_metadata(reopened_file) + except (OSError, FileNotFoundError): + # If we can't reopen, create minimal metadata + self.metadata = { + "name": self.name or "unknown", + "file_path": getattr(self.data, "name", "unknown"), + "extension": "txt", + "mime_type": "text/plain", + "content_hash": f"closed_file_{id(self.data)}", + "file_size": 0, + } + else: + # Create minimal metadata when file is closed and no path available + self.metadata = { + "name": self.name or "unknown", + "file_path": "unknown", + "extension": "txt", + "mime_type": "text/plain", + "content_hash": f"closed_file_{id(self.data)}", + "file_size": 0, + } + else: + # File is still open, proceed normally + self.metadata = await get_file_metadata(self.data) - if self.metadata["name"] is None: + if self.metadata.get("name") is None: self.metadata["name"] = self.name @asynccontextmanager diff --git a/cognee/tasks/ingestion/plugin_ingest_data.py b/cognee/tasks/ingestion/plugin_ingest_data.py index 1b3101a2d..c2a225c78 100644 --- a/cognee/tasks/ingestion/plugin_ingest_data.py +++ b/cognee/tasks/ingestion/plugin_ingest_data.py @@ -18,7 +18,7 @@ from cognee.shared.logging_utils import get_logger from .save_data_item_to_storage import save_data_item_to_storage from .adapters import LoaderToIngestionAdapter -from cognee.api.v1.add.config import get_s3_config +from cognee.infrastructure.files.storage.s3_config import get_s3_config logger = get_logger(__name__) @@ -98,12 +98,11 @@ async def plugin_ingest_data( logger.info(f"Plugin-based ingestion starting for dataset: {dataset_name}") # Preserve existing dataset creation and permission logic - user_datasets = await get_specific_user_permission_datasets(user.id, ["write"]) - existing_datasets = await get_authorized_existing_datasets(user.id, dataset_name, ["write"]) + existing_datasets = await get_authorized_existing_datasets([dataset_name], "write", user) - datasets = await load_or_create_datasets( - user_datasets, existing_datasets, dataset_name, user, dataset_id - ) + # Use dataset_id if provided, otherwise use dataset_name + dataset_names = [dataset_id] if dataset_id else [dataset_name] + datasets = await load_or_create_datasets(dataset_names, existing_datasets, user) dataset = datasets[0] @@ -116,7 +115,7 @@ async def plugin_ingest_data( dataset_data_map = {str(data.id): True for data in dataset_data} for data_item in data: - file_path = await save_data_item_to_storage(data_item, dataset_name) + file_path = await save_data_item_to_storage(data_item) # NEW: Use loader system or existing classification based on data type try: @@ -144,6 +143,57 @@ async def plugin_ingest_data( data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() + # Ensure metadata has all required fields with fallbacks + def get_metadata_field(metadata, field_name, default_value=""): + """Get metadata field with fallback handling.""" + if field_name in metadata and metadata[field_name] is not None: + return metadata[field_name] + + logger.warning(f"Missing metadata field '{field_name}', using fallback") + + # Provide fallbacks based on available information + if field_name == "name": + if "file_path" in metadata and metadata["file_path"]: + import os + + return os.path.basename(str(metadata["file_path"])).split(".")[0] + elif file_path: + import os + + return os.path.basename(str(file_path)).split(".")[0] + else: + content_hash = metadata.get("content_hash", str(data_id))[:8] + return f"content_{content_hash}" + elif field_name == "file_path": + # Use the actual file path returned by save_data_item_to_storage + return file_path + elif field_name == "extension": + if "file_path" in metadata and metadata["file_path"]: + import os + + _, ext = os.path.splitext(str(metadata["file_path"])) + return ext.lstrip(".") if ext else "txt" + elif file_path: + import os + + _, ext = os.path.splitext(str(file_path)) + return ext.lstrip(".") if ext else "txt" + return "txt" + elif field_name == "mime_type": + ext = get_metadata_field(metadata, "extension", "txt") + mime_map = { + "txt": "text/plain", + "md": "text/markdown", + "pdf": "application/pdf", + "json": "application/json", + "csv": "text/csv", + } + return mime_map.get(ext.lower(), "text/plain") + elif field_name == "content_hash": + return str(data_id) + + return default_value + from sqlalchemy import select db_engine = get_relational_engine() @@ -161,12 +211,12 @@ async def plugin_ingest_data( # Preserve existing data point creation/update logic if data_point is not None: - data_point.name = file_metadata["name"] - data_point.raw_data_location = file_metadata["file_path"] - data_point.extension = file_metadata["extension"] - data_point.mime_type = file_metadata["mime_type"] + data_point.name = get_metadata_field(file_metadata, "name") + data_point.raw_data_location = get_metadata_field(file_metadata, "file_path") + data_point.extension = get_metadata_field(file_metadata, "extension") + data_point.mime_type = get_metadata_field(file_metadata, "mime_type") data_point.owner_id = user.id - data_point.content_hash = file_metadata["content_hash"] + data_point.content_hash = get_metadata_field(file_metadata, "content_hash") data_point.external_metadata = ext_metadata data_point.node_set = json.dumps(node_set) if node_set else None @@ -181,12 +231,12 @@ async def plugin_ingest_data( data_point = Data( id=data_id, - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], + name=get_metadata_field(file_metadata, "name"), + raw_data_location=get_metadata_field(file_metadata, "file_path"), + extension=get_metadata_field(file_metadata, "extension"), + mime_type=get_metadata_field(file_metadata, "mime_type"), owner_id=user.id, - content_hash=file_metadata["content_hash"], + content_hash=get_metadata_field(file_metadata, "content_hash"), external_metadata=ext_metadata, node_set=json.dumps(node_set) if node_set else None, token_count=-1,