From f5bb91e49df908c2eed636c539196aa2c4cba4ca Mon Sep 17 00:00:00 2001 From: Geoff-Robin Date: Sun, 14 Sep 2025 03:29:38 +0530 Subject: [PATCH] added description attribute to every schema model --- cognee/tasks/schema/ingest_database_schema.py | 53 ++++++++++++------- cognee/tasks/schema/models.py | 5 +- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/cognee/tasks/schema/ingest_database_schema.py b/cognee/tasks/schema/ingest_database_schema.py index 2b9cd38c5..6d6f0b5f3 100644 --- a/cognee/tasks/schema/ingest_database_schema.py +++ b/cognee/tasks/schema/ingest_database_schema.py @@ -1,4 +1,5 @@ from typing import List, Dict +from uuid import uuid5, NAMESPACE_OID from cognee.infrastructure.engine.models.DataPoint import DataPoint from cognee.infrastructure.databases.relational.get_migration_relational_engine import get_migration_relational_engine from sqlalchemy import text @@ -11,7 +12,7 @@ async def ingest_database_schema( schema_name: str = "default", max_sample_rows: int = 5, node_set: List[str] = ["database_schema"] -) -> Dict[str, List[DataPoint]|DataPoint]: +) -> Dict[str, List[DataPoint] | DataPoint]: """ Ingest database schema with sample data into dedicated nodeset @@ -25,57 +26,69 @@ async def ingest_database_schema( List of created DataPoint objects """ engine = create_relational_engine( - db_path=database_config.get("db_path", ""), - db_name=database_config.get("db_name", "cognee_db"), - db_host=database_config.get("db_host"), - db_port=database_config.get("db_port"), - db_username=database_config.get("db_username"), - db_password=database_config.get("db_password"), - db_provider=database_config.get("db_provider", "sqlite"), + db_path=database_config.get("migration_db_path", ""), + db_name=database_config.get("migration_db_name", "cognee_db"), + db_host=database_config.get("migration_db_host"), + db_port=database_config.get("migration_db_port"), + db_username=database_config.get("migration_db_username"), + db_password=database_config.get("migration_db_password"), + db_provider=database_config.get("migration_db_provider", "sqlite"), ) schema = await engine.extract_schema() - tables={} - sample_data={} + tables = {} + sample_data = {} schema_tables = [] schema_relationships = [] + async with engine.engine.begin() as cursor: for table_name, details in schema.items(): + print(table_name) rows_result = await cursor.execute(text(f"SELECT * FROM {table_name} LIMIT {max_sample_rows}")) rows = [dict(zip([col["name"] for col in details["columns"]], row)) for row in rows_result.fetchall()] count_result = await cursor.execute(text(f"SELECT COUNT(*) FROM {table_name};")) row_count_estimate = count_result.scalar() + schema_table = SchemaTable( + id=uuid5(NAMESPACE_OID, name=table_name), table_name=table_name, schema_name=schema_name, columns=details["columns"], primary_key=details.get("primary_key"), foreign_keys=details.get("foreign_keys", []), sample_rows=rows, - row_count_estimate=row_count_estimate + row_count_estimate=row_count_estimate, + description=f"Schema table for '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows." ) schema_tables.append(schema_table) tables[table_name] = details sample_data[table_name] = rows - - for fk in details.get("foreign_keys",[]): + + for fk in details.get("foreign_keys", []): + print(f"ref_table:{fk['ref_table']}") + print(f"table_name:{table_name}") relationship = SchemaRelationship( + id=uuid5(NAMESPACE_OID, name=f"{fk['column']}:{table_name}:{fk['ref_column']}:{fk['ref_table']}"), source_table=table_name, target_table=fk["ref_table"], - relationship_type=fk["type"], - source_column=fk["source_column"], - target_column=fk["target_column"] + relationship_type="foreign_key", + source_column=fk["column"], + target_column=fk["ref_column"], + description=f"Foreign key relationship: {table_name}.{fk['column']} → {fk['ref_table']}.{fk['ref_column']}" ) schema_relationships.append(relationship) + database_schema = DatabaseSchema( + id=uuid5(NAMESPACE_OID, name=schema_name), schema_name=schema_name, database_type=database_config.get("db_provider", "sqlite"), tables=tables, sample_data=sample_data, - extraction_timestamp=datetime.utcnow() + extraction_timestamp=datetime.utcnow(), + description=f"Database schema '{schema_name}' containing {len(schema_tables)} tables and {len(schema_relationships)} relationships." ) - - return{ + + return { "database_schema": database_schema, "schema_tables": schema_tables, "relationships": schema_relationships - } \ No newline at end of file + } diff --git a/cognee/tasks/schema/models.py b/cognee/tasks/schema/models.py index b38ec5ff5..ef9374163 100644 --- a/cognee/tasks/schema/models.py +++ b/cognee/tasks/schema/models.py @@ -9,6 +9,7 @@ class DatabaseSchema(DataPoint): tables: Dict[str, Dict] # Reuse existing schema format from SqlAlchemyAdapter sample_data: Dict[str, List[Dict]] # Limited examples per table extraction_timestamp: datetime + description: str metadata: dict = {"index_fields": ["schema_name", "database_type"]} class SchemaTable(DataPoint): @@ -20,13 +21,15 @@ class SchemaTable(DataPoint): foreign_keys: List[Dict] # Foreign key relationships sample_rows: List[Dict] # Max 3-5 example rows row_count_estimate: Optional[int] # Actual table size + description: str metadata: dict = {"index_fields": ["table_name", "schema_name"]} class SchemaRelationship(DataPoint): """Represents relationships between tables""" source_table: str target_table: str - relationship_type: str # "foreign_key", "one_to_many", etc. + relationship_type: str source_column: str target_column: str + description: str metadata: dict = {"index_fields": ["source_table", "target_table"]} \ No newline at end of file