はじめに
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
イベント言語ガイドは次のセクションで構成されています:
イベント言語の文法は SQL 言語に似ています。SQLでは、ステートメントは論理的に固定したデータベースに対し実行され、一つの決まった結果を返し、タスクを終了します。Things Cloud では、ステートメントは入力データ(入力イベント)のストリームに対して継続的に実行されるもので、随時出力(出力イベント)を算出します。
例として、次のステートメントは温度センサーが特定の温度を超えた新しい温度情報を継続的に取得します:
select *
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
ここの、 MeasurementCreated はシステムで生成されたそれぞれのメジャーメントのイベントを含むストリームになります。SQL 同様、 where を使うことで、これらイベントのサブセットが選択されます。 getNumber() はイベントから数値データを抽出する関数です。この例では、“e” は “MeasurementCreated” イベントで、プロパティは “c8y_TemperatureMeasurement” です。“T.value” は温度センサーの摂氏度数を表すあたいです。( センサーライブラリ を参照下さい)
事前に定義されている操作(データ格納や、メールでのデータ送信等)を行うため、システムで準備された特別なストリームがいくつかあります。そのうちのひとつは CreateAlarm で、Things Cloud 内にアラームを格納するのに利用されます。センサーの温度が定義された値を超えた場合、アラームが即座に生成される必要があると想定します。これは、次のステートメントで実現できます:
insert into CreateAlarm
select
e.measurement.time as time,
e.measurement.source.value as source,
"c8y_TemperatureAlert" as type,
"Temperature too high" as text,
"ACTIVE" as status,
"CRITICAL" as severity
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
技術的に、このステートメントは、センサーが100℃を超えるごとに新規の “AlarmCreated” イベントを生成し、“CreateAlarm” 出力ストリームにそのイベントを乗せます。select 節のプロパティ名は “AlarmCreated” のプロパティに合致する必要があります。(リアルタイムステートメント を参照下さい)
イベント言語による遠隔操作は、派生データの一種です。遠隔操作は特定のデバイスを対象とします。次の例は温度読取結果に基づくリレースイッチ操作を示します:
insert into CreateOperation
select
"PENDING" as status,
<<heating ID>> as deviceId,
{
"c8y_Relay.relayState", "CLOSED"
} as fragments
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
fragments 部分の文法は、プロパティ名と値のペアのリストで、中カッコでくくられています: {“key1”, “value1”, “key2”, “value2”, …}.
イベント処理の一環として、 Things Cloud データベースから情報を照会する必要となる場合があるでしょう。これは、一連の問い合わせメソッドでサポートされています。この例は、自動販売機が毎時の総売り上げを集計する方法を示したものです。購入後に生成された売上レポートデータは Things Cloud データベースから取得されます。
create window SalesReport.win:time_batch(1 hour)
(
event com.cumulocity.model.event.Event,
customer com.cumulocity.model.ManagedObject
)
insert into SalesReport
select
e.event as event,
findOneManagedObjectParent(e.event.source.value) as customer
from EventCreated as e
insert into CreateMeasurement
select
"total_cust_trx",
"customer_trx_counter",
{
"total", count(*),
"customer_id", sales_report.customer.id.value
}
from SalesReport as sales_report
group by sales_report.customer.id.value
上記では、まずバッチウィンドウを作成します。このウィンドウは、この時間枠の合計を計算するために1時間データを保持します。できたデータ(受信イベントとともにイベント元となる親マネージドオブジェクト)をこのウィンドウに格納します。 このステートメントは自動販売機アプリケーションのデータモデルに依存します:売上レポートは source となる自動販売機のイベントとして表現されています。顧客は、自動販売機の親マネージドオブジェクトとして表現されています。
売上データの集まりは、SQLのような文法を用いた “insert into CreateMeasurement…” を通して計算され、メジャーメントとして格納されます。SQLとの違いは:SQLでは、固定した現在のコンテンツのデータベースに対して結果を計算しますが、イベント言語ではステートメントが絶え間なく動くので、処理対象期間をタイムウィンドウで限定する必要があります。
Things Cloud のリアルタイムイベント処理を使って、あなたのIoTソリューションに独自のロジックを追加できます。ロジックはデータ解析ロジック以外でも実行することができます。新しいロジックを定義するには、イベント言語を使います。 この言語によって、入力データを分析できます。強力なパターン、ウィンドウベースのクエリ言語を使うことができます。 この言語により、データの生成、更新、削除をリアルタイムに行うことができます。
リアルタイム解析の典型的なユースケースには以下のようなものがあります:
続く章ではイベント言語がどのように利用できるか、どうすれば独自の解析やサーバーサイド業務ロジック、自動化を記述できるのか基本的な内容を紹介しています。
イベント言語では、データはストリームを流れます。ストリームにイベントを生成したり、ストリームに生成されたイベントを監視することができます。
Things Cloud APIで使用される定義済みストリームがいくつかあります。各APIコールが発生した際、Things Cloud はそれぞれの入力ストリームに対して新しいイベントを自動生成します。例えば、REST APIでメジャーメントが生成された場合、MeasurementCreatedストリームに新しいイベントが生成されます。 Things Cloud のバックエンドにアクセスするため、各出力ストリームにイベントを設定することができ、それにより Things Cloud は自動的にDBクエリやメール送信などに必要なAPIコールを実行します。例えば、データベースに新規アラームを生成するためには、CreateAlarmストリームに新規イベントを作成します。
API | 入力ストリーム | 出力ストリーム | 説明 |
---|---|---|---|
Inventory | ManagedObjectCreated ManagedObjectUpdated ManagedObjectDeleted |
CreateManagedObject UpdateManagedObject DeleteManagedObject |
このイベントグループは1つのマネージドオブジェクトの生成、変更、削除を表します。 |
Events | EventCreated EventDeleted |
CreateEvent DeleteEvent |
このイベントグループは1つのイベントの生成、削除を表します。 |
Measurements | MeasurementCreated MeasurementDeleted |
CreateMeasurement DeleteMeasurement |
このイベントグループは1つのメジャーメントの生成、削除を表します。 |
Device control | OperationCreated OperationUpdated |
CreateOperation UpdateOperation |
このイベントグループは1つのオペレーションの生成、変更を表します。 |
Alarms | AlarmCreated AlarmUpdated |
CreateAlarm UpdateAlarm |
このイベントグループは一つのアラームの生成、変更を表します。 |
Emails | (適用なし) | SendEmail SendDashboard |
このイベントグループは、メール送信を表します。 |
SMS | (適用なし) | SendSms | このイベントグループは、SMS送信を表します(SMS送信は Things Cloud では未サポートです)。 |
HTTP | ResponseReceived | SendReqeust | このイベントグループは、外部サービスへのHTTPリクエストの送信を表します。 |
Export | (適用なし) | SendExport | このイベントグループは、エクスポートされたデータを含む電子メールの生成を表します。 |
それぞれのストリームでのイベントのデータ構成は、データモデルを確認してください。
キーワードinsert into
、select
によってイベント生成ができます。まず"insert into"に続けてイベントを生成するストリーム名を指定する必要があります。そして"select"節をイベントのパラメータ指定に使用します。パラメータは次の構文で指定します:<値> as <パラメータ>
コンマで区切ることで複数のパラメータを指定できます。パラメータ順序は関係ありません。ストリームによっては"select"節に必須指定パラメータがあることに注意してください。
ストリーム内の共通的なイベント生成トリガーは、他のストリーム内のイベント発生です。したがって、他のストリームを監視することでイベント待ち受けができます。キーワード"from"に続き、ストリーム名を指定し、(オプションで)イベントを参照するための変数名を文の後ろに続けます。
キーワードwhere
で条件を追加すれば、入ってくるすべてのイベントに対してイベント生成をせず、その指定条件のみに絞ることができます。 where
キーワードに続けてtrueかfalseの結果を持つ表現を記述します。 and
やor
で複数の表現を記述することもできます。
一例として文を作ってみましょう。ストリームを監視し、所定の条件が適用される場合、別のストリームに新しいイベントを生成します。 例として、作成された温度メジャーメントに対し、アラームを生成したいと思います。
CreateAlarm
に対し insert into
します。select
節にイベントに必要なすべてのパラメータを指定します。MeasurementCreated
ストリームにイベントが発生した場合にアラームを生成するため、 from
に MeasurementCreated
ストリームを指定します。MeasurementCreated
に対する特定の条件に限定するため、 where
節で指定します。でき上がった文はこのようになるでしょう。
insert into CreateAlarm
select
measurementEvent.measurement.time as time,
measurementEvent.measurement.source.value as source,
"c8y_TemperatureAlarm" as type,
"Temperature measurement was created" as text,
"ACTIVE" as status,
"CRITICAL" as severity
from MeasurementCreated measurementEvent
where measurementEvent.measurement.type = "c8y_TemperatureMeasurement";
エラーメッセージ
Real-time event processing is currently overloaded and may stop processing your events. Please contact support.
説明
それぞれのテナント向けの CEP キューがいっぱいになっています。たとえば、現在処理可能な数以上のイベントが作成されたときなどに発生します。
このケースでは、アラームが発生します。新しく入ってくるイベントを失わないように、一番古いイベントが削除されます。つまり、入ってくる新しいイベントがトリガーとなり、キューの先頭のイベントが削除されます。