feat: Add proxy for analytics

Added proxy usage with vercel hosting for telemetry and analytics

Feature COG-597
This commit is contained in:
Igor Ilic 2024-11-15 15:05:46 +01:00
parent b858f0b06b
commit d90f5fe7c1

View file

@ -1,5 +1,6 @@
""" This module contains utility functions for the cognee. """
import os
import requests
from datetime import datetime, timezone
import graphistry
import networkx as nx
@ -8,7 +9,6 @@ import pandas as pd
import matplotlib.pyplot as plt
import tiktoken
import nltk
from posthog import Posthog
from cognee.base_config import get_base_config
from cognee.infrastructure.databases.graph import get_graph_engine
@ -16,6 +16,9 @@ from cognee.infrastructure.databases.graph import get_graph_engine
from uuid import uuid4
import pathlib
# Analytics Proxy Url, currently hosted by Vercel
vercel_url = "https://proxyanalytics.vercel.app"
def get_anonymous_id():
"""Creates or reads a anonymous user id"""
home_dir = str(pathlib.Path(pathlib.Path(__file__).parent.parent.parent.resolve()))
@ -40,25 +43,23 @@ def send_telemetry(event_name: str, user_id, additional_properties: dict = {}):
if env in ["test", "dev"]:
return
posthog = Posthog(
project_api_key = "phc_UB1YVere1KtJg1MFxAo6ABfpkwN3OxCvGNDkMTjvH0",
host = "https://eu.i.posthog.com"
)
current_time = datetime.now(timezone.utc)
properties = {
"time": current_time.strftime("%m/%d/%Y"),
"user_id": user_id,
**additional_properties,
payload = {
"anonymous_id": str(get_anonymous_id()),
"event_name": event_name,
"user_properties": {
"user_id": str(user_id),
},
"properties": {
"time": current_time.strftime("%m/%d/%Y"),
**additional_properties
},
}
# Needed to forward properties to PostHog along with id
posthog.identify(get_anonymous_id(), properties)
response = requests.post(vercel_url, json=payload)
try:
posthog.capture(get_anonymous_id(), event_name, properties)
except Exception as e:
print("ERROR sending telemetric data to Posthog. See exception: %s", e)
if response.status_code != 200:
print(f"Error sending telemetry through proxy: {response.status_code}")
def num_tokens_from_string(string: str, encoding_name: str) -> int:
"""Returns the number of tokens in a text string."""