Batch版flentd、embulkでリベンジ!

  • #技術ブログ 

たまには、間髪いれずに更新もするんですよ!
とたまたま調子よく進むとすぐ調子に乗る、相変わらずの半人前うえピーです。

今回は、embulkの導入手順、Salesforceからデータを取得するためのプラグイン設定及びテスト実行までの軌跡です。

では、早速。
とりあえず、embulkは公式手順(こちら)通りにインストールすればOKです。
※Javaはインストール済み前提。私は、2017/09/05時点最新をインストールしました。
ホームディレクトリに環境作るのには、賛否あるみたいですが、今回は手順通り進めました。

$ cd ~
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

よし、問題なしと。

このまま、進めても芸がないので、Gemfileでプラグイン管理できるようにしてみたいと思います。
※余裕があるのでちょっと頑張る、うえピーです。

$ cd ~
$ embulk mkbundle embulk_bundle
$ vim embulk_bundle/Gemfile
gem 'プラグイン' で導入したいプラグインを追記します。
今回は、すぐに使いそうな、以下を追記しました。
gem 'embulk-output-command'
gem 'embulk-input-mysql'
gem 'embulk-output-mysql'

Gemfile作ったら、以下のコマンド実行。上手くプラグイン入るかな?

$ cd embulk_bundle
$ embulk bundle

OK、エラーなし。いやー、今回順調。

gemで公開されているプラグインにSalesforceのインプット用がなかったため、今回は、「mikoto2000」さんにGitで公開頂いているプラグイン(こちら)を使用します。
mikoto2000さん有難うございます、便利に使わせて頂いています!

まずは、Gitからプラグインをzipでダウンロード。それを~/embulk_bundleにアップロードします。
後は、コマンドを叩くだけ。

$ cd ~/embulk_bundle
$ unzip embulk-input-salesforce_bulk-master.zip
$ cd embulk-input-salesforce_bulk-master
$ ./gradlew gem

よしっ、インストール成功したようです。

早速、テストyml書いてみます。こんな感じで書いてみました。

$ vi ~/uehara_work/test_salesforce.yml
こっから、ymlの内容です。とりあえず、テストなので標準出力に出してます。
in:
  type: salesforce_bulk
  userName: 秘密
  password: 秘密
  authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
  objectType: Account
  pollingIntervalMillisecond: 5000
  querySelectFrom: SELECT Id,Name,LastModifiedDate FROM Account
  queryOrder: Name desc
  columns:
  - {type: string, name: Id}
  - {type: string, name: Name}
  - {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
  startRowMarkerName: LastModifiedDate

out:
  type: stdout

これで大丈夫かな。じゃあ実行!!

$ cd ~/uehara_work
$ embulk run -L ../embulk_bundle/embulk-input-salesforce_bulk-master/ ./test_salesforce.yml

どうだ・・・・

2017-09-06 18:23:14.916 +0900: Embulk v0.8.31
2017-09-06 18:23:22.791 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/salesforce_bulk from a load path
2017-09-06 18:23:22.848 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=6 / output tasks 3 = input tasks 1 * 3
2017-09-06 18:23:22.858 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-09-06 18:23:23.139 +0900 [INFO] (0013:task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2017-09-06 18:23:23.883 +0900 [INFO] (0013:task-0000): Login success.
2017-09-06 18:23:23.883 +0900 [INFO] (0013:task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account ORDER BY Name desc'

おっログインサクセス、頑張れー。

~省略~
IDは秘密,20170523060809 ほにゃほにゃ株式会社,2017-05-23T09:08:10.000UTC
IDは秘密,20170516101618 もじもじ株式会社,2017-05-16T01:16:18.000UTC
IDは秘密,20170516101314 がみがみ株式会社,2017-05-16T01:13:15.000UTC
2017-09-06 18:23:30.422 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-09-06 18:23:30.433 +0900 [INFO] (main): Committed.
2017-09-06 18:23:30.433 +0900 [INFO] (main): Next config diff: {"in":{"start_row_marker":"2017-09-05T08:31:23.000Z"},"out":{}}

やりぃ、バッチぐー!!!

次回は、MySQLにぶち込んで、何かと結合したいと思います。
アディオス!!