cognee/tools/daily_twitter_stats.py
2024-09-29 21:22:55 +02:00

66 lines
2 KiB
Python

import tweepy
import requests
import json
from datetime import datetime
# Twitter API credentials from GitHub Secrets
API_KEY = '${{ secrets.TWITTER_API_KEY }}'
API_SECRET = '${{ secrets.TWITTER_API_SECRET }}'
ACCESS_TOKEN = '${{ secrets.TWITTER_ACCESS_TOKEN }}'
ACCESS_SECRET = '${{ secrets.TWITTER_ACCESS_SECRET }}'
USERNAME = '${{ secrets.TWITTER_USERNAME }}'
SEGMENT_WRITE_KEY = '${{ secrets.SEGMENT_WRITE_KEY }}'
# Initialize Tweepy API
auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
twitter_api = tweepy.API(auth)
# Segment endpoint
SEGMENT_ENDPOINT = 'https://api.segment.io/v1/track'
def get_follower_count(username):
try:
user = twitter_api.get_user(screen_name=username)
return user.followers_count
except tweepy.TweepError as e:
print(f'Error fetching follower count: {e}')
return None
def send_data_to_segment(username, follower_count):
current_time = datetime.now().isoformat()
data = {
'userId': username,
'event': 'Follower Count Update',
'properties': {
'username': username,
'follower_count': follower_count,
'timestamp': current_time
},
'timestamp': current_time
}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic {SEGMENT_WRITE_KEY.encode("utf-8").decode("utf-8")}'
}
try:
response = requests.post(SEGMENT_ENDPOINT, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print(f'Successfully sent data to Segment for {username}')
else:
print(f'Failed to send data to Segment. Status code: {response.status_code}, Response: {response.text}')
except requests.exceptions.RequestException as e:
print(f'Error sending data to Segment: {e}')
follower_count = get_follower_count(USERNAME)
if follower_count is not None:
send_data_to_segment(USERNAME, follower_count)
else:
print('Failed to retrieve follower count.')