例
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。
入力データが以下のようであったとします。
{
"c8y_TemperatureMeasurement": {
"T": {
"value": ...,
"unit": "C"
}
},
"time":"...",
"source": {
"id":"..."
},
"type": "c8y_TemperatureMeasurement"
}
平均を生成するには、モジュールに次のステップが必要です。
モジュール:
create context HourlyAvgMeasurementDeviceContext
partition measurement.source.value from MeasurementCreated;
@Name("Creating_hourly_measurement")
context HourlyAvgMeasurementDeviceContext
insert into CreateMeasurement
select
m.measurement.source as source,
current_timestamp().toDate() as time,
"c8y_AverageTemperatureMeasurement" as type,
{
"c8y_AverageTemperatureMeasurement.T.value", avg(cast(getNumber(m, "c8y_TemperatureMeasurement.T.value"), double)),
"c8y_AverageTemperatureMeasurement.T.unit", getString(m, "c8y_TemperatureMeasurement.T.unit")
} as fragments
from MeasurementCreated.win:time(1 hours) m
where getObject(m, "c8y_TemperatureMeasurement") is not null
output last every 1 hours;
デバイスでオペレーションが処理される場合、通常決まったシーケンスに従います。
所定時間内にオペレーションが SUCCESSFUL や FAILED にならない場合、通常問題があることを示します(例えばデバイスとの接続が切れていたり、実行中に固まったり)。オペレーションが正常に処理されなかった場合でも、デバイスはオペレーションを FAILED に更新すべきです。
この例では10分を処理の許容実行時間としています。 次のシークエンスを確認します:
2つ目が起きない場合、新しいアラーム生成を行います。
@Name("handle_not_finished_operation")
insert into CreateAlarm
select
o.operation.deviceId as source,
CumulocitySeverities.MAJOR as severity,
CumulocityAlarmStatuses.ACTIVE as status,
"c8y_OperationNotFinishedAlarm" as type,
current_timestamp().toDate() as time,
replaceAllPlaceholders("The device has not finished the operation #{id} within 10 minutes", o.operation) as text
from pattern [
every o = OperationCreated
-> (timer:interval(10 minutes)
and not OperationUpdated(
operation.id.value = o.operation.id.value
and (operation.status in (OperationStatus.SUCCESSFUL, OperationStatus.FAILED))
))
];
デバイスはしばしばレジスタでアラームのステータスを保持しており、アラーム自体の意味を解析できません。この例では、デバイスがメジャーメントとして全レジスタをバイナリ値で送信することを想定しています。ルールがビット値を識別し、それぞれに対応したアラームを生成しなければなりません。
バイナリ値によってアラームテキスト、型、重大度の3つの表現を生成します。
create expression String getFaultRegisterAlarmType(position) [
switch (position) {
case 0:
"c8y_HighTemperatureAlarm";
break;
case 1:
"c8y_ProcessingAlarm";
break;
case 2:
"c8y_DoorOpenAlarm";
break;
case 3:
"c8y_SystemFailureAlarm";
break;
default:
"c8y_FaultRegister" + position + "Alarm";
break;
};
];
create expression CumulocitySeverities getFaultRegisterAlarmSeverity(position) [
importClass(com.cumulocity.model.event.CumulocitySeverities);
switch (position) {
case 0:
CumulocitySeverities.MAJOR;
break;
case 1:
CumulocitySeverities.WARNING;
break;
case 2:
CumulocitySeverities.MINOR;
break;
case 3:
CumulocitySeverities.CRITICAL;
break;
default:
CumulocitySeverities.MAJOR;
break;
};
];
create expression String getFaultRegisterAlarmText(position)[
switch(position) {
case 0:
"The machine temperature reached a critical status";
break;
case 1:
"There was an error trying to process data";
break;
case 2:
"Door was opened";
break;
case 3:
"There was a critical system failure";
break;
default:
"An undefined alarm was reported on position " || position || " in the binary fault register";
break;
};
];
ビット値を持つメジャーメントを解析するには、文字列化して文字ごとに操作します。
getActiveBits() 関数はその処理を行い、メジャーメントのビットが “1” であるビット位置のリストを返却します。
処理結果は List
create schema BitPosition(
position int
);
create schema MeasurementWithBinaryFaultRegister(
measurement Measurement,
faultRegister String
);
create expression Collection getActiveBits(value) [
importPackage(java.util);
var bitOnNumbers = new ArrayList();
var size = value.length;
for(var no = 0; no < size; no++) {
if(value.charAt(no) == "1") {
bitOnNumbers.add(Collections.singletonMap('position', size - no - 1));
}
}
bitOnNumbers;
];
@Name("extract_fault_register")
insert into MeasurementWithBinaryFaultRegister
select
m.measurement as measurement,
getString(m, "c8y_BinaryFaultRegister.errors.value") as faultRegister
from MeasurementCreated m
where getObject(m, "c8y_BinaryFaultRegister") is not null;
@Name("creating_alarm")
insert into CreateAlarm
select
m.measurement.source as source,
getFaultRegisterAlarmSeverity(bit.position) as severity,
CumulocityAlarmStatuses.ACTIVE as status,
m.measurement.time as time,
getFaultRegisterAlarmType(bit.position) as type,
getFaultRegisterAlarmText(bit.position) as text
from
MeasurementWithBinaryFaultRegister m unidirectional,
MeasurementWithBinaryFaultRegister[getActiveBits(faultRegister)@type(BitPosition)] as bit;
メジャーメント は以下のように生成します。
{
"c8y_BinaryFaultRegister": {
"errors": {
"value": 10110
}
},
"time":"...",
"source": {
"id":"..."
},
"type": "c8y_BinaryFaultRegister"
}
最後の文は3回トリガーします。
したがって、3回アラームを生成します。
何かしらの現在の水位を測るセンサーがあるとしましょう。このセンサーが Things Cloud へ定期的に値を送信している場合、消費量を簡単に計算することができます。 2つのメジャーメントの差の計算は有用ですが、それはメジャーメントが常に一定の間隔で送られる前提での話です。 時間間隔の変動によらない絶対量として1時間と決め、1時間あたりの消費量を計算することにしましょう。
あるデバイスに対する隣り合った2つのメジャーメントの値と時間の差を比較します(コンテキストの使用が必要となります)。
create schema FillLevelMeasurement(
measurement Measurement,
value double
);
create schema AdjacentFillLevelMeasurements(
firstValue double,
lastValue double,
firstTime Date,
lastTime Date,
source String
);
create context ConsumptionMeasurementDeviceContext
partition measurement.source.value from FillLevelMeasurement;
create expression double calculateConsumption(firstValue, lastValue, firstTime, lastTime) [
if (lastTime == firstTime) {
0;
} else {
((firstValue - lastValue) * 3600000) / (lastTime - firstTime);
}
];
@Name("filter_fill_level_measurements")
insert into FillLevelMeasurement
select
m.measurement as measurement,
cast(getNumber(m, "c8y_WaterTankFillLevel.level.value"), double) as value
from MeasurementCreated m
where getObject(m, "c8y_WaterTankFillLevel") is not null;
@Name("combine_two_latest_measurements")
context ConsumptionMeasurementDeviceContext
insert into AdjacentFillLevelMeasurements
select
first(m.value) as firstValue,
first(m.measurement.time) as firstTime,
last(m.value) as lastValue,
last(m.measurement.time) as lastTime,
context.key1 as source
from FillLevelMeasurement.win:length(2) m;
@Name("create_consumption_measurement")
insert into CreateMeasurement
select
m.lastTime as time,
m.source as source,
"c8y_HourlyWaterConsumption" as type,
{
"c8y_HourlyWaterConsumption.consumption.value", calculateConsumption(m.firstValue, m.lastValue, m.firstTime.toMillisec(), m.lastTime.toMillisec()),
"c8y_HourlyWaterConsumption.consumption.unit", "l/h"
} as fragments
from AdjacentFillLevelMeasurements m;
以下のCEPルール/モジュールは、Things Cloud 内で Zementis 分析モデルを使用する方法を示しています。
入力データが次のようになると想定します:
{
"c8y_SteamMeasurement": {
"Temperature": {
"value": ...,
"unit": "C"
}
},
{
"c8y_TemperatureMeasurement": {
"Pressure": {
"value": ...,
"unit": "bar"
}
},
{
"c8y_TemperatureMeasurement": {
"Steamoutput": {
"value": ...,
"unit": "%"
}
},
"time":"...",
"source": {
"id":"..."
},
"type": "c8y_TemperatureMeasurement"
}
まず、予測モデルが作成され、Zementisコンソールを介してアップロードされます。 モデルが https://myadapa.zementis.com:443/adapars/apply/model_name エンドポイントでデータスコアリングに使用できるようになったと仮定します。
CEP モジュール:
create constant variable string model_name = "model_name";
create constant variable string model_url = "https://myadapa.zementis.com:443/adapars/apply/";
create constant variable string auth = "Basic ...";
create constant variable string source_device = "12345";
create expression string js:getLabel(stringObj)[
var zemOutputs = JSON.parse(stringObj).outputs;
output = zemOutputs.pop().Predicted_label;
];
@Name("inputData")
insert into inputDataAll
select
m.source as source,
getNumber(m, "c8y_SteamMeasurement.Temperature.value") as `steam.temperature`,
getNumber(m, "c8y_SteamMeasurement.Pressure.value") as `steam.pressure`,
getNumber(m, "c8y_SteamMeasurement.Steamoutput.value") as `steam.steamoutput`
from MeasurementCreated m
where
measurement.source.value = source_device;
@Name("requestZementis")
insert into SendRequest
select
"GET" as method,
model_url || model_name || "?record=" || toJSON(m.*) as url,
auth as authorization,
"application/json" as contentType,
m.source as source
from inputDataAll m;
@Name("responseZementis")
insert into CreateEvent
select
"response_received_" || getString(response, "status") as type,
getLabel(response.body) as text,
response.creationTime as dateTime,
getString(response, "source.value") as source
from ResponseReceived response
where
getString(response, "source.value") = source_device;
@Name("generateAlarm")
insert into CreateAlarm
select
response.creationTime as dateTime,
getString(response, "source.value") as source,
"cepFailureAlarm" as type,
"Zementis Test Failure" as text,
"ACTIVE" as status,
"MAJOR" as severity
from ResponseReceived response
where
getString(response, "source.value") = source_device
and getLabel(response.body) = "0";
Things Cloud CEPモジュールは次のように機能します: