高度なユースケース
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
Things Cloud APIでは、自由にデータ構造を規定できます。イベント言語でも同様です。それぞれの出力ストリームには自由にフラグメントを追加することができます。キー、値ペアのリストで、fragmentsフィールドを設定することで、ストリームにフラグメントを追加することができます。キーは値に対するフルJSON Pathです。
{
key1, value1,
key2, value2,
key3, value3
} as fragments
例1:
insert into CreateMeasurement
select
"12345" as source,
"c8y_TemperatureMeasurement" as type,
current_timestamp().toDate() as time,
{
"c8y_TemperatureMeasurement.T1.value", 1,
"c8y_TemperatureMeasurement.T1.unit", "C",
"c8y_TemperatureMeasurement.T2.value", 2,
"c8y_TemperatureMeasurement.T2.unit", "C",
"c8y_TemperatureMeasurement.T3.value", 3,
"c8y_TemperatureMeasurement.T3.unit", "C",
"c8y_TemperatureMeasurement.T4.value", 4,
"c8y_TemperatureMeasurement.T4.unit", "C",
"c8y_TemperatureMeasurement.T5.value", 5,
"c8y_TemperatureMeasurement.T5.unit", "C"
} as fragments
from EventCreated;
この結果は、次のJSON形式になります:
{
"type": "c8y_TemperatureMeasurement",
"time": "...",
"source": {
"id": "12345"
},
"c8y_TemperatureMeasurement": {
"T1": {
"value": 1,
"unit": "C"
},
"T2": {
"value": 1,
"unit": "C"
},
"T3": {
"value": 1,
"unit": "C"
},
"T4": {
"value": 1,
"unit": "C"
},
"T5": {
"value": 1,
"unit": "C"
},
}
}
例2:
insert into CreateManagedObject
select
"MyCustomDevice" as name,
"customDevice" as type,
{
"c8y_IsDevice", {},
"c8y_SupportedOperations", {"c8y_Restart", "c8y_Command"},
"c8y_Hardware.serialNumber", "mySerialNumber",
"c8y_Hardware.model", "myDeviceModel",
"com_cumulocity_model_Agent", {},
"c8y_RequiredAvailability.responseInterval", 30
} as fragments
from EventCreated e;
この結果は、次のJSON形式になります:
{
"name": "MyCustomDevice",
"type": "customDevice",
"c8y_IsDevice": {},
"c8y_RequiredAvailability": {
"responseInterval": 30
},
"c8y_SupportedOperations": [
"c8y_Restart",
"c8y_Command"
],
"com_cumulocity_model_Agent": {},
"c8y_Hardware": {
"model": "myDeviceModel",
"serialNumber": "mySerialNumber"
}
}
いくつかのストリームにおいて、イベント到着によりトリガーさせる記述の仕方は一通りではありません。続く章ではトリガーの他の記述法、複合トリガーについて記載しています。
Pattern によって他のトリガーと連結、複合させることができるようになります。次のようなトリガーがあったとします。
from EventCreated e;
機能は、 pattern を使用した以下のトリガーと同等になります。
from pattern [every e=EventCreated];
pattern にフィルターを追加することもできます。
from pattern [every e=EventCreated(event.type = "myEventType")];
ストリームを連結してトリガーをつくることができます。
from EventCreated e unidirectional, AlarmCreated.std:lastevent() a
where e.event.source = a.alarm.source;
これはすべての EventCreated の時点で、同一デバイスから AlarmCreated が通知されていた場合、トリガーされます。
注記: そのデバイスの最後の AlarmCreated ではなく、デバイス全体の最後の AlarmCreated が同一デバイスからのものであった場合トリガーとなります。
シーケンスをトリガーとすることもできます。
from pattern[every (e=EventCreated -> a=AlarmCreated(alarm.source = e.event.source))];
これは EventCreated に続き、AlarmCreated が発生した場合にトリガーとなります。EventCreated が到着したときからスタートし、同一デバイスから AlarmCreated があった時点でトリガーとなります。その後、次の EventCreated を待つことになります。
ストリームを使ってトリガーする他に、タイマーでトリガーすることもできます。一定間隔でトリガーをかけられます。
from pattern [every timer:interval(5 minutes)];
または cron ジョブのように実行することもできます。
// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59
from pattern [every timer:at(*, *, *, *, *)]; // 毎分トリガー
from pattern [every timer:at(*, *, *, *, *, *)]; // 毎秒トリガー
from pattern [every timer:at(*/10, *, *, *)]; // 10分ごとにトリガー
from pattern [every timer:at(0, 1, *, *, [1,3,5])]; // 毎週月水金の夜中1時にトリガー
from pattern [every timer:at(0, */2, 1:7, *, *)]; // 毎月1日から7日まで、2時間ごとにトリガー
タイマーパターンを別のパターンと組み合わせることもできます。例えば、別のイベントから所定時間内にイベントが発生したか確かめられます。
from pattern [every e=EventCreated -> (timer:interval(10 minutes) and not a=AlarmCreated)];
これは、EventCreated が発生してから10分以内に AlarmCreated が発生しなかった場合にトリガーします。
output を利用すれば、ストリーム上のすべてのイベントを考慮せずに、ステートメントが結果を出力するタイミングを直接制御することができます。例えば10秒ごとにメジャーメントをとり、それに対して計算したいような場合などは、すべてのメジャーメントを計算する必要はなく、そのサブセットのみを計算すれば事足ります。
// 1分ごとに到着した最後のメジャーメントを出力します
from MeasurementCreated e
where e.measurement.type = "c8y_TemperatureMeasurement"
output last every 1 minutes;
// 到着する20のメジャーメントごとに最初のメジャーメントを出力します
from MeasurementCreated e
where e.measurement.type = "c8y_TemperatureMeasurement"
output first every 20 events;
// 20個めのメジャーメントが到着した後、20のメジャーメントすべてを出力します
from MeasurementCreated e
where e.measurement.type = "c8y_TemperatureMeasurement"
output every 20 events;
メジャーメントの総和を計算したいなど、すべてのメジャーメントを考慮する必要があるが、逐一メジャーメントが発生するごとに計算したくない場合。
select
sum(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated e
where e.measurement.type = "myCustomMeasurement"
output last every 50 events;
この文は50のメジャーメントごとに、(50個分ではなく、デプロイ後すべてのメジャーメントの)総和と最後のメジャーメントを出力します。
さらに複雑な条件に対応するために、イベントウィンドウを使用してストリームにある複数のイベントを束にすることができます。ウィンドウを生成するには主に2つの方法があります。
1. 時間間隔に対するウィンドウ
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:time(1 hours) e
where e.measurement.type = "myCustomMeasurement";
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:time(1 hours) e
where e.measurement.type = "myCustomMeasurement"
output last every 1 hours;
2つの文の違いは、1つ目は MeasurementCreated のたびにトリガーし、1時間分の平均を出力します。2つ目の文は1時間ごとにしかトリガーせず、最後の平均(最後の MeasurementCreated を受け取ったときに算出)しか出力しません。
2. イベント数に対するウィンドウ
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:length(100) e
where e.measurement.type = "myCustomMeasurement";
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:length(100) e
where e.measurement.type = "myCustomMeasurement"
output last every 100 events;
ウィンドウは、グローバルに宣言することができます:
create window MeasurementCreated.win:length(20) as MyMeasurementWindow;
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MyMeasurementWindow e
where e.measurement.type = "myCustomMeasurement";
ウィンドウ宣言によってウィンドウをクリアすることもできます。
on AlarmCreated delete from MyMeasurementWindow
複雑なモジュールを1文で作成するのは困難です。Things Cloud では定義されたストリームをいくつか提供しておりますが、その他にも、イベントフローを制御するために独自のストリームを生成することができます。ストリームの宣言は必須ではありません。新規のストリーム名を使えば、自動的に指定された入力値で生成、定義されます。
insert into MyEvent
select
e.event as e
from EventCreated e;
select e.type from MyEvent e;
ここで次の文を追加すると:
insert into MyEvent
select
e as e
from AlarmCreated e;
MyEvent がすでにEvent型の変数 e で宣言済みのため、この文はデプロイすことができません。この文は、 AlarmCreated 型の値を e に設定しようとします。
明示的に新しいストリームを宣言することもできます。
create schema MyEvent(
e Event
);
一般的な構文は:
create schema StreamName(
var1Name var1Type,
var2Name var2Type,
var3Name var3Type
);
Javaプリミティブ型とインポートされたJavaライブラリ 、Things Cloud のデータ型(Event, Measurement, ManagedObject のような)や他のストリームを利用することができます。
create schema TwoMyEvents(
firstEvent MyEvent,
secondEvent MyEvent
);
注記: ストリーム名はユニークであり、一度宣言したストリームは(明示的か暗黙的かによらず)すべてのモジュールで利用可能です
総和や平均のようなものよりも複雑な関数を作りたい場合、独自の便利関数や表現(expression)を作成することができます。関数を記述するにあたり、記述言語として JavaScript を利用できます。importClass を使えば、あなたの表現(expression)に Java クラスを追加することもできます。
例:
与えられた重大度を上げる。(JavaScript利用)
create expression CumulocitySeverities js:increaseSeverity(severity) [
importClass (com.cumulocity.model.event.CumulocitySeverities);
if(severity == CumulocitySeverities.WARNING) {
CumulocitySeverities.MINOR;
} else if(severity == CumulocitySeverities.MINOR) {
CumulocitySeverities.MAJOR;
} else if(severity == CumulocitySeverities.MAJOR) {
CumulocitySeverities.CRITICAL;
} else {
severity
}
];
2つの地理座標の間の距離を計算します。
create expression distance(lat1, lon1, lat2, lon2) [
var R = 6371000;
var toRad = function(arg) {
return arg * Math.PI / 180;
};
var lat1Rad = toRad(lat1);
var lat2Rad = toRad(lat2);
var deltaLatRad = toRad(lat2-lat1);
var deltaLonRad = toRad(lon2-lon1);
var a = Math.sin(deltaLatRad/2) * Math.sin(deltaLatRad/2) +
Math.cos(lat1Rad) * Math.cos(lat2Rad) * Math.sin(deltaLonRad/2) * Math.sin(deltaLonRad/2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
var d = R * c;
d;
];
モジュール内で変数を定義できます。
create variable String myEmailText = "Hello World";
create variable List supportedOperationsList = cast({"c8y_Restart", "c8y_Relay"}, java.util.List);
実行中に動的に値が変化する変数も定義できます。
create variable String latestEventType;
on EventCreated e set latestEventType = e.event.type;
コンテキストによって、定義された値ごとにイベント処理を分けることができます。
もし何らかのメジャーメントについての計算を定義したい場合、通常そのメジャーメントをもつすべてのデバイスに対して実行し、さらにデバイスごとに区別したいでしょう。
ここに例をあげます。
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:length(100) e
where e.measurement.type = "myCustomMeasurement";
これは1つのデバイスに対しては完璧に機能します。しかしながら、2つのデバイスになった瞬間、平均計算は両方のデバイスにわたって計算されます。なぜなら、すべてのメジャーメントは MeasurementCreated に発生するからです。この文は、デバイスごとにメジャーメントを区別することを考慮していません。
コンテキストを作成すれば、入力イベントを分離するための情報をどこで見つけることができるか文に伝えることができます。
create context DeviceAwareContext
partition by measurement.source.value from MeasurementCreated;
このコンテキスト定義では、MeasurementCreated ストリーム内でコンテキストキー(イベント分離のキー)として measurement.source.value (デバイスのID)を指定するということを宣言しています。
作成したコンテキストを文に追加します:
context DeviceAwareContext
select
avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
last(*)
from MeasurementCreated.win:length(100) e
where e.measurement.type = "myCustomMeasurement";
平均値は、各デバイスごとに計算されるようになりました。
コンテキストは、コンテキストで宣言された入力をもつ文に対してのみ適用されます。
複数の文でコンテキストを使用し、さらに入力が異なる場合は、コンテキスト内の各入力と、コンテキストキーの場所を設定する必要があります。
create context DeviceAwareContext
partition by
measurement.source.value from MeasurementCreated,
alarm.source.value from AlarmCreated,
event.source.value from EventCreated,
operation.deviceId.value from OperationCreated;
複数の値のコンテキストキーを作成することもできます:
create context DeviceAwareContext
partition by measurement.source.value and measurement.type from MeasurementCreated;
このコンテキストは各デバイスのIDのみでなく、メジャーメントのtypeも含めた値の組ごとに分離します。
デフォルトでは、イベント言語はオンメモリーで処理されます。多くの場合、長期間にわたって状態を保存すべき文は多くないため、これで十分です。もし、長期間保存すべき文を使う場合(例えば、長い期間のイベントウィンドウを使う場合)、次のようなアノテーションを入れる必要があります:
@Resilient
insert into ...
「@Resilient」を文の前につけることで、状態が定期的に保存されるようになります。サーバーメンテナンスや停電の際、状態を復旧でき、何も起こらなかったかのように文を継続することができます。