はじめに

イベント言語の利用

イベント言語の文法は SQL 言語に似ています。SQLでは、ステートメントは論理的に固定したデータベースに対するもので、一つの決まった結果を返し、タスクを終了します。Things Cloud では、ステートメントは入力データ(入力イベント)のストリームに対して継続的に実行されるもので、随時出力(出力イベント)を算出します。

例として、次のステートメントは温度センサーが新規に特定の温度を超えたかを継続的に取得します:

select *
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

ここで、 MeasurementCreated はシステムで生成されたメジャーメントそれぞれのイベントを含むストリームです。SQL 同様、 where を使うことで、これらイベントのサブセットが選択されます。 getNumber() はイベントから数値データを抽出する関数です。この例では、“e” は “MeasurementCreated” イベントで、プロパティは “c8y_TemperatureMeasurement” です。“T.value” は温度センサーの摂氏度数を表すあたいです。( センサーライブラリ を参照下さい)

イベントデータを元にどのようにデータを生成する?

特定の操作(データ格納や、メールでのデータ送信等)を行うため、システムで準備された特別なストリームがあります。ストリームのひとつは CreateAlarm で、Things Cloud 内にアラームを格納するのに利用できます。決められた値を温度センサーが超えた場合、即座にアラームの生成が必要だとしましょう。これは、次のステートメントで実現できます:

insert into CreateAlarm
select
 e.measurement.time as time,
 e.measurement.source.value as source,
 "c8y_TemperatureAlert" as type,
 "Temperature too high" as text,
 "ACTIVE" as status,
 "CRITICAL" as severity
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

技術的に、このステートメントは、センサーが100℃を超えるごとに新規の “AlarmCreated” イベントを生成し、“CreateAlarm” 出力ストリームにそのイベントを乗せます。select 節のプロパティ名は “AlarmCreated” のプロパティに合致する必要があります。(リアルタイムステートメント を参照下さい)

イベント言語でデバイスをコントロールするには?

イベントげんごによる遠隔操作は、まさに生成データの利用方法です。遠隔操作は特定のデバイスを対象とします。次の例は温度読取結果に基づくリレースイッチ操作を示します:

insert into CreateOperation
select
"PENDING" as status,
<<heating ID>> as deviceId,
{
"c8y_Relay.relayState", "CLOSED"
} as fragments
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100

fragments 部分の文法は、プロパティ名と値のペアのリストで、中カッコでくくられています: {“key1”, “value1”, “key2”, “value2”, …}.

イベント言語にデータを問い合わせるには?

イベント処理の途中には、 Things Cloud データベースからの情報が必要となる場合があるでしょう。これは、一連の問い合わせメソッドでサポートされています。この例は、自動販売機が毎時の総売り上げを集計する方法を示したものです。売上レポートデータは Things Cloud データベースから販売後に生成されます。

create window SalesReport.win:time_batch(1 hour)  
(
    event com.cumulocity.model.event.Event,
    customer com.cumulocity.model.ManagedObject
)

insert into SalesReport
select
    e.event as event,
    findOneManagedObjectParent(e.event.source.value) as customer
from EventCreated as e

insert into CreateMeasurement
select
    "total_cust_trx",
    "customer_trx_counter",
    {
        "total", count(*),
        "customer_id", sales_report.customer.id.value
    }
from SalesReport as sales_report
group by sales_report.customer.id.value

上でまず、バッチウィンドウを作成します。これは、この時間枠で合計を計算するために1時間データを保持します。できたデータ(イベント元となる親マネージドオブジェクトとともに入ってくるイベント)をこのウィンドウに格納します このステートメントは自動販売機アプリケーションのデータモデルに依存します:売上レポートは source となる自動販売機のイベントとして表現されています。顧客は、自動販売機の親マネージドオブジェクトとして表現されています。

売上データの集まりは、SQLのような文法を用いた “insert into CreateMeasurement…” を通して計算され、メジャーメントとして格納されます。SQLとの違いは:SQLでは、データベースの現在の内容に対する固定的な結果が計算されますが、イベント言語ではステートメントは絶え間なく動き、処理対象期間はタイムウィンドウに限定されます。

概要

Things Cloud のリアルタイムイベント処理を使って、あなたのIoTソリューションに独自のロジックを追加できます。ロジックはデータ解析ロジック以外でも実行することができます。新しいロジックを定義するには、イベント言語を使います。 この言語によって、入力データを分析できます。強力なパターン、ウィンドウベースのクエリ言語を使うことができます。 この言語により、データの生成、更新、削除をリアルタイムに行うことができます。

リアルタイム解析の典型的なユースケースには以下のようなものがあります:

続く章ではイベント言語がどのように利用できるか、どうすれば独自の解析やサーバーサイド業務ロジック、自動化を記述できるのか基本的な内容を紹介しています。

イベントストリーム

イベント言語では、データはストリームを流れます。ストリームにイベントを生成したり、ストリームに生成されたイベントを監視することができます。

定義済みのストリーム

Things Cloud APIで使用される定義済みストリームがいくつかあります。各APIコールが発生した際、Things Cloud はそれぞれの入力ストリームに対して新しいイベントを自動生成します。例えば、REST APIでメジャーメントが生成された場合、MeasurementCreatedストリームに新しいイベントが生成されます。 Things Cloud のバックエンドにアクセスするため、各出力ストリームにイベントを設定することができ、それにより Things Cloud は自動的にDBクエリやメール送信などに必要なAPIコールを実行します。例えば、データベースに新規アラームを生成するためには、CreateAlarmストリームに新規イベントを作成します。

API 入力ストリーム 出力ストリーム 説明
Inventory ManagedObjectCreated
ManagedObjectUpdated
ManagedObjectDeleted
CreateManagedObject
UpdateManagedObject
DeleteManagedObject
このイベントグループは1つのマネージドオブジェクトの生成、変更、削除を表します。
Events EventCreated
EventDeleted
CreateEvent
DeleteEvent
このイベントグループは1つのイベントの生成、削除を表します。
Measurements MeasurementCreated
MeasurementDeleted
CreateMeasurement
DeleteMeasurement
このイベントグループは1つのメジャーメントの生成、削除を表します。
Device control OperationCreated
OperationUpdated
CreateOperation
UpdateOperation
このイベントグループは1つのオペレーションの生成、変更を表します。
Alarms AlarmCreated
AlarmUpdated
CreateAlarm
UpdateAlarm
このイベントグループは一つのアラームの生成、変更を表します。
Emails (適用なし) SendEmail
SendDashboard
このイベントグループは、メール送信を表します。
SMS (適用なし) SendSms このイベントグループは、SMS送信を表します(SMS送信は Things Cloud では未サポートです)。
HTTP ResponseReceived SendReqeust このイベントグループは、外部サービスへのHTTPリクエストの送信を表します。
Export (適用なし) SendExport このイベントグループは、エクスポートされたデータを含む電子メールの生成を表します。

それぞれのストリームでのイベントのデータ構成は、データモデルを確認してください。

ストリームにイベントを生成する

キーワード insert into 、select によってイベント生成ができます。まず"insert into"に続けてイベントを生成するストリーム名を指定する必要があります。そして"select"節をイベントのパラメータ指定に使用します。パラメータは次の構文で指定します:<値> as <パラメータ> コンマで区切ることで複数のパラメータを指定できます。パラメータ順序は関係ありません。ストリームによっては"select"節に必須指定パラメータがあることに注意してください。

ストリームのイベントを待ち受ける

ストリーム内の共通的なイベント生成トリガーは、他のストリーム内のイベント発生です。したがって、他のストリームを監視することでイベント待ち受けができます。キーワード"from"に続け、ストリーム名を指定し、(オプション)イベントを参照するための変数名を文の後ろに続けます。

条件

キーワード"where"で条件を追加すれば、入ってくるすべてのイベントに対してイベント生成をせず、指定条件のみに絞ることができます。whereキーワードに続けて真か偽の結果を持つ表現を記述します。and や or で複数の表現を記述することもできます。

A例として文を作ってみましょう。ストリームを監視し、所定の条件が適用される場合、別のストリームに新しいイベントを生成します。 例として、温度メジャーメントに対し、アラームを生成したいと思います。

  1. アラームを生成するため、CreateAlarm に対し insert into します。
  2. select 節にイベントに必要なすべてのパラメータを指定します。
  3. MeasurementCreated ストリームにイベントが発生した場合にアラームを生成するため、from に MeasurementCreated ストリームを指定します。
  4. アラームを生成するのを、MeasurementCreated に対する特定の条件に限定するため、where節で指定します。

でき上がった文はこのようになるでしょう。

insert into CreateAlarm
select
  measurementEvent.measurement.time as time,
  measurementEvent.measurement.source.value as source,
  "c8y_TemperatureAlarm" as type,
  "Temperature measurement was created" as text,
  "ACTIVE" as status,
  "CRITICAL" as severity
from MeasurementCreated measurementEvent
where measurementEvent.measurement.type = "c8y_TemperatureMeasurement";

トラブルシューティング

エラーメッセージ

Real-time event processing is currently overloaded and may stop processing your events. Please contact support.

説明

それぞれのテナント向けの CEP キューがいっぱいになっています。これは、たとえば、現在処理可能なイベントがさらに作成されたときに発生する可能性があります。

このケースでは、アラームが発生します。新しく入ってくるイベントを失わないように、一番古いイベントが削除されます。つまり、入ってくる新しいイベントにより、キューの先頭のイベントが削除されます。