diff --git a/cognee/modules/visualization/cognee_network_visualization.py b/cognee/modules/visualization/cognee_network_visualization.py index bbdbc0019..c735e70f1 100644 --- a/cognee/modules/visualization/cognee_network_visualization.py +++ b/cognee/modules/visualization/cognee_network_visualization.py @@ -23,6 +23,9 @@ async def cognee_network_visualization(graph_data, destination_file_path: str = "TableRow": "#f47710", "TableType": "#6510f4", "ColumnValue": "#13613a", + "SchemaTable": "#f47710", + "DatabaseSchema": "#6510f4", + "SchemaRelationship": "#13613a", "default": "#D3D3D3", } diff --git a/cognee/tasks/ingestion/migrate_relational_database.py b/cognee/tasks/ingestion/migrate_relational_database.py index 83ad452c3..53ce176e8 100644 --- a/cognee/tasks/ingestion/migrate_relational_database.py +++ b/cognee/tasks/ingestion/migrate_relational_database.py @@ -81,7 +81,6 @@ async def schema_only_ingestion(schema): # Calling the ingest_database_schema function to return DataPoint subclasses result = await ingest_database_schema( schema=schema, - schema_name="migrated_schema", max_sample_rows=5, ) database_schema = result["database_schema"] @@ -105,7 +104,7 @@ async def schema_only_ingestion(schema): ), ) ) - table_name_to_id = {t.table_name: t.id for t in schema_tables} + table_name_to_id = {t.name: t.id for t in schema_tables} for rel in schema_relationships: source_table_id = table_name_to_id.get(rel.source_table) target_table_id = table_name_to_id.get(rel.target_table) diff --git a/cognee/tasks/schema/ingest_database_schema.py b/cognee/tasks/schema/ingest_database_schema.py index e89b679d2..e3823701c 100644 --- a/cognee/tasks/schema/ingest_database_schema.py +++ b/cognee/tasks/schema/ingest_database_schema.py @@ -12,15 +12,13 @@ from datetime import datetime, timezone async def ingest_database_schema( schema, - schema_name: str = "default", max_sample_rows: int = 0, ) -> Dict[str, List[DataPoint] | DataPoint]: """ Extract database schema metadata (optionally with sample data) and return DataPoint models for graph construction. Args: - database_config: Database connection configuration - schema_name: Name identifier for this schema + schema: Database schema max_sample_rows: Maximum sample rows per table (0 means no sampling) Returns: @@ -35,6 +33,7 @@ async def ingest_database_schema( schema_tables = [] schema_relationships = [] + migration_config = get_migration_config() engine = get_migration_relational_engine() qi = engine.engine.dialect.identifier_preparer.quote try: @@ -78,15 +77,17 @@ async def ingest_database_schema( row_count_estimate = count_result.scalar() schema_table = SchemaTable( - id=uuid5(NAMESPACE_OID, name=f"{schema_name}:{table_name}"), - table_name=table_name, - schema_name=schema_name, + id=uuid5(NAMESPACE_OID, name=f"{table_name}"), + name=table_name, columns=details["columns"], primary_key=details.get("primary_key"), foreign_keys=details.get("foreign_keys", []), sample_rows=rows, row_count_estimate=row_count_estimate, - description=f"Schema table for '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows.", + description=f"Relational database table with '{table_name}' with {len(details['columns'])} columns and approx. {row_count_estimate} rows." + f"Here are the columns this table contains: {details['columns']}" + f"Here are a few sample_rows to show the contents of the table: {rows}" + f"Table is part of the database: {migration_config.migration_db_name}", ) schema_tables.append(schema_table) tables[table_name] = details @@ -97,30 +98,33 @@ async def ingest_database_schema( if "." not in ref_table_fq and "." in table_name: ref_table_fq = f"{table_name.split('.', 1)[0]}.{ref_table_fq}" + relationship_name = ( + f"{table_name}:{fk['column']}->{ref_table_fq}:{fk['ref_column']}" + ) relationship = SchemaRelationship( - id=uuid5( - NAMESPACE_OID, - name=f"{schema_name}:{table_name}:{fk['column']}->{ref_table_fq}:{fk['ref_column']}", - ), + id=uuid5(NAMESPACE_OID, name=relationship_name), + name=relationship_name, source_table=table_name, target_table=ref_table_fq, relationship_type="foreign_key", source_column=fk["column"], target_column=fk["ref_column"], - description=f"Foreign key relationship: {table_name}.{fk['column']} → {ref_table_fq}.{fk['ref_column']}", + description=f"Relational database table foreign key relationship between: {table_name}.{fk['column']} → {ref_table_fq}.{fk['ref_column']}" + f"This foreing key relationship between table columns is a part of the following database: {migration_config.migration_db_name}", ) schema_relationships.append(relationship) - migration_config = get_migration_config() - id_str = f"{migration_config.migration_db_provider}:{migration_config.migration_db_name}:{schema_name}" + id_str = f"{migration_config.migration_db_provider}:{migration_config.migration_db_name}" database_schema = DatabaseSchema( id=uuid5(NAMESPACE_OID, name=id_str), - schema_name=schema_name, + name=migration_config.migration_db_name, database_type=migration_config.migration_db_provider, tables=tables, sample_data=sample_data, extraction_timestamp=datetime.now(timezone.utc), - description=f"Database schema '{schema_name}' containing {len(schema_tables)} tables and {len(schema_relationships)} relationships.", + description=f"Database schema containing {len(schema_tables)} tables and {len(schema_relationships)} relationships. " + f"The database type is {migration_config.migration_db_provider}." + f"The database contains the following tables: {tables}", ) return { diff --git a/cognee/tasks/schema/models.py b/cognee/tasks/schema/models.py index 423c92050..4b13f420b 100644 --- a/cognee/tasks/schema/models.py +++ b/cognee/tasks/schema/models.py @@ -6,36 +6,36 @@ from datetime import datetime class DatabaseSchema(DataPoint): """Represents a complete database schema with sample data""" - schema_name: str + name: str database_type: str # sqlite, postgres, etc. tables: Dict[str, Dict] # Reuse existing schema format from SqlAlchemyAdapter sample_data: Dict[str, List[Dict]] # Limited examples per table extraction_timestamp: datetime description: str - metadata: dict = {"index_fields": ["schema_name", "database_type"]} + metadata: dict = {"index_fields": ["description", "name"]} class SchemaTable(DataPoint): """Represents an individual table schema with relationships""" - table_name: str - schema_name: str + name: str columns: List[Dict] # Column definitions with types primary_key: Optional[str] foreign_keys: List[Dict] # Foreign key relationships sample_rows: List[Dict] # Max 3-5 example rows row_count_estimate: Optional[int] # Actual table size description: str - metadata: dict = {"index_fields": ["table_name", "schema_name"]} + metadata: dict = {"index_fields": ["description", "name"]} class SchemaRelationship(DataPoint): """Represents relationships between tables""" + name: str source_table: str target_table: str relationship_type: str # "foreign_key", "one_to_many", etc. source_column: str target_column: str description: str - metadata: dict = {"index_fields": ["source_table", "target_table"]} + metadata: dict = {"index_fields": ["description", "name"]}