import psycopg2
import time
import json
from getpass import getpass
import tweepy
connection = psycopg2.connect("dbname=kv user=root password=toor host=localhost")
cursor = connection.cursor()
def reset_cursor():
cursor = connection.cursor()
def savable_pairs(twt):
ret = []
for i in ["text", "favorited", "retweeted", "text"]:
ret.append([i, unicode(twt[i])])
ts = '%014d' % long(time.mktime( time.strptime(twt['created_at'], '%a %b %d %H:%M:%S +0000 %Y') ))
ret.append(['created_at', ts])
if len(twt["entities"]["urls"]):
ret.append(["has_urls", ':t'])
if len(twt["entities"]["hashtags"]):
ret.append(["has_hashtags", ':t'])
if len(twt["entities"]["user_mentions"]):
ret.append(["has_user_mentions", ':t'])
ret.append(["entities", json.dumps(twt["entities"])])
ret.append(["user", json.dumps(twt["user"])])
return ret
def save_db(twt):
savable_twt = savable_pairs(twt)
try:
cursor.execute("INSERT INTO my_store(id, doc) VALUES(%s, hstore(%s))", ("%d-%d" % (twt["id"], time.time()), savable_twt))
connection.commit()
except Exception as e:
connection.rollback()
reset_cursor()
print "!!!!!!!Unable to save", e
return
if cursor.lastrowid == None:
print "!!!!!!!Unable to save"
class StreamWatcherListener(tweepy.StreamListener):
def on_data(self, data):
print "========"
print data
twt = json.loads(data)
if isinstance(twt, dict) and not twt.has_key("delete"):
save_db(twt)
return True
def on_error(self, status_code):
print 'Error code = %s' % status_code
return True
def on_timeout(self):
print 'Snoozing Zzzzzz'
username = raw_input('Twitter username: ')
password = getpass('Twitter password: ')
auth = tweepy.auth.BasicAuthHandler(username, password)
stream = tweepy.Stream(auth, StreamWatcherListener(), secure=True)
stream.sample()