#!/usr/bin/env python3 import sys import os from pathlib import Path from copy import deepcopy import time import logging import json import random import argparse import requests import hashlib # -- Usage: # -- chmod a+x this_file.py # -- ./this_file.py -i videoids_one_per_line.txt -d output_directory # -- or # -- python3 this_file.py -i videoids_one_per_line.txt -d output_dir # -- Help: # -- ./this_file.py -h API_KEY = 'AIzaSyA-dlBUjVQeuc4a6ZN4RkNUYDFddrVLxrA' # -- API KEY HERE! BASE_URL = 'https://www.googleapis.com/youtube/v3/' API_PART_COMMENT_THREADS = ['snippet', 'replies'] API_PART_COMMENTS = ['snippet', 'id'] API_PARAMS_COMMENT_THREADS = { 'part': ','.join(API_PART_COMMENT_THREADS), 'maxResults': 100, 'order': 'time'} API_PARAMS_COMMENTS = { 'part': ','.join(API_PART_COMMENTS), 'maxResults': 100} STDOUT_PARAMS_OPTIONS = {} # -- Creates error logger with handler def make_error_logger(filename): handler = logging.FileHandler(filename) handler.setLevel(logging.ERROR) formatter = logging.Formatter( "%(asctime)s|%(levelname)s|%(message)s", "%Y-%m-%d %H:%M:%S" ) handler.setFormatter(formatter) logger = logging.getLogger(__name__) logger.addHandler(handler) return logger # -- Pages through API request and returns list of all items # -- Params must be in the Session object def get_api_items(endpoint, session): # -- Get response js_api = [] session.params['key'] = API_KEY err_msg = None while True: exit_codes = [200, 400, 401, 403, 404, 409] ret = __get_response( BASE_URL + endpoint, session=session, exit_codes=exit_codes, sleep=[1, 3]) (response, session) = (ret['response'], ret['session']) js = json.loads(response.text) # -- Skip if 4** status code if response.status_code // 100 == 4: errs = ','.join([x['reason'] for x in js['error']['errors']]) err_msg = '{} {}'.format(js['error']['code'], errs) print(f'Error: {err_msg}') print('Skipping...') break # -- Concatenate API js_api.extend(js['items']) # -- Continue if nextPageToken next_page_token = js.get('nextPageToken') if next_page_token is None: break else: session.params['pageToken'] = next_page_token return {'json': js_api, 'session': session, 'error': err_msg} def __get_response(url, session=None, exit_codes=None, sleep=None): if session is None: session = requests.Session() if exit_codes is None: exit_codes = [200] if sleep is None: sleep = [1, 3] elif not isinstance(sleep, list): sleep = [sleep] response = None while True: full_url = url if session.params: # -- Check if need to alter URL params in stdout stdout_params = deepcopy(session.params) if 'STDOUT_PARAMS_OPTIONS' in globals(): for k,v in STDOUT_PARAMS_OPTIONS.items(): stdout_params[k] = v full_url += '?' + '&'.join( '{}={}'.format(k, v) for k, v in stdout_params.items()) print(f'Retrieving {full_url}') try: response = session.get(url, timeout=10) except Exception as e: print(f'Exception: {e}') __sleep(*sleep) continue # -- Check status code status_code = response.status_code print(f'Status: {status_code}') if status_code in exit_codes: break else: print('Retrying...') api_params = session.params session = requests.Session() session.params = api_params __sleep(*sleep) return {'response': response, 'session': session} def __sleep(x, y=None): if y is None: y = x sleep_time = round(random.uniform(x, y), 3) print(f'Sleeping for {sleep_time} seconds...') time.sleep(sleep_time) return # -- Parse arguments def __parse_args(argv): parser = argparse.ArgumentParser( description='Write comments metadata for videos.', formatter_class=argparse.RawTextHelpFormatter) parser._action_groups.pop() required = parser.add_argument_group('required arguments') optional = parser.add_argument_group('optional arguments') required.add_argument( '-i', '--input', required=True, help='Single videoId (https://www.youtube.com/watch?v={videoId}) or text file containing videoIds, one per line.', metavar='videos.txt') optional.add_argument( '-d', '--api_dir', type=Path, default=Path(__file__).absolute().parent / 'videos', help='Output directory for API files (default: ./videos)', metavar='./videos') optional.add_argument('--stdout_key', default=['hide'], nargs=1, choices=['hide', 'show', 'sha1'], help='''Option for displaying API key in stdout: hide Hide API key in stdout (default) show Warning: your API key will be shown in stdout sha1 The SHA1 hash of your API key will be shown in stdout ''', metavar='hide') args = parser.parse_args(argv) f_in = args.input api_dir = args.api_dir stdout_key = args.stdout_key[0] # -- f_in can also be videoId now # if not f_in.endswith('.txt'): # raise ValueError('Input filename not a .txt file.') return {'input': f_in, 'api_dir': api_dir, 'stdout_key': stdout_key} def main(args): # -- Check for API key if not API_KEY: print('Missing API key. Open file and add value to API_KEY.') return # -- Parse arguments args = __parse_args(args) f_in = args['input'] api_dir = args['api_dir'] stdout_key_option = args['stdout_key'] # -- Obfuscate API key if necessary if stdout_key_option == 'hide': STDOUT_PARAMS_OPTIONS['key'] = ' {API key} ' elif stdout_key_option == 'show': STDOUT_PARAMS_OPTIONS['key'] = API_KEY elif stdout_key_option == 'sha1': STDOUT_PARAMS_OPTIONS['key'] = f' {{{hashlib.sha1(API_KEY.encode()).hexdigest()}}} ' # -- Get Ids if input file (ends with .txt) if f_in.endswith('.txt'): print(f'Reading {f_in}...') with open(f_in, 'r', encoding='utf-8') as f: ids_in = [x.rstrip() for x in f] else: ids_in = [f_in] total_ids = len(ids_in) print(f'Found {total_ids} videoId(s).') # -- Create output directory if not os.path.exists(api_dir): os.makedirs(api_dir) # -- Create error logger err_log = make_error_logger('error.log') # -- Iterate over Ids session = requests.Session() for i, _id in enumerate(ids_in): print('\n########################\n') print(f'Processing Id {_id} ({i + 1}/{total_ids})...') # -- Get API info session.params = API_PARAMS_COMMENT_THREADS session.params['videoId'] = _id session.params.pop('pageToken', None) # -- Reset pageToken ret = get_api_items('commentThreads', session) (js, session, err_msg) = (ret['json'], ret['session'], ret['error']) if err_msg: err_log.error(f'{_id}: {err_msg}') # -- long sleep if quotaExceeded # -- no sleep if 404 or commentsDisabled (err_status, err_reasons) = err_msg.split(' ', 1) if 'quotaExceeded' in err_reasons: __sleep(300, 600) if (err_status != '404') and ('commentsDisabled' not in err_reasons): time.sleep(1) continue print('Found {} comment thread(s).'.format(len(js))) if len(js) > 0: # -- Get API info (replies) session.params = API_PARAMS_COMMENTS total_comments = len(js) # -- counter for print statement thread: dict # -- type hint for thread in js: # -- Check if any replies replies = thread.get('replies') if replies is None: continue # -- Check if need to get more replies replies = replies['comments'] total_replies = thread['snippet']['totalReplyCount'] if len(replies) == total_replies: total_comments += len(replies) continue # -- Get more replies session.params['parentId'] = thread['id'] session.params.pop('pageToken', None) # -- Reset pageToken ret = get_api_items('comments', session) (js_reply, session, err_msg) = ( ret['json'], ret['session'], ret['error']) if err_msg: err_log.error(f"{thread['id']}: {err_msg}") if err_msg.split(' ', 1)[0] != '404': # -- no sleep if 404 time.sleep(1) continue thread['replies']['comments'] = js_reply total_comments += len(js_reply) # -- Fix reply count # -- (for new replies made between commentThreads and comments # -- endpoint requests) thread['snippet']['totalReplyCount'] = len(js_reply) print(f'Found {total_comments} comment(s).') # -- Write API info to file f_api = os.path.join(api_dir, f'{_id}_comments.json') print(f'Writing API info to {f_api}...') with open(f_api, 'w', encoding='utf-8') as f: json.dump(js, f, indent=4) print('Write successful.') print('\nFinished!') return if __name__ == '__main__': main(sys.argv[1:])