LocalStackを使ってCIでKinesis Client Libraryを使ったコードのテストを書いてみる
こんにちは。最近体の衰えをひどく感じるようになってきました。齋藤です。 最近、温度変化が激しくて体調が悪くなる人が多いですが お元気でしょうか。体調管理に気をつけたいところです。
初めに
今回、業務の中でKinesisを触る機会があるので 少し調査をしようと思い、CIでテストできる環境を作りました。
構成
- Circle CI 2.0
- LocalStack
- AWS SDK for Java
まずはざっとCicleCIの設定を
Circle CIの設定はこちらです。 設定を間違えると動かない部分があるのでご注意ください。 LocalStackのimageはatlassianlabsからlocalstackに移っております。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | version: 2 jobs: build: docker: - image : circleci/openjdk : 8-jdk - image : localstack/localstack # atlassianlabs/localstackと間違えると動きません。 working_directory: ~ /repo environment: JVM_OPTS: -Xmx3200m TERM: dumb steps: - checkout - restore_cache : keys: - v1-dependencies- { { checksum "build.gradle" } } - { { checksum "gradle.properties" } } - v1-dependencies- - run : ./gradlew dependencies - save_cache : paths: - ~ /.m2 key: v1-dependencies- { { checksum "build.gradle" } } - { { checksum "gradle.properties" } } - run : ./gradlew test - store_test_results : path: build/test-results/ |
まずはKinesisで使う基本的なメソッドから
AmazonKinesisクラスのセットアップについては後ほど説明します。 この辺は普通にKinesisのAPIです
Streamの作成
createStreamメソッドでStreamの作成をします。
1 2 | AmazonKinesis kinesis = ...; kinesis.createStream( "testStream" , /* シャード数 */ 5 ); |
Streamの情報取得
Streamの情報を取得します。 今回このメソッドはStreamが出来たかどうかの確認で使っています。
1 2 | AmazonKinesis kinesis = ...; kinesis.describeStream( "testStream" ); |
Streamの削除
Streamの削除をします。
1 2 | AmazonKinesis kinesis = ...; kinesis.deleteStream( "testStream" ); |
Streamにデータを登録
1 2 3 4 5 6 7 8 | AmazonKinesis kinesis = ...; PutRecordRequest recordRequest = new PutRecordRequest(); recordRequest.setStreamName( "testStream" ); recordRequest.setData(ByteBuffer.wrap( "some_data" .getBytes(StandardCharsets.UTF_8))); recordRequest.setPartitionKey( "some_partition_key" ); kinesis.putRecord(recordRequest); |
基本的なテストを書いてみる
一連の流れをテストコードにして見ます。 LocalStackではあらかじめ決められたポートでそれぞれのサービスが立ち上がります。 本ブログの以下の記事やgithubのページなどを見てご確認ください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | // guavaのThreadFactoryBuilderを使っています ThreadFactory factory = new ThreadFactoryBuilder().setDaemon( true ) .build(); ExecutorService executorService = Executors.newFixedThreadPool( 10 , factory); @Test public void test() throws Exception { // LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。 System.setProperty( "com.amazonaws.sdk.disableCbor" , "1" ); // LocalStack向けのSetupです AWSStaticCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider( new BasicAWSCredentials( "dummy" , "dummy" )); // LocalStack向けのSetupです KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( "testApp" , "testStream" , dummyProvider, "testWorker" ) .withInitialPositionInStream(InitialPositionInStream.LATEST) // LocalStack向けのSetupです AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration( "http://localhost:4568" , null )) .build(); kinesis.createStream( "testStream" , 5 ); // streamが作成されるまで待つ while ( true ) { try { kinesis.describeStream( "testStream" ); break ; } catch (ResourceNotFoundException e) { } } String key = RandomStringUtils.randomAlphanumeric( 10 ); String data = "test-data-" + key; PutRecordRequest recordRequest = new PutRecordRequest(); recordRequest.setStreamName( "testStream" ); recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))); recordRequest.setPartitionKey(key); kinesis.putRecord(recordRequest); kinesis.deleteStream( "testStream" ); } |
ちなみにこのテスト、たまに落ちるはずです。 これについては後ほど直していきます。
KCLでStreamからレコードを取り出して処理する
KCL(Kinesis Client Library)でStreamからレコードを順次取り出して処理していくコード書いて見ます
IRecordProcessorFactoryというインターフェースの実装とFactoryが返却するIRecordProcessorの実装を行います。
とりあえずざっとラムダ式と匿名クラスで書きました。
ログに出力するだけのProcessorです。ProcessorがレコードをConsumeします。また、ProcessorはStreamの
processBarrier
とbarrier
については後ほど説明します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | IRecordProcessorFactory factory = () -> { return new IRecordProcessor() { @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { log.info( "test: shutdown" ); } @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { log.info( "test: processRecords" ); records.stream() .forEach(r -> log.info(r.toString())); try { processBarrier.await(); } catch (Exception e) { throw new AssertionError(e); } } @Override public void initialize(String shardId) { log.info( "test: initialize {}" , shardId); try { barrier.await(); } catch (Exception e) { throw new AssertionError(e); } } }; }; |
シナリオ的なテストにしてみる
先ほどのテストにProcessorのコードを追加して動作を確認して見ます。 また、CyclicBarrierを使ってタイミングによってテストが失敗しないようにして見ます。 CyclicBarrierのコードの参考は以下の記事です。 Spring AOP+CyclicBarrierを活用してSpring Bootアプリ上での楽観ロックのテスト条件を確実に整える
以下のコードが全ての処理をざっとまとめたコードです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | ThreadFactory factory = new ThreadFactoryBuilder().setDaemon( true ) .build(); ExecutorService executorService = Executors.newFixedThreadPool( 10 , factory); @Test public void test() throws Exception { // LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。 System.setProperty( "com.amazonaws.sdk.disableCbor" , "1" ); CyclicBarrier barrier = new CyclicBarrier( 6 ); CyclicBarrier processBarrier = new CyclicBarrier( 2 ); IRecordProcessorFactory factory = () -> { return new IRecordProcessor() { @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { log.info( "test: shutdown" ); } @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { log.info( "test: processRecords" ); records.stream() .forEach(r -> log.info(r.toString())); try { processBarrier.await(); } catch (Exception e) { throw new AssertionError(e); } } @Override public void initialize(String shardId) { log.info( "test: initialize {}" , shardId); try { barrier.await(); } catch (Exception e) { throw new AssertionError(e); } } }; }; // LocalStack向けのSetupです AWSStaticCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider( new BasicAWSCredentials( "dummy" , "dummy" )); // LocalStack向けのSetupです KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( "testApp" , "testStream" , dummyProvider, "testWorker" ) .withInitialPositionInStream(InitialPositionInStream.LATEST) // LocalStack向けのSetupです AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration( "http://localhost:4568" , null )) .build(); kinesis.createStream( "testStream" , 5 ); // streamが作成されるまで待つ while ( true ) { try { kinesis.describeStream( "testStream" ); break ; } catch (ResourceNotFoundException e) { } } String key = RandomStringUtils.randomAlphanumeric( 10 ); String data = "test-data-" + key; // Processorの準備 Worker worker = new Worker.Builder() .config(config) .recordProcessorFactory(factory) .metricsFactory( new NullMetricsFactory()) .execService(executorService) .build(); executorService.submit(worker); // Processor全てが初期化されるのを待ちます。Processorはシャードの数だけ作成されます。 barrier.await(); PutRecordRequest recordRequest = new PutRecordRequest(); recordRequest.setStreamName( "testStream" ); recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))); recordRequest.setPartitionKey(key); kinesis.putRecord(recordRequest); // ProcessorがRecordを処理するのを待ちます。 processBarrier.await(); kinesis.deleteStream( "testStream" ); // Processorが処理をしているスレッド群を終了 executorService.awaitTermination( 10 , TimeUnit.SECONDS); } |
Streamの作成や削除などのsetup, teardownで本来書くような内容も同じ場所に書きましたが
この辺は@Rule
や@Before
などのjunitで拡張したりすれば
もう少し綺麗になるかと思われます。
まとめ
いかがだったでしょうか? 今回初めてKCLを使ってKinesisからデータを取得するような処理を書いて見ましたが LocalStackを使うことで非常に簡単にテストをすることができました。 テストコードは少し汚いですが。
このコードは業務で使われないかもしれない。
この記事が日の目を見ることがあると良いですね。
今回このテストを書いたリポジトリは以下のリポジトリです。 kinesis-sandbox
今回はLocalStackを使ったのですが 元々はkinesaliteを使おうかなと思っていました。 今度はkinesaliteを使ってみようかなと思います。