Rubyによるリアルタイム通知

投稿日 / 投稿者名

2018.8.1 / 堀江 亮佑

はじめに

本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。
Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、利用できない可能性があることをあらかじめご了承ください。
なお、作成にあたり、以下バージョンを用いています。

難易度 ★★


## 概要

本セクションでは、リアルタイム通知APIを用いた実装についてご紹介します。
longpollingの実装によって、コマンドライン上よりメジャーメントの通知をリアルタイムに受け取ります。
longpollingを用いた実装は扉の開閉のリアルタイム検知などセンサからの通知をリアルタイムで受け取りたい場合に有効です。


前提条件



所要時間(目安)

この手順に沿って、longpollingを実装する場合の所要時間。

つくるもの

今回はリアルタイム通知APIを用いてデバイスのメジャーメント情報をリアルタイムに通知するコードを実装します。 まずシミュレーターを作成し、このシミュレーターのセンサのメジャーメントをリアルタイムに通知するコードの実装を行います。 

1. シミュレーター作成

シミュレーターを作成するに当たって、シミュレーターの作成をする権限を自身のユーザーに付与する必要があります。 そこでまずは権限を作成し、その権限を自身のユーザーに付与しましょう。

管理->ロール->ロールを追加->タイプ->simulatorという項目を選択しシミュレーターという名前で保存してください。



ユーザーから自身のユーザーを選択しグローバルロールに先ほど作成したシミュレーターを追加してください。
これでシミュレーターの作成を行えるようになります。続いてデバイス管理に切り替え、 シミュレーター->新しいシミュレーター->温度計測で新しく温度計測シミュレーターを作成し実行中にしてください。




2. ハンドシェイク

ここからlongpollingの実装に入ります。ThingsCloudにおけるハンドシェイクでは、ハンドシェイク、チャネル接続、接続の3つの手順を踏みます。まずはハンドシェイクをrubyを用いて実装していきます。

require 'net/https'
require 'uri'
require 'json'
require 'Base64'

#client_idを格納する変数
@client_id
#httpヘッダー
header = {
  'Authorization' => 'Basic ' + Base64.urlsafe_encode64("[yourtenantname]/[yourusername]:[yourpasswd]"),
     'Content-Type' => 'application/json'

}

####ハンドシェイク
#http周り
uri = URI('https://[yourtenantpath]/cep/realtime')
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = 1
http.verify_mode = OpenSSL::SSL::VERIFY_NONE

req = Net::HTTP::Post.new(uri.request_uri, header)
#bodyのパラメータ

params = [{"version"=>"1.0", "minimumVersion"=>"0.9", "channel"=>"/meta/handshake", "supportedConnectionTypes"=>["long-polling"], "advice"=>{"timeout"=>60000, "interval"=>0}}]
req.body = JSON.pretty_generate(params)
#リクエスト送信
res = http.request(req)
if res.is_a?(Net::HTTPSuccess)
 responses = JSON.parse(res.body)
 @client_id = responses[0]["clientId"]
else
  puts res.value
end                                                                                                                                                                                                                                                                            


以下のrubyの標準ライブラリを用いて実装を行っています。 ``` require 'net/https' require 'uri' require 'json' require 'Base64' ```

ここでpostする際のヘッダー情報を設定しています。basic認証で用いる認証情報の設定などを行っていて、ここの"[yourtenantname]/[yourusername]:[yourpasswd]"にはテナントのログインに用いるユーザー名、パスワードを入れてください。

header = {
  'Authorization' => 'Basic ' + Base64.urlsafe_encode64("[yourtenantname]/[yourusername]:[yourpasswd]"),
     'Content-Type' => 'application/json'

}

次にhttpリクエストに関する設定をしています。`uri`には自身のテナント名を含むuriを設定してください。
#http周り
uri = URI('https://[yourtenantpath]/cep/realtime')
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = 1
http.verify_mode = OpenSSL::SSL::VERIFY_NONE

req = Net::HTTP::Post.new(uri.request_uri, header)

そしてparamsにhttpリクエストのbodyに格納する内容を代入します。

params = [{"version"=>"1.0", "minimumVersion"=>"0.9", "channel"=>"/meta/handshake", "supportedConnectionTypes"=>["long-polling"], "advice"=>{"timeout"=>60000, "interval"=>0}}]
req.body = JSON.pretty_generate(params)

最後にpostリクエストを送ります。リクエストに成功すると@client_idにこのリアルタイム通信におけるclientidが格納されます。 clientidとはlongpollingを用いた会話を識別するためのidです。チャネル接続や、そのあとの通知はこのclientidを用いて通信を行います。

res = http.request(req)
if res.is_a?(Net::HTTPSuccess)
 responses = JSON.parse(res.body)
 @client_id = responses[0]["clientId"]
else
  puts res.value
end            

3. チャネル

require 'net/https'
require 'uri'
require 'json'
require 'Base64'

#中略

params = [{"channel" => "/meta/subscribe",
           #温度シミュレーター
           "subscription" => "/measurements/[systemid]",
           "clientId"=>@client_id,
}]

req.body = JSON.pretty_generate(params)
#リクエスト送信
res = http.request(req)

if res.is_a?(Net::HTTPSuccess)
 p JSON.parse(res.body)
else
  puts res.value

end


チャネル接続はparamsの中身のみ変更してpostリクエストを再送することでできます。

params = [{"channel" => "/meta/subscribe",
           #温度シミュレーター
           "subscription" => "/measurements/[systemid]",
           "clientId"=>@client_id,
}]

システムIDは`デバイス管理->デバイス一覧->システムID`で確認できます。



4. 接続

最後に接続し通知の受け取りを行います。

require 'net/https'
require 'uri'
require 'json'
require 'Base64'

#中略

params = [{"channel" => "/meta/connect",
           "connectionType" => "long-polling",
           "clientId"=>@client_id,
}]

req.body = JSON.pretty_generate(params)
#リクエスト送信
while 1
  res = http.request(req)
  if res.is_a?(Net::HTTPSuccess)
    p JSON.parse(res.body)
  else
    puts res.value

  end
end                   

接続では以下のように`params`を設定します。
params = [{"channel" => "/meta/connect",
           "connectionType" => "long-polling",
           "clientId"=>@client_id,
}]

そしてループ処理の中で、httpリクエストを送り、通知を受け取り再度送り直すという手順を繰り返し行っています。

while 1
  res = http.request(req)
  if res.is_a?(Net::HTTPSuccess)
    p JSON.parse(res.body)
  else
    puts res.value

  end
end      

実行結果

これらの処理をターミナル上で実行すると以下のように出力されます。

[{"ext"=>{"ack"=>true}, "minimumVersion"=>"1.0", "clientId"=>"[yourClientId]", "supportedConnectionTypes"=>["websocket", "smartrest-long-polling", "long-polling"], "channel"=>"/meta/handshake", "version"=>"1.0", "successful"=>true}]
[{"channel"=>"/meta/subscribe", "subscription"=>"/measurements/[systemid]", "successful"=>true}]
[{"data"=>{"realtimeAction"=>"CREATE", "data"=>{"time"=>"2018-07-11T09:07:06.121+09:00", "id"=>"1790811", "self"=>"http://[yourTenant]/measurement/measurements/[systemid]", "source"=>{"id"=>"[systemid]", "self"=>"http://[yourTenant]/inventory/managedObjects/[systemid]"}, "type"=>"c8y_Temperature", "c8y_Temperature"=>{"T"=>{"unit"=>"℃", "value"=>6.6913060636}}}}, "channel"=>"/measurements/[systemid]", "id"=>"50677779"}, {"advice"=>{"interval"=>0, "timeout"=>5400000, "reconnect"=>"retry"}, "channel"=>"/meta/connect", "successful"=>true}]
[{"data"=>{"realtimeAction"=>"CREATE", "data"=>{"time"=>"2018-07-11T09:07:11.126+09:00", "id"=>"1790817", "self"=>"http://[yourTenant]/measurement/measurements/1790817", "source"=>{"id"=>"[systemid]", "self"=>"http://[yourTenant]/inventory/managedObjects/[systemid]"}, "type"=>"c8y_Temperature", "c8y_Temperature"=>{"T"=>{"unit"=>"℃", "value"=>8.0901699437}}}}, "channel"=>"/measurements/[systemid]", "id"=>"50677834"}, {"channel"=>"/meta/connect", "successful"=>true}]
[{"data"=>{"realtimeAction"=>"CREATE", "data"=>{"time"=>"2018-07-11T09:07:16.133+09:00", "id"=>"1790822", "self"=>"http://[yourTenant]/measurement/measurements/1790822", "source"=>{"id"=>"[systemid]", "self"=>"http://[yourTenant]/inventory/managedObjects/[systemid]"}, "type"=>"c8y_Temperature", "c8y_Temperature"=>{"T"=>{"unit"=>"℃", "value"=>9.1354545764}}}}, "channel"=>"/measurements/[systemid]", "id"=>"50677904"}, {"channel"=>"/meta/connect", "successful"=>true}]

ハンドシェイクの応答

最初にハンドシェイクを行うためその応答が返ってきます。

[
  {
    "ext"=>{
      "ack"=>true
      },
    "minimumVersion"=>"1.0", "clientId"=>"[
      yourClientId
    ]",
    "supportedConnectionTypes"=>[
      "websocket",
      "smartrest-long-polling",
      "long-polling"
    ],
    "channel"=>"/meta/handshake", "version"=>"1.0", "successful"=>true
  }
]

チャネル接続の応答

その次にチャネル接続に対する応答が返ってきます。

[
  {
    "channel"=>"/meta/subscribe",
    "subscription"=>"/measurements/[systemid]", "successful"=>true
  }
]

接続の応答

これ以降は接続の応答が続きます。以下のような構造になっており、今回の温度シミュレーターの場合はc8y_Temperatureというオブジェクト中のTという項目に温度の単位であるunitとその値であるvalueが返ってきます。

[
  {
    "data"=>{
      "realtimeAction"=>"CREATE",
       "data"=>{
         "time"=>"2018-07-11T09:07:16.133+09:00", "id"=>"1790822",
         "self"=>"http://[yourTenant]/measurement/measurements/1790822",
           "source"=>{
             "id"=>"[systemid]",
             "self"=>"http://[yourTenant]/inventory/managedObjects/[systemid]"
           },
         "type"=>"c8y_Temperature",
         "c8y_Temperature"=>{
           "T"=>{
             "unit"=>"℃", "value"=>9.1354545764
           }
         }
       }
    },
    "channel"=>"/measurements/[systemid]", "id"=>"50677904"
  },
  {
    "channel"=>"/meta/connect", "successful"=>true
  }
]