cognee/scripts/verify_package.py
2025-07-16 16:34:34 +02:00

337 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Cognee Package Verification Script
This script helps users verify the integrity and authenticity of Cognee packages
by checking hashes, GPG signatures, and package metadata.
Usage:
python verify_package.py [package_file] [--check-all] [--verbose]
Examples:
python verify_package.py cognee-0.2.1.tar.gz
python verify_package.py --check-all --verbose
python verify_package.py cognee-0.2.1-py3-none-any.whl --verify-signature
"""
import os
import sys
import hashlib
import json
import argparse
import subprocess
import tempfile
import urllib.request
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import zipfile
import tarfile
class PackageVerifier:
"""Handles package verification operations."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.github_api_base = "https://api.github.com/repos/topoteretes/cognee"
self.github_releases_base = "https://github.com/topoteretes/cognee/releases"
def log(self, message: str, level: str = "INFO"):
"""Log messages with different levels."""
if self.verbose or level in ["ERROR", "WARNING"]:
print(f"[{level}] {message}")
def calculate_hash(self, file_path: str, algorithm: str = "sha256") -> str:
"""Calculate hash of a file."""
hash_obj = hashlib.new(algorithm)
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except Exception as e:
self.log(f"Error calculating {algorithm} hash: {e}", "ERROR")
return ""
def verify_hash(self, file_path: str, expected_hash: str, algorithm: str = "sha256") -> bool:
"""Verify file hash against expected value."""
calculated_hash = self.calculate_hash(file_path, algorithm)
if not calculated_hash:
return False
match = calculated_hash.lower() == expected_hash.lower()
if match:
self.log(f"{algorithm.upper()} hash verification PASSED", "INFO")
else:
self.log(f"{algorithm.upper()} hash verification FAILED", "ERROR")
self.log(f" Expected: {expected_hash}", "ERROR")
self.log(f" Calculated: {calculated_hash}", "ERROR")
return match
def verify_gpg_signature(self, file_path: str, signature_path: str) -> bool:
"""Verify GPG signature of a file."""
try:
# Check if gpg is available
subprocess.run(
["gpg", "--version"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except (subprocess.CalledProcessError, FileNotFoundError):
self.log("GPG not found. Please install GPG to verify signatures.", "WARNING")
return False
if not os.path.exists(signature_path):
self.log(f"Signature file not found: {signature_path}", "WARNING")
return False
try:
result = subprocess.run(
["gpg", "--verify", signature_path, file_path], capture_output=True, text=True
)
if result.returncode == 0:
self.log("✓ GPG signature verification PASSED", "INFO")
return True
else:
self.log("✗ GPG signature verification FAILED", "ERROR")
self.log(f"GPG error: {result.stderr}", "ERROR")
return False
except Exception as e:
self.log(f"Error verifying GPG signature: {e}", "ERROR")
return False
def get_latest_release_info(self) -> Optional[Dict]:
"""Get latest release information from GitHub API."""
try:
url = f"{self.github_api_base}/releases/latest"
with urllib.request.urlopen(url) as response:
return json.loads(response.read())
except Exception as e:
self.log(f"Error fetching release info: {e}", "ERROR")
return None
def download_checksum_file(
self, release_info: Dict, checksum_type: str = "SHA256SUMS"
) -> Optional[str]:
"""Download checksum file from GitHub release."""
for asset in release_info.get("assets", []):
if asset["name"] == checksum_type:
try:
with tempfile.NamedTemporaryFile(
mode="w+", delete=False, suffix=f".{checksum_type}"
) as tmp:
with urllib.request.urlopen(asset["browser_download_url"]) as response:
tmp.write(response.read().decode("utf-8"))
return tmp.name
except Exception as e:
self.log(f"Error downloading {checksum_type}: {e}", "ERROR")
return None
return None
def parse_checksum_file(self, checksum_file: str) -> Dict[str, str]:
"""Parse checksum file and return filename -> hash mapping."""
checksums = {}
try:
with open(checksum_file, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = line.split()
if len(parts) >= 2:
hash_value = parts[0]
filename = parts[1].lstrip("*") # Remove binary mode indicator
checksums[filename] = hash_value
except Exception as e:
self.log(f"Error parsing checksum file: {e}", "ERROR")
return checksums
def verify_package_metadata(self, package_path: str) -> bool:
"""Verify package metadata and structure."""
self.log(f"Verifying package metadata for: {package_path}")
if package_path.endswith(".whl"):
return self._verify_wheel_metadata(package_path)
elif package_path.endswith(".tar.gz"):
return self._verify_tarball_metadata(package_path)
else:
self.log(f"Unsupported package format: {package_path}", "WARNING")
return False
def _verify_wheel_metadata(self, wheel_path: str) -> bool:
"""Verify wheel package metadata."""
try:
with zipfile.ZipFile(wheel_path, "r") as wheel:
# Check for required metadata files
required_files = ["METADATA", "WHEEL"]
metadata_files = [
f for f in wheel.namelist() if any(req in f for req in required_files)
]
if not metadata_files:
self.log("✗ Required metadata files not found in wheel", "ERROR")
return False
# Read and validate METADATA
metadata_content = None
for file in wheel.namelist():
if file.endswith("METADATA"):
metadata_content = wheel.read(file).decode("utf-8")
break
if metadata_content:
if "Name: cognee" in metadata_content:
self.log("✓ Package metadata verification PASSED", "INFO")
return True
else:
self.log("✗ Package name verification FAILED", "ERROR")
return False
except Exception as e:
self.log(f"Error verifying wheel metadata: {e}", "ERROR")
return False
return False
def _verify_tarball_metadata(self, tarball_path: str) -> bool:
"""Verify tarball package metadata."""
try:
with tarfile.open(tarball_path, "r:gz") as tar:
# Look for PKG-INFO or pyproject.toml
metadata_files = [
f for f in tar.getnames() if "PKG-INFO" in f or "pyproject.toml" in f
]
if not metadata_files:
self.log("✗ No metadata files found in tarball", "ERROR")
return False
# Check PKG-INFO if available
for file in metadata_files:
if "PKG-INFO" in file:
member = tar.getmember(file)
content = tar.extractfile(member).read().decode("utf-8")
if "Name: cognee" in content:
self.log("✓ Package metadata verification PASSED", "INFO")
return True
except Exception as e:
self.log(f"Error verifying tarball metadata: {e}", "ERROR")
return False
return False
def verify_package(self, package_path: str, verify_signature: bool = False) -> bool:
"""Comprehensive package verification."""
if not os.path.exists(package_path):
self.log(f"Package file not found: {package_path}", "ERROR")
return False
self.log(f"Starting verification of: {package_path}")
verification_results = []
# 1. Verify package metadata
metadata_ok = self.verify_package_metadata(package_path)
verification_results.append(metadata_ok)
# 2. Get release info and checksums
release_info = self.get_latest_release_info()
if not release_info:
self.log("Could not fetch release information", "WARNING")
return all(verification_results)
# 3. Download and verify checksums
checksum_file = self.download_checksum_file(release_info, "SHA256SUMS")
if checksum_file:
checksums = self.parse_checksum_file(checksum_file)
filename = os.path.basename(package_path)
if filename in checksums:
hash_ok = self.verify_hash(package_path, checksums[filename], "sha256")
verification_results.append(hash_ok)
else:
self.log(f"No checksum found for {filename}", "WARNING")
os.unlink(checksum_file) # Clean up temp file
# 4. Verify GPG signature if requested
if verify_signature:
signature_path = f"{package_path}.asc"
if os.path.exists(signature_path):
sig_ok = self.verify_gpg_signature(package_path, signature_path)
verification_results.append(sig_ok)
else:
self.log(f"Signature file not found: {signature_path}", "WARNING")
# Overall result
all_passed = all(verification_results)
if all_passed:
self.log("🎉 Package verification PASSED", "INFO")
else:
self.log("❌ Package verification FAILED", "ERROR")
return all_passed
def verify_all_packages(self, directory: str = ".", verify_signature: bool = False) -> bool:
"""Verify all Cognee packages in a directory."""
package_files = []
for file in os.listdir(directory):
if file.startswith("cognee") and (file.endswith(".whl") or file.endswith(".tar.gz")):
package_files.append(os.path.join(directory, file))
if not package_files:
self.log("No Cognee packages found in directory", "WARNING")
return False
all_results = []
for package_file in package_files:
self.log(f"\n{'=' * 60}")
result = self.verify_package(package_file, verify_signature)
all_results.append(result)
return all(all_results)
def main():
parser = argparse.ArgumentParser(description="Verify Cognee package integrity and authenticity")
parser.add_argument("package", nargs="?", help="Path to package file to verify")
parser.add_argument(
"--check-all", action="store_true", help="Verify all packages in current directory"
)
parser.add_argument(
"--verify-signature", action="store_true", help="Also verify GPG signatures"
)
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output")
args = parser.parse_args()
if not args.package and not args.check_all:
parser.print_help()
sys.exit(1)
verifier = PackageVerifier(verbose=args.verbose)
try:
if args.check_all:
success = verifier.verify_all_packages(".", args.verify_signature)
else:
success = verifier.verify_package(args.package, args.verify_signature)
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\nVerification interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()