CEPスクリプトを書く(基本編)

投稿日 / 投稿者名

2018.8.1 / sasa

はじめに

本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。
Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、利用できない可能性があることをあらかじめご了承ください。
なお、作成にあたり、以下バージョンを用いています。

難易度 ★★


所要時間

1時間

概要

本レポートでは、リアルタイムイベント処理を行う Things Cloud のイベント言語(CEPスクリプト)についての解説を行います。
イベント言語とは何かの概念、および簡単な例を記述できるようになることを目標にします。
イベント言語を利用することで、複雑な条件で Things Cloud のデータを加工、判定、変更し、メールやデバイスなどに通知することができます。

前提条件

Things Cloud における CEP

CEP

CEP(Complex Event Processing)は、複合イベント処理と言われる技術です。 Things Cloud でも次のような形で CEP が使われます。

どんなもの?

まずは実際のCEPがどんなものか見てみましょう。 以下は、速度が120km/hを超えたら、アラームを発生させるものです。

insert into CreateAlarm
select
	"highSpeedAlarm" as type,
	CumulocitySeverities.WARNING as severity,
	CumulocityAlarmStatuses.ACTIVE as status,
	"high speed" as text,
	m.measurement.time as time,
	m.measurement.source as source
from MeasurementCreated m
where
	getObject(m.measurement, "c8y_MotionMeasurement.speed.value") is not null
	and getNumber(m.measurement, "c8y_MotionMeasurement.speed.value") >= 120;

何となく、SQLっぽい形をしています。ステートメント内のそれぞれの句の意味や書式は後で説明します。

RDB との違い

処理するデータの対象が違う

CEPでは入力対象のデータ、および結果の出力先は「ストリーム」です。 ストリームは、現在時刻を基準としており、リアルタイムにデータが流れていくイメージです。Things Cloud では CEP はメモリー上で実行されます。

CEP処理の基本形

CEP では、

を行います。

トリガーとなるイベント(入力)元 として、

があります。

書き込み(出力)先 は、

があります。

ちなみに上の例では、

で、「センサーデータが発生したら、アラームを書き込む」処理を表しています。

CEPはトリガー(契機)発生時に処理を行い、結果をストリームに書き込みます

基本的な文法

では、さっそく書いていきましょう。

例題
Measurement の速度が120km/h を超えたら、highSpeed Eventを発生させる

Event は Things Cloud 内で管理されているイベントのこととします。CEPのイベントは単独のカタカナ表記とします。

ステートメントの基本形

1つの処理を表すステートメント基本形は以下です。

書式:

insert into <<出力ストリーム>>
select
  <<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
  <<入力ストリーム>>
where
  <<入力ストリームの取得条件>>;

入力ストリーム

Things Cloud データと連携するために、あらかじめ準備されている入力ストリームを紹介します。 入力ストリームを用いることで、Things Cloud 内にセンサーデータが記録されたり、デバイスの状態が変化したといったイベントと記録値を得ることができます。 詳しくは、説明/設定ガイド を参照してください。

対象 生成イベント
(Created)
更新イベント
(Updated)
削除イベント
(Deleted)
データ名
ManagedObject- managedObject
Event- - event
Measurement- - measurement
Operation- - operation
Alarm- - alarm

たとえば、ManagedObject の更新イベントを表すストリーム名は、ManagedObjectUpdated で、Operation の生成イベントを表すストリーム名は OperationCreated となります。

ManagedObjectUpdated は、ManagedObject が変更されたことをイベントとし、変更内容を得ることができます。また、OperationCreated はデバイスや Things Cloud 画面からオペレーションが作成されたことをイベントとし、どのようなオペレーションなのかを得ることができます。

出力ストリーム

Things Cloud データと連携するために、あらかじめ準備されている出力ストリームを紹介します。 出力ストリームを利用することで、Things Cloud に Measurement データを書き込んだり、デバイスの情報を書き換えたり、オペレーションを発行したりすることができます。

詳しくは、説明/設定ガイド を参照してください。

対象 生成処理
(Create)
更新処理
(Update)
削除処理
(Delete)
データ名
-ManagedObject managedObject
-Event - event
-Measurement - measurement
-Operation - operation
-Alarm - alarm

たとえば、ManagedObject の生成処理を行いたい場合、CreateManagedObject ストリームに書き込み、Event の削除処理を行いたい場合、DeleteEvent ストリームに書き込みます。

その他のストリーム

データ形式

CEP で使われるステートメントで、イベントのようなデータは Java オブジェクトで表されます。オブジェクトには型があります。 入力ストリームで使うデータ型はここ 、出力ストリームで使うデータ型はここ を参照してください。

実践

例題
Measurement の温度が9℃を超えたら、type=nineEvent のEventを発生させる
insert into <<出力ストリーム>>
select
  <<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
  <<入力ストリーム>>
where
  <<入力ストリームの取得条件>>;

以下のようになったと思います。

insert into CreateEvent
select
  <<出力ストリームに入れるデータを入力ストリームなどから指定>>
from
  MeasurementCreated
where
  <<入力ストリームの取得条件>>;

select は、以下のような形式で記述します。

(項目1に設定したい値) as 出力ストリームのデータ項目1,
(項目2に設定したい値) as 出力ストリームのデータ項目2,
 :
 :

設定したい値は通常定数か入力ストリームから取得しますが、入力ストリームのイベントを格納する変数は from 句で MeasurementCreated m のように指定します。(変数 m に入力イベントのJavaオブジェクトが入ります) 出力ストリームのデータ型を調べて、データ項目を事前に把握してください。出力ストリームの各データ項目に何を設定するか自由なところもありますが、一例を以下に示します。

select
	"nineEvent" as type,
	"温度が9℃を超えました" as text,
	m.measurement.time as time,
	m.measurement.source as source

最後は where 句です。 温度が9℃を超えたら は以下のように記述します。

where
	getNumber(m.measurement, "c8y_Temperature.T.value") > 9;

getNumber(a, b) は fragment の値を取り出す関数です。a に Java オブジェクト、b にデータのパスを指定します。この例では、measurement オブジェクトの “c8y_Temperature” の “T” の “value” の値を取得しています。


できあがった CEP スクリプト

insert into CreateEvent
select
	"nineEvent" as type,
	"温度が9℃を超えました" as text,
	m.measurement.time as time,
	m.measurement.source as source
from
  MeasurementCreated m
where
	getNumber(m.measurement, "c8y_Temperature.T.value") > 9;

getNumber の返却型は Java 言語での java.lang.Number(のサブクラスの java.math.BigDecimal) です。getNumber(m.measurement, “c8y_Temperature.T.value”).intValue() のような Java のメソッドを利用できます。

Things Cloud に登録してみましょう。

シミュレーターで試す

さっそく作ったCEPスクリプトを動かしてみましょう。動かすには、温度データを出力するデバイスが必要ですが、ここでは Things Cloud の機能として準備されているシミュレーターを利用します。

シミュレーターを利用すると、Things Cloud での 1 デバイスとしてカウントされ、課金が発生しますので、ご注意ください。

アカウントの権限を追加する

権限追加操作には admin 権限が必要なため、ない場合は Things Cloud のシステム管理者に依頼してください。

まず、シミュレーターを利用するロールを作成します。

次に、利用ユーザーに作成したロールを追加します。

シミュレーターを設定します。

CEPが動作しているか確認する

別のウィンドウで、Things Cloud にログインします。

うまく行かない場合?

  1. デバイスシミュレーターは動作していますか?
  2. イベント処理のスクリプトは「デプロイ済み」になっていますか?
  3. 通知タブを含むウィンドウをリロードしてみてください
  4. 9℃を超えるのに数分かかる場合があります。もう少し待ってみてください

CEPが作成した Event は、「デバイス管理」の「イベント」からも確認できます。

終わったら、シミュレーター、デバイスを削除しておきましょう。

これで基本はマスターです!