はじめに

重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。

このガイドについて

イベント言語ガイドは次のセクションで構成されています:

イベント言語の利用

イベント言語の文法は SQL 言語に似ています。SQLでは、ステートメントは論理的に固定したデータベースに対し実行され、一つの決まった結果を返し、タスクを終了します。Things Cloud では、ステートメントは入力データ(入力イベント)のストリームに対して継続的に実行されるもので、随時出力(出力イベント)を算出します。

例として、次のステートメントは温度センサーが特定の温度を超えた新しい温度情報を継続的に取得します:

select *
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

ここの、 MeasurementCreated はシステムで生成されたそれぞれのメジャーメントのイベントを含むストリームになります。SQL 同様、 where を使うことで、これらイベントのサブセットが選択されます。 getNumber() はイベントから数値データを抽出する関数です。この例では、“e” は “MeasurementCreated” イベントで、プロパティは “c8y_TemperatureMeasurement” です。“T.value” は温度センサーの摂氏度数を表すあたいです。( センサーライブラリ を参照下さい)

イベントデータを元にどのようにデータを生成する?

事前に定義されている操作(データ格納や、メールでのデータ送信等)を行うため、システムで準備された特別なストリームがいくつかあります。そのうちのひとつは CreateAlarm で、Things Cloud 内にアラームを格納するのに利用されます。センサーの温度が定義された値を超えた場合、アラームが即座に生成される必要があると想定します。これは、次のステートメントで実現できます:

insert into CreateAlarm
select
 e.measurement.time as time,
 e.measurement.source.value as source,
 "c8y_TemperatureAlert" as type,
 "Temperature too high" as text,
 "ACTIVE" as status,
 "CRITICAL" as severity
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

技術的に、このステートメントは、センサーが100℃を超えるごとに新規の “AlarmCreated” イベントを生成し、“CreateAlarm” 出力ストリームにそのイベントを乗せます。select 節のプロパティ名は “AlarmCreated” のプロパティに合致する必要があります。(リアルタイムステートメント を参照下さい)

イベント言語でデバイスをコントロールするには?

イベント言語による遠隔操作は、派生データの一種です。遠隔操作は特定のデバイスを対象とします。次の例は温度読取結果に基づくリレースイッチ操作を示します:

insert into CreateOperation
select
"PENDING" as status,
<<heating ID>> as deviceId,
{
"c8y_Relay.relayState", "CLOSED"
} as fragments
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

fragments 部分の文法は、プロパティ名と値のペアのリストで、中カッコでくくられています: {“key1”, “value1”, “key2”, “value2”, …}.

イベント言語にデータを問い合わせるには?

イベント処理の一環として、 Things Cloud データベースから情報を照会する必要となる場合があるでしょう。これは、一連の問い合わせメソッドでサポートされています。この例は、自動販売機が毎時の総売り上げを集計する方法を示したものです。購入後に生成された売上レポートデータは Things Cloud データベースから取得されます。

create window SalesReport.win:time_batch(1 hour)  
(
    event com.cumulocity.model.event.Event,
    customer com.cumulocity.model.ManagedObject
)

insert into SalesReport
select
    e.event as event,
    findOneManagedObjectParent(e.event.source.value) as customer
from EventCreated as e

insert into CreateMeasurement
select
    "total_cust_trx",
    "customer_trx_counter",
    {
        "total", count(*),
        "customer_id", sales_report.customer.id.value
    }
from SalesReport as sales_report
group by sales_report.customer.id.value

上記では、まずバッチウィンドウを作成します。このウィンドウは、この時間枠の合計を計算するために1時間データを保持します。できたデータ(受信イベントとともにイベント元となる親マネージドオブジェクト)をこのウィンドウに格納します。 このステートメントは自動販売機アプリケーションのデータモデルに依存します:売上レポートは source となる自動販売機のイベントとして表現されています。顧客は、自動販売機の親マネージドオブジェクトとして表現されています。

売上データの集まりは、SQLのような文法を用いた “insert into CreateMeasurement…” を通して計算され、メジャーメントとして格納されます。SQLとの違いは:SQLでは、固定した現在のコンテンツのデータベースに対して結果を計算しますが、イベント言語ではステートメントが絶え間なく動くので、処理対象期間をタイムウィンドウで限定する必要があります。

概要

Things Cloud のリアルタイムイベント処理を使って、あなたのIoTソリューションに独自のロジックを追加できます。ロジックはデータ解析ロジック以外でも実行することができます。新しいロジックを定義するには、イベント言語を使います。 この言語によって、入力データを分析できます。強力なパターン、ウィンドウベースのクエリ言語を使うことができます。 この言語により、データの生成、更新、削除をリアルタイムに行うことができます。

リアルタイム解析の典型的なユースケースには以下のようなものがあります:

続く章ではイベント言語がどのように利用できるか、どうすれば独自の解析やサーバーサイド業務ロジック、自動化を記述できるのか基本的な内容を紹介しています。

イベントストリーム

イベント言語では、データはストリームを流れます。ストリームにイベントを生成したり、ストリームに生成されたイベントを監視することができます。

定義済みのストリーム

Things Cloud APIで使用される定義済みストリームがいくつかあります。各APIコールが発生した際、Things Cloud はそれぞれの入力ストリームに対して新しいイベントを自動生成します。例えば、REST APIでメジャーメントが生成された場合、MeasurementCreatedストリームに新しいイベントが生成されます。 Things Cloud のバックエンドにアクセスするため、各出力ストリームにイベントを設定することができ、それにより Things Cloud は自動的にDBクエリやメール送信などに必要なAPIコールを実行します。例えば、データベースに新規アラームを生成するためには、CreateAlarmストリームに新規イベントを作成します。

API 入力ストリーム 出力ストリーム 説明
Inventory ManagedObjectCreated
ManagedObjectUpdated
ManagedObjectDeleted
CreateManagedObject
UpdateManagedObject
DeleteManagedObject
このイベントグループは1つのマネージドオブジェクトの生成、変更、削除を表します。
Events EventCreated
EventDeleted
CreateEvent
DeleteEvent
このイベントグループは1つのイベントの生成、削除を表します。
Measurements MeasurementCreated
MeasurementDeleted
CreateMeasurement
DeleteMeasurement
このイベントグループは1つのメジャーメントの生成、削除を表します。
Device control OperationCreated
OperationUpdated
CreateOperation
UpdateOperation
このイベントグループは1つのオペレーションの生成、変更を表します。
Alarms AlarmCreated
AlarmUpdated
CreateAlarm
UpdateAlarm
このイベントグループは一つのアラームの生成、変更を表します。
Emails (適用なし) SendEmail
SendDashboard
このイベントグループは、メール送信を表します。
SMS (適用なし) SendSms このイベントグループは、SMS送信を表します(SMS送信は Things Cloud では未サポートです)。
HTTP ResponseReceived SendReqeust このイベントグループは、外部サービスへのHTTPリクエストの送信を表します。
Export (適用なし) SendExport このイベントグループは、エクスポートされたデータを含む電子メールの生成を表します。

それぞれのストリームでのイベントのデータ構成は、データモデルを確認してください。

ストリームにイベントを生成する

キーワードinsert intoselect によってイベント生成ができます。まず"insert into"に続けてイベントを生成するストリーム名を指定する必要があります。そして"select"節をイベントのパラメータ指定に使用します。パラメータは次の構文で指定します:<値> as <パラメータ> コンマで区切ることで複数のパラメータを指定できます。パラメータ順序は関係ありません。ストリームによっては"select"節に必須指定パラメータがあることに注意してください。

ストリームのイベントを待ち受ける

ストリーム内の共通的なイベント生成トリガーは、他のストリーム内のイベント発生です。したがって、他のストリームを監視することでイベント待ち受けができます。キーワード"from"に続き、ストリーム名を指定し、(オプションで)イベントを参照するための変数名を文の後ろに続けます。

条件

キーワードwhereで条件を追加すれば、入ってくるすべてのイベントに対してイベント生成をせず、その指定条件のみに絞ることができます。 where キーワードに続けてtrueかfalseの結果を持つ表現を記述します。 andorで複数の表現を記述することもできます。

一例として文を作ってみましょう。ストリームを監視し、所定の条件が適用される場合、別のストリームに新しいイベントを生成します。 例として、作成された温度メジャーメントに対し、アラームを生成したいと思います。

  1. アラームを生成するため、 CreateAlarm に対し insert into します。
  2. select 節にイベントに必要なすべてのパラメータを指定します。
  3. MeasurementCreated ストリームにイベントが発生した場合にアラームを生成するため、 fromMeasurementCreated ストリームを指定します。
  4. アラームを生成するのを、 MeasurementCreated に対する特定の条件に限定するため、 where 節で指定します。

でき上がった文はこのようになるでしょう。

insert into CreateAlarm
select
  measurementEvent.measurement.time as time,
  measurementEvent.measurement.source.value as source,
  "c8y_TemperatureAlarm" as type,
  "Temperature measurement was created" as text,
  "ACTIVE" as status,
  "CRITICAL" as severity
from MeasurementCreated measurementEvent
where measurementEvent.measurement.type = "c8y_TemperatureMeasurement";

トラブルシューティング

エラーメッセージ

Real-time event processing is currently overloaded and may stop processing your events. Please contact support.

説明

それぞれのテナント向けの CEP キューがいっぱいになっています。たとえば、現在処理可能な数以上のイベントが作成されたときなどに発生します。

このケースでは、アラームが発生します。新しく入ってくるイベントを失わないように、一番古いイベントが削除されます。つまり、入ってくる新しいイベントがトリガーとなり、キューの先頭のイベントが削除されます。