MySQL の X Protocol/X DevAPI 周りについて調査したのをまとめたののメモ
MySQL 8 以後では X Protocol がサポートされている(5.7 系では部分的なサポートであり、X Protocol を本格的に利用する場合には 8 を利用することが推奨されているようだ) 通信は従来の MySQL Protocol と異なり、Protocol Buffers Based となっていて、各言語のドライバの実装が簡単になっている(protocol buffers がその言語でサポートされていれば、だが) これにより今後 libmysqlclient に依存せずに各言語のドライバが実装されるようになって運用管理が簡単になるんじゃないかと私は考えています。 実際に、mysql-connector-nodejsは X Protocol のみをサポートしていて、libmysqlclient への依存がありません。
また、X Protocol/X DevAPI は async を前提に設計されているため、各言語の Connector ではその言語の特性を生かして CompletableFuture/Promise などを利用して実装されています。
X DevAPI というのものがあって、これは MySQL Shell と MySQL Connectors で実装されている API。どの言語を利用していても統一的に MySQL を扱えるプログラミング言語レベルの API になっている。
X Protocol への接続について
X Protocol は mysql 8 ではデフォルトで有効になっている模様。有効かどうかは show plugins
などして mysqlx plugin が有効かどうかを確認すれば良い。
port も 3306 ではなく 33060 なので注意。
Node 実装について
Node の MySQL connector は promise based になっており使いやすい * https://dev.mysql.com/doc/dev/connector-nodejs/8.0/ * https://dev.mysql.com/doc/dev/connector-nodejs/8.0/
NODE_DEBUG=protobuf
という環境変数を設定すれば、protobuf の serialization のログが見れて便利。
Java 実装について
Java 実装も割と普通に実行できる。今まで通りの MySQL Connector/J の実装で実行可能。 生で使うなら X DevAPI は JDBC API の 2億倍使いやすい。
package com.example;
import com.mysql.cj.xdevapi.Session;
import com.mysql.cj.xdevapi.SessionFactory;
import com.mysql.cj.xdevapi.SqlResult;
import java.util.List;
import java.util.stream.Collectors;
public class App {
public static void main(String[] args) {
SessionFactory sessionFactory = new SessionFactory();
Session session = sessionFactory.getSession("mysqlx://root@127.0.0.1:33060/test");
runQuery(session, "SHOW PROCESSLIST");
runQuery(session, "SELECT SLEEP(15)");
runQuery(session, "SHOW PROCESSLIST");
session.close();
}
private static void runQuery(Session session, String query) {
System.out.println("😁 クエリ開始" + query);
session.sql(query).executeAsync()
.thenAccept(rows -> {
System.out.println("😁 クエリ完了 " + query);
dumpRows(rows);
});
}
private static void dumpRows(SqlResult rows) {
List<String> columnNames = rows.getColumnNames();
System.out.println("\n\n結果結果結果結果結果結果結果結果結果結果結果");
System.out.println(" " + columnNames.stream().collect(Collectors.joining("\t")));
System.out.println(rows.fetchAll()
.stream()
.map(row -> columnNames.stream()
.map(row::getString)
.collect(Collectors.joining("\t")))
.map(line -> " " + line)
.collect(Collectors.joining("\n")));
System.out.println("\n\n終了終了終了終了終了終了終了終了終了終了終了");
}
}
のような実装では、以下のような結果を得るだろう。
😁 クエリ開始SHOW PROCESSLIST
😁 クエリ完了 SHOW PROCESSLIST
結果結果結果結果結果結果結果結果結果結果結果
Id User Host db Command Time State Info
4 event_scheduler localhost null Daemon 342510 Waiting on empty queue null
25 root 172.17.0.1:36154 null Sleep 26244 null
102 root 172.17.0.1:39884 test Sleep 1599 null PLUGIN
204 root 172.17.0.1:40090 test Query 0 null PLUGIN: SHOW PROCESSLIST
終了終了終了終了終了終了終了終了終了終了終了
😁 クエリ開始SELECT SLEEP(15)
😁 クエリ開始SHOW PROCESSLIST
😁 クエリ完了 SELECT SLEEP(15)
結果結果結果結果結果結果結果結果結果結果結果
SLEEP(15)
0
終了終了終了終了終了終了終了終了終了終了終了
😁 クエリ完了 SHOW PROCESSLIST
結果結果結果結果結果結果結果結果結果結果結果
Id User Host db Command Time State Info
4 event_scheduler localhost null Daemon 342525 Waiting on empty queue null
25 root 172.17.0.1:36154 null Sleep 26259 null
102 root 172.17.0.1:39884 test Sleep 1614 null PLUGIN
204 root 172.17.0.1:40090 test Query 0 null PLUGIN: SHOW PROCESSLIST
終了終了終了終了終了終了終了終了終了終了終了
com.mysql.cj.protocol.x.AsyncMessageSender#writeAsync
に "[SEND] ===> " + message.getMessage().getClass().getSimpleName() + "\n" + message.getMessage().toString()
というデバッグログを設置して、com.mysql.cj.protocol.x.ResultMessageListener#createFromMessage
に "[RECEIVE] <== " + message.getMessage().getClass().getName() + "\n" + message.getMessage().toString()
というデバッグログを設置すると、通信の様子を垣間見ることができる。
上記のコードの場合の出力は以下のようになる。
[SEND] ===> CapabilitiesGet
[SEND] ===> CapabilitiesSet
capabilities {
capabilities {
name: "tls"
value {
type: SCALAR
scalar {
type: V_BOOL
v_bool: true
}
}
}
}
[SEND] ===> AuthenticateStart
mech_name: "PLAIN"
auth_data: "test\000root\000"
[SEND] ===> StmtExecute
stmt: "select @@mysqlx_max_allowed_packet"
😁 クエリ開始SHOW PROCESSLIST
[SEND] ===> StmtExecute
stmt: "SHOW PROCESSLIST"
😁 クエリ開始SELECT SLEEP(15)
[SEND] ===> StmtExecute
stmt: "SELECT SLEEP(15)"
😁 クエリ開始SHOW PROCESSLIST
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Id"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16
[SEND] ===> StmtExecute
stmt: "SHOW PROCESSLIST"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "User"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 96
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Host"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
flags: 16
[SEND] ===> Close
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "db"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Command"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 48
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Time"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 7
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "State"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 90
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Info"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 300
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\b"
field: "event_scheduler\000"
field: "localhost\000"
field: ""
field: "Daemon\000"
field: "\216\353)"
field: "Waiting on empty queue\000"
field: ""
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "2"
field: "root\000"
field: "172.17.0.1:36154\000"
field: ""
field: "Sleep\000"
field: "\272\235\003"
field: "\000"
field: ""
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\314\001"
field: "root\000"
field: "172.17.0.1:39884\000"
field: "test\000"
field: "Sleep\000"
field: "\260\034"
field: ""
field: "PLUGIN\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\234\003"
field: "root\000"
field: "172.17.0.1:40094\000"
field: "test\000"
field: "Query\000"
field: "\000"
field: ""
field: "PLUGIN: SHOW PROCESSLIST\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk
😁 クエリ完了 SHOW PROCESSLIST
結果結果結果結果結果結果結果結果結果結果結果
Id User Host db Command Time State Info
4 event_scheduler localhost null Daemon 342727 Waiting on empty queue null
25 root 172.17.0.1:36154 null Sleep 26461 null
102 root 172.17.0.1:39884 test Sleep 1816 null PLUGIN
206 root 172.17.0.1:40094 test Query 0 null PLUGIN: SHOW PROCESSLIST
終了終了終了終了終了終了終了終了終了終了終了
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "SLEEP(15)"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk
😁 クエリ完了 SELECT SLEEP(15)
結果結果結果結果結果結果結果結果結果結果結果
SLEEP(15)
0
終了終了終了終了終了終了終了終了終了終了終了
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Id"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 21
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "User"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 96
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Host"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "db"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 192
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Command"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 48
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: SINT
name: "Time"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
length: 7
flags: 16
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "State"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 90
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$ColumnMetaData
type: BYTES
name: "Info"
original_name: ""
table: ""
original_table: ""
schema: ""
catalog: "def"
collation: 33
length: 300
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\b"
field: "event_scheduler\000"
field: "localhost\000"
field: ""
field: "Daemon\000"
field: "\254\353)"
field: "Waiting on empty queue\000"
field: ""
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "2"
field: "root\000"
field: "172.17.0.1:36154\000"
field: ""
field: "Sleep\000"
field: "\330\235\003"
field: "\000"
field: ""
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\314\001"
field: "root\000"
field: "172.17.0.1:39884\000"
field: "test\000"
field: "Sleep\000"
field: "\316\034"
field: ""
field: "PLUGIN\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$Row
field: "\234\003"
field: "root\000"
field: "172.17.0.1:40094\000"
field: "test\000"
field: "Query\000"
field: "\000"
field: ""
field: "PLUGIN: SHOW PROCESSLIST\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxResultset$FetchDone
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxNotice$Frame
type: 3
scope: LOCAL
payload: "\b\004\022\004\b\002\030\000"
[RECEIVE] <== com.mysql.cj.x.protobuf.MysqlxSql$StmtExecuteOk
😁 クエリ完了 SHOW PROCESSLIST
結果結果結果結果結果結果結果結果結果結果結果
Id User Host db Command Time State Info
4 event_scheduler localhost null Daemon 342742 Waiting on empty queue null
25 root 172.17.0.1:36154 null Sleep 26476 null
102 root 172.17.0.1:39884 test Sleep 1831 null PLUGIN
206 root 172.17.0.1:40094 test Query 0 null PLUGIN: SHOW PROCESSLIST
終了終了終了終了終了終了終了終了終了終了終了
現在の Java connector/mysqld の実装では、select sleep(15)
などのクエリが発行された場合、その後のクエリの結果が先に帰ってくることはない。これは実際問題、session が状態を持つ以上、そうならざるを得ない。このため、他の状態を持たないプロトコルのクライアントと同じ気分で使っているとハマるかも。
そして、transaction は session に紐づく が、 1session あたり 1 TCP connection 以上 という実装に現時点ではなっている(以上、というのは slave への自動送信などを x devapi 上で 1 セッションとして扱う可能性があるため)。
(通信を多重化することも可能だったと思うが、現在の実装はそうなっていない。なんでだろうか。MySQL Server の実装上の制約?)