追記型ストレージFasterLogについて

始めに

Microsoftは、ローカルで使えるストレージライブラリのFASTERというものを開発している。
このFASTER、最初はキーバリューストアであるFasterKVのみ提供していた。しかし、バージョン2019.10.31.1より、FasterLogという、データの追記に特化した機能が追加された。
少し触ってみると、今自分が実現したい機能を丁度良くカバーしてそうなので、使い方や注意点を書いておく。

FasterKVは論文も発表されているので、特徴や理論的な裏付け等が知りたければそちらも参照のこと。

今回紹介するFasterLogは、FasterKVで使用しているストレージ機能のベース部分を利用した、追記と範囲検索に特化したものとなる。

なお、FasterLogは今もなおインターフェイスの更新が行われており、この記事で書かれていることと微妙に差異があるかもしれないので注意が必要。
この記事では2019.11.18.1をベースに解説を行う。

特徴

  • データはディスクに保存される
  • 扱えるデータ型はバイト配列のみ
  • 以下の操作が可能
    • 追記
    • データの範囲検索
    • 過去のある時点までのレコード削除
  • 特定レコードの削除は不可
  • 途中にデータを挿入することは不可

使い方

プロジェクトへの導入

普通にnugetパッケージとして公開されているので、PackageReference等で追加すればOK。

ストレージデバイスインスタンスを作成する

まず、以下のコードでログを格納するディスク領域を作成する。

// using FASTER.core;
// 独自ストレージを使用する場合、FASTER.core.IDeviceを実装したインスタンスを代わりに生成する
// ファイルパスのみ必須。
IDevice logDevice = Devices.CreateLogDevice("[格納するファイルのパス]");

なお、deleteOnCloseというオプション引数が使用可能だが、これを使用すると、後でデータを開こうとした時に挙動がおかしくなるので設定しないこと。
ここで生成したIDeviceは、必ずプログラム終了時にlogDevice.Close()すること。
IDisposeは実装してないので、try-finallyで囲むと良い。

FasterLogインスタンスを生成する

ストレージデバイスインスタンスを生成したら、以下のようにして、FasterLogインスタンスを生成する。

// using FASTER.core;
// LogDeviceのみ必須
var fls = new FasterLogSettings()
{
    LogDevice = logDevice
};
using(var fl = new FasterLog(fls))
{
    // 処理
    // flインスタンスはプロセス内で使い回すこと
}

FasterLogSettingsのその他項目について

FasterLogSettingsのその他項目は以下のようになる。

  • PageSizeBits
    • 格納データページの基本単位
    • 各レコードは、メタデータと実データが必ず同じページに収まるように格納される
      • つまり、一レコード当たりの最大サイズがこの値に依存して決定される
      • ファイルの空間効率にも影響するので、サイズ設定は慎重に
    • 単位は何ビット使用するかなので、例えば8を指定したら2^8 = 256(bytes)となる
    • 初期値は22(2^22 ≒ 4MB)
  • MemorySizeBits
    • オンメモリに乗せる最大データ容量
    • 単位はPageSizeBitsと同じ
    • 初期値は23(2^23 ≒ 8MB)
    • 最低でもPageSizeBits+1必要で、下回るとFasterLogインスタンス生成時にエラーが出る
  • SegmentSizeBits
    • オンメモリに乗らないデータを格納するファイルのサイズ
    • 初期値は30(=1GB)
  • LogCommitManager
    • トランザクションを管理する
    • リカバリ等も担当
  • LogCommitFile
    • デフォルトのLogCommitManagerを使う場合に使用するトランザクションファイルのベースパス
  • GetMemory
    • データ読出しの時に呼ばれるコールバック関数
  • LogChecksum

生成されるファイル

FasterLogをnewした時点で、以下のファイルが生成される

  • ログセグメントデータ: [CreateLogDeviceで指定したファイル].[0始まり数字]
    • データ本体が格納される
    • 初期状態の場合、2^[SegmentSizeBits]bytesのサイズをアロケートしようとするので注意(デフォルト1GB)
    • ログデータが2^[SegmentSizeBits]を超えるたびに、新しくセグメントファイルを作成する
  • トランザクションファイル: [CreateLogDeviceで指定したファイル].commit
    • 論理的なアドレスの開始点、終点が書き込まれる
    • サイズは常に36bytes
    • FasterLogSettings.LogCommitFileで生成パスを変更可能

データを追加する

データの追加は以下のように行う

// FasterLog fl;
// byte[] data;
// ReadOnlySpan<byte>でも可
// Enqueueの時点ではまだ永続化はされない
long recordAddress = fl.Enqueue(data);
// コミットデータの永続化
fl.Commit(true);

Commitした時点でディスクに書き出される。
戻り値として、追加したデータの論理アドレスが取得できる。

データを読み出す

データの読み出しは、C#8.0とそれより前でやり方が異なる。

共通事項

下記のように、FasterLog.Scan([開始アドレス], [終端アドレス])を使用する。

// FasterLog fl;
// fl.Scan([論理開始アドレス], [論理終端アドレス])という風にして指定する
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    // enumeratorで走査
}

ここでいう開始アドレスと終端は、全てのログを見るならFasterLog.CommittedBeginAddressFasterLog.CommittedUntilAddressをそれぞれ指定すると良い。
全てのログを見たくない場合は、終端アドレスに、[開始アドレス] + [適当なバイト数]を指定する。

走査中に、現在見ているログの論理アドレスを知りたい場合は、FastLogScanIterator.CurrentAddress、次のエントリのアドレスを知る場合は、FastLogScanIterator.NextAddressを使用する。

非同期(C#8.0)

IAsyncEnumerableを使用する。

// FasterLog fl;
// fl.Scan([論理開始アドレス], [論理終端アドレス])という風にして指定する
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    // dataの型はbyte[]
    // lenはデータ長(bytes)
    // 2019.11.18では二つだけだが、最新ソースでは更に long currentAddress も追加になる模様
    // https://github.com/microsoft/FASTER/commit/bf657635374873958d96b31db1299b58ef9a17b1
    await foreach(var (data, len) in iter.GetAsyncEnumerable())
    {
        // データの参照
    }
}

FastLogScanIterator.GetAsyncEnumerable(CancellationToken ct = default)では、foreachの要素に単純にnewされたbyte[]を受け取るが、代わりにSystem.Buffers.MemoryPool<byte>のインスタンスを渡すと、メモリプール経由でバッファの確保を行うため、アロケーションが減らせる。
ただし、IMemoryOwner<byte>自体のアロケーションは避けられないため、ゼロではない。
また、使い終わったIMemoryOwner<byte>はDisposeを行わないと、メモリリークの原因になる。

非同期(C#7.x以前)

FastLogScanIterator.[GetNext,NextAddress,WaitAsync]等を駆使する

// FasterLog fl;
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    while(iter.NextAddress >= [終端アドレス])
    {
        // データを取り出すまでループと待機を行う
        // 引数は、データ本体、データ長、現在のアドレスの三つ
        while(!iter.GetNext(out var entry, out var length, out var currentAddress))
        {
            if(iter.NextAddress >= [終端アドレス])
            {
                break;
            }
            await iter.WaitAsync();
        }
        // データの参照
    }
}

データの削除

データの削除には、FasterLog.TruncateUntil(long untilAddress)またはFasterLog.TruncateUntilPageStart(long untilAddress)を使用する。

TruncateUntil

TruncateUntilデータの開始アドレス(BeginAddress)から、指定されたアドレス直前までのデータを削除するという挙動である。
注意点として、レコード境界ではない中途半端なアドレスを指定すると、次回のScan時にエラーが出るという仕様がある。

回避するには、末尾アドレス(FasterLog.TailAddress)を指定するか、下記のようにScanの途中で得たNextAddressで、正確なレコード境界を取得して指定するというやり方がある

// FasterLog fl;
long untilAddress = 0;
using(var iter = fl.Scan(fl.BeginAddress, [終端]))
{
    await foreach(var x in iter.GetAsyncEnumerable())
    {
        // 処理
        untilAddress = iter.NextAddress;
    }
}
fl.TruncateUntil(untilAddress);
// 最後にCommitすると変更が反映される
fl.Commit(true);

より安全に、かつ大雑把に消したい場合は、後述のTruncateUntilPageStartを使用する

TruncateUntilPageStart

TruncateUntilPageStartデータの開始アドレスから、指定されたアドレスに紐づくページの直前までを消去するという挙動である。
レコードはページをまたぐことは仕様上ないため、TruncateUntilで起こったような問題は起きない。
ただし、正確な消去はできないため、大雑把にログローテーション等をしたい場合に使うと良いだろう

パフォーマンス上の注意点

追記、削除する時は、操作後にコミットを行い永続化する必要があるが、コミットはFasterLogの中で最もコストのかかる処理だという事を念頭に置いた方が良い。しかし、コミットしないとデータが永続化されないので、信頼性が下がる。悩ましいところである。
つまり、より高速にデータを処理したい場合は、信頼性を下げずにいかにコミット回数を減らすかということを考える。

ではどうすればいいか。以下のようなやり方を一例として示そうと思う。

Commitタスクを独立させる

箇条書きにすると以下のような動作になる

  • 追記タスクを並列化する
  • 追記タスクは、自分の書き込みが完了するまでWaitForCommitAsyncで待機する
  • 追加で、一つだけひたすらコミットだけするタスクを作成する
  • 追記タスクは、コミットタスクに自分が追記したレコードのアドレスを渡す
  • コミットタスクは、現在のコミット済み終端アドレスと受け取ったレコードアドレスを比較して、未コミットと判断したら、コミットを行う

タスク間のデータ受け渡しは、System.Threading.Channelsが使えると思う。

追加されたデータが永続化されるまで待機する

さて、このやり方では、追記とコミットが別タスクで行われる状態になる。しかし、要件によっては、データ消失を可能な限り避けるため、自分で追加したデータが、確実に永続化されたかどうか確認する必要が出てくる。
そこで、FasterLog.WaitForCommitAsync(long address, CancellationToken ct)を使用する。
引数にはFasterLog.Enqueueで得たアドレスを指定する。
これを使うと、指定されたアドレスがコミットされたと判断されるまで(address <= CommittedUntilAddressになるまで)待機が発生する。

コード例

具体的には、少々長くなるが以下のようなコードになる。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using FASTER.core;
using System.IO;
using System.Linq;

namespace fasterlabs
{
    static class FasterLogCommitTest
    {
        static long EnqueueData(long value, FasterLog fl)
        {
            Span<long> data = stackalloc long[1];

            if (fl.TryEnqueue(System.Runtime.InteropServices.MemoryMarshal.AsBytes(data), out var logicalAddress))
            {
                return logicalAddress;
            }
            else
            {
                return -1;
            }
        }
        public static async ValueTask DoTest(int TaskNum)
        {
            const long TotalCount = 100000;
            // ensure using cleared data 
            if (File.Exists("logcommittest.log.0"))
            {
                File.Delete("logcommittest.log.0");
            }
            if (File.Exists("logcommittest.log.commit"))
            {
                File.Delete("logcommittest.log.commit");
            }
            var log = Devices.CreateLogDevice("logcommittest.log");
            var channel = Channel.CreateUnbounded<long>();
            using (var fl = new FasterLog(new FasterLogSettings() { LogDevice = log }))
            {
                var sw = new System.Diagnostics.Stopwatch();
                sw.Start();
                using (var csrc = new CancellationTokenSource(1000 * 240))
                {
                    await Task.WhenAll(
                        Task.WhenAll(Enumerable.Range(0, TaskNum).Select(async idx =>
                        {
                            long logicalAddress = 0;
                            try
                            {
                                for (int i = 0; i < TotalCount / TaskNum; i++)
                                {
                                    logicalAddress = EnqueueData(i + idx * TotalCount, fl);
                                    await channel.Writer.WriteAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
                                    await fl.WaitForCommitAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
                                    // Console.WriteLine($"{idx}, {i}, {logicalAddress}");
                                }
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine($"producer error({idx}, {logicalAddress}, {fl.CommittedUntilAddress}, {fl.TailAddress}): {e}");
                            }
                            // Console.WriteLine($"exit producer({idx}, {sw.Elapsed})");
                        })).ContinueWith(t => channel.Writer.Complete()),
                        Task.Run(async () =>
                        {
                            int commitCount = 0;
                            try
                            {
                                while (true)
                                {
                                    if (!await channel.Reader.WaitToReadAsync(csrc.Token).ConfigureAwait(false))
                                    {
                                        break;
                                    }
                                    while (channel.Reader.TryRead(out var untiladdr))
                                    {
                                        if (fl.CommittedUntilAddress <= untiladdr && fl.CommittedUntilAddress != fl.TailAddress)
                                        {
                                            fl.Commit(true);
                                            // await fl.CommitAsync(csrc.Token).ConfigureAwait(false);
                                            commitCount++;
                                        }
                                    }
                                }
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine($"consumer error:{e}");
                            }
                            Console.WriteLine($"exit consumer({commitCount})");
                        }).ContinueWith(t =>
                        {
                            if(fl.CommittedUntilAddress != fl.TailAddress)
                            {
                                Console.WriteLine($"last commit");
                                fl.Commit(true);
                            }
                        })
                    ).ConfigureAwait(false);
                    sw.Stop();
                    Console.WriteLine($"Multi({TotalCount}, {TaskNum}): {sw.Elapsed}, iops = {(TotalCount * 1000) / sw.Elapsed.TotalMilliseconds}");
                }
            }
            log.Close();
        }
    }
}

バージョン2019.11.18.1より前のバージョンでは、CommitAsyncするとWaitForCommitAsyncとスレッドプールの消費が競合して、デッドロック状態になる場合があるので注意すること。(該当github issue)

終りに

今回はログ向けストレージ機能を持つFasterLogを紹介した。実際制約もあるので、あらゆる場面で使用できるわけではないが、それでも他には無い特徴を持っているため、役に立つ場面では役に立つと思われる。
機会があれば、コミット回数や並列数を変えて性能テスト等を行ってみたい。

また、近く大幅なPRが来る予定なので、その辺りが来たらまたこの記事を更新したい。

参考リンク

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account