Add versioning to the data point model

This commit is contained in:
vasilije 2024-12-17 19:57:47 +01:00
parent 45cb2c3289
commit 6fb3b4abec

View file

@ -1,45 +1,64 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional, Any, Dict
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing_extensions import TypedDict from typing_extensions import TypedDict
# Define metadata type
class MetaData(TypedDict): class MetaData(TypedDict):
index_fields: list[str] index_fields: list[str]
# Updated DataPoint model with versioning and new fields
class DataPoint(BaseModel): class DataPoint(BaseModel):
__tablename__ = "data_point" __tablename__ = "data_point"
id: UUID = Field(default_factory = uuid4) id: UUID = Field(default_factory=uuid4)
updated_at: Optional[datetime] = datetime.now(timezone.utc) created_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000))
updated_at: int = Field(default_factory=lambda: int(datetime.now(timezone.utc).timestamp() * 1000))
version: str = "0.1" # Default version
source: Optional[str] = None # Path to file, URL, etc.
type: Optional[str] = "text" # "text", "file", "image", "video"
topological_rank: Optional[int] = 0 topological_rank: Optional[int] = 0
_metadata: Optional[MetaData] = { extra: Optional[Dict[str, Any]] = None # For additional properties
"index_fields": [], _metadata: Optional[MetaData] = Field(
"type": "DataPoint" default={"index_fields": [], "type": "DataPoint"}
} )
# class Config: # Override the Pydantic configuration
# underscore_attrs_are_private = True class Config:
underscore_attrs_are_private = True
@classmethod @classmethod
def get_embeddable_data(self, data_point): def get_embeddable_data(cls, data_point):
if data_point._metadata and len(data_point._metadata["index_fields"]) > 0 \ """Retrieve embeddable data based on metadata's index_fields."""
and hasattr(data_point, data_point._metadata["index_fields"][0]): if (
data_point._metadata
and len(data_point._metadata["index_fields"]) > 0
and hasattr(data_point, data_point._metadata["index_fields"][0])
):
attribute = getattr(data_point, data_point._metadata["index_fields"][0]) attribute = getattr(data_point, data_point._metadata["index_fields"][0])
if isinstance(attribute, str): if isinstance(attribute, str):
return attribute.strip() return attribute.strip()
else: return attribute
return attribute
@classmethod @classmethod
def get_embeddable_properties(self, data_point): def get_embeddable_properties(cls, data_point):
"""Retrieve all embeddable properties."""
if data_point._metadata and len(data_point._metadata["index_fields"]) > 0: if data_point._metadata and len(data_point._metadata["index_fields"]) > 0:
return [getattr(data_point, field, None) for field in data_point._metadata["index_fields"]] return [getattr(data_point, field, None) for field in data_point._metadata["index_fields"]]
return [] return []
@classmethod @classmethod
def get_embeddable_property_names(self, data_point): def get_embeddable_property_names(cls, data_point):
return data_point._metadata["index_fields"] or [] """Retrieve names of embeddable properties."""
return data_point._metadata["index_fields"] or []
def update_version(self, new_version: str):
"""Update the version and updated_at timestamp."""
self.version = new_version
self.updated_at = int(datetime.now(timezone.utc).timestamp() * 1000)