-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_sync.py
More file actions
73 lines (60 loc) · 2.26 KB
/
batch_sync.py
File metadata and controls
73 lines (60 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from config import *
from db import *
from listener import *
from utils import *
import argparse
import datetime
import sys
import time
import tweepy
from loguru import logger
logger.remove()
logger.add(sys.stdout, level="DEBUG", colorize=True,
format="<green>{time:YYYY-MM-DD at HH:mm:ss}</green> <blue>|{level: ^8}|</blue> <cyan>{module: ^10}:{function: ^15}:{line: >3}</cyan> - <level>{message}</level>", backtrace=True)
auth = tweepy.OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
def parse_arguments():
parser = argparse.ArgumentParser(description="Pipeline Orchestrator")
parser.add_argument(
"-d",
"--days-to-sync",
type=int,
required=False,
default=1,
help="Days to sync.",
)
parser.add_argument(
"-n",
"--number-of-tweets",
type=int,
required=False,
default=200,
help="Number of tweets to sync.",
)
return parser.parse_args()
def limit_handled(cursor):
while True:
try:
yield next(cursor)
except tweepy.RateLimitError:
logger.error(f"Rate Limit is reached. Sleeping for 15 minutes.")
time.sleep(15 * 60)
def batch_sync(days_to_sync: int, number_of_tweets):
today = datetime.date.today()
start_time= today - datetime.timedelta(days=days_to_sync)
logger.info(f"Starting batch syncing from {start_time} to {today}")
search_query = f"from:vaxhunterscan since: {start_time} until: {today} exclude:replies"
search_query = f"from:vaxhunterscan since: {start_time} until: {today}"
search_query = f"from:vaxhunterscan since: {start_time}"
search_query = f"from:vaxhunterscan"
cursor = tweepy.Cursor(api.search, q=search_query, tweet_mode='extended', lang='en')
tweets = []
# for tweet_obj in limit_handled(cursor.items()):
for tweet_obj in cursor.items(number_of_tweets):
process_tweet(tweet_obj)
tweets.append(tweet_obj)
logger.info(f"Finished syncing {len(tweets)} tweets")
if __name__ == '__main__':
args = parse_arguments()
batch_sync(args.days_to_sync, args.number_of_tweets)