CEPスクリプトを書く(基本編)
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
投稿日 / 投稿者名
2018.8.1 / sasa
はじめに
本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。
Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、利用できない可能性があることをあらかじめご了承ください。
なお、作成にあたり、以下バージョンを用いています。
- ver.8.15.8(backend/UI)
難易度 ★★
所要時間
1時間
概要
本レポートでは、リアルタイムイベント処理を行う Things Cloud のイベント言語(CEPスクリプト)についての解説を行います。
イベント言語とは何かの概念、および簡単な例を記述できるようになることを目標にします。
イベント言語を利用することで、複雑な条件で Things Cloud のデータを加工、判定、変更し、メールやデバイスなどに通知することができます。
前提条件
- Things Cloud に関する基本的なデータモデル(ManagedObject, Measurement, Event, Alarm など)について理解していること。
- Java言語およびその基本的なクラスについて知識があること。
Things Cloud における CEP
CEP
CEP(Complex Event Processing)は、複合イベント処理と言われる技術です。 Things Cloud でも次のような形で CEP が使われます。
- 車の速度が 100km/h を超えた状態を30秒間継続したらアラームを発生させる
- 直近2日間の平均気温が17℃を下回ったらデバイスのスイッチをつける
- 特定のエリアにトラッキングデバイスが100台以上入ったら、メールを送信する
どんなもの?
まずは実際のCEPがどんなものか見てみましょう。 以下は、速度が120km/hを超えたら、アラームを発生させるものです。
insert into CreateAlarm
select
"highSpeedAlarm" as type,
CumulocitySeverities.WARNING as severity,
CumulocityAlarmStatuses.ACTIVE as status,
"high speed" as text,
m.measurement.time as time,
m.measurement.source as source
from MeasurementCreated m
where
getObject(m.measurement, "c8y_MotionMeasurement.speed.value") is not null
and getNumber(m.measurement, "c8y_MotionMeasurement.speed.value") >= 120;
何となく、SQLっぽい形をしています。ステートメント内のそれぞれの句の意味や書式は後で説明します。
RDB との違い
処理するデータの対象が違う
- RDB … ストックされたデータ(静的)
- CEP … ストリームデータ(時系列)
CEPでは入力対象のデータ、および結果の出力先は「ストリーム」です。 ストリームは、現在時刻を基準としており、リアルタイムにデータが流れていくイメージです。Things Cloud では CEP はメモリー上で実行されます。
CEP処理の基本形
CEP では、
- ストリームのイベントをトリガー(契機)として、
- 別のストリームに書き込む処理
を行います。
トリガーとなるイベント(入力)元 として、
- ユーザー定義のストリーム
- Things Cloud のデバイス情報更新イベント、センサーデータ(Measurement)書き込みイベントなど、Things Cloud固有のイベントに結び付いた特殊なストリーム
があります。
書き込み(出力)先 は、
- 普通のストリームの場合
- センサーデータ(Measurement)の Things Cloud データベースへの書き込み、やメール送信、という特殊処理と結びついた特別なストリームの場合
があります。
ちなみに上の例では、
- 入力元:MeasurementCreated (Things Cloud のセンサーデータ発生イベント)
- 出力先:CreateAlarm (Things Cloud のアラーム生成(DB書き込み)を行うイベント
で、「センサーデータが発生したら、アラームを書き込む」処理を表しています。
CEPはトリガー(契機)発生時に処理を行い、結果をストリームに書き込みます
基本的な文法
では、さっそく書いていきましょう。
例題 |
---|
Measurement の速度が120km/h を超えたら、highSpeed Eventを発生させる |
Event は Things Cloud 内で管理されているイベントのこととします。CEPのイベントは単独のカタカナ表記とします。
ステートメントの基本形
1つの処理を表すステートメント基本形は以下です。
書式:
insert into <<出力ストリーム>>
select
<<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
<<入力ストリーム>>
where
<<入力ストリームの取得条件>>;
入力ストリーム
Things Cloud データと連携するために、あらかじめ準備されている入力ストリームを紹介します。 入力ストリームを用いることで、Things Cloud 内にセンサーデータが記録されたり、デバイスの状態が変化したといったイベントと記録値を得ることができます。 詳しくは、説明/設定ガイド を参照してください。
対象 | 生成イベント (Created) |
更新イベント (Updated) |
削除イベント (Deleted) |
データ名 |
---|---|---|---|---|
ManagedObject- | ○ | ○ | ○ | managedObject |
Event- | ○ | - | ○ | event |
Measurement- | ○ | - | ○ | measurement |
Operation- | ○ | ○ | - | operation |
Alarm- | ○ | ○ | - | alarm |
たとえば、ManagedObject の更新イベントを表すストリーム名は、ManagedObjectUpdated で、Operation の生成イベントを表すストリーム名は OperationCreated となります。
ManagedObjectUpdated は、ManagedObject が変更されたことをイベントとし、変更内容を得ることができます。また、OperationCreated はデバイスや Things Cloud 画面からオペレーションが作成されたことをイベントとし、どのようなオペレーションなのかを得ることができます。
出力ストリーム
Things Cloud データと連携するために、あらかじめ準備されている出力ストリームを紹介します。 出力ストリームを利用することで、Things Cloud に Measurement データを書き込んだり、デバイスの情報を書き換えたり、オペレーションを発行したりすることができます。
詳しくは、説明/設定ガイド を参照してください。
対象 | 生成処理 (Create) |
更新処理 (Update) |
削除処理 (Delete) |
データ名 |
---|---|---|---|---|
-ManagedObject | ○ | ○ | ○ | managedObject |
-Event | ○ | - | ○ | event |
-Measurement | ○ | - | ○ | measurement |
-Operation | ○ | ○ | - | operation |
-Alarm | ○ | ○ | - | alarm |
たとえば、ManagedObject の生成処理を行いたい場合、CreateManagedObject ストリームに書き込み、Event の削除処理を行いたい場合、DeleteEvent ストリームに書き込みます。
その他のストリーム
- SendMail (出力ストリーム)
- 外部にメール送信するときに使います
データ形式
CEP で使われるステートメントで、イベントのようなデータは Java オブジェクトで表されます。オブジェクトには型があります。 入力ストリームで使うデータ型はここ 、出力ストリームで使うデータ型はここ を参照してください。
実践
例題 |
---|
Measurement の温度が9℃を超えたら、type=nineEvent のEventを発生させる |
insert into <<出力ストリーム>>
select
<<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
<<入力ストリーム>>
where
<<入力ストリームの取得条件>>;
- 問1.出力ストリームは?
- 問2.入力ストリームは?
以下のようになったと思います。
insert into CreateEvent
select
<<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
MeasurementCreated
where
<<入力ストリームの取得条件>>;
select は、以下のような形式で記述します。
(項目1に設定したい値) as 出力ストリームのデータ項目1,
(項目2に設定したい値) as 出力ストリームのデータ項目2,
:
:
設定したい値は通常定数か入力ストリームから取得しますが、入力ストリームのイベントを格納する変数は from 句で MeasurementCreated m
のように指定します。(変数 m に入力イベントのJavaオブジェクトが入ります)
出力ストリームのデータ型を調べて、データ項目を事前に把握してください。出力ストリームの各データ項目に何を設定するか自由なところもありますが、一例を以下に示します。
select
"nineEvent" as type,
"温度が9℃を超えました" as text,
m.measurement.time as time,
m.measurement.source as source
最後は where 句です。 温度が9℃を超えたら は以下のように記述します。
where
getNumber(m.measurement, "c8y_Temperature.T.value") > 9;
getNumber(a, b) は fragment の値を取り出す関数です。a に Java オブジェクト、b にデータのパスを指定します。この例では、measurement オブジェクトの “c8y_Temperature” の “T” の “value” の値を取得しています。
できあがった CEP スクリプト
insert into CreateEvent
select
"nineEvent" as type,
"温度が9℃を超えました" as text,
m.measurement.time as time,
m.measurement.source as source
from
MeasurementCreated m
where
getNumber(m.measurement, "c8y_Temperature.T.value") > 9;
getNumber の返却型は Java 言語での java.lang.Number(のサブクラスの java.math.BigDecimal) です。getNumber(m.measurement, “c8y_Temperature.T.value”).intValue() のような Java のメソッドを利用できます。
Things Cloud に登録してみましょう。
- Things Cloud にログイン
- 「管理アプリケーション」にスイッチ
- 左のメニューから「イベント処理」をクリック
- 「新しいモジュール」をクリック
- スクリプト名を適当につけます
- 「ソースコード」にスクリプトを貼り付け、デプロイ状態で保存
シミュレーターで試す
さっそく作ったCEPスクリプトを動かしてみましょう。動かすには、温度データを出力するデバイスが必要ですが、ここでは Things Cloud の機能として準備されているシミュレーターを利用します。
シミュレーターを利用すると、Things Cloud での 1 デバイスとしてカウントされ、課金が発生しますので、ご注意ください。
アカウントの権限を追加する
権限追加操作には admin 権限が必要なため、ない場合は Things Cloud のシステム管理者に依頼してください。
まず、シミュレーターを利用するロールを作成します。
- 「管理」アプリケーションを起動してください
- 左のメニューから「ロール」を選択
- 「ロールを追加」をクリック
- 上の赤枠の「新しいロール」を「Simulator」などに改名(改名後、右のチェックをクリックするのを忘れないでください)
- 「Simulator」の管理者にチェックして「保存」
次に、利用ユーザーに作成したロールを追加します。
- 「管理」アプリケーションから
- 左のメニューから「ユーザー」を選択
- 権限を付与したいユーザーを選択
- グローバルロールの中に先ほど作成した「Simulator」ロールがあるので、チェック
シミュレーターを設定します。
- 「デバイス管理」アプリケーションから
- 左のメニューから「シミュレーター」を選択
- プリセット「温度計測」
- シミュレーター名「温度」
- インスタンス数「1」
- 「次へ」
- スイッチを入れるとシミュレーターが動作開始します
- デバイス一覧から先ほど作成したシミュレーターを選択します
- 「計測値」を選択し、シミュレーターがデータを生成していることを確認します
CEPが動作しているか確認する
別のウィンドウで、Things Cloud にログインします。
- 「管理」アプリケーションから
- 左のメニューの「イベント処理」を選択
- 先ほど作ったイベント処理ルールを選択します
- 「通知」タブを開きます
- 「通知」タブに処理されたイベントが表示されます
うまく行かない場合?
- デバイスシミュレーターは動作していますか?
- イベント処理のスクリプトは「デプロイ済み」になっていますか?
- 通知タブを含むウィンドウをリロードしてみてください
- 9℃を超えるのに数分かかる場合があります。もう少し待ってみてください
CEPが作成した Event は、「デバイス管理」の「イベント」からも確認できます。
終わったら、シミュレーター、デバイスを削除しておきましょう。
これで基本はマスターです!