CyberAgent, Inc. AdTech Studio / データエンジニア
データ処理基盤
# データ処理基盤の開発 Spark Streamingを用いてKafkaからデータを読み出し、 1) スキーマによるデータのバリデーション 2)重複レコードの排除 3) event timeによるパーティショニング 4) Parquet形式への変換 を行い、HDFSに保存するデータパイプラインの開発を行った.. 自身が担当したパートは、次の通りです. ## スキーマの互換性に基づくデータ管理 データのスキーマにはApache Avroのものを採用していました. Avroでは各フィールドにnullabilityやデフォルト値を設定できるため、有効なレコードを柔軟に規定することができます.この情報に基づきデータをバリデーションすることで、無効なレコードをフィルタリングし、有効なデータのみ正規の場所に保存されるようにしました. またAvroはスキーマとデータが完全に別になっているという点で、大量にデータを収集するというコンテキストではJSONやProtocol Buffersと比べても効率の良いフォーマットです.ただし、その恩恵を十分受けるには、スキーマ自体を転送する回数を減らす必要があります. (JSONのように全てのレコードにスキーマを載せると、サイズが無駄に大きくなるため) そこで、IDベースでスキーマを一限管理するためにスキーマレジストリを開発しました. これにより、各レコードにはスキーマのIDを含めるだけでよくなるので、全てのレコードにスキーマを載せる場合と比べてかなり効率が改善されました. さらにスキーマレジストリでは、スキーマの互換性を管理しており、後方互換性が保たれている範囲ごとにデータを固めて保存する仕組みをつくりました.これにより、Data Where Houseへ利用時に互換性のないデータ同士を読み出そうとしてロードエラーになるということを防止することができます. 詳細は下記のSlideの通りです. https://www.slideshare.net/seiyamizuno35/serialization-systems 上記のスキーマレジストリはOSSとしても公開しており、Docker imageとしてDockerHub上でも公開しております.また、Helmのchartを用意してあるので、Kubernetes上で簡単に動作させることができるようにしてあります. https://github.com/cyberagent/typebook ## S3 => Kafkaへのデータ転送 多様なデータソースからの取り込みを可能にするため、S3からKafkaにデータを転送するアプリケーションの開発をしていました.こちらは1日に約2TB程のGZIPデータを取り込まなければならず、処理性能が求められたため、Actorモデルによる並列実装がしやすいAkka Streamsを採用していました. アプリケーションの実行はDockerコンテナで行っていたのですが、単コンテナで上記のトラフィックを捌ききるのは厳しかったため、S3 Event Notification => SQS => Akka Streams => KafkaというようにSQSをロードバランサとして使うことにより高負荷に対してもスケール可能な状態にしました. 詳細は下記のSlideの通りです. https://www.slideshare.net/seiyamizuno35/connect-s3-with-kafka-using-akka-streams