Python
ffmpeg
websocket
ニコニコ生放送
Python3
20
どのような問題がありますか?

投稿日

更新日

ニコ生(タイムシフト)ダウンローダーを書く

はじめに

ニコ生のタイムシフトをダウンロードしたかったんです。高画質のために深夜に見るのも辛いし。でも探しても全然ダウンローダーが無い。
なのでGoogleChromeのDevToolとか見ながら夜業して作りました。websocketとかの勉強になったかな?
悪用厳禁。
早くコードを見せろという方用にGithubのリンクを貼っときます。
https://github.com/kairi003/nicolive-dl

参考文献

いきなり参考文献というのもどうかと思うのですが、@mueruさんの「ニコ生の配信を分析し、動画として保存するために頑張った話」を大変参考にさせていただきました。
今回私が書いたのはこれの発展ですね。GoogleChrome+Seleniumなしで動くように頑張りました。
あとは@tor4kichiさんの公開しているニコ生配信情報WebSocketのまとめも大変参考になりました。

ニコ生解析編

※注意: 2021年5月31日現在の情報です。今後仕様変更がされる可能性があります。

というわけでニコ生の解析からです。まずはテスト用に自分でフリーの動画を30秒くらい生配信してタイムシフトで見れるようにしました。これで私も生主です。(違う)
DevToolで通信を監視しているとxhrでts(動画データ)とm3u8(HSL)的なものをやり取りしていることがわかります。
特に暗号化されているわけでもないのであとはコレを連結できればいいかなーって感じですね。
image.png

m3u8の引数は開始時間とかっぽいしこいつを何とかできればいいのかなぁって思って色々やっているところで@mueruさんの記事を発見します。
なんとm3u8のstart=0をffmpegに突っ込むだけでダウンロードができてしまうのです。なんてかんたんなんだ。ffmpegは神か。ただの動画エンコーダーだと思ってたよ。

しかも@mueruさんの方式でもダウンロード自体は滞りなくできてしまう。しかしここまできたらもっとガッツリハックしたい。そう思いAtCoderでレートが下がってテンションが落ちてる上に次の日1限な長い夜の幕があがるのでした。

HLSのURLはどこから来るのか

HLSは動画ストリーミングの方式で、tsという形式で細かく分割した動画ファイルの順番を書いたm3u8というテキストファイルから動画を再生するやり方です。そして今回はm3u8のURLが特定できれば良さげで、さらにそれはWebSocketの通信で得ていることが(記事を読んで)わかりました。
確かにwebsocketの欄をみるとtimeshiftってコネクションがあって、{type: stream}メッセージでのm3u8のURLを受け取っています。

image.png

@mueruさんは解析はここまででやめていて、ChromeをSeleniumで操作してWebSocketを直接拾う方式をとっています。
しかし私は思いました。
「それ、PythonでWebSocketクライアント動かして直接通信したほうが良くない?」

WebSocketのURLはどこから来るのか

WebSocket通信をChromeを介さずに自前でできればダウンローダも安定しますし、最初にクライアント側から送っている画質情報を自分で決められるのでいつでも高画質でダウンロードできます。(多分)そしてなによりJavaScriptを動かすわけでもないのにSeleniumでChrome操作をしなくちゃいけないなんて気持ち悪いです。

というわけで、WebSocket通信をするためにWebSocketのURLをなんとかして知る必要があります。そしてコイツはaudience_tokenなる謎のパラメータを持っているためそいつがCookieかスクリプトかjsonかにあるはずだと目星をつけ、私はDevToolで検索をかけました。すると衝撃の結果が。

image.png

なんとindex.htmlがヒットしたのです。
しかも、その埋め込み方が、

image.png

「embedded-data(埋め込みデータ)」というIDを持ったscriptタグの独自属性data-propにHTMLエスケープされた2kBくらいのjsonが直書きしてある
その中にwebSocketUrlなるまさにそのものなURLが存在しました。

いやもう、何から突っ込めばいいのか。変態か?
と思いながらも、まあ簡単に取得できそうな部分にあったのは朗報です。

ともかくこれでWebSocket通信を自前でできそうになりました。

実装編

手法の目処が立ったので早速実装していきます。
今回は手順として、

  1. Sessionを用意してアカウントにログイン
  2. 目的のタイムシフトのHTMLを開き、webSocketUrlを取得
  3. WebSocket通信を別スレッドで開始
  4. ffmpegで動画をダウンロード

となります。

なんでWebSocketを別スレッドで実行するの

なんでかというと、ニコニコ動画もニコ生も、定期的にこちらがページを見ていることを伝えないと通信が切断されちゃうからなんですね。ハートビートって言うらしいです。
PyPIにあったニコニコ動画ダウンローダーでは普通にAPIを叩いてたんですが、ニコ生ではWebSocketのメッセージを定期的(30秒ごと)に送る必要があります。keepSheetというメッセージです。
どの動画でも30秒っぽいんですが最初にもらうメッセージで30秒って指定されるので一応それを読んでそこから値を設定するようにしました。
あとWebSocketにはping/pongというものがあって、こちらもコネクションの生存確認、pingと送られたらpongと返さなきゃいけないらしいです。これも実装する必要が多分あります。pingも30秒置きに送られてくるので同様にpongを30秒おきに返すことになります。
別スレッドで時間制御ループ回すの面倒だしkeepSheetいらなくね?

並行処理について

今回はasyncioの非同期処理で実現しています。なんでかというと採用したwebsocketsというライブラリがasyncioだったからです。
Pythonの非同期処理は普段あまり触らないので色々手さぐりで大変でした。

コード解説

唐突にコードの説明に入ります。

nicolive_dl.py

nicolive_dl.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import asyncio
from pathlib import Path
from urllib.parse import unquote
from collections import namedtuple
from requests import Session
from bs4 import BeautifulSoup, Tag
from sanitize_filename import sanitize
from .nicolive_ws import NicoLiveWS
from .exceptions import *


NicoLiveInfo = namedtuple('NicoLiveInfo', 'lvid title web_socket_url')


class NicoLiveDL:
    def __init__(self):
        self.ses = Session()

    def login(self, username, password):
        payload = {'mail_tel': username, 'password': password}
        login_url = 'https://account.nicovideo.jp/login/redirector'
        res = self.ses.post(login_url, data=payload)
        if res.url != 'https://account.nicovideo.jp/my/account':
            raise LoginError('Failed to Login')

    async def download(self, lvid, output='{title}-{lvid}.mp4'):
        lvid, title, web_socket_url = await self.get_info(lvid)
        title = sanitize(title)
        output_path = Path(output.format(title=title, lvid=lvid))
        if output_path.exists():
            while True:
                ans = input(f'Can you overwrite {output_path}? [y/n]')
                if ans.lower() == 'y':
                    break
                elif ans.lower() == 'n':
                    return
        nlws = NicoLiveWS(web_socket_url)
        asyncio.create_task(nlws.connect())
        stream_uri = await nlws.wait_for_stream()
        output_path.parent.mkdir(parents=True, exist_ok=True)
        args = ['-y', '-i', stream_uri, '-c', 'copy', output_path]
        proc = await asyncio.subprocess.create_subprocess_exec('ffmpeg', *args)
        await proc.communicate()
        await nlws.close()

    async def get_info(self, lvid):
        res = self.ses.get(f'https://live.nicovideo.jp/watch/{lvid}')
        res.raise_for_status()
        soup = BeautifulSoup(res.content, 'html.parser')
        embedded_tag = soup.select_one('#embedded-data')
        if not isinstance(embedded_tag, Tag):
            raise SelectException('Not Found #embedded-data')
        embedded_data = embedded_tag.get_attribute_list('data-props')[0]
        decoded_data = json.loads(unquote(embedded_data))
        web_socket_url = decoded_data['site']['relive']['webSocketUrl']
        title = decoded_data['program']['title']
        return NicoLiveInfo(lvid, title, web_socket_url)

NicoLiveDLはダウンローダー本体のクラスです。

nldl = NicoLiveDL()
nldl.login(input('Account: '), getpass('Password: '))
lvid = input('Live Id: ')
print(await nldl.get_info(lvid))
await nldl.download(lvid)

みたいに使います。
loginでログインし、downloadを実行するとget_infoでindex.html上に埋め込まれたデータを解析してからwebsocketのオブジェクトを起動します。
asuncio.create_taskはasyncioにおけるFire and Forget、つまり処理の投げっぱなしってやつです。websocketはとりあえず投げっぱなしで起動して、streamメッセージからHLSのURLが手に入るまで待機し、そいつを手に入れたらffmpegでダウンロードして終了次第websocketをcloseします。
ちなみにasyncio.subprocessで起動するサブプロセスのコルーチンをawaitして手に入るのはtaskなので、コレをさらにawaitしないと処理が流れちゃって即終了します。

nicolive_ws.py

nicolive_ws.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import json
import asyncio
import logging
import websockets

wslogger = logging.getLogger('websockets')
wslogger.setLevel(logging.INFO)
wslogger.addHandler(logging.StreamHandler())
# LogLevelをDEBUGにすると通信情報を表示

class WebSocketApp:
    def __init__(self, uri, **kwargs):
        self.ws = None
        self.uri = uri
        self.option = kwargs

    async def connect(self):
        async with websockets.connect(self.uri, **self.option) as self.ws:
            await self.on_open()
            async for msg in self.ws:
                await self.on_recv(msg)

    async def send(self, data):
        msg = json.dumps(data)
        await self.ws.send(msg)
        await self.on_send(data)

    async def close(self):
        await self.on_close()
        await self.ws.close()

    async def on_open(self):
        pass

    async def on_send(self, data):
        pass

    async def on_recv(self, msg):
        pass

    async def on_close(self):
        pass


class NicoLiveWS(WebSocketApp):
    def __init__(self, uri, **kwargs):
        super().__init__(uri, **kwargs)
        self.keep_interval_sec = None
        self.stream_uri = None
        self.recv_stream_event = asyncio.Event()

    async def on_open(self):
        start_watching = {
            "type": "startWatching",
            "data": {
                "stream": {
                    "quality": "super_high",
                    "protocol": "hls",
                    "latency": "low",
                    "chasePlay": False
                },
                "room": {
                    "protocol": "webSocket",
                    "commentable": True
                },
                "reconnect": True
            }
        }
        await self.send(start_watching)

    async def on_recv(self, msg):
        data = json.loads(msg)
        if not isinstance(data, dict):
            return
        t = data.get('type')
        if t == 'ping':
            asyncio.create_task(self.pong())
        elif t == 'seat':
            asyncio.create_task(self.heart_start(data))
        elif t == 'stream':
            asyncio.create_task(self.recv_stream(data))

    async def pong(self):
        await self.send({"type": "pong"})

    async def heart_start(self, data):
        if self.keep_interval_sec:
            return
        self.keep_interval_sec = data['data']['keepIntervalSec']
        next_time = time.time()
        while True:
            await self.send({"type": "keepSeat"})
            next_time += self.keep_interval_sec
            await asyncio.sleep(max(next_time - time.time(), 0))

    async def recv_stream(self, data):
        self.stream_uri = data['data']['uri']
        self.recv_stream_event.set()

    async def wait_for_stream(self):
        await self.recv_stream_event.wait()
        return self.stream_uri

こちらはWebSocketのラッパーというかオブジェクト化したものです。
そのままだとちょっとあれだったので。
WebSocketAppをベースとしてNicoLiveWSで継承してオーバーロードしてます。on_メソッドはインターフェース的なつもり。
streamを受け取るのを外部で待たせるためにEventを使ってみたのですが、これもっとガンガン使っていい感じにできそうな気がしないでもない。
個人的にwebsocektsのasync for .. inが書いてて気持ちいいですね。
heart_startみたいにwhile Trueするよりもきれいでいいと思います。

最後に

結局実装を終えるまでに一晩、ブラッシュアップして記事にするまでで1日弱かかってしまいました。まあ意味があるかは置いといて楽しかったからいいかな。
特にasyncioとDevToolの造詣が深くなった気がしますし。
何かご指摘などありましたらコメントによろしくおねがいします。

追記 (2021-06-01)

要求画質をsuper_highに変更しました。
何の略なのかがいまいち分からないんですが、多分abrって最高画質じゃなくて自動設定だと思うんですよね。
super_highはプレミアム会員じゃないと選択できませんが自動でnormalになると思われます。

ユーザー登録して、Qiitaをもっと便利に使ってみませんか。
  1. あなたにマッチした記事をお届けします
    ユーザーやタグをフォローすることで、あなたが興味を持つ技術分野の情報をまとめてキャッチアップできます
  2. 便利な情報をあとで効率的に読み返せます
    気に入った記事を「ストック」することで、あとからすぐに検索できます
kairi003
この記事は以下の記事からリンクされています

コメント

リンクをコピー
このコメントを報告

自分で試したわけじゃないですが streamlink がニコ生に対応しているようですよ。

0
どのような問題がありますか?
あなたもコメントしてみませんか :)
ユーザー登録
すでにアカウントを持っている方はログイン
記事投稿イベント開催中
データに関する記事を書こう!
~
20
どのような問題がありますか?
ユーザー登録して、Qiitaをもっと便利に使ってみませんか

この機能を利用するにはログインする必要があります。ログインするとさらに下記の機能が使えます。

  1. ユーザーやタグのフォロー機能であなたにマッチした記事をお届け
  2. ストック機能で便利な情報を後から効率的に読み返せる
ユーザー登録ログイン
ストックするカテゴリー