From 5440fa766c890559f4f8c9575ee6f7c0f3044dff Mon Sep 17 00:00:00 2001 From: vasilije Date: Wed, 16 Jul 2025 16:34:34 +0200 Subject: [PATCH] added automation and test fixes --- .github/workflows/e2e_tests.yml | 2 +- .github/workflows/publish_pypi.yml | 174 ++++++++++ .github/workflows/security_verification.yml | 290 ++++++++++++++++ cognee/tests/test_knowledge_graph_quality.py | 39 +-- scripts/verify_package.py | 337 +++++++++++++++++++ 5 files changed, 812 insertions(+), 30 deletions(-) create mode 100644 .github/workflows/publish_pypi.yml create mode 100644 .github/workflows/security_verification.yml create mode 100644 scripts/verify_package.py diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index a495b0f58..92c0f226c 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -297,7 +297,7 @@ jobs: - name: Run Knowledge Graph Quality Test env: ENV: 'dev' - # Model selection is handled by the test with fallback priority + LLM_MODEL: ${{ secrets.LLM_MODEL }} LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} diff --git a/.github/workflows/publish_pypi.yml b/.github/workflows/publish_pypi.yml new file mode 100644 index 000000000..3b0d5336d --- /dev/null +++ b/.github/workflows/publish_pypi.yml @@ -0,0 +1,174 @@ +name: Publish to PyPI + +on: + release: + types: [published] + workflow_dispatch: + inputs: + test_pypi: + description: 'Publish to Test PyPI instead of PyPI' + required: false + type: boolean + default: false + +permissions: + contents: read + id-token: write # Required for trusted publishing and attestations + attestations: write # Required for package attestations + +jobs: + security-scan: + name: Security Scan + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install uv + uv sync --dev + + - name: Run safety check for known vulnerabilities + run: | + pip install safety + safety check --json > safety-report.json || true + + - name: Run bandit security linter + run: | + pip install bandit + bandit -r cognee/ -f json -o bandit-report.json || true + + - name: Upload security reports as artifacts + uses: actions/upload-artifact@v4 + with: + name: security-reports + path: | + safety-report.json + bandit-report.json + + - name: Check for high-severity vulnerabilities + run: | + # Fail if high-severity vulnerabilities are found + if [ -f safety-report.json ]; then + python -c " + import json + import sys + try: + with open('safety-report.json', 'r') as f: + data = json.load(f) + if isinstance(data, list) and len(data) > 0: + high_severity = [v for v in data if v.get('severity', '').lower() in ['high', 'critical']] + if high_severity: + print('HIGH SEVERITY VULNERABILITIES FOUND:') + for vuln in high_severity: + print(f' - {vuln.get(\"vulnerability\", \"Unknown\")} in {vuln.get(\"package\", \"Unknown\")}') + sys.exit(1) + except Exception as e: + print(f'Error parsing safety report: {e}') + pass + " + fi + + build-and-publish: + name: Build and publish to PyPI + needs: security-scan + runs-on: ubuntu-latest + environment: + name: ${{ github.event.inputs.test_pypi == 'true' && 'testpypi' || 'pypi' }} + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install build twine hatchling + + - name: Build package + run: | + python -m build + + - name: Generate package hashes + run: | + cd dist + sha256sum * > SHA256SUMS + sha512sum * > SHA512SUMS + echo "Generated checksums:" + cat SHA256SUMS + cat SHA512SUMS + + - name: Verify package integrity + run: | + cd dist + sha256sum -c SHA256SUMS + sha512sum -c SHA512SUMS + echo "Package integrity verified" + + - name: Check package with twine + run: | + twine check dist/* + + - name: Generate SBOM (Software Bill of Materials) + run: | + pip install cyclonedx-bom + cyclonedx-py requirements -o cognee-sbom.json + + - name: Upload build artifacts + uses: actions/upload-artifact@v4 + with: + name: dist-files + path: | + dist/ + cognee-sbom.json + + - name: Generate attestations for built packages + uses: actions/attest-build-provenance@v1 + with: + subject-path: 'dist/*' + + - name: Publish to Test PyPI + if: github.event.inputs.test_pypi == 'true' + uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: https://test.pypi.org/legacy/ + attestations: true + + - name: Publish to PyPI + if: github.event.inputs.test_pypi != 'true' + uses: pypa/gh-action-pypi-publish@release/v1 + with: + attestations: true + + - name: Create release with hashes + if: github.event_name == 'release' + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + # Upload hash files to the release + gh release upload ${{ github.event.release.tag_name }} \ + dist/SHA256SUMS \ + dist/SHA512SUMS \ + cognee-sbom.json \ + --clobber + + - name: Security notice + run: | + echo "::notice::Package published successfully with security attestations" + echo "::notice::Checksums and SBOM uploaded to release assets" + echo "::notice::Users can verify package integrity using the provided checksums" \ No newline at end of file diff --git a/.github/workflows/security_verification.yml b/.github/workflows/security_verification.yml new file mode 100644 index 000000000..bb85daa98 --- /dev/null +++ b/.github/workflows/security_verification.yml @@ -0,0 +1,290 @@ +name: Security Verification + +on: + push: + branches: [ main, dev ] + pull_request: + branches: [ main, dev ] + schedule: + - cron: '0 2 * * 0' # Weekly security scan on Sundays at 2 AM UTC + workflow_dispatch: + +permissions: + contents: read + security-events: write + actions: read + +jobs: + dependency-scan: + name: Dependency Vulnerability Scan + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install uv + uv sync --dev + + - name: Run Trivy vulnerability scanner + uses: aquasecurity/trivy-action@master + with: + scan-type: 'fs' + scan-ref: '.' + format: 'sarif' + output: 'trivy-results.sarif' + + - name: Upload Trivy scan results to GitHub Security tab + uses: github/codeql-action/upload-sarif@v3 + if: always() + with: + sarif_file: 'trivy-results.sarif' + + - name: Run pip-audit for Python vulnerabilities + run: | + pip install pip-audit + pip-audit --format=json --output=pip-audit-results.json || true + + - name: Check for critical vulnerabilities + run: | + python -c " + import json + import sys + try: + with open('pip-audit-results.json', 'r') as f: + data = json.load(f) + vulns = data.get('vulnerabilities', []) + critical_vulns = [v for v in vulns if v.get('aliases', []) and any('CVE' in alias for alias in v['aliases'])] + if critical_vulns: + print('CRITICAL VULNERABILITIES FOUND:') + for vuln in critical_vulns: + print(f' - {vuln.get(\"id\", \"Unknown\")} in {vuln.get(\"package\", \"Unknown\")}') + sys.exit(1) + except (FileNotFoundError, json.JSONDecodeError): + print('No vulnerabilities file found or invalid format') + pass + " + + - name: Upload vulnerability reports + uses: actions/upload-artifact@v4 + with: + name: vulnerability-reports + path: | + trivy-results.sarif + pip-audit-results.json + + code-quality-scan: + name: Code Quality & Security Scan + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install analysis tools + run: | + python -m pip install --upgrade pip + pip install bandit[toml] semgrep safety + + - name: Run Bandit security linter + run: | + bandit -r cognee/ -f json -o bandit-report.json || true + bandit -r cognee/ -f txt || true + + - name: Run Semgrep security analysis + run: | + semgrep --config=auto --json --output=semgrep-results.json cognee/ || true + + - name: Run Safety check + run: | + safety check --json --output safety-results.json || true + + - name: Upload security scan results + uses: actions/upload-artifact@v4 + with: + name: security-analysis + path: | + bandit-report.json + semgrep-results.json + safety-results.json + + package-integrity: + name: Package Integrity & Signing + runs-on: ubuntu-latest + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install build twine hatchling + + - name: Build package + run: | + python -m build + + - name: Generate package hashes + run: | + cd dist + sha256sum * > SHA256SUMS + sha512sum * > SHA512SUMS + md5sum * > MD5SUMS + echo "Generated checksums:" + cat SHA256SUMS + + - name: Import GPG key + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + env: + GPG_PRIVATE_KEY: ${{ secrets.GPG_PRIVATE_KEY }} + GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} + run: | + if [ -n "$GPG_PRIVATE_KEY" ]; then + echo "$GPG_PRIVATE_KEY" | gpg --batch --import + echo "GPG key imported successfully" + # List imported keys for verification + gpg --list-secret-keys --keyid-format LONG + else + echo "GPG_PRIVATE_KEY not set, skipping GPG signing" + fi + + - name: Sign packages with GPG + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + env: + GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} + run: | + if [ -n "$GPG_PASSPHRASE" ]; then + cd dist + for file in *; do + if [ -f "$file" ]; then + echo "Signing $file..." + gpg --batch --yes --passphrase "$GPG_PASSPHRASE" --detach-sign --armor "$file" + echo "Created signature: $file.asc" + fi + done + # Sign the checksum files + gpg --batch --yes --passphrase "$GPG_PASSPHRASE" --detach-sign --armor SHA256SUMS + gpg --batch --yes --passphrase "$GPG_PASSPHRASE" --detach-sign --armor SHA512SUMS + echo "All files signed successfully" + else + echo "GPG_PASSPHRASE not set, skipping signing" + fi + + - name: Verify signatures + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + run: | + cd dist + for sig_file in *.asc; do + if [ -f "$sig_file" ]; then + echo "Verifying signature: $sig_file" + gpg --verify "$sig_file" + fi + done + + - name: Upload signed packages + uses: actions/upload-artifact@v4 + with: + name: signed-packages + path: | + dist/ + retention-days: 30 + + security-policy-check: + name: Security Policy Compliance + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Check for security policy files + run: | + echo "Checking for security policy files..." + + # Check for SECURITY.md + if [ -f "SECURITY.md" ]; then + echo "✓ SECURITY.md found" + else + echo "✗ SECURITY.md not found" + exit 1 + fi + + # Check for CODE_OF_CONDUCT.md + if [ -f "CODE_OF_CONDUCT.md" ]; then + echo "✓ CODE_OF_CONDUCT.md found" + else + echo "✗ CODE_OF_CONDUCT.md not found" + exit 1 + fi + + # Check for LICENSE file + if [ -f "LICENSE" ] || [ -f "LICENSE.md" ] || [ -f "LICENSE.txt" ]; then + echo "✓ LICENSE file found" + else + echo "✗ LICENSE file not found" + exit 1 + fi + + - name: Validate Python dependencies + run: | + python -m pip install --upgrade pip + pip install uv + uv sync --dev + + # Check for pinned dependencies in production + echo "Checking for properly pinned dependencies..." + python -c " + import tomllib + with open('pyproject.toml', 'rb') as f: + data = tomllib.load(f) + + deps = data.get('project', {}).get('dependencies', []) + unpinned = [] + for dep in deps: + if '>=' in dep and '<' not in dep: + unpinned.append(dep) + + if unpinned: + print('WARNING: Unpinned dependencies found:') + for dep in unpinned: + print(f' - {dep}') + else: + print('✓ All dependencies properly version-constrained') + " + + - name: Check for secrets in code + run: | + pip install detect-secrets + detect-secrets scan --all-files --baseline .secrets.baseline || true + + # Basic regex checks for common secrets + echo "Checking for potential secrets..." + if grep -r "password\s*=" . --include="*.py" --include="*.yml" --include="*.yaml" | grep -v ".git" | grep -v "example" | grep -v "test"; then + echo "WARNING: Potential hardcoded passwords found" + fi + + if grep -r "api_key\s*=" . --include="*.py" --include="*.yml" --include="*.yaml" | grep -v ".git" | grep -v "example" | grep -v "test"; then + echo "WARNING: Potential hardcoded API keys found" + fi \ No newline at end of file diff --git a/cognee/tests/test_knowledge_graph_quality.py b/cognee/tests/test_knowledge_graph_quality.py index b9b530969..adb41cdd7 100644 --- a/cognee/tests/test_knowledge_graph_quality.py +++ b/cognee/tests/test_knowledge_graph_quality.py @@ -12,40 +12,21 @@ logger = get_logger() async def test_knowledge_graph_quality_with_gpt4o(): """ Test that verifies all main concepts and entities from a specific document are found - in the knowledge graph using GPT-4o model for high-quality entity extraction. + in the knowledge graph using the configured LLM model for entity extraction. This test addresses the issue where HotPotQA questions may not reflect diminishing quality of knowledge graph creation after data model changes. - """ - # Configure model with fallback for better availability - preferred_models = ["gpt-4o", "gpt-4o-mini", "gpt-4-turbo", "gpt-3.5-turbo"] - selected_model = None + The model is configured via the LLM_MODEL environment variable. + """ # Ensure we have API key if not os.environ.get("LLM_API_KEY"): raise ValueError("LLM_API_KEY must be set for this test") - # Try to find an available model by testing actual availability - from cognee.infrastructure.llm.utils import test_llm_connection - - for model in preferred_models: - try: - os.environ["LLM_MODEL"] = model - cognee.config.set_llm_model(model) - - # Test the model availability - await test_llm_connection() - - selected_model = model - print(f"Successfully using model: {model}") - break - except Exception as e: - print(f"Model {model} not available: {e}") - continue - - if not selected_model: - raise ValueError("No suitable model available from: " + ", ".join(preferred_models)) + # Get model from environment variable + current_model = os.environ.get("LLM_MODEL", "gpt-4o") + print(f"Using model from environment: {current_model}") # Set up test directories data_directory_path = str( @@ -253,17 +234,17 @@ async def test_knowledge_graph_quality_with_gpt4o(): print("QUALITY ASSESSMENT:") print("-" * 40) - print(f"Model used: {selected_model}") + print(f"Model used: {current_model}") print() # Adjust quality thresholds based on model capability - if selected_model == "gpt-4o": + if current_model == "gpt-4o": min_entity_coverage = 0.70 # At least 70% of entities should be found min_concept_coverage = 0.60 # At least 60% of concepts should be found - elif selected_model == "gpt-4o-mini": + elif current_model == "gpt-4o-mini": min_entity_coverage = 0.65 # Slightly lower for mini model min_concept_coverage = 0.55 # Slightly lower for mini model - elif selected_model == "gpt-4-turbo": + elif current_model == "gpt-4-turbo": min_entity_coverage = 0.68 # Good performance expected min_concept_coverage = 0.58 # Good performance expected else: # gpt-3.5-turbo or other models diff --git a/scripts/verify_package.py b/scripts/verify_package.py new file mode 100644 index 000000000..dacbc41c1 --- /dev/null +++ b/scripts/verify_package.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +""" +Cognee Package Verification Script + +This script helps users verify the integrity and authenticity of Cognee packages +by checking hashes, GPG signatures, and package metadata. + +Usage: + python verify_package.py [package_file] [--check-all] [--verbose] + +Examples: + python verify_package.py cognee-0.2.1.tar.gz + python verify_package.py --check-all --verbose + python verify_package.py cognee-0.2.1-py3-none-any.whl --verify-signature +""" + +import os +import sys +import hashlib +import json +import argparse +import subprocess +import tempfile +import urllib.request +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import zipfile +import tarfile + + +class PackageVerifier: + """Handles package verification operations.""" + + def __init__(self, verbose: bool = False): + self.verbose = verbose + self.github_api_base = "https://api.github.com/repos/topoteretes/cognee" + self.github_releases_base = "https://github.com/topoteretes/cognee/releases" + + def log(self, message: str, level: str = "INFO"): + """Log messages with different levels.""" + if self.verbose or level in ["ERROR", "WARNING"]: + print(f"[{level}] {message}") + + def calculate_hash(self, file_path: str, algorithm: str = "sha256") -> str: + """Calculate hash of a file.""" + hash_obj = hashlib.new(algorithm) + + try: + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_obj.update(chunk) + return hash_obj.hexdigest() + except Exception as e: + self.log(f"Error calculating {algorithm} hash: {e}", "ERROR") + return "" + + def verify_hash(self, file_path: str, expected_hash: str, algorithm: str = "sha256") -> bool: + """Verify file hash against expected value.""" + calculated_hash = self.calculate_hash(file_path, algorithm) + + if not calculated_hash: + return False + + match = calculated_hash.lower() == expected_hash.lower() + + if match: + self.log(f"✓ {algorithm.upper()} hash verification PASSED", "INFO") + else: + self.log(f"✗ {algorithm.upper()} hash verification FAILED", "ERROR") + self.log(f" Expected: {expected_hash}", "ERROR") + self.log(f" Calculated: {calculated_hash}", "ERROR") + + return match + + def verify_gpg_signature(self, file_path: str, signature_path: str) -> bool: + """Verify GPG signature of a file.""" + try: + # Check if gpg is available + subprocess.run( + ["gpg", "--version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except (subprocess.CalledProcessError, FileNotFoundError): + self.log("GPG not found. Please install GPG to verify signatures.", "WARNING") + return False + + if not os.path.exists(signature_path): + self.log(f"Signature file not found: {signature_path}", "WARNING") + return False + + try: + result = subprocess.run( + ["gpg", "--verify", signature_path, file_path], capture_output=True, text=True + ) + + if result.returncode == 0: + self.log("✓ GPG signature verification PASSED", "INFO") + return True + else: + self.log("✗ GPG signature verification FAILED", "ERROR") + self.log(f"GPG error: {result.stderr}", "ERROR") + return False + except Exception as e: + self.log(f"Error verifying GPG signature: {e}", "ERROR") + return False + + def get_latest_release_info(self) -> Optional[Dict]: + """Get latest release information from GitHub API.""" + try: + url = f"{self.github_api_base}/releases/latest" + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + except Exception as e: + self.log(f"Error fetching release info: {e}", "ERROR") + return None + + def download_checksum_file( + self, release_info: Dict, checksum_type: str = "SHA256SUMS" + ) -> Optional[str]: + """Download checksum file from GitHub release.""" + for asset in release_info.get("assets", []): + if asset["name"] == checksum_type: + try: + with tempfile.NamedTemporaryFile( + mode="w+", delete=False, suffix=f".{checksum_type}" + ) as tmp: + with urllib.request.urlopen(asset["browser_download_url"]) as response: + tmp.write(response.read().decode("utf-8")) + return tmp.name + except Exception as e: + self.log(f"Error downloading {checksum_type}: {e}", "ERROR") + return None + return None + + def parse_checksum_file(self, checksum_file: str) -> Dict[str, str]: + """Parse checksum file and return filename -> hash mapping.""" + checksums = {} + try: + with open(checksum_file, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + parts = line.split() + if len(parts) >= 2: + hash_value = parts[0] + filename = parts[1].lstrip("*") # Remove binary mode indicator + checksums[filename] = hash_value + except Exception as e: + self.log(f"Error parsing checksum file: {e}", "ERROR") + return checksums + + def verify_package_metadata(self, package_path: str) -> bool: + """Verify package metadata and structure.""" + self.log(f"Verifying package metadata for: {package_path}") + + if package_path.endswith(".whl"): + return self._verify_wheel_metadata(package_path) + elif package_path.endswith(".tar.gz"): + return self._verify_tarball_metadata(package_path) + else: + self.log(f"Unsupported package format: {package_path}", "WARNING") + return False + + def _verify_wheel_metadata(self, wheel_path: str) -> bool: + """Verify wheel package metadata.""" + try: + with zipfile.ZipFile(wheel_path, "r") as wheel: + # Check for required metadata files + required_files = ["METADATA", "WHEEL"] + metadata_files = [ + f for f in wheel.namelist() if any(req in f for req in required_files) + ] + + if not metadata_files: + self.log("✗ Required metadata files not found in wheel", "ERROR") + return False + + # Read and validate METADATA + metadata_content = None + for file in wheel.namelist(): + if file.endswith("METADATA"): + metadata_content = wheel.read(file).decode("utf-8") + break + + if metadata_content: + if "Name: cognee" in metadata_content: + self.log("✓ Package metadata verification PASSED", "INFO") + return True + else: + self.log("✗ Package name verification FAILED", "ERROR") + return False + + except Exception as e: + self.log(f"Error verifying wheel metadata: {e}", "ERROR") + return False + + return False + + def _verify_tarball_metadata(self, tarball_path: str) -> bool: + """Verify tarball package metadata.""" + try: + with tarfile.open(tarball_path, "r:gz") as tar: + # Look for PKG-INFO or pyproject.toml + metadata_files = [ + f for f in tar.getnames() if "PKG-INFO" in f or "pyproject.toml" in f + ] + + if not metadata_files: + self.log("✗ No metadata files found in tarball", "ERROR") + return False + + # Check PKG-INFO if available + for file in metadata_files: + if "PKG-INFO" in file: + member = tar.getmember(file) + content = tar.extractfile(member).read().decode("utf-8") + if "Name: cognee" in content: + self.log("✓ Package metadata verification PASSED", "INFO") + return True + + except Exception as e: + self.log(f"Error verifying tarball metadata: {e}", "ERROR") + return False + + return False + + def verify_package(self, package_path: str, verify_signature: bool = False) -> bool: + """Comprehensive package verification.""" + if not os.path.exists(package_path): + self.log(f"Package file not found: {package_path}", "ERROR") + return False + + self.log(f"Starting verification of: {package_path}") + verification_results = [] + + # 1. Verify package metadata + metadata_ok = self.verify_package_metadata(package_path) + verification_results.append(metadata_ok) + + # 2. Get release info and checksums + release_info = self.get_latest_release_info() + if not release_info: + self.log("Could not fetch release information", "WARNING") + return all(verification_results) + + # 3. Download and verify checksums + checksum_file = self.download_checksum_file(release_info, "SHA256SUMS") + if checksum_file: + checksums = self.parse_checksum_file(checksum_file) + filename = os.path.basename(package_path) + + if filename in checksums: + hash_ok = self.verify_hash(package_path, checksums[filename], "sha256") + verification_results.append(hash_ok) + else: + self.log(f"No checksum found for {filename}", "WARNING") + + os.unlink(checksum_file) # Clean up temp file + + # 4. Verify GPG signature if requested + if verify_signature: + signature_path = f"{package_path}.asc" + if os.path.exists(signature_path): + sig_ok = self.verify_gpg_signature(package_path, signature_path) + verification_results.append(sig_ok) + else: + self.log(f"Signature file not found: {signature_path}", "WARNING") + + # Overall result + all_passed = all(verification_results) + if all_passed: + self.log("🎉 Package verification PASSED", "INFO") + else: + self.log("❌ Package verification FAILED", "ERROR") + + return all_passed + + def verify_all_packages(self, directory: str = ".", verify_signature: bool = False) -> bool: + """Verify all Cognee packages in a directory.""" + package_files = [] + + for file in os.listdir(directory): + if file.startswith("cognee") and (file.endswith(".whl") or file.endswith(".tar.gz")): + package_files.append(os.path.join(directory, file)) + + if not package_files: + self.log("No Cognee packages found in directory", "WARNING") + return False + + all_results = [] + for package_file in package_files: + self.log(f"\n{'=' * 60}") + result = self.verify_package(package_file, verify_signature) + all_results.append(result) + + return all(all_results) + + +def main(): + parser = argparse.ArgumentParser(description="Verify Cognee package integrity and authenticity") + parser.add_argument("package", nargs="?", help="Path to package file to verify") + parser.add_argument( + "--check-all", action="store_true", help="Verify all packages in current directory" + ) + parser.add_argument( + "--verify-signature", action="store_true", help="Also verify GPG signatures" + ) + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output") + + args = parser.parse_args() + + if not args.package and not args.check_all: + parser.print_help() + sys.exit(1) + + verifier = PackageVerifier(verbose=args.verbose) + + try: + if args.check_all: + success = verifier.verify_all_packages(".", args.verify_signature) + else: + success = verifier.verify_package(args.package, args.verify_signature) + + sys.exit(0 if success else 1) + + except KeyboardInterrupt: + print("\nVerification interrupted by user") + sys.exit(1) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()