Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!
LocalStackでKinesis Firehose検証用のコード書いています。齋藤です。
それでは今回はLocalStackでKinesis Firehoseを動かしてみたいと思います。 Firehoseの連携先はS3です。 ちなみに最後の方にも書いたのですが、LocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。
今回書いたコードはgithub上に置いています。
LocalStackでKinesis Firehoseの連携先S3バケットを作成する
S3のバケットを作成するためにはAmazonS3オブジェクトが必要になってきます。 dd以下のコードでLocalStackに向けたS3クライアントをセットアップします。
1 2 3 4 5 6 | AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));AmazonS3 s3 = AmazonS3ClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null)) .withPathStyleAccessEnabled(true) // これがないとエラーになります。 .build(); |
バケットの作成を以下のコードで行います。
1 | s3.createBucket("test"); |
これでひとまず、LocalStackで使うS3のバケットのセットアップは終了です。
LocalStackでKinesis FirehoseのDelivery Streamを作成します。
LocalStackに向けたKinesis Firehoseのクライアントの作成は以下のコードで行います。
1 2 3 4 5 | AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));AmazonKinesisFirehose firehose = AmazonKinesisFirehoseClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null)) .build();; |
先ほど作成したFirehoseのクライアントを使って 以下のコードでDelivery Streamを作成します。
1 2 3 4 5 6 7 8 | firehose.createDeliveryStream( new CreateDeliveryStreamRequest() .withDeliveryStreamName("testStream") .withS3DestinationConfiguration( new S3DestinationConfiguration() .withBucketARN("arn:aws:s3:::test") // 先ほど作ったS3のバケット名でS3のARNを設定します。 .withPrefix("firehose/") .withRoleARN("arn:aws:iam::dummy:role/dummy"))) // LocalStackにはIAM関連のAPIはないみたいなのでdummy roleです。 |
RecordをPutしてS3に連携されていることをザッと確認してみます。
RecordをPutします。 このコードは都元ダイスケ氏の記事から持ってきました。
1 2 3 4 5 6 | firehose.putRecord( new PutRecordRequest() .withDeliveryStreamName("testStream") .withRecord( new Record() .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))))); |
S3からlistObjectを使ってKinesisによって連係されていることを確認します。 アサーションはassertjを使っています。
1 2 3 4 | assertThat(s3.listObjects("test") .getObjectSummaries()).anySatisfy(summary -> { assertThat(summary.getKey()).startsWith("firehose/"); }); |
S3のお片付け
S3ってobjectがあると削除できないんですね・・・・。初めて知りました。 先にObjectを削除してからBucketを削除します。
1 2 3 4 5 6 7 8 9 10 11 12 | @Afterpublic void tearDown() { firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream")); s3.listObjects("test") .getObjectSummaries() .forEach(s -> { s3.deleteObject(s.getBucketName(), s.getKey()); }); s3.deleteBucket("test");} |
なお、CLIだとforce deleteできるようです。 ログを見た感じだと中で似たようなことをやっていそうですね。
aws --endpoint http://localhost:4572 s3 rb s3://test --force
delete: s3://test/firehose/28fe77a2-d0ef-4d39-9d6a-1a576709b922
delete: s3://test/firehose/2e4038af-7997-4735-8e2f-2e5f69b7fb88
delete: s3://test/firehose/3038ba8b-df66-46f9-9398-4d7e3ac7182f
delete: s3://test/firehose/8c1720be-e3dd-4faa-b068-d254434c00fe
delete: s3://test/xxxxx
remove_bucket: test
今回書いたコード
今回書いたコードはgithub上に置いています。 こちらにも貼り付けておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | public class FirehoseTest { private AmazonKinesisFirehose firehose; private AWSCredentialsProvider dummyProvider; private AmazonS3 s3; @Before public void setup() { System.setProperty("com.amazonaws.sdk.disableCbor", "1"); // LocalStackが対応していないプロトコルを使わないようにする dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));; firehose = AmazonKinesisFirehoseClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null)) .build();; s3 = AmazonS3ClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null)) .withPathStyleAccessEnabled(true) .build();; } @Test public void test() { s3.createBucket("test"); firehose.createDeliveryStream( new CreateDeliveryStreamRequest() .withDeliveryStreamName("testStream") .withS3DestinationConfiguration( new S3DestinationConfiguration() .withBucketARN("arn:aws:s3:::test") .withPrefix("firehose/") .withRoleARN("arn:aws:iam::dummy:role/dummy"))); while (true) { try { firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName("testStream")); break; } catch (ResourceNotFoundException e) { } } firehose.putRecord( new PutRecordRequest() .withDeliveryStreamName("testStream") .withRecord( new Record() .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))))); assertThat(s3.listObjects("test") .getObjectSummaries()).anySatisfy(summary -> { assertThat(summary.getKey()).startsWith("firehose/"); }); } @After public void tearDown() { firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream")); s3.listObjects("test") .getObjectSummaries() .forEach(s -> { s3.deleteObject(s.getBucketName(), s.getKey()); }); s3.deleteBucket("test"); }} |
まとめ
いかがだったでしょうか。
今回はKinesis FirehoseをLocalStackを使って動かしてみました。 最近LocalStackを使って検証しているのですが、LocalStack非常に便利ですね!
~今度はElasticsearchをDestinationにした記事を書くつもりです。~
と思ったらLocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。 どうしようテスト。 Firehoseに投げるテストをするだけならS3に連携しておけばいいかなって気もしますね。
辛い 辛くない
Issue報告したら直りました。(というかAPIがなかったみたい)
AWS CLIでLocalStack上のdelivery-streamを削除しようとしたらエラーが返ってきます。
今回自分はDockerでLocalStackを使っているので
--rmオプションを使ってdocker runしたりやdocker rmしたりして環境を真っさらにしています。。。