refactor: remove code and repository related tasks

This commit is contained in:
Andrej Milicevic 2025-11-05 13:02:56 +01:00
parent c481b87d58
commit 18e4bb48fd
7 changed files with 0 additions and 828 deletions

View file

@ -1,35 +0,0 @@
import os
import asyncio
import argparse
from cognee.tasks.repo_processor.get_repo_file_dependencies import get_repo_file_dependencies
from cognee.tasks.repo_processor.enrich_dependency_graph import enrich_dependency_graph
def main():
"""
Execute the main logic of the dependency graph processor.
This function sets up argument parsing to retrieve the repository path, checks the
existence of the specified path, and processes the repository to produce a dependency
graph. If the repository path does not exist, it logs an error message and terminates
without further execution.
"""
parser = argparse.ArgumentParser()
parser.add_argument("repo_path", help="Path to the repository")
args = parser.parse_args()
repo_path = args.repo_path
if not os.path.exists(repo_path):
print(f"Error: The provided repository path does not exist: {repo_path}")
return
graph = asyncio.run(get_repo_file_dependencies(repo_path))
graph = asyncio.run(enrich_dependency_graph(graph))
for node in graph.nodes:
print(f"Node: {node}")
for _, target, data in graph.out_edges(node, data=True):
print(f" Edge to {target}, data: {data}")
if __name__ == "__main__":
main()

View file

@ -1,20 +0,0 @@
import argparse
import asyncio
from cognee.tasks.repo_processor.get_local_dependencies import get_local_script_dependencies
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Get local script dependencies.")
# Suggested path: .../cognee/examples/python/simple_example.py
parser.add_argument("script_path", type=str, help="Absolute path to the Python script file")
# Suggested path: .../cognee
parser.add_argument("repo_path", type=str, help="Absolute path to the repository root")
args = parser.parse_args()
dependencies = asyncio.run(get_local_script_dependencies(args.script_path, args.repo_path))
print("Dependencies:")
for dependency in dependencies:
print(dependency)

View file

@ -1,35 +0,0 @@
import os
import asyncio
import argparse
from cognee.tasks.repo_processor.get_repo_file_dependencies import get_repo_file_dependencies
def main():
"""
Parse the command line arguments and print the repository file dependencies.
This function sets up an argument parser to retrieve the path of a repository. It checks
if the provided path exists and if it doesnt, it prints an error message and exits. If
the path is valid, it calls an asynchronous function to get the dependencies and prints
the nodes and their relations in the dependency graph.
"""
parser = argparse.ArgumentParser()
parser.add_argument("repo_path", help="Path to the repository")
args = parser.parse_args()
repo_path = args.repo_path
if not os.path.exists(repo_path):
print(f"Error: The provided repository path does not exist: {repo_path}")
return
graph = asyncio.run(get_repo_file_dependencies(repo_path))
for node in graph.nodes:
print(f"Node: {node}")
edges = graph.edges(node, data=True)
for _, target, data in edges:
print(f" Edge to {target}, Relation: {data.get('relation')}")
if __name__ == "__main__":
main()

View file

@ -1,2 +0,0 @@
from .get_non_code_files import get_non_py_files
from .get_repo_file_dependencies import get_repo_file_dependencies

View file

@ -1,335 +0,0 @@
import os
import aiofiles
import importlib
from typing import AsyncGenerator, Optional
from uuid import NAMESPACE_OID, uuid5
import tree_sitter_python as tspython
from tree_sitter import Language, Node, Parser, Tree
from cognee.shared.logging_utils import get_logger
from cognee.low_level import DataPoint
from cognee.shared.CodeGraphEntities import (
CodeFile,
ImportStatement,
FunctionDefinition,
ClassDefinition,
)
logger = get_logger()
class FileParser:
"""
Handles the parsing of files into source code and an abstract syntax tree
representation. Public methods include:
- parse_file: Parses a file and returns its source code and syntax tree representation.
"""
def __init__(self):
self.parsed_files = {}
async def parse_file(self, file_path: str) -> tuple[str, Tree]:
"""
Parse a file and return its source code along with its syntax tree representation.
If the file has already been parsed, retrieve the result from memory instead of reading
the file again.
Parameters:
-----------
- file_path (str): The path of the file to parse.
Returns:
--------
- tuple[str, Tree]: A tuple containing the source code of the file and its
corresponding syntax tree representation.
"""
PY_LANGUAGE = Language(tspython.language())
source_code_parser = Parser(PY_LANGUAGE)
if file_path not in self.parsed_files:
source_code = await get_source_code(file_path)
source_code_tree = source_code_parser.parse(bytes(source_code, "utf-8"))
self.parsed_files[file_path] = (source_code, source_code_tree)
return self.parsed_files[file_path]
async def get_source_code(file_path: str):
"""
Read source code from a file asynchronously.
This function attempts to open a file specified by the given file path, read its
contents, and return the source code. In case of any errors during the file reading
process, it logs an error message and returns None.
Parameters:
-----------
- file_path (str): The path to the file from which to read the source code.
Returns:
--------
Returns the contents of the file as a string if successful, or None if an error
occurs.
"""
try:
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
source_code = await f.read()
return source_code
except Exception as error:
logger.error(f"Error reading file {file_path}: {str(error)}")
return None
def resolve_module_path(module_name):
"""
Find the file path of a module.
Return the file path of the specified module if found, or return None if the module does
not exist or cannot be located.
Parameters:
-----------
- module_name: The name of the module whose file path is to be resolved.
Returns:
--------
The file path of the module as a string or None if the module is not found.
"""
try:
spec = importlib.util.find_spec(module_name)
if spec and spec.origin:
return spec.origin
except ModuleNotFoundError:
return None
return None
def find_function_location(
module_path: str, function_name: str, parser: FileParser
) -> Optional[tuple[str, str]]:
"""
Find the location of a function definition in a specified module.
Parameters:
-----------
- module_path (str): The path to the module where the function is defined.
- function_name (str): The name of the function whose location is to be found.
- parser (FileParser): An instance of FileParser used to parse the module's source
code.
Returns:
--------
- Optional[tuple[str, str]]: Returns a tuple containing the module path and the
start point of the function if found; otherwise, returns None.
"""
if not module_path or not os.path.exists(module_path):
return None
source_code, tree = parser.parse_file(module_path)
root_node: Node = tree.root_node
for node in root_node.children:
if node.type == "function_definition":
func_name_node = node.child_by_field_name("name")
if func_name_node and func_name_node.text.decode() == function_name:
return (module_path, node.start_point) # (line, column)
return None
async def get_local_script_dependencies(
repo_path: str, script_path: str, detailed_extraction: bool = False
) -> CodeFile:
"""
Retrieve local script dependencies and create a CodeFile object.
Parameters:
-----------
- repo_path (str): The path to the repository that contains the script.
- script_path (str): The path of the script for which dependencies are being
extracted.
- detailed_extraction (bool): A flag indicating whether to perform a detailed
extraction of code components.
Returns:
--------
- CodeFile: Returns a CodeFile object containing information about the script,
including its dependencies and definitions.
"""
code_file_parser = FileParser()
source_code, source_code_tree = await code_file_parser.parse_file(script_path)
file_path_relative_to_repo = script_path[len(repo_path) + 1 :]
if not detailed_extraction:
code_file_node = CodeFile(
id=uuid5(NAMESPACE_OID, script_path),
name=file_path_relative_to_repo,
source_code=source_code,
file_path=script_path,
language="python",
)
return code_file_node
code_file_node = CodeFile(
id=uuid5(NAMESPACE_OID, script_path),
name=file_path_relative_to_repo,
source_code=None,
file_path=script_path,
language="python",
)
async for part in extract_code_parts(source_code_tree.root_node, script_path=script_path):
part.file_path = script_path
if isinstance(part, FunctionDefinition):
code_file_node.provides_function_definition.append(part)
if isinstance(part, ClassDefinition):
code_file_node.provides_class_definition.append(part)
if isinstance(part, ImportStatement):
code_file_node.depends_on.append(part)
return code_file_node
def find_node(nodes: list[Node], condition: callable) -> Node:
"""
Find and return the first node that satisfies the given condition.
Iterate through the provided list of nodes and return the first node for which the
condition callable returns True. If no such node is found, return None.
Parameters:
-----------
- nodes (list[Node]): A list of Node objects to search through.
- condition (callable): A callable that takes a Node and returns a boolean
indicating if the node meets specified criteria.
Returns:
--------
- Node: The first Node that matches the condition, or None if no such node exists.
"""
for node in nodes:
if condition(node):
return node
return None
async def extract_code_parts(
tree_root: Node, script_path: str, existing_nodes: list[DataPoint] = {}
) -> AsyncGenerator[DataPoint, None]:
"""
Extract code parts from a given AST node tree asynchronously.
Iteratively yields DataPoint nodes representing import statements, function definitions,
and class definitions found in the children of the specified tree root. The function
checks
if nodes are already present in the existing_nodes dictionary to prevent duplicates.
This function has to be used in an asynchronous context, and it requires a valid
tree_root
and proper initialization of existing_nodes.
Parameters:
-----------
- tree_root (Node): The root node of the AST tree containing code parts to extract.
- script_path (str): The file path of the script from which the AST was generated.
- existing_nodes (list[DataPoint]): A dictionary that holds already extracted
DataPoint nodes to avoid duplicates. (default {})
Returns:
--------
Yields DataPoint nodes representing imported modules, functions, and classes.
"""
for child_node in tree_root.children:
if child_node.type == "import_statement" or child_node.type == "import_from_statement":
parts = child_node.text.decode("utf-8").split()
if parts[0] == "import":
module_name = parts[1]
function_name = None
elif parts[0] == "from":
module_name = parts[1]
function_name = parts[3]
if " as " in function_name:
function_name = function_name.split(" as ")[0]
if " as " in module_name:
module_name = module_name.split(" as ")[0]
if function_name and "import " + function_name not in existing_nodes:
import_statement_node = ImportStatement(
name=function_name,
module=module_name,
start_point=child_node.start_point,
end_point=child_node.end_point,
file_path=script_path,
source_code=child_node.text,
)
existing_nodes["import " + function_name] = import_statement_node
if function_name:
yield existing_nodes["import " + function_name]
if module_name not in existing_nodes:
import_statement_node = ImportStatement(
name=module_name,
module=module_name,
start_point=child_node.start_point,
end_point=child_node.end_point,
file_path=script_path,
source_code=child_node.text,
)
existing_nodes[module_name] = import_statement_node
yield existing_nodes[module_name]
if child_node.type == "function_definition":
function_node = find_node(child_node.children, lambda node: node.type == "identifier")
function_node_name = function_node.text
if function_node_name not in existing_nodes:
function_definition_node = FunctionDefinition(
name=function_node_name,
start_point=child_node.start_point,
end_point=child_node.end_point,
file_path=script_path,
source_code=child_node.text,
)
existing_nodes[function_node_name] = function_definition_node
yield existing_nodes[function_node_name]
if child_node.type == "class_definition":
class_name_node = find_node(child_node.children, lambda node: node.type == "identifier")
class_name_node_name = class_name_node.text
if class_name_node_name not in existing_nodes:
class_definition_node = ClassDefinition(
name=class_name_node_name,
start_point=child_node.start_point,
end_point=child_node.end_point,
file_path=script_path,
source_code=child_node.text,
)
existing_nodes[class_name_node_name] = class_definition_node
yield existing_nodes[class_name_node_name]

View file

@ -1,158 +0,0 @@
import os
async def get_non_py_files(repo_path):
"""
Get files that are not .py files and their contents.
Check if the specified repository path exists and if so, traverse the directory,
collecting the paths of files that do not have a .py extension and meet the
criteria set in the allowed and ignored patterns. Return a list of paths to
those files.
Parameters:
-----------
- repo_path: The file system path to the repository to scan for non-Python files.
Returns:
--------
A list of file paths that are not Python files and meet the specified criteria.
"""
if not os.path.exists(repo_path):
return {}
IGNORED_PATTERNS = {
".git",
"__pycache__",
"*.pyc",
"*.pyo",
"*.pyd",
"node_modules",
"*.egg-info",
}
ALLOWED_EXTENSIONS = {
".txt",
".md",
".csv",
".json",
".xml",
".yaml",
".yml",
".html",
".css",
".js",
".ts",
".jsx",
".tsx",
".sql",
".log",
".ini",
".toml",
".properties",
".sh",
".bash",
".dockerfile",
".gitignore",
".gitattributes",
".makefile",
".pyproject",
".requirements",
".env",
".pdf",
".doc",
".docx",
".dot",
".dotx",
".rtf",
".wps",
".wpd",
".odt",
".ott",
".ottx",
".txt",
".wp",
".sdw",
".sdx",
".docm",
".dotm",
# Additional extensions for other programming languages
".java",
".c",
".cpp",
".h",
".cs",
".go",
".php",
".rb",
".swift",
".pl",
".lua",
".rs",
".scala",
".kt",
".sh",
".sql",
".v",
".asm",
".pas",
".d",
".ml",
".clj",
".cljs",
".erl",
".ex",
".exs",
".f",
".fs",
".r",
".pyi",
".pdb",
".ipynb",
".rmd",
".cabal",
".hs",
".nim",
".vhdl",
".verilog",
".svelte",
".html",
".css",
".scss",
".less",
".json5",
".yaml",
".yml",
}
def should_process(path):
"""
Determine if a file should be processed based on its extension and path patterns.
This function checks if the file extension is in the allowed list and ensures that none
of the ignored patterns are present in the provided file path.
Parameters:
-----------
- path: The file path to check for processing eligibility.
Returns:
--------
Returns True if the file should be processed; otherwise, False.
"""
_, ext = os.path.splitext(path)
return ext in ALLOWED_EXTENSIONS and not any(
pattern in path for pattern in IGNORED_PATTERNS
)
non_py_files_paths = [
os.path.join(root, file)
for root, _, files in os.walk(repo_path)
for file in files
if not file.endswith(".py") and should_process(os.path.join(root, file))
]
return non_py_files_paths

View file

@ -1,243 +0,0 @@
import asyncio
import math
import os
from pathlib import Path
from typing import Set
from typing import AsyncGenerator, Optional, List
from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.engine import DataPoint
from cognee.shared.CodeGraphEntities import CodeFile, Repository
# constant, declared only once
EXCLUDED_DIRS: Set[str] = {
".venv",
"venv",
"env",
".env",
"site-packages",
"node_modules",
"dist",
"build",
".git",
"tests",
"test",
}
async def get_source_code_files(
repo_path,
language_config: dict[str, list[str]] | None = None,
excluded_paths: Optional[List[str]] = None,
):
"""
Retrieve Python source code files from the specified repository path.
This function scans the given repository path for files that have the .py extension
while excluding test files and files within a virtual environment. It returns a list of
absolute paths to the source code files that are not empty.
Parameters:
-----------
- repo_path: Root path of the repository to search
- language_config: dict mapping language names to file extensions, e.g.,
{'python': ['.py'], 'javascript': ['.js', '.jsx'], ...}
- excluded_paths: Optional list of path fragments or glob patterns to exclude
Returns:
--------
A list of (absolute_path, language) tuples for source code files.
"""
def _get_language_from_extension(file, language_config):
for lang, exts in language_config.items():
for ext in exts:
if file.endswith(ext):
return lang
return None
# Default config if not provided
if language_config is None:
language_config = {
"python": [".py"],
"javascript": [".js", ".jsx"],
"typescript": [".ts", ".tsx"],
"java": [".java"],
"csharp": [".cs"],
"go": [".go"],
"rust": [".rs"],
"cpp": [".cpp", ".c", ".h", ".hpp"],
}
if not os.path.exists(repo_path):
return []
source_code_files = set()
for root, _, files in os.walk(repo_path):
for file in files:
lang = _get_language_from_extension(file, language_config)
if lang is None:
continue
# Exclude tests, common build/venv directories and files provided in exclude_paths
excluded_dirs = EXCLUDED_DIRS
excluded_paths = {Path(p).resolve() for p in (excluded_paths or [])} # full paths
root_path = Path(root).resolve()
root_parts = set(root_path.parts) # same as before
base_name, _ext = os.path.splitext(file)
if (
base_name.startswith("test_")
or base_name.endswith("_test")
or ".test." in file
or ".spec." in file
or (excluded_dirs & root_parts) # name match
or any(
root_path.is_relative_to(p) # full-path match
for p in excluded_paths
)
):
continue
file_path = os.path.abspath(os.path.join(root, file))
if os.path.getsize(file_path) == 0:
continue
source_code_files.add((file_path, lang))
return sorted(list(source_code_files))
def run_coroutine(coroutine_func, *args, **kwargs):
"""
Run a coroutine function until it completes.
This function creates a new asyncio event loop, sets it as the current loop, and
executes the given coroutine function with the provided arguments. Once the coroutine
completes, the loop is closed. Intended for use in environments where an existing event
loop is not available or desirable.
Parameters:
-----------
- coroutine_func: The coroutine function to be run.
- *args: Positional arguments to pass to the coroutine function.
- **kwargs: Keyword arguments to pass to the coroutine function.
Returns:
--------
The result returned by the coroutine after completion.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coroutine_func(*args, **kwargs))
loop.close()
return result
async def get_repo_file_dependencies(
repo_path: str,
detailed_extraction: bool = False,
supported_languages: list = None,
excluded_paths: Optional[List[str]] = None,
) -> AsyncGenerator[DataPoint, None]:
"""
Generate a dependency graph for source files (multi-language) in the given repository path.
Check the validity of the repository path and yield a repository object followed by the
dependencies of source files within that repository. Raise a FileNotFoundError if the
provided path does not exist. The extraction of detailed dependencies can be controlled
via the `detailed_extraction` argument. Languages considered can be restricted via
the `supported_languages` argument.
Parameters:
-----------
- repo_path (str): The file path to the repository to process.
- detailed_extraction (bool): Whether to perform a detailed extraction of code parts.
- supported_languages (list | None): Subset of languages to include; if None, use defaults.
"""
if isinstance(repo_path, list) and len(repo_path) == 1:
repo_path = repo_path[0]
if not os.path.exists(repo_path):
raise FileNotFoundError(f"Repository path {repo_path} does not exist.")
# Build language config from supported_languages
default_language_config = {
"python": [".py"],
"javascript": [".js", ".jsx"],
"typescript": [".ts", ".tsx"],
"java": [".java"],
"csharp": [".cs"],
"go": [".go"],
"rust": [".rs"],
"cpp": [".cpp", ".c", ".h", ".hpp"],
"c": [".c", ".h"],
}
if supported_languages is not None:
language_config = {
k: v for k, v in default_language_config.items() if k in supported_languages
}
else:
language_config = default_language_config
source_code_files = await get_source_code_files(
repo_path, language_config=language_config, excluded_paths=excluded_paths
)
repo = Repository(
id=uuid5(NAMESPACE_OID, repo_path),
path=repo_path,
)
yield repo
chunk_size = 100
number_of_chunks = math.ceil(len(source_code_files) / chunk_size)
chunk_ranges = [
(
chunk_number * chunk_size,
min((chunk_number + 1) * chunk_size, len(source_code_files)) - 1,
)
for chunk_number in range(number_of_chunks)
]
# Import dependency extractors for each language (Python for now, extend later)
from cognee.tasks.repo_processor.get_local_dependencies import get_local_script_dependencies
import aiofiles
# TODO: Add other language extractors here
for start_range, end_range in chunk_ranges:
tasks = []
for file_path, lang in source_code_files[start_range : end_range + 1]:
# For now, only Python is supported; extend with other languages
if lang == "python":
tasks.append(
get_local_script_dependencies(repo_path, file_path, detailed_extraction)
)
else:
# Placeholder: create a minimal CodeFile for other languages
async def make_codefile_stub(file_path=file_path, lang=lang):
async with aiofiles.open(
file_path, "r", encoding="utf-8", errors="replace"
) as f:
source = await f.read()
return CodeFile(
id=uuid5(NAMESPACE_OID, file_path),
name=os.path.relpath(file_path, repo_path),
file_path=file_path,
language=lang,
source_code=source,
)
tasks.append(make_codefile_stub())
results: list[CodeFile] = await asyncio.gather(*tasks)
for source_code_file in results:
source_code_file.part_of = repo
if getattr(
source_code_file, "language", None
) is None and source_code_file.file_path.endswith(".py"):
source_code_file.language = "python"
yield source_code_file