tokuhirom's blog

MySQL の X Protocol/X DevAPI 周りについて調査したのをまとめたののメモ

MySQL 8 以後では X Protocol がサポートされている(5.7 系では部分的なサポートであり、X Protocol を本格的に利用する場合には 8 を利用することが推奨されているようだ) 通信は従来の MySQL Protocol と異なり、Protocol Buffers Based となっていて、各言語のドライバの実装が簡単になっている(protocol buffers がその言語でサポートされていれば、だが) これにより今後 libmysqlclient に依存せずに各言語のドライバが実装されるようになって運用管理が簡単になるんじゃないかと私は考えています。 実際に、mysql-connector-nodejsは X Protocol のみをサポートしていて、libmysqlclient への依存がありません。

また、X Protocol/X DevAPI は async を前提に設計されているため、各言語の Connector ではその言語の特性を生かして CompletableFuture/Promise などを利用して実装されています。

X DevAPI というのものがあって、これは MySQL Shell と MySQL Connectors で実装されている API。どの言語を利用していても統一的に MySQL を扱えるプログラミング言語レベルの API になっている。

X Protocol への接続について

X Protocol は mysql 8 ではデフォルトで有効になっている模様。有効かどうかは show plugins などして mysqlx plugin が有効かどうかを確認すれば良い。 port も 3306 ではなく 33060 なので注意。

Node 実装について

Node の MySQL connector は promise based になっており使いやすい * https://dev.mysql.com/doc/dev/connector-nodejs/8.0/ * https://dev.mysql.com/doc/dev/connector-nodejs/8.0/

NODE_DEBUG=protobuf という環境変数を設定すれば、protobuf の serialization のログが見れて便利。

Java 実装について

Java 実装も割と普通に実行できる。今まで通りの MySQL Connector/J の実装で実行可能。 生で使うなら X DevAPI は JDBC API の 2億倍使いやすい。

package com.example;

import com.mysql.cj.xdevapi.Session;
import com.mysql.cj.xdevapi.SessionFactory;
import com.mysql.cj.xdevapi.SqlResult;

import java.util.List;
import java.util.stream.Collectors;

public class App {
    public static void main(String[] args) {
        SessionFactory sessionFactory = new SessionFactory();
        Session session = sessionFactory.getSession("mysqlx://root@127.0.0.1:33060/test");
        runQuery(session, "SHOW PROCESSLIST");
        runQuery(session, "SELECT SLEEP(15)");
        runQuery(session, "SHOW PROCESSLIST");
        session.close();
    }

    private static void runQuery(Session session, String query) {
        System.out.println("😁 クエリ開始" + query);
        session.sql(query).executeAsync()
                .thenAccept(rows -> {
                    System.out.println("😁 クエリ完了 " + query);
                    dumpRows(rows);
                });
    }

    private static void dumpRows(SqlResult rows) {
        List<String> columnNames = rows.getColumnNames();
        System.out.println("\n\n結果結果結果結果結果結果結果結果結果結果結果");
        System.out.println("    " + columnNames.stream().collect(Collectors.joining("\t")));
        System.out.println(rows.fetchAll()
                .stream()
                .map(row -> columnNames.stream()
                        .map(row::getString)
                        .collect(Collectors.joining("\t")))
                .map(line -> "    " + line)
                .collect(Collectors.joining("\n")));
        System.out.println("\n\n終了終了終了終了終了終了終了終了終了終了終了");
    }
}

のような実装では、以下のような結果を得るだろう。

😁 クエリ開始SHOW PROCESSLIST
😁 クエリ完了 SHOW PROCESSLIST


結果結果結果結果結果結果結果結果結果結果結果
    Id    User    Host    db    Command    Time    State    Info
    4    event_scheduler    localhost    null    Daemon    342510    Waiting on empty queue    null
    25    root    172.17.0.1:36154    null    Sleep    26244        null
    102    root    172.17.0.1:39884    test    Sleep    1599    null    PLUGIN
    204    root    172.17.0.1:40090    test    Query    0    null    PLUGIN: SHOW PROCESSLIST


終了終了終了終了終了終了終了終了終了終了終了
😁 クエリ開始SELECT SLEEP(15)
😁 クエリ開始SHOW PROCESSLIST
😁 クエリ完了 SELECT SLEEP(15)


結果結果結果結果結果結果結果結果結果結果結果
    SLEEP(15)
    0


終了終了終了終了終了終了終了終了終了終了終了
😁 クエリ完了 SHOW PROCESSLIST


結果結果結果結果結果結果結果結果結果結果結果
    Id    User    Host    db    Command    Time    State    Info
    4    event_scheduler    localhost    null    Daemon    342525    Waiting on empty queue    null
    25    root    172.17.0.1:36154    null    Sleep    26259        null
    102    root    172.17.0.1:39884    test    Sleep    1614    null    PLUGIN
    204    root    172.17.0.1:40090    test    Query    0    null    PLUGIN: SHOW PROCESSLIST


終了終了終了終了終了終了終了終了終了終了終了

com.mysql.cj.protocol.x.AsyncMessageSender#writeAsync"[SEND] ===> " + message.getMessage().getClass().getSimpleName() + "\n" + message.getMessage().toString()というデバッグログを設置して、com.mysql.cj.protocol.x.ResultMessageListener#createFromMessage"[RECEIVE] <== " + message.getMessage().getClass().getName() + "\n" + message.getMessage().toString() というデバッグログを設置すると、通信の様子を垣間見ることができる。

上記のコードの場合の出力は以下のようになる。

[SEND] ===> CapabilitiesGet

[SEND] ===> CapabilitiesSet
capabilities {
  capabilities {
    name: "tls"
    value {
      type: SCALAR
      scalar {
        type: V_BOOL
        v_bool: true
      }
    }
  }
}

[SEND] ===> AuthenticateStart
mech_name: "PLAIN"
auth_data: "test\000root\000"

[SEND] ===> StmtExecute
stmt: "select @@mysqlx_max_allowed_packet"

😁 クエリ開始SHOW PROCESSLIST
[SEND] ===> StmtExecute
stmt: "SHOW PROCESSLIST"

😁 クエリ開始SELECT SLEEP(15)
[SEND] ===> StmtExecute
stmt: "SELECT SLEEP(15)"

😁 クエリ開始SHOW PROCESSLIST
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Id"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16

[SEND] ===> StmtExecute
stmt: "SHOW PROCESSLIST"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "User"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 96
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Host"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
flags: 16

[SEND] ===> Close

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "db"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Command"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 48
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Time"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 7
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "State"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 90

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Info"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 300

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\b"
field: "event_scheduler\000"
field: "localhost\000"
field: ""
field: "Daemon\000"
field: "\216\353)"
field: "Waiting on empty queue\000"
field: ""

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "2"
field: "root\000"
field: "172.17.0.1:36154\000"
field: ""
field: "Sleep\000"
field: "\272\235\003"
field: "\000"
field: ""

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\314\001"
field: "root\000"
field: "172.17.0.1:39884\000"
field: "test\000"
field: "Sleep\000"
field: "\260\034"
field: ""
field: "PLUGIN\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\234\003"
field: "root\000"
field: "172.17.0.1:40094\000"
field: "test\000"
field: "Query\000"
field: "\000"
field: ""
field: "PLUGIN: SHOW PROCESSLIST\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk

😁 クエリ完了 SHOW PROCESSLIST


結果結果結果結果結果結果結果結果結果結果結果
    Id    User    Host    db    Command    Time    State    Info
    4    event_scheduler    localhost    null    Daemon    342727    Waiting on empty queue    null
    25    root    172.17.0.1:36154    null    Sleep    26461        null
    102    root    172.17.0.1:39884    test    Sleep    1816    null    PLUGIN
    206    root    172.17.0.1:40094    test    Query    0    null    PLUGIN: SHOW PROCESSLIST


終了終了終了終了終了終了終了終了終了終了終了
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "SLEEP(15)"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk

😁 クエリ完了 SELECT SLEEP(15)


結果結果結果結果結果結果結果結果結果結果結果
    SLEEP(15)
    0


終了終了終了終了終了終了終了終了終了終了終了
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Id"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "User"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 96
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Host"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "db"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Command"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 48
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Time"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 7
flags: 16

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "State"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 90

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Info"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 300

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\b"
field: "event_scheduler\000"
field: "localhost\000"
field: ""
field: "Daemon\000"
field: "\254\353)"
field: "Waiting on empty queue\000"
field: ""

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "2"
field: "root\000"
field: "172.17.0.1:36154\000"
field: ""
field: "Sleep\000"
field: "\330\235\003"
field: "\000"
field: ""

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\314\001"
field: "root\000"
field: "172.17.0.1:39884\000"
field: "test\000"
field: "Sleep\000"
field: "\316\034"
field: ""
field: "PLUGIN\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\234\003"
field: "root\000"
field: "172.17.0.1:40094\000"
field: "test\000"
field: "Query\000"
field: "\000"
field: ""
field: "PLUGIN: SHOW PROCESSLIST\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"

[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk

😁 クエリ完了 SHOW PROCESSLIST


結果結果結果結果結果結果結果結果結果結果結果
    Id    User    Host    db    Command    Time    State    Info
    4    event_scheduler    localhost    null    Daemon    342742    Waiting on empty queue    null
    25    root    172.17.0.1:36154    null    Sleep    26476        null
    102    root    172.17.0.1:39884    test    Sleep    1831    null    PLUGIN
    206    root    172.17.0.1:40094    test    Query    0    null    PLUGIN: SHOW PROCESSLIST


終了終了終了終了終了終了終了終了終了終了終了

現在の Java connector/mysqld の実装では、select sleep(15) などのクエリが発行された場合、その後のクエリの結果が先に帰ってくることはない。これは実際問題、session が状態を持つ以上、そうならざるを得ない。このため、他の状態を持たないプロトコルのクライアントと同じ気分で使っているとハマるかも。 そして、transaction は session に紐づく が、 1session あたり 1 TCP connection 以上 という実装に現時点ではなっている(以上、というのは slave への自動送信などを x devapi 上で 1 セッションとして扱う可能性があるため)。 (通信を多重化することも可能だったと思うが、現在の実装はそうなっていない。なんでだろうか。MySQL Server の実装上の制約?)

Created: 2018-07-08 23:39:44 +0900
Updated: 2018-07-08 23:39:44 +0900