added fixes
This commit is contained in:
parent
1cd0fe0dcf
commit
9110a2b59b
2 changed files with 98 additions and 19 deletions
|
|
@ -30,9 +30,38 @@ class BinaryData(IngestionData):
|
|||
|
||||
async def ensure_metadata(self):
|
||||
if self.metadata is None:
|
||||
self.metadata = await get_file_metadata(self.data)
|
||||
# Handle case where file might be closed
|
||||
if hasattr(self.data, "closed") and self.data.closed:
|
||||
# Try to reopen the file if we have a file path
|
||||
if hasattr(self.data, "name") and self.data.name:
|
||||
try:
|
||||
with open(self.data.name, "rb") as reopened_file:
|
||||
self.metadata = await get_file_metadata(reopened_file)
|
||||
except (OSError, FileNotFoundError):
|
||||
# If we can't reopen, create minimal metadata
|
||||
self.metadata = {
|
||||
"name": self.name or "unknown",
|
||||
"file_path": getattr(self.data, "name", "unknown"),
|
||||
"extension": "txt",
|
||||
"mime_type": "text/plain",
|
||||
"content_hash": f"closed_file_{id(self.data)}",
|
||||
"file_size": 0,
|
||||
}
|
||||
else:
|
||||
# Create minimal metadata when file is closed and no path available
|
||||
self.metadata = {
|
||||
"name": self.name or "unknown",
|
||||
"file_path": "unknown",
|
||||
"extension": "txt",
|
||||
"mime_type": "text/plain",
|
||||
"content_hash": f"closed_file_{id(self.data)}",
|
||||
"file_size": 0,
|
||||
}
|
||||
else:
|
||||
# File is still open, proceed normally
|
||||
self.metadata = await get_file_metadata(self.data)
|
||||
|
||||
if self.metadata["name"] is None:
|
||||
if self.metadata.get("name") is None:
|
||||
self.metadata["name"] = self.name
|
||||
|
||||
@asynccontextmanager
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ from cognee.shared.logging_utils import get_logger
|
|||
|
||||
from .save_data_item_to_storage import save_data_item_to_storage
|
||||
from .adapters import LoaderToIngestionAdapter
|
||||
from cognee.api.v1.add.config import get_s3_config
|
||||
from cognee.infrastructure.files.storage.s3_config import get_s3_config
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
|
@ -98,12 +98,11 @@ async def plugin_ingest_data(
|
|||
logger.info(f"Plugin-based ingestion starting for dataset: {dataset_name}")
|
||||
|
||||
# Preserve existing dataset creation and permission logic
|
||||
user_datasets = await get_specific_user_permission_datasets(user.id, ["write"])
|
||||
existing_datasets = await get_authorized_existing_datasets(user.id, dataset_name, ["write"])
|
||||
existing_datasets = await get_authorized_existing_datasets([dataset_name], "write", user)
|
||||
|
||||
datasets = await load_or_create_datasets(
|
||||
user_datasets, existing_datasets, dataset_name, user, dataset_id
|
||||
)
|
||||
# Use dataset_id if provided, otherwise use dataset_name
|
||||
dataset_names = [dataset_id] if dataset_id else [dataset_name]
|
||||
datasets = await load_or_create_datasets(dataset_names, existing_datasets, user)
|
||||
|
||||
dataset = datasets[0]
|
||||
|
||||
|
|
@ -116,7 +115,7 @@ async def plugin_ingest_data(
|
|||
dataset_data_map = {str(data.id): True for data in dataset_data}
|
||||
|
||||
for data_item in data:
|
||||
file_path = await save_data_item_to_storage(data_item, dataset_name)
|
||||
file_path = await save_data_item_to_storage(data_item)
|
||||
|
||||
# NEW: Use loader system or existing classification based on data type
|
||||
try:
|
||||
|
|
@ -144,6 +143,57 @@ async def plugin_ingest_data(
|
|||
data_id = ingestion.identify(classified_data, user)
|
||||
file_metadata = classified_data.get_metadata()
|
||||
|
||||
# Ensure metadata has all required fields with fallbacks
|
||||
def get_metadata_field(metadata, field_name, default_value=""):
|
||||
"""Get metadata field with fallback handling."""
|
||||
if field_name in metadata and metadata[field_name] is not None:
|
||||
return metadata[field_name]
|
||||
|
||||
logger.warning(f"Missing metadata field '{field_name}', using fallback")
|
||||
|
||||
# Provide fallbacks based on available information
|
||||
if field_name == "name":
|
||||
if "file_path" in metadata and metadata["file_path"]:
|
||||
import os
|
||||
|
||||
return os.path.basename(str(metadata["file_path"])).split(".")[0]
|
||||
elif file_path:
|
||||
import os
|
||||
|
||||
return os.path.basename(str(file_path)).split(".")[0]
|
||||
else:
|
||||
content_hash = metadata.get("content_hash", str(data_id))[:8]
|
||||
return f"content_{content_hash}"
|
||||
elif field_name == "file_path":
|
||||
# Use the actual file path returned by save_data_item_to_storage
|
||||
return file_path
|
||||
elif field_name == "extension":
|
||||
if "file_path" in metadata and metadata["file_path"]:
|
||||
import os
|
||||
|
||||
_, ext = os.path.splitext(str(metadata["file_path"]))
|
||||
return ext.lstrip(".") if ext else "txt"
|
||||
elif file_path:
|
||||
import os
|
||||
|
||||
_, ext = os.path.splitext(str(file_path))
|
||||
return ext.lstrip(".") if ext else "txt"
|
||||
return "txt"
|
||||
elif field_name == "mime_type":
|
||||
ext = get_metadata_field(metadata, "extension", "txt")
|
||||
mime_map = {
|
||||
"txt": "text/plain",
|
||||
"md": "text/markdown",
|
||||
"pdf": "application/pdf",
|
||||
"json": "application/json",
|
||||
"csv": "text/csv",
|
||||
}
|
||||
return mime_map.get(ext.lower(), "text/plain")
|
||||
elif field_name == "content_hash":
|
||||
return str(data_id)
|
||||
|
||||
return default_value
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
db_engine = get_relational_engine()
|
||||
|
|
@ -161,12 +211,12 @@ async def plugin_ingest_data(
|
|||
|
||||
# Preserve existing data point creation/update logic
|
||||
if data_point is not None:
|
||||
data_point.name = file_metadata["name"]
|
||||
data_point.raw_data_location = file_metadata["file_path"]
|
||||
data_point.extension = file_metadata["extension"]
|
||||
data_point.mime_type = file_metadata["mime_type"]
|
||||
data_point.name = get_metadata_field(file_metadata, "name")
|
||||
data_point.raw_data_location = get_metadata_field(file_metadata, "file_path")
|
||||
data_point.extension = get_metadata_field(file_metadata, "extension")
|
||||
data_point.mime_type = get_metadata_field(file_metadata, "mime_type")
|
||||
data_point.owner_id = user.id
|
||||
data_point.content_hash = file_metadata["content_hash"]
|
||||
data_point.content_hash = get_metadata_field(file_metadata, "content_hash")
|
||||
data_point.external_metadata = ext_metadata
|
||||
data_point.node_set = json.dumps(node_set) if node_set else None
|
||||
|
||||
|
|
@ -181,12 +231,12 @@ async def plugin_ingest_data(
|
|||
|
||||
data_point = Data(
|
||||
id=data_id,
|
||||
name=file_metadata["name"],
|
||||
raw_data_location=file_metadata["file_path"],
|
||||
extension=file_metadata["extension"],
|
||||
mime_type=file_metadata["mime_type"],
|
||||
name=get_metadata_field(file_metadata, "name"),
|
||||
raw_data_location=get_metadata_field(file_metadata, "file_path"),
|
||||
extension=get_metadata_field(file_metadata, "extension"),
|
||||
mime_type=get_metadata_field(file_metadata, "mime_type"),
|
||||
owner_id=user.id,
|
||||
content_hash=file_metadata["content_hash"],
|
||||
content_hash=get_metadata_field(file_metadata, "content_hash"),
|
||||
external_metadata=ext_metadata,
|
||||
node_set=json.dumps(node_set) if node_set else None,
|
||||
token_count=-1,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue