refactor: add visualization to schema migration

This commit is contained in:
Igor Ilic 2025-09-27 01:15:30 +02:00
parent f93d30ae77
commit 17fb3b49ef
4 changed files with 30 additions and 24 deletions

View file

@ -23,6 +23,9 @@ async def cognee_network_visualization(graph_data, destination_file_path: str =
"TableRow": "#f47710",
"TableType": "#6510f4",
"ColumnValue": "#13613a",
"SchemaTable": "#f47710",
"DatabaseSchema": "#6510f4",
"SchemaRelationship": "#13613a",
"default": "#D3D3D3",
}

View file

@ -81,7 +81,6 @@ async def schema_only_ingestion(schema):
# Calling the ingest_database_schema function to return DataPoint subclasses
result = await ingest_database_schema(
schema=schema,
schema_name="migrated_schema",
max_sample_rows=5,
)
database_schema = result["database_schema"]
@ -105,7 +104,7 @@ async def schema_only_ingestion(schema):
),
)
)
table_name_to_id = {t.table_name: t.id for t in schema_tables}
table_name_to_id = {t.name: t.id for t in schema_tables}
for rel in schema_relationships:
source_table_id = table_name_to_id.get(rel.source_table)
target_table_id = table_name_to_id.get(rel.target_table)

View file

@ -12,15 +12,13 @@ from datetime import datetime, timezone
async def ingest_database_schema(
schema,
schema_name: str = "default",
max_sample_rows: int = 0,
) -> Dict[str, List[DataPoint] | DataPoint]:
"""
Extract database schema metadata (optionally with sample data) and return DataPoint models for graph construction.
Args:
database_config: Database connection configuration
schema_name: Name identifier for this schema
schema: Database schema
max_sample_rows: Maximum sample rows per table (0 means no sampling)
Returns:
@ -35,6 +33,7 @@ async def ingest_database_schema(
schema_tables = []
schema_relationships = []
migration_config = get_migration_config()
engine = get_migration_relational_engine()
qi = engine.engine.dialect.identifier_preparer.quote
try:
@ -78,15 +77,17 @@ async def ingest_database_schema(
row_count_estimate = count_result.scalar()
schema_table = SchemaTable(
id=uuid5(NAMESPACE_OID, name=f"{schema_name}:{table_name}"),
table_name=table_name,
schema_name=schema_name,
id=uuid5(NAMESPACE_OID, name=f"{table_name}"),
name=table_name,
columns=details["columns"],
primary_key=details.get("primary_key"),
foreign_keys=details.get("foreign_keys", []),
sample_rows=rows,
row_count_estimate=row_count_estimate,
description=f"Schema table for '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows.",
description=f"Relational database table with '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows."
f"Here are the columns this table contains: {details['columns']}"
f"Here are a few sample_rows to show the contents of the table: {rows}"
f"Table is part of the database: {migration_config.migration_db_name}",
)
schema_tables.append(schema_table)
tables[table_name] = details
@ -97,30 +98,33 @@ async def ingest_database_schema(
if "." not in ref_table_fq and "." in table_name:
ref_table_fq = f"{table_name.split('.', 1)[0]}.{ref_table_fq}"
relationship_name = (
f"{table_name}:{fk['column']}->{ref_table_fq}:{fk['ref_column']}"
)
relationship = SchemaRelationship(
id=uuid5(
NAMESPACE_OID,
name=f"{schema_name}:{table_name}:{fk['column']}->{ref_table_fq}:{fk['ref_column']}",
),
id=uuid5(NAMESPACE_OID, name=relationship_name),
name=relationship_name,
source_table=table_name,
target_table=ref_table_fq,
relationship_type="foreign_key",
source_column=fk["column"],
target_column=fk["ref_column"],
description=f"Foreign key relationship: {table_name}.{fk['column']}{ref_table_fq}.{fk['ref_column']}",
description=f"Relational database table foreign key relationship between: {table_name}.{fk['column']}{ref_table_fq}.{fk['ref_column']}"
f"This foreing key relationship between table columns is a part of the following database: {migration_config.migration_db_name}",
)
schema_relationships.append(relationship)
migration_config = get_migration_config()
id_str = f"{migration_config.migration_db_provider}:{migration_config.migration_db_name}:{schema_name}"
id_str = f"{migration_config.migration_db_provider}:{migration_config.migration_db_name}"
database_schema = DatabaseSchema(
id=uuid5(NAMESPACE_OID, name=id_str),
schema_name=schema_name,
name=migration_config.migration_db_name,
database_type=migration_config.migration_db_provider,
tables=tables,
sample_data=sample_data,
extraction_timestamp=datetime.now(timezone.utc),
description=f"Database schema '{schema_name}' containing {len(schema_tables)} tables and {len(schema_relationships)} relationships.",
description=f"Database schema containing {len(schema_tables)} tables and {len(schema_relationships)} relationships. "
f"The database type is {migration_config.migration_db_provider}."
f"The database contains the following tables: {tables}",
)
return {

View file

@ -6,36 +6,36 @@ from datetime import datetime
class DatabaseSchema(DataPoint):
"""Represents a complete database schema with sample data"""
schema_name: str
name: str
database_type: str # sqlite, postgres, etc.
tables: Dict[str, Dict] # Reuse existing schema format from SqlAlchemyAdapter
sample_data: Dict[str, List[Dict]] # Limited examples per table
extraction_timestamp: datetime
description: str
metadata: dict = {"index_fields": ["schema_name", "database_type"]}
metadata: dict = {"index_fields": ["description", "name"]}
class SchemaTable(DataPoint):
"""Represents an individual table schema with relationships"""
table_name: str
schema_name: str
name: str
columns: List[Dict] # Column definitions with types
primary_key: Optional[str]
foreign_keys: List[Dict] # Foreign key relationships
sample_rows: List[Dict] # Max 3-5 example rows
row_count_estimate: Optional[int] # Actual table size
description: str
metadata: dict = {"index_fields": ["table_name", "schema_name"]}
metadata: dict = {"index_fields": ["description", "name"]}
class SchemaRelationship(DataPoint):
"""Represents relationships between tables"""
name: str
source_table: str
target_table: str
relationship_type: str # "foreign_key", "one_to_many", etc.
source_column: str
target_column: str
description: str
metadata: dict = {"index_fields": ["source_table", "target_table"]}
metadata: dict = {"index_fields": ["description", "name"]}