リアルタイム処理

概要

Things Cloud では高レベルなリアルタイムプロセス言語を基盤にしたリアルタイム IoT ビジネスロジックを走らせることが可能です。このページでは、リアルタイムプロセスの基本コンセプトを紹介し、 Things Cloud 上でどのように独自のビジネスロジックを走らせれば良いのか説明していきます。

リアルタイムプロセスのインターフェースについては参照ガイド、 "イベント言語""イベントスクリプト"をご覧ください.

Things Cloud 上でのリアルタイムプロセスとは?

Things Cloud にはデバイス、または他ソースからのデータを受け取りユーザー定義のビジネスロジックを処理するリアルタイム・エンジンが含まれています。このユーザー定義ビジネス・ロジックは新データの通知、受信データを使用したデータベースの作成(例:センサーの閾値を越えるとアラーム送信)、デバイスのトリガーやメールの送信などを含みます。このロジックは IoT リアルタイムデータ用の高レベルイベント言語で作成します。

イベント言語構造

イベント言語 は下記の例のようなステートメントで作成されます。そして、モジュールとしてグループ化され展開されます。モジュールは Things Cloud アプリケーションの一部として展開することができます。 Things Cloud 管理アプリケーションとしても編集可能です。REST APIを通じて開発者はユーザー・フレンドリーなドメイン特定ビジネスロジックウィザードを開発することができます。例えば、自宅自動化アプリの開発者はすぐに使用可能な温度計センサーの閾値をトリガーに室内温度制御をするウィザードを作成することができます。

上記の図は Things Cloud のもう一つの特徴:リアルタイムプロセス専用データの送信の一例です。"transient"というデータは Things Cloud のデータベースには保存されず、リアルタイム・エンジンでのみ使用されます。これは保存容量を低減しプロセスコストの削減にもつながります。例えば、高レゾリューションを持続せずに複数のデータをリアルタイムで追跡をすることができるのです。

リアルタイムプロセスのメリットとは?

Things Cloud のリアルタイムプロセスのメリットは下記になります:

  • 遠隔センサーでどんなイベントにもすぐに反応可能。
  • 高度な対話型IoTアプリ開発が可能。
  • ソフトウェア開発の必要がなくIoTのユースケースを直接 Things Cloud 内で走らせることができ、さらに Things Cloud にホスティングも管理も任せることが可能。
  • 多種デバイスに渡り独自のビジネスルールに従ったデータの確認、正常化、引き出しが可能。
  • イベントを基にしたトリガーの自動遠隔制御が可能。
  • time windows や joins のようなストリーム指向のビジネス・ロジックの使用が可能。
  • 長期記憶用データのみ持続保存しデバイスのオンライン追跡コストの削減が可能。

イベント言語とは?

イベント言語 はSQL言語と構文的に類似しています。しかし、SQLでは論理的に固定されたデータベース上でステートメントを走らせ結果を出し削除しますが、 Things Cloud ではステートメントを流れてくる入力データ(入力イベント)上に連続的に走らせ、出力結果(出力イベント)を連続的に計算しています。

下記では、特定の温度より高い温度データを連続的にセンサーから抽出しています:

select *
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

ここの、 MeasurementCreated とはシステム内に作成されたイベントを含む計測データの流れを指します。SQLと同じようにイベント内のsubsetを選択するのにwhereが使用されています。getNumber()はイベント内の数値を読むための機能となります。  こちらの例では、イベントはe、MeasuremenCreatedイベントとそのプロパティは"c8y_TemperatureMeasurement.T.value"、単位は温度センサーの摂氏となります。(センサー・ライブラリをご参照ください)。

イベント言語からの派生データはどのように作成したらいいのか?

あらかじめ定義されたオペレーションを実施できる特別なストリームをシステムは提供しています。(例:データベースへの保存・メールを通じてデータの送信など)CreateAlarmはこの特別なストリームの一つで、 Things Cloud 内にアラームを保存することができます。例えば、温度が高すぎるとすぐに発生するアラームが必要な場合、下記のようなstatementで実行可能になります:

insert into CreateAlarm
select
 e.measurement.time as time,
 e.measurement.source.value as source,
 "c8y_TemperatureAlert" as type,
 "Temperature too high" as text,
 "ACTIVE" as status,
 "CRITICAL" as severity
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

技術的に説明すると、このステートメントは温度センサーが100℃を超える度に新しい"AlarmCreated" イベントを作成し"CreateAlarm"出力ストリームへ投入しています。()内のプロパティ名と"AlarmCreated"のプロパティ名は同一でなければなりません。(イベント言語リファレンスをご覧ください。).

イベント言語からどのようにデバイスを制御すればよいですか?

Things Cloud の遠隔制御は派生データの一つとなります。例えば、デバイス上でオペレーションを走らせたい場合、デバイスをターゲットにした新しいオペレーションを作成します。下記では温度計の温度によるスイッチの動きになります:

insert into CreateOperation
select
"PENDING" as status,
<<heating ID>> as deviceId,
{
"c8y_Relay.relayState", "CLOSED"
} as fragments
from MeasurementCreated e
where getNumber(e, "c8y_TemperatuのreMeasurement.T.value") > 100
  • heating ID はトリガーとなる温度IDのプレースホルダーとなります。
  • fragments はオペレーションのコンテンツを定義します。ここでは"c8y_Relay" を "CLOSED"とします。

fragments の構文は{}に囲われたプロパティ名と値の羅列となります:{"key1", "value1", "key2", "value2", ...}。

イベント言語からどのようにデータを問い合わせればよいですか?

進行中のイベントのプロセスの一部として時々関連情報を Things Cloud データベースに問い合わせすることがあります。これは Things Cloud 内の問い合わせ方法で対応しています。下記は特定の顧客に一時間ごとに自販機の合計売上を測定する場合の例になります。顧客データが購買ごでトリガーされる売上レポートに直接含まれていない為、 Things Cloud のデータベースから抽出する必要があります:

create window SalesReport.win:time_batch(1 hour)  
(
    event com.cumulocity.model.event.Event,
    customer com.cumulocity.model.ManagedObject
)

insert into SalesReport
select
    e.event as event,
    findOneManagedObjectParent(e.event.source.value) as customer
from EventCreated as e

insert into CreateMeasurement
select
    "total_cust_trx",
    "customer_trx_counter",
    {
        "total", count(*),
        "customer_id", sales_report.customer.id.value
    }
from SalesReport as sales_report
group by sales_report.customer.id.value

上記ではまず集計のためにデータを一時間保存するバッチ・ウィンドウを作成しています:入力イベントとイベントソースである親マネージドオブジェクトをウィンドウ内に保存します。これは自販機アプリケーションのデータモデルと通信し、 Things Cloud 内では、売上レポート=イベント、自販機=ソース、そして顧客=自販機の親マネージドオブジェクト、として表されます。

最後に売上レポートの集計はSQLのようなシンタックスを使用して"insert into CreateMeasurement..."を通して計算され、メジャーメントとして保存されます。ただしSQLとの違いにご注意ください:SQLでは固定された最新データベースのコンテンツを使用して計算されますが、イベント言語ではステートメントは無限に亘るので時間ウィンドウで集計を限定する必要があります。

Things Cloud ではどのようにリアルタイムプロセスは実施されていますか?

最初に記したように、 Things Cloud にはAPIリクエストをするのに2つのプロセス・モードが存在します:persistenttransientです。 "persistent"モードはデフォルトのモードになります: Things Cloud データベースにデータを保存し、リアルタイム・エンジンへデータを送信します。保存と送信を終えてから Things Cloud はリクエストの結果を出します。

"transient" モードではリアルタイム・エンジンにデータを送信するだけで、非同期的にすぐに結果を出します。このモードはリアルタイムで特定のデータを監視するのに効果的です。

例えば、運転中の車両の位置情報は常時監視する必要がありますが、レポートは毎分保存する程度にしたいとします。下記でそれが可能となります:

insert into CreatedEvent
select * from EventCreated e
where getObject(e, "c8y_LocationUpdate") is not null
output first every 60 seconds

また、別のオプションとして60更新毎に出力をしたい場合は以下のようになります:

insert into CreatedEvent
select * from EventCreated e
where getObject(e, "c8y_LocationUpdate") is not null
output first every 60 events