feat: ingest pr comments with graphql (#835)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com>
This commit is contained in:
lxobr 2025-05-19 15:13:11 +02:00 committed by GitHub
parent d7d626698d
commit b4b55b820d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 412 additions and 127 deletions

View file

@ -3,7 +3,6 @@ from typing import Type
from pydantic import BaseModel, Field, PrivateAttr
from cognee.modules.engine.models import NodeSet
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
class CogneeSearchInput(BaseModel):
@ -29,8 +28,8 @@ class CogneeSearch(BaseTool):
self._nodeset_name = nodeset_name
def _run(self, query: str) -> str:
import cognee
import asyncio
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
async def main():
try:

View file

@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
import requests
from cognee.shared.logging_utils import get_logger
GITHUB_API_URL = "https://api.github.com/graphql"
logger = get_logger("github_comments")
class GitHubCommentBase(ABC):
"""Base class for GitHub comment providers."""
def __init__(self, token, username, limit=10):
self.token = token
self.username = username
self.limit = limit
def _run_query(self, query: str) -> dict:
"""Executes a GraphQL query against GitHub's API."""
headers = {"Authorization": f"Bearer {self.token}"}
response = requests.post(GITHUB_API_URL, json={"query": query}, headers=headers)
if response.status_code != 200:
raise Exception(f"Query failed: {response.status_code} - {response.text}")
return response.json()["data"]
def get_comments(self):
"""Template method that orchestrates the comment retrieval process."""
try:
query = self._build_query()
data = self._run_query(query)
raw_comments = self._extract_comments(data)
return [self._format_comment(item) for item in raw_comments[: self.limit]]
except Exception as e:
logger.error(f"Error fetching {self._get_comment_type()} comments: {e}")
return []
@abstractmethod
def _build_query(self) -> str:
"""Builds the GraphQL query string."""
pass
@abstractmethod
def _extract_comments(self, data) -> list:
"""Extracts the comment data from the GraphQL response."""
pass
@abstractmethod
def _format_comment(self, item) -> dict:
"""Formats a single comment."""
pass
@abstractmethod
def _get_comment_type(self) -> str:
"""Returns the type of comment this provider handles."""
pass

View file

@ -0,0 +1,298 @@
from datetime import datetime, timedelta
from cognee.complex_demos.crewai_demo.src.crewai_demo.github_comment_base import (
GitHubCommentBase,
logger,
)
class IssueCommentsProvider(GitHubCommentBase):
"""Provider for GitHub issue comments."""
QUERY_TEMPLATE = """
{{
user(login: "{username}") {{
issueComments(first: {limit}, orderBy: {{field: UPDATED_AT, direction: DESC}}) {{
nodes {{
body
createdAt
updatedAt
url
issue {{
number
title
url
repository {{
nameWithOwner
}}
state
}}
}}
}}
}}
}}
"""
def _build_query(self) -> str:
"""Builds the GraphQL query for issue comments."""
return self.QUERY_TEMPLATE.format(username=self.username, limit=self.limit)
def _extract_comments(self, data) -> list:
"""Extracts issue comments from the GraphQL response."""
return data["user"]["issueComments"]["nodes"]
def _format_comment(self, comment) -> dict:
"""Formats an issue comment from GraphQL."""
comment_id = comment["url"].split("/")[-1] if comment["url"] else None
return {
"repo": comment["issue"]["repository"]["nameWithOwner"],
"issue_number": comment["issue"]["number"],
"comment_id": comment_id,
"body": comment["body"],
"text": comment["body"],
"created_at": comment["createdAt"],
"updated_at": comment["updatedAt"],
"html_url": comment["url"],
"issue_url": comment["issue"]["url"],
"author_association": "COMMENTER",
"issue_title": comment["issue"]["title"],
"issue_state": comment["issue"]["state"],
"login": self.username,
"type": "issue_comment",
}
def _get_comment_type(self) -> str:
"""Returns the comment type for error messages."""
return "issue"
class PrReviewsProvider(GitHubCommentBase):
"""Provider for GitHub PR reviews."""
QUERY_TEMPLATE = """
{{
user(login: "{username}") {{
contributionsCollection {{
pullRequestReviewContributions(first: {fetch_limit}) {{
nodes {{
pullRequestReview {{
body
createdAt
updatedAt
url
state
pullRequest {{
number
title
url
repository {{
nameWithOwner
}}
state
}}
}}
}}
}}
}}
}}
}}
"""
def __init__(self, token, username, limit=10, fetch_limit=None):
"""Initialize with token, username, and optional limits."""
super().__init__(token, username, limit)
self.fetch_limit = fetch_limit if fetch_limit is not None else 10 * limit
def _build_query(self) -> str:
"""Builds the GraphQL query for PR reviews."""
return self.QUERY_TEMPLATE.format(username=self.username, fetch_limit=self.fetch_limit)
def _extract_comments(self, data) -> list:
"""Extracts PR reviews from the GraphQL response."""
contributions = data["user"]["contributionsCollection"]["pullRequestReviewContributions"][
"nodes"
]
return [
node["pullRequestReview"] for node in contributions if node["pullRequestReview"]["body"]
]
def _format_comment(self, review) -> dict:
"""Formats a PR review from GraphQL."""
review_id = review["url"].split("/")[-1] if review["url"] else None
return {
"repo": review["pullRequest"]["repository"]["nameWithOwner"],
"issue_number": review["pullRequest"]["number"],
"comment_id": review_id,
"body": review["body"],
"text": review["body"],
"created_at": review["createdAt"],
"updated_at": review["updatedAt"],
"html_url": review["url"],
"issue_url": review["pullRequest"]["url"],
"author_association": "COMMENTER",
"issue_title": review["pullRequest"]["title"],
"issue_state": review["pullRequest"]["state"],
"login": self.username,
"review_state": review["state"],
"type": "pr_review",
}
def _get_comment_type(self) -> str:
"""Returns the comment type for error messages."""
return "PR review"
class PrReviewCommentsProvider(GitHubCommentBase):
"""Provider for GitHub PR review comments (inline code comments)."""
PR_CONTRIBUTIONS_TEMPLATE = """
{{
user(login: "{username}") {{
contributionsCollection {{
pullRequestReviewContributions(first: {fetch_limit}) {{
nodes {{
pullRequestReview {{
pullRequest {{
number
title
url
repository {{
nameWithOwner
}}
state
}}
}}
}}
}}
}}
}}
}}
"""
PR_COMMENTS_TEMPLATE = """
{{
repository(owner: "{owner}", name: "{repo}") {{
pullRequest(number: {pr_number}) {{
reviews(first: {reviews_limit}, author: "{username}") {{
nodes {{
comments(first: {comments_limit}) {{
nodes {{
body
createdAt
updatedAt
url
}}
}}
}}
}}
}}
}}
}}
"""
def __init__(
self,
token,
username,
limit=10,
fetch_limit=None,
reviews_limit=None,
comments_limit=None,
pr_limit=None,
):
"""Initialize with token, username, and optional limits."""
super().__init__(token, username, limit)
self.fetch_limit = fetch_limit if fetch_limit is not None else 4 * limit
self.reviews_limit = reviews_limit if reviews_limit is not None else 2 * limit
self.comments_limit = comments_limit if comments_limit is not None else 3 * limit
self.pr_limit = pr_limit if pr_limit is not None else 2 * limit
def _build_query(self) -> str:
"""Builds the GraphQL query for PR contributions."""
return self.PR_CONTRIBUTIONS_TEMPLATE.format(
username=self.username, fetch_limit=self.fetch_limit
)
def _extract_comments(self, data) -> list:
"""Extracts PR review comments using a two-step approach."""
prs = self._get_reviewed_prs(data)
return self._fetch_comments_for_prs(prs)
def _get_reviewed_prs(self, data) -> list:
"""Gets a deduplicated list of PRs the user has reviewed."""
contributions = data["user"]["contributionsCollection"]["pullRequestReviewContributions"][
"nodes"
]
unique_prs = []
for node in contributions:
pr = node["pullRequestReview"]["pullRequest"]
if not any(existing_pr["url"] == pr["url"] for existing_pr in unique_prs):
unique_prs.append(pr)
return unique_prs[: min(self.pr_limit, len(unique_prs))]
def _fetch_comments_for_prs(self, prs) -> list:
"""Fetches inline comments for each PR in the list."""
all_comments = []
for pr in prs:
comments = self._get_comments_for_pr(pr)
all_comments.extend(comments)
return all_comments
def _get_comments_for_pr(self, pr) -> list:
"""Fetches the inline comments for a specific PR."""
owner, repo = pr["repository"]["nameWithOwner"].split("/")
pr_query = self.PR_COMMENTS_TEMPLATE.format(
owner=owner,
repo=repo,
pr_number=pr["number"],
username=self.username,
reviews_limit=self.reviews_limit,
comments_limit=self.comments_limit,
)
try:
pr_comments = []
pr_data = self._run_query(pr_query)
reviews = pr_data["repository"]["pullRequest"]["reviews"]["nodes"]
for review in reviews:
for comment in review["comments"]["nodes"]:
comment["_pr_data"] = pr
pr_comments.append(comment)
return pr_comments
except Exception as e:
logger.error(f"Error fetching comments for PR #{pr['number']}: {e}")
return []
def _format_comment(self, comment) -> dict:
"""Formats a PR review comment from GraphQL."""
pr = comment["_pr_data"]
comment_id = comment["url"].split("/")[-1] if comment["url"] else None
return {
"repo": pr["repository"]["nameWithOwner"],
"issue_number": pr["number"],
"comment_id": comment_id,
"body": comment["body"],
"text": comment["body"],
"created_at": comment["createdAt"],
"updated_at": comment["updatedAt"],
"html_url": comment["url"],
"issue_url": pr["url"],
"author_association": "COMMENTER",
"issue_title": pr["title"],
"issue_state": pr["state"],
"login": self.username,
"type": "pr_review_comment",
}
def _get_comment_type(self) -> str:
"""Returns the comment type for error messages."""
return "PR review comment"

View file

@ -1,105 +1,57 @@
from github import Github
from datetime import datetime, timedelta
from datetime import datetime
from cognee.complex_demos.crewai_demo.src.crewai_demo.github_comment_providers import (
IssueCommentsProvider,
PrReviewsProvider,
PrReviewCommentsProvider,
)
from cognee.complex_demos.crewai_demo.src.crewai_demo.github_comment_base import logger
class GitHubDevComments:
"""Class for working with a GitHub developer's comments."""
"""Facade class for working with a GitHub developer's comments."""
def __init__(
self, profile, days=30, issues_limit=10, max_comments=5, include_issue_details=True
):
def __init__(self, profile, limit=10, include_issue_details=True):
"""Initialize with a GitHubDevProfile instance and default parameters."""
self.profile = profile
self.days = days
self.issues_limit = issues_limit
self.max_comments = max_comments
self.limit = limit
self.include_issue_details = include_issue_details
def get_issue_comments(self):
"""Fetches comments made by the user on issues and PRs across repositories within timeframe."""
"""Fetches the most recent comments made by the user on issues and PRs across repositories."""
if not self.profile.user:
logger.warning(f"No user found for profile {self.profile.username}")
return None
date_filter = self._get_date_filter(self.days)
query = f"commenter:{self.profile.username}{date_filter}"
logger.debug(f"Fetching comments for {self.profile.username} with limit={self.limit}")
return self._get_comments_from_search(query)
# Create providers with just the basic limit - they will handle their own multipliers
issue_provider = IssueCommentsProvider(
self.profile.token, self.profile.username, self.limit
)
pr_review_provider = PrReviewsProvider(
self.profile.token, self.profile.username, self.limit
)
pr_comment_provider = PrReviewCommentsProvider(
self.profile.token, self.profile.username, self.limit
)
def get_repo_issue_comments(self, repo_name):
"""Fetches comments made by the user on issues and PRs in a specific repository within timeframe."""
if not self.profile.user:
return None
issue_comments = issue_provider.get_comments()
pr_reviews = pr_review_provider.get_comments()
pr_review_comments = pr_comment_provider.get_comments()
date_filter = self._get_date_filter(self.days)
query = f"repo:{repo_name} commenter:{self.profile.username}{date_filter}"
self.profile.github.get_repo(repo_name)
total_comments = issue_comments + pr_reviews + pr_review_comments
logger.info(
f"Retrieved {len(total_comments)} comments for {self.profile.username} "
f"({len(issue_comments)} issue, {len(pr_reviews)} PR reviews, "
f"{len(pr_review_comments)} PR review comments)"
)
return self._get_comments_from_search(query)
return total_comments
def set_limits(
self, days=None, issues_limit=None, max_comments=None, include_issue_details=None
):
"""Sets all search parameters for comment searches."""
if days is not None:
self.days = days
if issues_limit is not None:
self.issues_limit = issues_limit
if max_comments is not None:
self.max_comments = max_comments
def set_limit(self, limit=None, include_issue_details=None):
"""Sets the limit for comments to retrieve."""
if limit is not None:
self.limit = limit
if include_issue_details is not None:
self.include_issue_details = include_issue_details
def _get_date_filter(self, days):
"""Creates a date filter string for GitHub search queries."""
if not days:
return ""
date_limit = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
return f" created:>={date_limit}"
def _get_comments_from_search(self, query):
"""Retrieves comments based on a search query for issues."""
try:
issues = list(self.profile.github.search_issues(query))
except Exception as e:
print(f"Error executing search query: {e}")
return []
if not issues:
return []
all_comments = [
self._extract_comment_data(issue, comment)
for issue in issues[: self.issues_limit]
for comment in self._get_user_comments_from_issue(issue)
]
return all_comments
def _get_user_comments_from_issue(self, issue):
"""Gets comments made by the user on a specific issue."""
try:
all_comments = list(issue.get_comments())
user_comments = [c for c in all_comments if c.user.login == self.profile.username]
return user_comments[: self.max_comments]
except Exception as e:
print(f"Error getting comments from issue #{issue.number}: {e}")
return []
def _extract_comment_data(self, issue, comment):
"""Creates a structured data object from a comment."""
comment_data = {
"repo": issue.repository.full_name,
"issue_number": issue.number,
"comment_id": comment.id,
"body": comment.body,
"created_at": comment.created_at,
"updated_at": comment.updated_at,
"html_url": comment.html_url,
"issue_url": issue.html_url,
"author_association": getattr(comment, "author_association", "UNKNOWN"),
"issue_title": issue.title,
"issue_state": issue.state,
}
return comment_data

View file

@ -62,38 +62,18 @@ class GitHubDevProfile:
return self.commits.get_user_file_changes()
def get_issue_comments(
self, days=30, issues_limit=10, max_comments=5, include_issue_details=True
):
"""Fetches comments made by the user on issues across repositories within specified timeframe."""
def get_issue_comments(self, limit=10, include_issue_details=True):
"""Fetches the most recent comments made by the user on issues and PRs across repositories."""
if not self.comments:
return None
self.comments.set_limits(
days=days,
issues_limit=issues_limit,
max_comments=max_comments,
self.comments.set_limit(
limit=limit,
include_issue_details=include_issue_details,
)
return self.comments.get_issue_comments()
def get_repo_issue_comments(
self, repo_name, days=30, issues_limit=10, max_comments=5, include_issue_details=True
):
"""Fetches comments made by the user on issues in a specific repository within timeframe."""
if not self.user or not self.comments:
return None
self.comments.set_limits(
days=days,
issues_limit=issues_limit,
max_comments=max_comments,
include_issue_details=include_issue_details,
)
return self.comments.get_repo_issue_comments(repo_name)
def _get_user(self, username):
"""Fetches a GitHub user object."""
try:

View file

@ -17,9 +17,7 @@ def get_github_profile_data(
commits_result = profile.get_user_commits(
days=days, prs_limit=prs_limit, commits_per_pr=commits_per_pr, include_files=True
)
comments = profile.get_issue_comments(
days=days, issues_limit=issues_limit, max_comments=max_comments, include_issue_details=True
)
comments = profile.get_issue_comments(limit=max_comments, include_issue_details=True)
return {
"user": profile.get_user_info(),
@ -72,13 +70,14 @@ def get_github_data_for_cognee(
if file_changes:
enriched_file_changes = [item | user_info for item in file_changes]
comments = profile.get_issue_comments(
days=days, issues_limit=issues_limit, max_comments=max_comments, include_issue_details=True
)
comments = profile.get_issue_comments(limit=max_comments, include_issue_details=True)
enriched_comments = []
if comments:
enriched_comments = [comment | user_info for comment in comments]
enriched_comments = []
for comment in comments:
safe_user_info = {k: v for k, v in user_info.items() if k not in comment}
enriched_comments.append(comment | safe_user_info)
return {"user": user_info, "file_changes": enriched_file_changes, "comments": enriched_comments}

View file

@ -286,13 +286,15 @@ if __name__ == "__main__":
dotenv.load_dotenv()
token = os.getenv("GITHUB_TOKEN")
# Choose one of these options:
# Option 1: Process from JSON file, mostly for testing
# json_file_path = ""
# asyncio.run(process_github_from_file(json_file_path))
#
# Option 2: Process directly from GitHub
username = ""
asyncio.run(cognify_github_data_from_username(username, token))
async def cognify_from_username(username, token):
from cognee.infrastructure.databases.relational import create_db_and_tables
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await create_db_and_tables()
await cognify_github_data_from_username(username, token)
# Run it
asyncio.run(cognify_from_username(username, token))