diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/DB/__init__.py b/DB/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/DB/generic.py b/DB/generic.py new file mode 100644 index 0000000..5074451 --- /dev/null +++ b/DB/generic.py @@ -0,0 +1,11 @@ +import logging +import os + +class DB: + def __init__(self): + self.name = "Generic DB Driver" + + def _WIPE(self): + self.close() + os.remove(self.filename) + self.open() diff --git a/DB/multi.py b/DB/multi.py new file mode 100644 index 0000000..c6b6afd --- /dev/null +++ b/DB/multi.py @@ -0,0 +1,38 @@ +from . import generic +import logging + +class Driver(generic.DB): + def __init__(self, databases = []): + self.name = "Multiple Dispatch DB Driver" + + if type(databases) == list: + self.dbs = databases + else: + self.dbs = [databases] + + logging.debug(self.dbs) + + def __getattribute__(self, name): + try: + return object.__getattribute__(self, name) + except AttributeError: + pass + + def wrapper(*args, **kwargs): + for d in self.dbs: + logging.debug(f"{d} -> {name}({args})") + fn = None + try: + fn = getattr(d, name) + except AttributeError: + logging.warn(f"{d} has no attribute {name}") + + if fn: fn(*args, **kwargs) + + return wrapper + + def getTweets(self): + return self.dbs[0].getTweets() + + def _WIPE(self): + return [d._WIPE() for d in self.dbs] diff --git a/DB/mysql.py b/DB/mysql.py new file mode 100644 index 0000000..08e5488 --- /dev/null +++ b/DB/mysql.py @@ -0,0 +1,94 @@ +import MySQLdb +import logging + +from . import generic + + +class MySQLDriver(generic.DB): + def __init__(self): + super(self) + self.name = "MySQL DB Driver" + self.db = MySQLdb.connect(host="", user="", passwd="", db="", charset="utf8") + + def getTweets(): + self.db.cursor() + return cur.execute( + """SELECT * \ + FROM Tweets \ + WHERE Deleted=0""" + ) + + def writeSuccess(path): + cur = self.db.cursor() + try: + cur.execute( + """UPDATE Tweets \ + SET Screenshot=1 \ + WHERE Tweet_Id=%s""", + [path], + ) + self.db.commit() + logging.info(f"Screenshot OK. Tweet id {path}") + except MySQLdb.Error as e: + try: + logging.error(("MySQL Error [%d]: %s" % (e.args[0], e.args[1]))) + except IndexError: + logging.error(("MySQL Error: %s" % str(e))) + + logging.error(("Error", e.args[0], e.args[1])) + logging.warning(("Warning:", path, "not saved to database")) + return True + + def markDeleted(path): + cur = self.db.cursor() + try: + cur.execute( + """UPDATE Tweets \ + SET Deleted=1 \ + WHERE Tweet_Id=%s""", + [path], + ) + self.db.commit() + logging.info(("Tweet marked as deleted ", path)) + except MySQLdb.Error as e: + try: + logging.error(("MySQL Error [%d]: %s" % (e.args[0], e.args[1]))) + except IndexError: + logging.error(("MySQL Error: %s" % str(e))) + + logging.error(("Error", e.args[0], e.args[1])) + logging.warning(("Warning:", path, "not saved to database")) + return True + + def getLogs(): + cur = self.db.cursor() + return cur.execute( + "SELECT Url, Tweet_Id FROM Tweets WHERE Screenshot=0 AND Deleted=0 " + ) + + def saveTweet(url, status): + (author, text, id_str) = (status.user.screen_name, status.text, status.id_str) + cur = db.cursor() + + cur.execute( + "CREATE TABLE IF NOT EXISTS Tweets(Id INT PRIMARY KEY AUTO_INCREMENT, \ + Author VARCHAR(255), \ + Text VARCHAR(255), \ + Url VARCHAR(255), \ + Tweet_Id VARCHAR(255), \ + Screenshot INT, \ + Deleted INT)" + ) + + try: + cur.execute( + """INSERT INTO Tweets(Author, Text, Url, Tweet_Id, Screenshot, Deleted) + VALUES (%s, %s, %s, %s, %s, %s)""", + (author, text, url, id_str, 0, 0), + ) + self.db.commit() + logging.info(("Wrote to database:", author, id_str)) + except MySQLdb.Error as e: + logging.error(("Error", e.args[0], e.args[1])) + self.db.rollback() + logging.error("ERROR writing database") diff --git a/DB/pynx.py b/DB/pynx.py new file mode 100644 index 0000000..83853bb --- /dev/null +++ b/DB/pynx.py @@ -0,0 +1,138 @@ +import networkx as nx +import unicodedata +import logging +import json +import re +import os + +from . import generic +from . import utils + +hashre = re.compile(r"(#\w+)") +userre = re.compile(r"(@\w+)") + + +def normalize(input_str): + return unicodedata.normalize("NFKD", input_str).encode("ASCII", "ignore").lower() + + +def add_node(G, node, attr={}): + try: + G[node]["weight"] += 1 + except KeyError: + G.add_node(node, weight=1) + + +def add_edge(G, n, p): + try: + G.edges[n, p]["weight"] += 1 + except KeyError: + G.add_edge(n, p, weight=1) + + +def add_tags(G, text): + tags = hashre.findall(text) + for i, t in enumerate(tags): + n = normalize(t) + add_node(G, n) + for u in tags[i:]: + u = normalize(u) + add_node(G, u) + add_edge(G, t, u) + return tags + +def add_users(G, text, status): + users = set(userre.findall(text)) + if status.in_reply_to_screen_name: + users.add("@%s" % status.in_reply_to_screen_name) + try: + users.append("@%s" % status.retweeted_status.user.screen_name) + except AttributeError: + pass + u = normalize("@%s" % status.user.screen_name) + add_node(G, u) + for v in users: + add_edge(G, u, normalize(v)) + + +class Driver(generic.DB): + def __init__(self, filename="graph.gexf"): + generic.DB.__init__(self) + + self.name = "NetworkX DB Driver" + + self.type = filename.split(".")[-1] or "gexf" + if self.type == 'pynx': # this is for test handeling + self.type = "gexf" + filename.replace('pynx', 'gexf') + + self.filename = filename + + self.open() + + def open(self): + self._user_graph = "user-%s" % self.filename + self._hash_graph = "hash-%s" % self.filename + self._twit_graph = "twit-%s" % self.filename + + self._write = getattr(nx, "write_%s" % self.type) + self._read = getattr(nx, "read_%s" % self.type) + + self.U = self._open_graph(self._user_graph) + self.H = self._open_graph(self._hash_graph) + self.T = self._open_graph(self._twit_graph) + + logging.info(f"graphs opened {self.U.nodes()} {self.H.nodes()} {self.T.nodes()}") + + def _WIPE(self): + self.close() + + os.remove(self._user_graph) + os.remove(self._hash_graph) + os.remove(self._twit_graph) + + self.open() + + def _open_graph(self, filename): + try: + return self._read(filename) + except IOError: + return nx.Graph() + + def getTweets(self): + return [n for n in self.U.nodes()] + +# def getAuthor(self, screen_name): +# u = normalize("@%s" % screen_name) +# return self.U.neighbors(u) + + def markDeleted(self, id): + nx.set_node_attributes(self.U, {id: {"deleted": True}}) + + def _write_all(self): + self._write(self.H, self._hash_graph) + self._write(self.U, self._user_graph) + self._write(self.T, self._twit_graph) + + def close(self): + self._write_all() + + def saveTweet(self, status): + text = utils.extract_text(status) + + add_tags(self.H, text) + add_users(self.U, text, status) + + logging.info(f"H, {self.H.nodes()}") + self._write_all() + + def saveAuthor(self, user): + u = normalize("@%s" % user.screen_name) + add_node(self.U, u) + nx.set_node_attributes(self.U, {u: {'id': user.id, 'created_at': user.created_at.isoformat()}}) + + self._write_all() + +if __name__ == "__main__": + G = nx.Graph() + add_users(G, "RT @test blah blah #gnu @other", {}) diff --git a/DB/sqlite.py b/DB/sqlite.py new file mode 100644 index 0000000..cda94be --- /dev/null +++ b/DB/sqlite.py @@ -0,0 +1,146 @@ +import sqlite3 +import json +import logging +import sys + +from . import utils +from . import generic + +VERSION = 1 + +def migrate_0_1(db): + db.execute( + """CREATE TABLE IF NOT EXISTS tweets (id INTEGER PRIMARY KEY, \ + screen_name VARCHAR(255), \ + text VARCHAR(1024), \ + date DATE, \ + link VARCHAR(255), \ + directed_to VARCHAR(255), \ + replies INTEGER, \ + retweets INTEGER, \ + favorites INTEGER, \ + geo VARCHAR(255), \ + mentions VARCHAR(1024), \ + hashtags VARCHAR(1024), \ + Screenshot BOOLEAN, \ + Deleted BOOLEAN)""") + + db.execute( + """CREATE TABLE IF NOT EXISTS authors (screen_name VARCHAR(255) PRIMARY KEY, \ + id INTEGER UNIQUE, \ + date DATE) + """) + + db.execute("PRAGMA user_version = 1") + +MIGRATIONS = [ + migrate_0_1 +] + +class Driver(generic.DB): + def __init__(self, filename="twitter.db"): + generic.DB.__init__(self) + self.filename = filename + self.open() + + def open(self): + self.db = sqlite3.connect(self.filename) + + self._migrate() + + def close(self): + self.db.close() + + def _migrate(self): + cur = self.db.cursor() + user_version = cur.execute( + """ + PRAGMA user_version + """ + ).fetchone() + version = user_version[0] if user_version else 0 + + if version != VERSION: + for i, m in enumerate(MIGRATIONS[version:VERSION]): + logging.info(f"running migration {i} -> {i+1} ({m})") + try: + m(self) + except Exception as e: + logging.critical(f"error {e} in migration {m}") + sys.exit(-2) + + def getTweets(self): + cur = self.db.cursor() + return cur.execute( + """SELECT * \ + FROM tweets \ + WHERE Deleted=0""" + ) + + def getAuthor(self, screen_name): + cur = self.db.cursor() + r = cur.execute( + """SELECT * FROM authors WHERE screen_name=?""", (screen_name,) + ).fetchone() + + if not r: raise KeyError(f"{screen_name} not found") + return r[0] + + def writeSuccess(self, id): + q = """UPDATE tweets \ + SET Screenshot=1 \ + WHERE id=?""" + if self.execute(q, (id,)): + logging.info(f"Screenshot OK. Tweet id {id}") + return True + logging.warning(f"{id} not marked as success") + return False + + def markDeleted(self, id): + q = """UPDATE tweets \ + SET Deleted=1 \ + WHERE id=?""" + if self.execute(q, (id,)): + logging.info(f"Tweet marked as deleted {id}") + return True + logging.warning(f"{id} not marked as deleted") + return False + + def getLogs(self,): + cur = self.db.cursor() + return cur.execute( + "SELECT link, id FROM tweets WHERE Screenshot=0 AND Deleted=0 " + ) + + def execute(self, q, args = []): + cur = self.db.cursor() + try: + cur.execute(q, args) + self.db.commit() + except sqlite3.Error as e: + logging.error(e, q, args) + self.db.rollback() + logging.error("ERROR writing database") + return False + return True + + + def saveTweet(self, status): + text = utils.extract_text(status) + date = status.created_at + if type(date) == str: + date = utils.make_date(status.created_at) + + self.execute(""" + INSERT INTO tweets(id, screen_name, text, date, link, directed_to, replies, geo, mentions, hashtags, Screenshot, Deleted) \ + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, 0) + """, (status.id, status.user.screen_name, text, date, status.link, + status.in_reply_to_screen_name, status.replies_count, status.geo, + json.dumps(status.entities.user_mentions), json.dumps(status.entities.hashtags))) + + def saveAuthor(self, user): + args = (user.screen_name, user.id, user.created_at) + self.execute(""" + INSERT INTO authors(screen_name, id, date) + VALUES(?, ?, ?) ON CONFLICT(screen_name) DO NOTHING + """, args) diff --git a/DB/tsv.py b/DB/tsv.py new file mode 100644 index 0000000..5c5c04f --- /dev/null +++ b/DB/tsv.py @@ -0,0 +1,39 @@ +import os +import sys +import logging + +from . import generic +from . import utils + +class Driver(generic.DB): + def __init__(self, filename=sys.stdout): + generic.DB.__init__(self) + + self.name = "Simplest TSV driver" + + self.filename = filename + self.open() + + def close(self): + self.file.close() + + def open(self): + if type(self.filename) is str: + exists = os.path.exists(self.filename) + self.file = open(self.filename, 'a') + if not exists: + self.file.write("id\tauthor\ttext\turl") + else: + self.file = self.filename + self.file.write("id\tauthor\ttext\turl") + + + def saveTweet(self, status): + text = utils.extract_text(status) + + self.file.write("\t".join(( + str(status.id), + status.user.screen_name, + text.replace("\n", "\\n").replace("\t", "\\t"), + status.link + ))) diff --git a/DB/utils.py b/DB/utils.py new file mode 100644 index 0000000..d16525f --- /dev/null +++ b/DB/utils.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +from types import SimpleNamespace +from datetime import datetime + +def make_date(date): + #"Sun Sep 13 14:21:23 +0000 2020" + if date: return datetime.strptime(date, '%a %b %d %H:%M:%S +0000 %Y') + return date + +def make_user(username, id, date): + return SimpleNamespace(screen_name = username, id = id, created_at = make_date(date)) + +def make_status(username, id, user_id, user_date=None, + text=None, date=None, link=None, to=None, + replies=0, retweets=0, favorites=0, + geo=None, mentions=[], hashtags=[]): + user = make_user(username, user_id, user_date) + entities = SimpleNamespace(hashtags=hashtags, user_mentions=mentions) + return SimpleNamespace(id=id, user=user, entities=entities, + geo=geo, text=text, created_at=make_date(date), + in_reply_to_screen_name=to, link=link, + replies_count=replies, favorite_count=favorites, retweet_count=retweets) + +def extract_text(status): + try: + return status.extended_tweet.text + except AttributeError: + return status.text diff --git a/README.md b/README.md index d70e4ea..dba567a 100644 --- a/README.md +++ b/README.md @@ -2,22 +2,60 @@ This is a collection of tools to monitor deleted tweets, automate screenshoting, and archiving. -* `streaming.py` and `save_to_db.py` work together to grab a real-time streamed timeline from Twitter and save all the results in a database. +* `streaming.py` and `DB modules` work together to grab a real-time streamed timeline from Twitter and save all the results in a database, we currently support `SQLITE, MySQL and python networkx` but you can easily implement your own driver * All the tweets in the database are then screenshot by `screenshot.py` * Finally, the `monitoring.py` worker crawls through the database and checks if the tweets have been deleted. * I included `get_user_ids.py`, as the Twitter API often requires the ID, and not the screen name (eg not "@basilesimon"). ## Dependencies and install * `git clone` this repo -* `wget https://raw.githubusercontent.com/pypa/pip/master/contrib/get-pip.py` then `sudo python get-pip.py` -* `pip install tweepy` -* `pip install MySQL-python` (but you might need to `apt-get install build-essential python-dev libmysqlclient-dev`. I read it's easy to install on Max OS, with Homebrew) -* `pip install needle` -* `apt-get install mysql-server nodejs-legacy nodejs npm` +* `wget https://raw.githubusercontent.com/pypa/pip/master/contrib/get-pip.py` +then `sudo python get-pip.py` +* pip install -r requirements.txt + +# Configuration +there is a nifty tool that will generate a config file in the default location (`~/.config/twitter-tools/config.json`), just run `python3 ./setup.py` and you'll be prompted. +![screenshot](./config-screenshot.png) + + +we'll pick up the first entry, not that we'll look for `./config.json` and +`../config.json` too, of course you can specify any file with the command +line. + +it should look like this: +```json +[ + { + "consumer_key" : "XXXXXXXXXXXXXXXXXXXXXXXXX", + "consumer_secret" : "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + "access_token": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + "access_token_secret" : "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + }, + { + "consumer_key" : "YYYYYYYYYYYYYYYYYYYYYYYYY", + "consumer_secret" : "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY", + "access_token": "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY", + "access_token_secret" : "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY" + }, + { + "consumer_key" : "ZZZZZZZZZZZZZZZZZZZZZZZZZ", + "consumer_secret" : "ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ", + "access_token": "ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ", + "access_token_secret" : "ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ" + } +] +``` + + +# for Mysql +* `pip install MySQL-python` (but you might need to `apt-get install + build-essential python-dev libmysqlclient-dev`. I read it's easy to install + on Max OS, with Homebrew) +* `apt-get install mysql-server` + +* `apt-get install nodejs-legacy nodejs npm` * `sudo apt-get install build-essential chrpath git-core libssl-dev libfontconfig1-dev libxft-dev` * `sudo npm -g install phantomjs` -* You will need a comma-separated list of user IDs, or a list of keywords you want to track. See all the other options in [the Docs](https://dev.twitter.com/streaming/reference/post/statuses/filter). -* Obviously, you will also need your developer access keys and things. Pop them in the placeholders accordingly in each file. ### Comma-separated list of user IDs I use the wonderful [t from sferik](https://github.com/sferik/t), a command line tool for twitter shenanigans. @@ -42,6 +80,35 @@ You might want to consider running all these with `cron` on a server. Just sayin Then uncomment line 2 and 34-40 in `save_to_db.py` +## blocking massive amount of users +You can use the `block.py` tool to block users massively. +the `-f` flag allows to pass a CSV file + +## finding similar users +when looking for bots you may want to look for a LOT of similar usernames, we got you covered ! + +first you'll need to generate a list of usernames, you can do so with any password dict tool, +we recomend you use https://github.com/LandGrey/pydictor + +and then pass it to `./get_user_ids.py -f` that will spit out a TSV of valid usernames and id +pairs, it's all cached so you can run it multiple times. + +example: + +``` shell +$ python3 pydictor.py --head alejandro --len 4 4 -base d -o usernames.csv + _ _ _ + _ __ _ _ __| (_) ___| |_ ___ _ __ + | '_ \| | | |/ _` | |/ __| __/ _ \| '__| + | |_) | |_| | (_| | | (__| || (_) | | + | .__/ \__, |\__,_|_|\___|\__\___/|_| + |_| |___/ 2.1.4.1#dev + +[+] A total of :10000 lines +[+] Store in :./results/blah.txt +[+] Cost :0.0529 seconds +$ python3 get_user_ids.py -f results/usernames.csv > valid_usernames.tsv +``` ## License [PDD/ Unlicense](http://choosealicense.com/licenses/unlicense/) diff --git a/block.py b/block.py new file mode 100644 index 0000000..25db4a2 --- /dev/null +++ b/block.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import logging +import config as c +import utils + +def run(): + """ + main entry point + """ + + UNBLOCK = { + "flags": "-U, --unblock", + "dest": "unblock", + "help": "unblock operation", + "action": "count", + "default": 0 + } + + opts = c.parse_args([c.CONFIG_FILE, c.DEBUG, UNBLOCK, c.CSV_FILE, c.IDS, c.USERS]) + config = opts.config[0] + + if not len(opts.ids): + return logging.error("need to provide at least one id") + + api = utils.twitter_login(config) + + print(opts.unblock) + if opts.unblock: + op = api.destroy_block + action = "unblocked" + else: + op = api.create_block + action = "blocked" + + for u, i in opts.ids: + try: + op(user_id=i) + except Exception as e: + logging.error(f"{op}({i}) -> [{u}] failed with error {e}") + + logging.info(f"all done, {action} {len(opts.ids)}") + +if __name__ == "__main__": + run() diff --git a/config-screenshot.png b/config-screenshot.png new file mode 100644 index 0000000..7227e0e Binary files /dev/null and b/config-screenshot.png differ diff --git a/config.py b/config.py new file mode 100644 index 0000000..0df0817 --- /dev/null +++ b/config.py @@ -0,0 +1,236 @@ +import re +import os +import sys +import json +import csv +import importlib + +import argparse +import coloredlogs, logging + +from get_user_ids import fetch + +from DB.multi import Driver as MultiDriver + +LOGGING_FORMAT = '%(asctime)s - %(pathname)s:%(lineno)s:%(funcName)s()\n - %(levelname)s - %(message)s' +coloredlogs.install(fmt=LOGGING_FORMAT) + +def flatten(lists): + return [i for l in lists for i in l] + +class LoadJSONAction(argparse.Action): + """ + load a json file and put it in an opt + """ + + def __call__(self, parser, namespace, filename, option_string=None): + with open(filename) as data: + setattr(namespace, self.dest, json.load(data)) + + +class LoadRowFileAction(argparse.Action): + """ + load a file line by line into an opt + """ + + def __call__(self, parser, namespace, filename, option_string=None): + ret = [] + logging.debug(f"opening {filename} as CSV") + with open(filename) as f: + for row in f: + s = row.rstrip() + if len(s): ret.append(s) + setattr(namespace, self.dest, ret) + + +class LoadCSVAction(argparse.Action): + """ + load a csv file and put it in an opt + """ + + def __call__(self, parser, namespace, filename, option_string=None): + ret = [] + logging.debug(f"opening {filename} as CSV") + with open(filename, "rb") as csvfile: + reader = csv.reader(csvfile, delimiter=",", quotechar="|") + for row in reader: + for elem in row: + ret.extend(elem.strip().split(",")) + setattr(namespace, self.dest, ret) + + +def load_db_driver(arg): + db_driver, filename = None, None + try: + db_driver, filename = arg.split(":") + except: + db_driver = arg + filename = None + finally: + try: + M = importlib.import_module(f"DB.{db_driver}") + except: + logging.error(f"ERROR could not find db driver for {db_driver}") + sys.exit(-2) + + if filename: + return M.Driver(filename) + + return M.Driver() + +class LoadDBDriverAction(argparse.Action): + """ + load a db driver by name + """ + + def __call__(self, parser, namespace, values, option_string=None): + old_dbs = getattr(namespace, self.dest) + if not isinstance(old_dbs, list): + old_dbs = () + + dbs = [load_db_driver(v) for v in values] + + dbs.extend(old_dbs) + setattr(namespace, self.dest, dbs) + +class ParseComasAction(argparse.Action): + """ + Parse a coma separated arg into an array + """ + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, flatten([v.split(",") for v in values])) + +class FetchUsersAction(argparse.Action): + """ + Parse a coma separated usernames into an array of user ids + """ + + def __call__(self, parser, namespace, values, option_string=None): + try: + db = namespace.db + except AttributeError: + db = None + + old_ids = getattr(namespace, self.dest) or () + ids = fetch(namespace.config[0], flatten([v.split(',') for v in values]), db) + ids.extend(old_ids) + setattr(namespace, 'ids', ids) + +class IncreaseVerbosityAction(argparse.Action): + """ + up debug level + """ + + def __call__(self, parser, namespace, values, option_string=None): + coloredlogs.increase_verbosity() + +def load_config(paths): + def try_load_json(j): + try: + with open(j) as data: + return json.load(data) + except FileNotFoundError: + return None + except Exception as e: + logging.error(f"{e} is your config file well formated ?") + raise e + + for p in paths: + c = try_load_json(os.path.expanduser(p)) + if c: return c + + return [] + +CONFIG_FILE = { + "flags": "-c, --config", + "dest": "config", + "help": "config file", + "action": LoadJSONAction, + "default": load_config(["./config.json", "../config.json", "~/.config/twitter-tools/config.json"]) +} + +IDS = { + "flags": "-i, --ids", + "dest": "ids", + "nargs": "*", + "help": "twitter user ids, as a comma-separated list", + "action": ParseComasAction, +} +USERS = { + "flags": "-u, --users", + "dest": "users", + "nargs": "*", + "help": "twitter usernames, as a comma-separated list", + "action": FetchUsersAction, +} +USERS_NOFETCH= { + "flags": "-u, --users", + "dest": "ids", + "nargs": "*", + "help": "twitter usernames, as a comma-separated list", + "action": ParseComasAction, +} +TERMS = { + "flags": "-t, --track", + "dest": "track", + "nargs": "*", + "default": [], + "help": "terms to track, as a comma-separated list", + "action": ParseComasAction, +} +DBS = { + "flags": "-D, --database", + "dest": "db", + "help": "database system to use (mysql, sqlite, elasticsearch)", + "nargs": "*", + "default": "tsv", + "action": LoadDBDriverAction, +} +CSV_FILE = { + "flags": "-f, --csv", + "dest": "csv", + "help": "load data from a csv file", + "action": LoadRowFileAction, +} +DEBUG = { + "flags": "-v", + "help": "increase verbosity", + "action": IncreaseVerbosityAction, + "default": 0 +} + +options = [CONFIG_FILE, IDS, USERS, TERMS, DBS] + +r = re.compile(r"\s+") + + +def parse_args(options): + parser = argparse.ArgumentParser( + description="Twitter Tools: query twitter from the commandline", + allow_abbrev=False + ) + + if DBS in options: + DBS["default"] = load_db_driver(DBS["default"]) + + def add_argument(o): + flags = o.pop("flags") + parser.add_argument(flags, **o) + + last = options.pop() + [add_argument(o) for o in options] + + last["flags"] = last["dest"] + del last["dest"] + + add_argument(last) + + opts = parser.parse_args() + if DBS in options: + opts.db = MultiDriver(opts.db) + + return opts + +if __name__ == "__main__": + parse_args(options) diff --git a/get_user_ids.py b/get_user_ids.py index 0845fd7..ba1cc37 100644 --- a/get_user_ids.py +++ b/get_user_ids.py @@ -1,36 +1,76 @@ -import tweepy import time import csv import sys +import logging +import config as c +import utils +from DB import utils as db_utils -consumer_key = "" -consumer_secret = "" -access_token = "" -access_token_secret = "" +TWITTER_BATCH_LIMIT = 100 + +def fetch(config, users, db): + if not (hasattr(db, 'getAuthor') and hasattr(db, 'saveAuthor')): + db = c.load_db_driver('sqlite') -auth = tweepy.OAuthHandler(consumer_key, consumer_secret) -auth.set_access_token(access_token, access_token_secret) + logging.info(f"looking for: {users} in {db}") -api = tweepy.API(auth) + api = None + handles = [] + need_fetch = [] + def add_sn(screen_name, i, date): + if i: handles.append((screen_name, i)) + db.saveAuthor(db_utils.make_user(screen_name, i, date)) + + for screen_name in users: + sn = screen_name.lower() + try: + i = db.getAuthor(sn) + if i: handles.append(i) + except (KeyError, AttributeError) as e: + logging.warn(f"{sn} not found in DB {db} ({e})") + need_fetch.append(sn) -def get_user_ids(): - handles = [] - with open("list.csv", "rb") as csvfile: - reader = csv.reader(csvfile, delimiter=',', quotechar='|') - for row in reader: - for elem in row: - handles.extend(elem.strip().split(',')) - - for handle in handles: - try: - u = api.get_user(handle[1:-1]) - time.sleep(6) - print u._json['id'] - sys.stderr.write(str(u._json['id']) + "\n") - except Exception, e: - print e - -if __name__ == '__main__': - get_user_ids() + while len(need_fetch): + if not api: api = utils.twitter_login(config) + + batch = need_fetch[:TWITTER_BATCH_LIMIT] + need_fetch = need_fetch[TWITTER_BATCH_LIMIT:] + + logging.debug(f"this batch is {len(batch)}, still need to fetch {len(need_fetch)}") + + try: + lu = api.lookup_users(user_ids = None, screen_names = batch, include_entities = False) + except Exception as e: + lu = [] + + for u in lu: + sn = u._json['screen_name'].lower() + add_sn(sn, u._json['id'], u._json['created_at']) + batch.remove(sn) + + for sn in batch: + add_sn(sn, None) + + logging.info(handles) + return handles + +if __name__ == "__main__": + DB_CONFIG = c.DBS + DB_CONFIG["default"] = "sqlite" + + opts = c.parse_args([DB_CONFIG, c.DEBUG, c.CONFIG_FILE, c.CSV_FILE, c.USERS, ]) + config = opts.config[0] + ids = None + try: + ids = opts.ids + except KeyError: + ids = [] + + if opts.csv: + ids.extend(fetch(config, opts.csv, opts.db)) + + print("screen_name\tid") + for u, i in ids: + print(f"{u}\t{i}") diff --git a/got.py b/got.py new file mode 100644 index 0000000..6700ea3 --- /dev/null +++ b/got.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import logging +import GetOldTweets3 as got +import config as c + +def run(): + """ + main entry point + """ + + GEO = { + "flags": "-g, --geo", + "dest": "geo", + "nargs": "*", + "help": "lookup for tweets near term", + } + + WITHIN = { + "flags": "-w, --within", + "dest": "within", + "nargs": "*", + "help": "radius of the geo query", + } + + opts = c.parse_args([c.CONFIG_FILE, c.DEBUG, GEO, WITHIN, c.USERS_NOFETCH, c.DBS, c.TERMS]) + + database = opts.db + config = opts.config[0] + + criteria = got.manager.TweetCriteria() + + if opts.ids and len(opts.ids): criteria.setUsername(opts.ids) + if opts.track and len(opts.track): criteria.setQuerySearch(" ".join(opts.track)) + if opts.geo: criteria.setNear(opts.geo) + if opts.within: criteria.setNear(opts.within) + + logging.info(criteria) + def handler(tweets): + for t in tweets: + print(t) + + got.manager.TweetManager.getTweets(criteria, handler) + + +if __name__ == "__main__": + run() diff --git a/monitoring.py b/monitoring.py index 0afeaee..077cc5a 100644 --- a/monitoring.py +++ b/monitoring.py @@ -1,12 +1,10 @@ -import MySQLdb import requests +import logging +import config as c +opts = c.parse_args([c.DBS]) +db = opts.db -db = MySQLdb.connect(host="", - user="", - passwd="", - db="", - charset="utf8") list_of_tweets = [] @@ -15,31 +13,24 @@ def query(url): if r.status_code != 200: return True else: - print "Tweet still exists" + logging.info("Tweet still exists") def read_database(db): - cur = db.cursor() - cur.execute("""SELECT * \ - FROM Tweets \ - WHERE Deleted=0""") - + cur = db.getTweets() for tweet in cur: list_of_tweets.append(tweet) - print tweet + logging.info(tweet) return list_of_tweets def check_tweet(): for tweet in read_database(db): if query(tweet[3]) is True: - cur = db.cursor() - cur.execute("""UPDATE Tweets \ - SET Deleted=1 \ - WHERE Tweet_Id=%s""", [tweet[4]]) - db.commit() - print "tweet deleted, id is", tweet[4] - print "url is", tweet[3] + db.markDeleted(tweet[4]) + + logging.info(f"tweet deleted, id is {tweet[4]}") + logging.info(f"url is {tweet[3]}") if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fc70930 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +tweepy +needle +pyinquirer +clint +coloredlogs +GetOldTweets3 +pytest diff --git a/save_to_db.py b/save_to_db.py deleted file mode 100644 index 3b61dce..0000000 --- a/save_to_db.py +++ /dev/null @@ -1,40 +0,0 @@ -import MySQLdb -# import elasticsearch - - -db = MySQLdb.connect(host="", - user="", - passwd="", - db="", - charset="utf8") - - -def save_to_db(author, text, url, id_str): - cur = db.cursor() - - cur.execute("CREATE TABLE IF NOT EXISTS Tweets(Id INT PRIMARY KEY AUTO_INCREMENT, \ - Author VARCHAR(255), \ - Text VARCHAR(255), \ - Url VARCHAR(255), \ - Tweet_Id VARCHAR(255), \ - Screenshot INT, \ - Deleted INT)") - - try: - cur.execute("""INSERT INTO Tweets(Author, Text, Url, Tweet_Id, Screenshot, Deleted) - VALUES (%s, %s, %s, %s, %s, %s)""", - (author, text, url, id_str, 0, 0)) - db.commit() - print "Wrote to database:", author, id_str - except MySQLdb.Error, e: - print "Error", e.args[0], e.args[1] - db.rollback() - print "ERROR writing database" - - # es = elasticsearch.Elasticsearch() - # es.index(index="tweets", doc_type="tweet", id=id, body={ - # "Author": author, - # "Text": text.encode('utf-8'), - # "Url": url, - # "Tweet_Id": id_str - # }) diff --git a/screenshot.py b/screenshot.py index 9c8b088..eaad302 100644 --- a/screenshot.py +++ b/screenshot.py @@ -1,17 +1,17 @@ # Run me with 'nosetests screenshot.py --with-save-baseline --nocapture' -import MySQLdb +import logging + from needle.cases import NeedleTestCase from needle.driver import NeedlePhantomJS -db = MySQLdb.connect(host="", - user="", - passwd="", - db="") +import config as c +opts = c.parse_args([c.DBS]) +db = opts.db -class captureTweetScreenshots(NeedleTestCase): +class captureTweetScreenshots(NeedleTestCase): @classmethod def get_web_driver(cls): return NeedlePhantomJS() @@ -20,59 +20,26 @@ def test_masthead(self): self.list_to_screenshot() def writeSuccess(self, path): - - cur = db.cursor() - try: - cur.execute("""UPDATE Tweets \ - SET Screenshot=1 \ - WHERE Tweet_Id=%s""", [path]) - db.commit() - print "Screenshot OK. Tweet id ", path - except MySQLdb.Error, e: - try: - print "MySQL Error [%d]: %s" % (e.args[0], e.args[1]) - except IndexError: - print "MySQL Error: %s" % str(e) - - print "Error", e.args[0], e.args[1] - print "Warning:", path, "not saved to database" - return True + return db.writeSuccess(path) def markDeleted(self, path): - - cur = db.cursor() - try: - cur.execute("""UPDATE Tweets \ - SET Deleted=1 \ - WHERE Tweet_Id=%s""", [path]) - db.commit() - print "Tweet marked as deleted ", path - except MySQLdb.Error, e: - try: - print "MySQL Error [%d]: %s" % (e.args[0], e.args[1]) - except IndexError: - print "MySQL Error: %s" % str(e) - - print "Error", e.args[0], e.args[1] - print "Warning:", path, "not saved to database" - return True + return db.markDeleted(path) def list_to_screenshot(self): - logFile = open('logfile.txt', 'w') - cur = db.cursor() - cur.execute("SELECT Url, Tweet_Id FROM Tweets WHERE Screenshot=0 AND Deleted=0 ") + logFile = open("logfile.txt", "w") + cur = db.getLogs() for (Url, Tweet_Id) in cur: try: self.driver.get(Url) except: - print "Url doesnt exist ", Url + logging(f"url does not exist: {Url}") logFile.write("Url doesnt exist \n") continue try: - self.assertScreenshot('.tweet', Tweet_Id) + self.assertScreenshot(".tweet", Tweet_Id) except: - print "Tweet deleted ", Url + logging(f"tweet deleted: {Url}") self.markDeleted(Tweet_Id) message = "Tweet deleted %s \n" % Url logFile.write(message) @@ -81,5 +48,7 @@ def list_to_screenshot(self): message = "Tweet screenshotted %s \n" % Url logFile.write(message) logFile.close() + + # if __name__ == '__main__': # list_to_screenshot(db) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4c7e29c --- /dev/null +++ b/setup.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 + +# -*- coding: utf-8 -*- +""" +* ask the user for it's twitter credentials +""" +from __future__ import print_function, unicode_literals + +import os +import sys +import json +import pathlib + +from PyInquirer import prompt, print_json +from config import load_config + +CONFIG_PATH = "~/.config/twitter-tools/config.json" +config = load_config([CONFIG_PATH]) + +if config: + answers = prompt([ + { + 'type': 'confirm', + 'name': 'confirm', + 'message': f"""a config file has already been found at {CONFIG_PATH}, +with the following configs: {[c["name"] for c in config]}, +continue ?""" + } + ]) + + if not answers["confirm"]: + sys.exit(-2) +else: + config = [] + +def validate_length(n): + def validate(l): + if len(l) == n: return True + if len(l) < n: return f"your input too short, (it's {len(l)}, i expect {n})" + if len(l) > n: return f"your input too long, (it's {len(l)}, i expect {n})" + + return validate + +questions = [ + { + 'type': 'input', + 'message': 'creds name', + 'name': 'name', + 'default': 'twitter-tools' + }, + { + 'type': 'password', + 'message': 'Enter your consumer key', + 'name': 'consumer_key', + 'validate': validate_length(25) + }, + { + 'type': 'password', + 'message': 'Enter your consumer key secret', + 'name': 'consumer_secret', + 'validate': validate_length(45) + }, + { + 'type': 'password', + 'message': 'Enter your access token', + 'name': 'access_token_key', + 'validate': validate_length(50) + }, + { + 'type': 'password', + 'message': 'Enter your access token secret', + 'name': 'access_token_secret', + 'validate': validate_length(45) + } +] + +answers = prompt(questions) +config.append(answers) + +dirname = os.path.dirname(os.path.expanduser(CONFIG_PATH)) +if not os.path.exists(dirname): + os.mkdirs(dirname, parents = True, exist_ok = True) + +with open(os.path.expanduser(CONFIG_PATH), "w") as f: + f.write(json.dumps(config, indent=4)) diff --git a/streaming.py b/streaming.py index 2b5477c..54d0b53 100644 --- a/streaming.py +++ b/streaming.py @@ -1,36 +1,81 @@ -from tweepy.streaming import StreamListener -from tweepy import OAuthHandler -from tweepy import Stream -from save_to_db import save_to_db +""" +stream tweets to database driver and stdout +""" + +import logging +import signal +import sys -# Authentication details. -consumer_key = "" -consumer_secret = "" -access_token = "" -access_token_secret = "" +import tweepy +from urllib3.exceptions import ProtocolError +from tweepy.streaming import StreamListener +import config as c +import utils -# This is the listener, resposible for receiving data class StdOutListener(StreamListener): + """ + listener, resposible for receiving data + """ + def __init__(self, database): + super(StdOutListener, self).__init__() + self.database = database def on_status(self, status): - tweet_url = "http://twitter.com/" + status.user.screen_name + "/status/" + status.id_str - print "TWEET", status.text - print "URL", tweet_url - save_to_db(status.user.screen_name, status.text, tweet_url, status.id_str) + """ + a twitter status has been recieved + """ + + tweet_url = ( + "http://twitter.com/" + status.user.screen_name + "/status/" + status.id_str + ) + logging.info(f"TWEET: {tweet_url}\n{status.text}") + + self.database.saveTweet(tweet_url, status) + self.database.saveAuthor(status) def on_error(self, status): - print status + """ + error handler + """ + logging.error(status) -if __name__ == "__main__": - l = StdOutListener() - auth = OAuthHandler(consumer_key, consumer_secret) - auth.set_access_token(access_token, access_token_secret) +def run(): + """ + main entry point + """ + opts = c.parse_args([c.CONFIG_FILE, c.DEBUG, c.IDS, c.USERS, c.DBS, c.TERMS]) - ids = [] - with open("ids.csv") as f: - for row in f: - ids.append(row) + database = opts.db + config = opts.config[0] - stream = Stream(auth, l) - stream.filter(ids) + print (opts.ids) + if opts.ids: + ids = [str(i[1]) for i in opts.ids] + else: + ids = None + + stream_config = { + "follow": ids, + "track": opts.track or None + } + + listener = StdOutListener(database) + api = utils.twitter_login(config) + + def signal_handler(*argv, **argh): + database.close() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + + stream = tweepy.Stream(auth = api.auth, listener = listener) + logging.info(f"STREAM: {stream_config}") + while True: + try: + stream.filter(**stream_config) + except ProtocolError: + pass + +if __name__ == "__main__": + run() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5aceb3b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +import pytest + +def pytest_addoption(parser): + parser.addoption( + "--dbblacklist", nargs="+", default=[], help="modules to ignore" + ) + +@pytest.fixture +def db_blacklist(request): + return request.config.getoption("--dbblacklist") diff --git a/tests/db.py b/tests/db.py new file mode 100644 index 0000000..98ad929 --- /dev/null +++ b/tests/db.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +import pytest +import os + +drivers = [d for d in os.listdir('/../DB') if not d.match('(__init__|pycache|utils)')] + + +class TestDrivers(): + @pytest.mark.parametrize( + 'driver', drivers + ) + + def print_driver(self, driver): + print(driver) diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..94e7cc3 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 + +import pytest +import importlib +import sys +import os +import re + +if sys.version_info[0] < 3: + raise Exception("Python 2.x is not supported. Please upgrade to 3.x") + +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) + +BLACKLIST = ['__init__.py', '__pycache__', 'utils.py', 'generic.py', 'multi.py'] +drivers = [d.replace('.py', '') for d in os.listdir('DB') if not d in BLACKLIST] + +from DB import utils + +TEST_DATE = "Sun Sep 13 14:21:23 +0000 2020" +TEST_USER_DESCRIPTOR = { + 'username': 'test', + 'id': 1, + 'date': TEST_DATE, +} +TEST_STATUS_DESCRIPTOR = { + 'id': 0, + 'username': 'test', + 'user_id': 42, + 'user_date': TEST_DATE, + 'text': 'test tweet', + 'date': TEST_DATE, + 'link': 'http://twitter.com/test/id', + 'to': 'me', + 'replies': 42, + 'retweets': 0, + 'favorites': 23, + 'geo': 'Buenos Aires, Argentina', + 'mentions': ['me', 'anarchy', 'freedom'], + 'hashtags': ['#fun', '#test', '#twitter-tools'] +} +TEST_USER = utils.make_user(**TEST_USER_DESCRIPTOR) +TEST_STATUS = utils.make_status(**TEST_STATUS_DESCRIPTOR) + +class TestUtils(): + def test_make_date(self): + d = utils.make_date(TEST_DATE) + + print(d, dir(d)) + assert(d.year == 2020) + + def test_make_user(self): + u = utils.make_user('test', 0, TEST_DATE) + + print(u, dir(u)) + assert(hasattr(u, 'screen_name')) + assert(hasattr(u, 'id')) + assert(hasattr(u, 'created_at')) + + assert(u.created_at.year == 2020) + + def test_make_status(self): + s = utils.make_status(**TEST_STATUS_DESCRIPTOR) + + print(s, dir(s)) + assert(hasattr(s, 'user')) + assert(hasattr(s, 'entities')) + assert(hasattr(s, 'geo')) + assert(hasattr(s, 'text')) + assert(hasattr(s, 'created_at')) + assert(hasattr(s, 'in_reply_to_screen_name')) + assert(hasattr(s, 'link')) + assert(hasattr(s, 'replies_count')) + assert(hasattr(s, 'favorite_count')) + assert(hasattr(s, 'retweet_count')) + + assert(s.created_at.year == 2020) + + assert(hasattr(s.user, 'screen_name')) + assert(hasattr(s.user, 'id')) + assert(hasattr(s.user, 'created_at')) + + assert(s.user.created_at.year == 2020) + + assert(hasattr(s.entities, 'hashtags')) + assert('#fun' in s.entities.hashtags ) + + assert(hasattr(s.entities, 'user_mentions')) + assert('anarchy' in s.entities.user_mentions ) + +@pytest.mark.parametrize( + 'driver', drivers +) +class TestDrivers(): + def test_import_driver(self, driver, db_blacklist): + if driver in db_blacklist: pytest.skip("blacklisted") + return importlib.import_module(f"DB.{driver}") + + def test_load_driver(self, driver, db_blacklist): + M = self.test_import_driver(driver, db_blacklist) + return M.Driver(f"test_run.{driver}") + + def test__WIPE(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + + D._WIPE() + + def test_saveTweet(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + + D.saveTweet(TEST_STATUS) + + def test_saveAuthor(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'saveAuthor'): + pytest.skip("Driver does not implement optional feature: saveAuthor") + + D.saveAuthor(TEST_USER) + + def test_getAuthor(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'getAuthor'): + pytest.skip("Driver does not implement optional feature: getAuthor") + + D.getAuthor(TEST_USER.screen_name) + + def test_getTweets(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'getTweets'): + pytest.skip("Driver does not implement optional feature: getTweets") + + D.getTweets() + + def test_writeSuccess(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'writeSuccess'): + pytest.skip("Driver does not implement optional feature: writeSuccess") + + D.writeSuccess(0) + + def test_markDeleted(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'markDeleted'): + pytest.skip("Driver does not implement optional feature: markDeleted") + + D.markDeleted(0) + + def test_getLogs(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + if not hasattr(D, 'getLogs'): + pytest.skip("Driver does not implement optional feature: getLogs") + + D.getLogs() + + def test__WIPE_again(self, driver, db_blacklist): + D = self.test_load_driver(driver, db_blacklist) + + D._WIPE() + diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..a4a98c4 --- /dev/null +++ b/utils.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +import tweepy +import logging + +from types import SimpleNamespace + +def twitter_login(config): + auth = tweepy.OAuthHandler(config["consumer_key"], config["consumer_secret"]) + auth.set_access_token(config["access_token"], config["access_token_secret"]) + + api = tweepy.API(auth) + + # test authentication + try: + api.verify_credentials() + logging.info("authentification OK") + return api + except: + logging.error("Error during authentication") + return None