added description attribute to every schema model

This commit is contained in:
Geoff-Robin 2025-09-14 03:29:38 +05:30 committed by Igor Ilic
parent 17df14363c
commit f5bb91e49d
2 changed files with 37 additions and 21 deletions

View file

@ -1,4 +1,5 @@
from typing import List, Dict
from uuid import uuid5, NAMESPACE_OID
from cognee.infrastructure.engine.models.DataPoint import DataPoint
from cognee.infrastructure.databases.relational.get_migration_relational_engine import get_migration_relational_engine
from sqlalchemy import text
@ -11,7 +12,7 @@ async def ingest_database_schema(
schema_name: str = "default",
max_sample_rows: int = 5,
node_set: List[str] = ["database_schema"]
) -> Dict[str, List[DataPoint]|DataPoint]:
) -> Dict[str, List[DataPoint] | DataPoint]:
"""
Ingest database schema with sample data into dedicated nodeset
@ -25,57 +26,69 @@ async def ingest_database_schema(
List of created DataPoint objects
"""
engine = create_relational_engine(
db_path=database_config.get("db_path", ""),
db_name=database_config.get("db_name", "cognee_db"),
db_host=database_config.get("db_host"),
db_port=database_config.get("db_port"),
db_username=database_config.get("db_username"),
db_password=database_config.get("db_password"),
db_provider=database_config.get("db_provider", "sqlite"),
db_path=database_config.get("migration_db_path", ""),
db_name=database_config.get("migration_db_name", "cognee_db"),
db_host=database_config.get("migration_db_host"),
db_port=database_config.get("migration_db_port"),
db_username=database_config.get("migration_db_username"),
db_password=database_config.get("migration_db_password"),
db_provider=database_config.get("migration_db_provider", "sqlite"),
)
schema = await engine.extract_schema()
tables={}
sample_data={}
tables = {}
sample_data = {}
schema_tables = []
schema_relationships = []
async with engine.engine.begin() as cursor:
for table_name, details in schema.items():
print(table_name)
rows_result = await cursor.execute(text(f"SELECT * FROM {table_name} LIMIT {max_sample_rows}"))
rows = [dict(zip([col["name"] for col in details["columns"]], row)) for row in rows_result.fetchall()]
count_result = await cursor.execute(text(f"SELECT COUNT(*) FROM {table_name};"))
row_count_estimate = count_result.scalar()
schema_table = SchemaTable(
id=uuid5(NAMESPACE_OID, name=table_name),
table_name=table_name,
schema_name=schema_name,
columns=details["columns"],
primary_key=details.get("primary_key"),
foreign_keys=details.get("foreign_keys", []),
sample_rows=rows,
row_count_estimate=row_count_estimate
row_count_estimate=row_count_estimate,
description=f"Schema table for '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows."
)
schema_tables.append(schema_table)
tables[table_name] = details
sample_data[table_name] = rows
for fk in details.get("foreign_keys",[]):
for fk in details.get("foreign_keys", []):
print(f"ref_table:{fk['ref_table']}")
print(f"table_name:{table_name}")
relationship = SchemaRelationship(
id=uuid5(NAMESPACE_OID, name=f"{fk['column']}:{table_name}:{fk['ref_column']}:{fk['ref_table']}"),
source_table=table_name,
target_table=fk["ref_table"],
relationship_type=fk["type"],
source_column=fk["source_column"],
target_column=fk["target_column"]
relationship_type="foreign_key",
source_column=fk["column"],
target_column=fk["ref_column"],
description=f"Foreign key relationship: {table_name}.{fk['column']}{fk['ref_table']}.{fk['ref_column']}"
)
schema_relationships.append(relationship)
database_schema = DatabaseSchema(
id=uuid5(NAMESPACE_OID, name=schema_name),
schema_name=schema_name,
database_type=database_config.get("db_provider", "sqlite"),
tables=tables,
sample_data=sample_data,
extraction_timestamp=datetime.utcnow()
extraction_timestamp=datetime.utcnow(),
description=f"Database schema '{schema_name}' containing {len(schema_tables)} tables and {len(schema_relationships)} relationships."
)
return{
return {
"database_schema": database_schema,
"schema_tables": schema_tables,
"relationships": schema_relationships
}
}

View file

@ -9,6 +9,7 @@ class DatabaseSchema(DataPoint):
tables: Dict[str, Dict] # Reuse existing schema format from SqlAlchemyAdapter
sample_data: Dict[str, List[Dict]] # Limited examples per table
extraction_timestamp: datetime
description: str
metadata: dict = {"index_fields": ["schema_name", "database_type"]}
class SchemaTable(DataPoint):
@ -20,13 +21,15 @@ class SchemaTable(DataPoint):
foreign_keys: List[Dict] # Foreign key relationships
sample_rows: List[Dict] # Max 3-5 example rows
row_count_estimate: Optional[int] # Actual table size
description: str
metadata: dict = {"index_fields": ["table_name", "schema_name"]}
class SchemaRelationship(DataPoint):
"""Represents relationships between tables"""
source_table: str
target_table: str
relationship_type: str # "foreign_key", "one_to_many", etc.
relationship_type: str
source_column: str
target_column: str
description: str
metadata: dict = {"index_fields": ["source_table", "target_table"]}