#!/usr/bin/env python3 import sys from pathlib import Path from copy import deepcopy import time import logging import json import random import argparse import requests import hashlib # -- Usage: # -- chmod a+x this_file.py # -- ./this_file.py -i videoids_one_per_line.txt -d output_directory # -- or # -- python3 this_file.py -i videoids_one_per_line.txt -d output_dir # -- Help: # -- ./this_file.py -h API_KEY = "" # -- API KEY HERE! BASE_URL = "https://www.googleapis.com/youtube/v3/" API_PART_COMMENT_THREADS = ["snippet", "replies"] API_PART_COMMENTS = ["snippet", "id"] API_PARAMS_COMMENT_THREADS = { "part": ",".join(API_PART_COMMENT_THREADS), "maxResults": 100, "order": "time", } API_PARAMS_COMMENTS = {"part": ",".join(API_PART_COMMENTS), "maxResults": 100} STDOUT_PARAMS_OPTIONS = {} # TODO: Restore single-character options for --stdout_key # Create logger for stdout to apply RStripFilter on # Log levels option (ie. --verbose) # -- Filter for removing trailing whitespaces from logger class RStripFilter(logging.Filter): def filter(self, record): record.msg = record.msg.rstrip() return True # -- Creates error logger with handler def make_error_logger(filename): handler = logging.FileHandler( filename, delay=True ) # -- delay=True to delay creation of log file until needed handler.setLevel(logging.ERROR) formatter = logging.Formatter( fmt="%(asctime)s.%(msecs)03dZ|%(levelname)s|%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) formatter.converter = time.gmtime handler.setFormatter(formatter) handler.addFilter(RStripFilter()) # -- Remove trailing whitespaces logger = logging.getLogger(__name__) logger.addHandler(handler) return logger # -- Pages through API request and returns list of all items # -- Params must be in the Session object def get_api_items(endpoint, session): # -- Get response js_api = [] session.params["key"] = API_KEY err_msg = None while True: exit_codes = [200, 400, 401, 403, 404, 409] ret = __get_response( BASE_URL + endpoint, session=session, exit_codes=exit_codes, sleep=[1, 3] ) (response, session) = (ret["response"], ret["session"]) js = json.loads(response.text) # -- Skip if 4** status code if response.status_code // 100 == 4: errs = ",".join([x["reason"] for x in js["error"]["errors"]]) err_msg = "{} {}".format(js["error"]["code"], errs) print(f"Error: {err_msg}") print("Skipping...") break # -- Concatenate API js_api.extend(js["items"]) # -- Continue if nextPageToken next_page_token = js.get("nextPageToken") if next_page_token is None: break else: session.params["pageToken"] = next_page_token return {"json": js_api, "session": session, "error": err_msg} def __get_response(url, session=None, exit_codes=None, sleep=None): if session is None: session = requests.Session() if exit_codes is None: exit_codes = [200] if sleep is None: sleep = [1, 3] elif not isinstance(sleep, list): sleep = [sleep] response = None while True: full_url = url if session.params: # -- Check if need to alter URL params in stdout stdout_params = deepcopy(session.params) if "STDOUT_PARAMS_OPTIONS" in globals(): for k, v in STDOUT_PARAMS_OPTIONS.items(): stdout_params[k] = v full_url += "?" + "&".join( "{}={}".format(k, v) for k, v in stdout_params.items() ) print(f"Retrieving {full_url}") try: response = session.get(url, timeout=10) except Exception as e: print(f"Exception: {e}") __sleep(*sleep) continue # -- Check status code status_code = response.status_code print(f"Status: {status_code}") if status_code in exit_codes: break else: print("Retrying...") api_params = session.params session = requests.Session() session.params = api_params __sleep(*sleep) return {"response": response, "session": session} def __sleep(x, y=None): if y is None: y = x sleep_time = round(random.uniform(x, y), 3) print(f"Sleeping for {sleep_time} seconds...") time.sleep(sleep_time) return # -- Parse arguments def __parse_args(argv): parser = argparse.ArgumentParser( description="Write comments metadata for videos.", formatter_class=argparse.RawTextHelpFormatter, ) parser._action_groups.pop() required = parser.add_argument_group("required arguments") optional = parser.add_argument_group("optional arguments") required.add_argument( "-i", "--input", required=True, help="Single videoId (https://www.youtube.com/watch?v={videoId}) or text file containing videoIds, one per line.", metavar="videos.txt", ) optional.add_argument( "-d", "--api_dir", type=Path, default=Path("./videos/"), help="Output directory for API files (default: ./videos/)", metavar="./videos/", ) optional.add_argument( "-l", "--log_dir", type=Path, default=None, help="Output directory for log files (default: same as --api_dir)", metavar="./videos/", ) optional.add_argument( "--stdout_key", default=["hide"], nargs=1, choices=["hide", "show", "sha1"], help="""Option for displaying API key in stdout: hide Hide API key in stdout (default) show Warning: your API key will be shown in stdout sha1 The SHA1 hash of your API key will be shown in stdout """, metavar="hide", ) args = parser.parse_args(argv) f_in = args.input api_dir = args.api_dir log_dir = args.log_dir if log_dir is None: log_dir = api_dir stdout_key = args.stdout_key[0] return { "input": f_in, "api_dir": api_dir, "log_dir": log_dir, "stdout_key": stdout_key, } def main(args): # -- Check for API key if not API_KEY: print("Missing API key. Open file and add value to API_KEY.") return # -- Parse arguments args = __parse_args(args) f_in = args["input"] api_dir = args["api_dir"] log_dir = args["log_dir"] stdout_key_option = args["stdout_key"] # -- Obfuscate API key if necessary if stdout_key_option == "hide": STDOUT_PARAMS_OPTIONS["key"] = " {API key} " elif stdout_key_option == "show": STDOUT_PARAMS_OPTIONS["key"] = API_KEY elif stdout_key_option == "sha1": STDOUT_PARAMS_OPTIONS[ "key" ] = f" {{{hashlib.sha1(API_KEY.encode()).hexdigest()}}} " # -- Get Ids if input file (ends with .txt) if f_in.endswith(".txt"): print(f"Reading {f_in}...") with open(f_in, "r", encoding="utf-8") as f: ids_in = [x.rstrip() for x in f] else: ids_in = [f_in] total_ids = len(ids_in) print(f"Found {total_ids} videoId(s).") # -- Create directories if they do not exist [dir.mkdir(parents=True, exist_ok=True) for dir in [api_dir, log_dir]] # -- Create error logger err_path = log_dir / "error.log" err_log = make_error_logger(err_path) # -- Iterate over Ids session = requests.Session() for i, _id in enumerate(ids_in): print("\n########################\n") print(f"Processing Id {_id} ({i + 1}/{total_ids})...") # -- Get API info session.params = API_PARAMS_COMMENT_THREADS session.params["videoId"] = _id session.params.pop("pageToken", None) # -- Reset pageToken ret = get_api_items("commentThreads", session) (js, session, err_msg) = (ret["json"], ret["session"], ret["error"]) if err_msg: err_log.error(f"{_id}: {err_msg}") # -- long sleep if quotaExceeded # -- no sleep if 404 or commentsDisabled (err_status, err_reasons) = err_msg.split(" ", 1) if "quotaExceeded" in err_reasons: __sleep(300, 600) if (err_status != "404") and ("commentsDisabled" not in err_reasons): time.sleep(1) continue print("Found {} comment thread(s).".format(len(js))) if len(js) > 0: # -- Get API info (replies) session.params = API_PARAMS_COMMENTS total_comments = len(js) # -- counter for print statement for thread in js: # -- Check if any replies replies = thread.get("replies") if replies is None: continue # -- Check if need to get more replies replies = replies["comments"] total_replies = thread["snippet"]["totalReplyCount"] if len(replies) == total_replies: total_comments += len(replies) continue # -- Get more replies session.params["parentId"] = thread["id"] session.params.pop("pageToken", None) # -- Reset pageToken ret = get_api_items("comments", session) (js_reply, session, err_msg) = ( ret["json"], ret["session"], ret["error"], ) if err_msg: err_log.error(f"{thread['id']}: {err_msg}") if err_msg.split(" ", 1)[0] != "404": # -- no sleep if 404 time.sleep(1) continue thread["replies"]["comments"] = js_reply total_comments += len(js_reply) # -- Fix reply count # -- (for new replies made between commentThreads and comments # -- endpoint requests) thread["snippet"]["totalReplyCount"] = len(js_reply) print(f"Found {total_comments} comment(s).") # -- Write API info to file f_api = api_dir / f"{_id}_comments.json" print(f"Writing API info to {f_api}...") with open(f_api, "w", encoding="utf-8") as f: json.dump(js, f, indent=4) print("Write successful.") # -- Check if any errors logged if err_log._cache.get(40): print(f"\nFinished with errors (check {err_path}).") else: print("\nFinished with no errors.") return if __name__ == "__main__": main(sys.argv[1:])