データ指向アプリケーションデザイン | 第11章 ストリーム処理

発表者: キム・ギョンチョル、キム・ミンギュ

ストリームとは、時間の経過とともに少しずつ生成されるデータである。ストリーム処理は、データを集めて後で処理するバッチ処理とは異なり、イベントが発生するたびに処理する。イベントストリームは、増分処理できる時系列のビジネスイベントである。

イベントストリームの転送

イベントには、特定の時点で起きた事柄の詳細が含まれ、通常は時計に基づくイベント発生タイムスタンプを持つ。イベントの例には、ページ閲覧や商品購入などのユーザー行動、センサー値、CPUメトリクス、Webサーバーログの各行がある。

イベントはテキスト、JSON、バイナリデータとして符号化できる。ファイル、リレーショナルテーブル、ドキュメントデータベースへ書き込むことも、別ノードで処理するためネットワーク経由で送ることもできる。

プロデューサ、パブリッシャ、送信者はイベントを作る。コンシューマ、サブスクライバ、受信者はイベントを処理する。ストリームシステムでは、関連するイベントをトピックまたはストリームにまとめる。

バッチ処理では、1日分のデータを1日の終わりに処理することがある。ストリーム処理では、新しいイベントを継続的に確認して処理し、最後に処理した位置からの進捗も追跡する。

メッセージングシステム

メッセージングシステムは、新しいイベントをコンシューマへ通知する。プロデューサはイベントを含むメッセージを送り、コンシューマはそれを受け取って処理する。

最も単純な方法は、UnixパイプやTCP接続のような直接通信チャネルである。メッセージングシステムはこれを一般化し、多数のプロデューサが同じトピックへメッセージを送り、多数のコンシューマがそこからメッセージを受け取れるようにする。

publish/subscribeシステムは、プロデューサがコンシューマの処理速度より速く送る場合の扱いを決めなければならない。

  • メッセージを捨てる。
  • キューにバッファする。
  • バックプレッシャ、つまりフロー制御を適用してプロデューサを遅くする。

ノードが故障したり一時的にオフラインになったりした場合の扱いも決める必要がある。耐久性には、メッセージをディスクへ書く、複製する、またはその両方が必要である。耐久性を避ければスループットとレイテンシは改善できるが、アプリケーションによってはメッセージ損失が許容できない。

直接メッセージング

一部のシステムは、中間ブローカーなしでプロデューサからコンシューマへ直接メッセージを送る。UDPマルチキャストはレイテンシに敏感な金融システムで使われ、アプリケーションレベルのプロトコルで失われたパケットを回復できる。ZeroMQやnanomsgは、TCPまたはIPマルチキャスト上でpublish/subscribeメッセージングを提供する。StatsDのようなメトリクスシステムはUDPメッセージングを使う。プロデューサがコンシューマのHTTPまたはRPCエンドポイントを直接呼ぶこともできる。

直接メッセージングの制約は、メッセージ損失やオフラインのコンシューマをアプリケーション側で扱わなければならない点である。コンシューマが利用できない間に送られたメッセージは失われる可能性がある。

メッセージブローカー

メッセージブローカーは、プロデューサからメッセージを受け取り、コンシューマへ配送する代替手段である。メッセージストリームに最適化された一種のデータベースと見なせる。プロデューサとコンシューマは、ブローカーへクライアントとして接続する。

ブローカーはクライアント状態の変化を吸収する。メッセージをディスクへ書けば、ブローカー障害後も失われにくい。コンシューマが遅い場合、ブローカーはメッセージをバッファできるが、キューが増えるとメモリ、ディスク使用量、性能の問題が出る。

データベースと比べると、ブローカーは通常、配送または確認応答後にメッセージを削除する。保存データに対する任意のクエリではなく、逐次的なメッセージ配送に最適化されている。

メッセージブローカーにはRabbitMQ、ActiveMQ、Qpid、HornetQ、TIBCO Enterprise Message Service、IBM MQ、Azure Service Bus、Google Cloud Pub/Subなどがある。JMSはメッセージの送信、受信、読み取りのためのJava APIである。AMQPはメッセージ指向ミドルウェア向けのオープン標準アプリケーション層プロトコルである。

複数コンシューマ

一般的なパターンは2つある。

  • ロードバランシング: 各メッセージを1つのコンシューマへ配送し、作業を複数ノードに分散する。
  • ファンアウト: 各メッセージをすべてのコンシューマへ配送し、独立した処理パイプラインを可能にする。

コンシューマグループは両方の考え方を組み合わせる。各グループはすべてのメッセージを受け取るが、グループ内では各メッセージを1つのノードだけが処理する。

確認応答と再配送

ブローカーがメッセージをコンシューマへ配送した後、コンシューマが処理前に失敗することがある。損失を防ぐため、コンシューマは処理成功を明示的に確認応答する。タイムアウトや切断までに確認応答を受け取れなければ、ブローカーはメッセージを再配送する。

再配送は順序を変える可能性がある。あるコンシューマがm3の処理中に失敗し、別のコンシューマがすでにm4を処理している場合、m3は後で再配送され、m4の後に処理されることがある。

パーティション化されたログ

従来のメッセージブローカーは、メッセージを一時的なものとして扱う。配送と確認応答の後、メッセージは削除され復旧できない。これに対し、データベースやファイルは誰かが削除するまでデータを保持する。

ログベースのブローカーは、メッセージをディスク上の追記専用列として保存する。プロデューサはログ末尾へメッセージを追記する。コンシューマはログを順に読み、末尾に到達したら新しいメッセージの通知を待つ。

スループットを上げるため、ログはパーティション化できる。各パーティションは別のマシンで提供できる。パーティション内では、ブローカーが各メッセージへ増加するオフセットを割り当てる。順序はパーティション内で保証され、パーティション間では保証されない。

Apache Kafka、Amazon Kinesis、Twitterの分散ログはこの方式を使う。非常に多くのメッセージを処理でき、メッセージを複製することで障害に耐えられる。

ログ方式と従来型メッセージングの比較

ログベースメッセージングは、コンシューマが独立してログを読み、読んでもメッセージが削除されないため、ファンアウトを支援する。ロードバランシングは通常、グループ内のコンシューマへパーティションを割り当てることで行う。

利点は高スループットと再生可能性である。欠点は、1つのグループ内のコンシューマ数がパーティション数に制限されること、遅いコンシューマが遅れ続ける可能性があることである。

コンシューマオフセット

コンシューマが1つのパーティションを順に処理するなら、進捗はオフセットで表せる。各メッセージの確認応答を追跡する必要はない。これによりオーバーヘッドが減り、バッチ処理とパイプライン化が可能になる。

オフセットはデータベースレプリケーションのログシーケンス番号に似ている。コンシューマが失敗した場合、グループ内の別のコンシューマが最後にコミットされたオフセットから再開できる。失敗したコンシューマがメッセージを処理したがオフセットをコミットしていなければ、そのメッセージは再処理される可能性がある。

ディスク領域と遅いコンシューマ

ログベースのブローカーは通常、一定期間またはサイズ上限までメッセージを保持する。コンシューマが大きく遅れて必要なメッセージが削除されると、別のデータ源なしには追いつけない。そのため運用ではコンシューマラグの監視が必要である。

古いメッセージの再生

AMQPやJMS型のブローカーでは、メッセージを確認応答すると通常は削除される。ログベースのブローカーでは、メッセージを消費してもログは変わらない。コンシューマはオフセットを戻して古いメッセージを再生できる。これはデバッグ、派生データの再構築、新しいストリームプロセッサの導入に役立つ。

データベースとストリーム

データベースとストリームは、データベース変更ログがイベントのストリームであるという点でつながっている。レプリケーションログ、write-ahead log、change data captureはいずれも変更を順序付きレコードとして公開する。

システムを同期させる

多くのアプリケーションは、データベース、キャッシュ、検索インデックス、分析システム、データウェアハウスなど複数のシステムにデータを持つ。アプリケーションが2つのシステムへ別々に書き込むdual writeは、途中で失敗して不整合を残すことがある。

Change data capture(CDC)は、データベースから変更ストリームを派生させ、それを他システムへ適用することでこの問題を解決する。データベースが真実の源となり、コンシューマが変更ストリームから派生システムを更新する。

イベントソーシング

イベントソーシングは、すべての変更を不変イベントとして保存する。現在状態だけを保存するのではなく、その状態に至ったイベント列を記録する。現在状態はイベントを再生することで再構築できる。

イベントソーシングは監査可能性や派生ビューと相性がよいが、古いイベントを理解できる状態に保つ必要があるため、スキーマ進化とイベント設計には注意が必要である。

状態、ストリーム、不変性

可変状態は、不変イベントのストリームを適用した結果と見なせる。イベントログを保持すれば、状態の再構築、新しい派生ビューの作成、バグからの復旧が可能になる。完全な履歴が不要になった場合は、コンパクションで各キーの最新値だけを残せる。

ストリーム処理

ストリーム処理は、アラート送信、ダッシュボード更新、検索インデックス構築、キャッシュ更新、不正検知、イベント拡張、分析計算など多くの用途に使える。

ストリームプロセッサは入力ストリームを消費し、出力ストリームを生成するかデータベースを更新する。時間、結合、障害、exactly-onceな効果をどう扱うかを決めなければならない。

ストリーム処理の用途

複合イベント処理はイベントストリーム内のパターンを探す。ストリーム分析はレート、パーセンタイル、移動集計などのメトリクスを計算する。マテリアライズドビューの維持は、ソースデータの変化に合わせて派生データを最新に保つ。

時間の扱い

イベントは遅れて到着したり順不同で到着したりする。イベント時刻は実際にイベントが起きた時刻であり、処理時刻はプロセッサがそのイベントを見る時刻である。ウィンドウ処理はイベントを時間でまとめるが、遅延イベントによってウィンドウを閉じる判断が難しくなる。システムはウォーターマークや許容遅延を使い、正確性とレイテンシのバランスを取る。

Window

ストリーム結合

ストリーム処理では、ストリームとテーブル、2つのストリーム、またはストリームと変化するテーブルを結合することがある。

Stream-table join

ストリーム同士の結合では、後から到着する対応イベントを結合できるように、両側の最近のイベントを保持する必要がある。

Stream-stream join

耐障害性

バッチ処理は入力ファイルが不変なので、失敗したタスクを再試行しやすい。ストリーム処理は継続的に動作し、副作用を持つことがあるため難しい。

at-least-once処理ではイベントが複数回処理されることがある。at-most-once処理ではイベントが失われることがある。exactly-once処理は通常、再生可能なログ、チェックポイント、決定的処理、冪等な書き込み、入力オフセットと出力書き込みの間のトランザクションによって実装される。

まとめ

ストリーム処理は、継続的に発生するデータを第一級の入力として扱う。メッセージングシステムはイベントを配送し、ログベースのブローカーはそれを保持して再生可能にし、データベース変更ストリームは派生システムを同期させる。主な設計課題は、配送保証、順序、保持、時間の扱い、結合、状態管理、耐障害性である。