Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!

Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!

LocalStackでKinesis Firehose検証用のコード書いています。齋藤です。

それでは今回はLocalStackでKinesis Firehoseを動かしてみたいと思います。 Firehoseの連携先はS3です。 ちなみに最後の方にも書いたのですが、LocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。

今回書いたコードはgithub上に置いています

LocalStackでKinesis Firehoseの連携先S3バケットを作成する

S3のバケットを作成するためにはAmazonS3オブジェクトが必要になってきます。 dd以下のコードでLocalStackに向けたS3クライアントをセットアップします。

1
2
3
4
5
6
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
AmazonS3 s3 = AmazonS3ClientBuilder.standard()
    .withCredentials(dummyProvider)
    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
    .withPathStyleAccessEnabled(true) // これがないとエラーになります。
    .build();

バケットの作成を以下のコードで行います。

1
s3.createBucket("test");

これでひとまず、LocalStackで使うS3のバケットのセットアップは終了です。

LocalStackでKinesis FirehoseのDelivery Streamを作成します。

LocalStackに向けたKinesis Firehoseのクライアントの作成は以下のコードで行います。

1
2
3
4
5
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
AmazonKinesisFirehose firehose = AmazonKinesisFirehoseClientBuilder.standard()
  .withCredentials(dummyProvider)
  .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
  .build();;

先ほど作成したFirehoseのクライアントを使って 以下のコードでDelivery Streamを作成します。

1
2
3
4
5
6
7
8
firehose.createDeliveryStream(
  new CreateDeliveryStreamRequest()
    .withDeliveryStreamName("testStream")
    .withS3DestinationConfiguration(
      new S3DestinationConfiguration()
        .withBucketARN("arn:aws:s3:::test") // 先ほど作ったS3のバケット名でS3のARNを設定します。
        .withPrefix("firehose/")
        .withRoleARN("arn:aws:iam::dummy:role/dummy"))) // LocalStackにはIAM関連のAPIはないみたいなのでdummy roleです。

RecordをPutしてS3に連携されていることをザッと確認してみます。

RecordをPutします。 このコードは都元ダイスケ氏の記事から持ってきました。

1
2
3
4
5
6
firehose.putRecord(
  new PutRecordRequest()
    .withDeliveryStreamName("testStream")
    .withRecord(
      new Record()
        .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));

S3からlistObjectを使ってKinesisによって連係されていることを確認します。 アサーションはassertjを使っています。

1
2
3
4
assertThat(s3.listObjects("test")
  .getObjectSummaries()).anySatisfy(summary -> {
    assertThat(summary.getKey()).startsWith("firehose/");
  });

S3のお片付け

S3ってobjectがあると削除できないんですね・・・・。初めて知りました。 先にObjectを削除してからBucketを削除します。

1
2
3
4
5
6
7
8
9
10
11
12
@After
public void tearDown() {
  firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));
 
  s3.listObjects("test")
    .getObjectSummaries()
    .forEach(s -> {
      s3.deleteObject(s.getBucketName(), s.getKey());
    });
 
  s3.deleteBucket("test");
}

なお、CLIだとforce deleteできるようです。 ログを見た感じだと中で似たようなことをやっていそうですね。

aws --endpoint http://localhost:4572 s3 rb s3://test --force

delete: s3://test/firehose/28fe77a2-d0ef-4d39-9d6a-1a576709b922

delete: s3://test/firehose/2e4038af-7997-4735-8e2f-2e5f69b7fb88

delete: s3://test/firehose/3038ba8b-df66-46f9-9398-4d7e3ac7182f

delete: s3://test/firehose/8c1720be-e3dd-4faa-b068-d254434c00fe

delete: s3://test/xxxxx

remove_bucket: test

今回書いたコード

今回書いたコードはgithub上に置いています。 こちらにも貼り付けておきます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class FirehoseTest {
  private AmazonKinesisFirehose firehose;
 
  private AWSCredentialsProvider dummyProvider;
 
  private AmazonS3 s3;
 
 
  @Before
  public void setup() {
    System.setProperty("com.amazonaws.sdk.disableCbor", "1"); // LocalStackが対応していないプロトコルを使わないようにする
    dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));;
    firehose = AmazonKinesisFirehoseClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
      .build();;
 
    s3 = AmazonS3ClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
      .withPathStyleAccessEnabled(true)
      .build();;
  }
 
  @Test
  public void test() {
    s3.createBucket("test");
 
    firehose.createDeliveryStream(
      new CreateDeliveryStreamRequest()
        .withDeliveryStreamName("testStream")
        .withS3DestinationConfiguration(
          new S3DestinationConfiguration()
            .withBucketARN("arn:aws:s3:::test")
            .withPrefix("firehose/")
            .withRoleARN("arn:aws:iam::dummy:role/dummy")));
 
    while (true) {
      try {
        firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName("testStream"));
        break;
      } catch (ResourceNotFoundException e) {
      }
    }
 
    firehose.putRecord(
      new PutRecordRequest()
        .withDeliveryStreamName("testStream")
        .withRecord(
          new Record()
            .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));
 
 
    assertThat(s3.listObjects("test")
      .getObjectSummaries()).anySatisfy(summary -> {
        assertThat(summary.getKey()).startsWith("firehose/");
      });
  }
 
  @After
  public void tearDown() {
    firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));
 
    s3.listObjects("test")
      .getObjectSummaries()
      .forEach(s -> {
        s3.deleteObject(s.getBucketName(), s.getKey());
      });
 
    s3.deleteBucket("test");
  }
}

まとめ

いかがだったでしょうか。

今回はKinesis FirehoseをLocalStackを使って動かしてみました。 最近LocalStackを使って検証しているのですが、LocalStack非常に便利ですね!

~今度はElasticsearchをDestinationにした記事を書くつもりです。~

と思ったらLocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。 どうしようテスト。 Firehoseに投げるテストをするだけならS3に連携しておけばいいかなって気もしますね。

辛い 辛くない

Issue報告したら直りました。(というかAPIがなかったみたい)

AWS CLIでLocalStack上のdelivery-streamを削除しようとしたらエラーが返ってきます。 今回自分はDockerでLocalStackを使っているので --rmオプションを使ってdocker runしたりやdocker rmしたりして 環境を真っさらにしています。。。