Madogiwa Blog

主に技術系の学習メモに使っていきます。

並列データ転送ツールembulkの実行環境(Docker)とサンプルを作ったので使い方とかをメモ📝

業務でちょっとembulkを触る機会があって、いろいろ触ってみたかったのでDockerを使った実行環境とサンプルを使って少しわかってきたので使い方とか概要をメモしておきます📝

embulkとは?

embulkとは下記で記載しているように、色々なファイルとかデータベースからデータを転送する機能を提供するサービスです🐬

Embulk is a parallel bulk data loader that helps data transfer between various storages, databases, NoSQL and cloud services. https://github.com/embulk/embulk

実際の業務では日々大量に発生するアプリケーションのログデータをredshiftにembulkを使って連携してredashで分析するといったこと使われているのかなと思います。

ざっくりとした概要は下記のブログに記載されているのが分かりやすかったです 🙇‍♂️

frsyuki.hatenablog.com

また基本的なコードはjRubyで書かれているようですので、頑張ればコードも読めそうですね💎

github.com

embulkの実行環境を作る

今回はとりあえずembulkを動かせるようにDockerを使ってembulkの実行環境とmysqlpostgresqlの環境が立ち上げられるようにしてみました。

github.com

下記コマンドを実行していただければ、とりあえずembulkの実行環境が立ち上がるはずです🙌

$ git clone https://github.com/Madogiwa0124/embulk_samples.git
$ cd embulk_samples
$ docker-compose up -d
$ docker-compose run embulk --version
  embulk 0.9.19

実行環境の構成

今回作成した実行環境の概要は下記の通りです 🐳

  • embulk
    • openjdkのJava8のDockerをもとにして作成
    • embulk本体のinstallとembulkプラグインのinstall
    • entry pointの指定(docker-compose コンテナ名でembulkのプロセスが起動されるように)
  • mysql
    • mysqlの最新版(連携テスト用)
  • postgresql

mysqlに関してはembulkでの実行時に認証周りでエラーが発生したので、confdをマウントしてデフォルトの認証設定をmysql_native_passwordに変更しています。(mysql 8.0.4から変わったようです。)

[ERROR] (0001:transaction): Operation failed (0:null)
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: java.sql.SQLException: Unable to load authentication plugin ‘caching_sha2_password’

参考: Docker で MySQL 8.0.4 を使う - Qiita

とりあえず動かしてみる

embulkを動かすためには、入力(in)と出力(out)が定義された設定ファイルを用意する必要があります。

下記はcsvファイルからcsvファイルに出力する例です。inに入力となる対象の設定を、outに出力となる対象の設定を記載します。(execには実行時の設定を記載します。)

exec:
  min_output_tasks: 1
in:
  type: file
  path_prefix: '/work/embulk/inputs/files/sample.csv'
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: "NULL"
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    default_timezone: 'Asia/Tokyo'
    columns:
    - {name: id, type: long}
    - {name: account, type: string}
    - {name: time, type: string}
    - {name: purchase, type: string}
    - {name: comment, type: string}
out:
  type: file
  path_prefix: '/work/embulk/outputs/samples.csv'
  file_ext: csv
  formatter:
    type: csv
    charset: UTF-8
    delimiter: ','
    newline: CRLF
    quote: '"'
    escape: '\'

※今回はデータ量が少なく、空ファイルが出来てしまうのでexecの中でmin_output_tasksを1にしてページ分散をさせないようにしています。

詳しい設定内容は、embulk公式のドキュメントで設定項目等の詳しい記載があります。

Configuration — Embulk 0.8 documentation

またmysqlpostgresql等への連携はプラグインを使って行います、他にも様々なプラグインがembulkのorganization配下にはたくさんあるので、自分の扱いたいサービス等があるか見てみると良さそうです🙋

github.com

プレビューで試す

それでは、実際に試してみましょう。今回はシンプルですが先程の設定ファイルを使ってcsvからcsvに連携してみます。連携するファイルは下記です。

id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,embulk filter
4,11270,2015-01-29 11:54:36,20150129,embulk plugin

https://github.com/Madogiwa0124/embulk_samples/blob/master/embulk/inputs/files/sample.csv

まずは実際に連携処理を実行する前にembulkにはpreviewという設定ファイルの内容を確認する機能があるので、それを試してみます👀

$ docker-compose run embulk preview ./embulk/configuers/csv_to_csv_config.yml

+---------+----------------+---------------------+-----------------+----------------+
| id:long | account:string |         time:string | purchase:string | comment:string |
+---------+----------------+---------------------+-----------------+----------------+
|       1 |          32864 | 2015-01-27 19:23:49 |        20150127 |         embulk |
|       2 |          14824 | 2015-01-27 19:01:23 |        20150127 |   embulk jruby |
|       3 |          27559 | 2015-01-28 02:20:02 |        20150128 |  embulk filter |
|       4 |          11270 | 2015-01-29 11:54:36 |        20150129 |  embulk plugin |
+---------+----------------+---------------------+-----------------+----------------+

このように実際に出力される内容が確認出来ます、問題なさそうだったら実際に実行してみましょう!

実行する

実際に実行する場合はrunを実行します🚀

$ dc run embulk run ./embulk/configuers/csv_to_csv_config.yml

2019-11-17 03:24:17.983 +0000 [INFO] (0012:task-0000): Writing local file '/work/embulk/outputs/samples.csv000.00.csv'
2019-11-17 03:24:18.053 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2019-11-17 03:24:18.067 +0000 [INFO] (main): Committed.
2019-11-17 03:24:18.068 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/work/embulk/inputs/files/sample.csv"},"out":{}}

embulk/outputs配下にsamples.csv000.00.csvというファイルが作成され以下のように入力のファイルと同様の内容が記載されているはずです🙌

id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,embulk filter
4,11270,2015-01-29 11:54:36,20150129,embulk plugin

リポジトリに入っているその他のサンプル

今回作成したリポジトリには他にも下記のようなサンプルが入っています。

気になるものがあれば実行コマンド等をReadmeに記載しているので、試してみてください🙋

github.com

おわりに

今回はデータ連携まわりの機能を持つembulkの実行環境と簡単な使い方を記載していました。

実際のサービス運用では開発だけじゃなくて、運用改善のためのデータ分析基盤の理解等も必要になってくると思うので今回embulkを触ってよかったかなと思います🐬

参考

qiita.com

qiita.com

teratail.com

qiita.com