cognee/examples/python/code_graph_example.py
alekszievr 05ba29af01
Feat: log pipeline status and pass it through pipeline [COG-1214] (#501)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced pipeline execution now provides consolidated status feedback
with improved telemetry for start, completion, and error events.
- Automatic generation of unique dataset identifiers offers clearer task
and pipeline run associations.

- **Refactor**
- Task execution has been streamlined with explicit parameter handling
for more structured pipeline processing.
- Interactive examples and demos now return results directly, making
integration and monitoring more accessible.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Boris Arzentar <borisarzentar@gmail.com>
2025-02-11 16:41:40 +01:00

46 lines
1.3 KiB
Python

import argparse
import asyncio
import logging
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
from cognee.shared.utils import setup_logging
async def main(repo_path, include_docs):
return await run_code_graph_pipeline(repo_path, include_docs)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
parser.add_argument(
"--include_docs",
type=lambda x: x.lower() in ("true", "1"),
default=True,
help="Whether or not to process non-code files",
)
parser.add_argument(
"--time",
type=lambda x: x.lower() in ("true", "1"),
default=True,
help="Whether or not to time the pipeline run",
)
return parser.parse_args()
if __name__ == "__main__":
setup_logging(logging.ERROR)
args = parse_args()
if args.time:
import time
start_time = time.time()
asyncio.run(main(args.repo_path, args.include_docs))
end_time = time.time()
print("\n" + "=" * 50)
print(f"Pipeline Execution Time: {end_time - start_time:.2f} seconds")
print("=" * 50 + "\n")
else:
asyncio.run(main(args.repo_path, args.include_docs))