1時間ごとのメジャーメント平均値を計算
入力データは次のようなものと想定します:
{
"c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
"time": "...",
"source": {"id":"..."},
"type": "c8y_TemperatureMeasurement"
}
平均値を作成するには、モジュールに次のパーツが必要です:
-
1時間以上の時間ウィンドウをデバイス(source)ごとにグループ化したもの
-
1時間ごとの平均値、source、単位を返す
select
(時間ウィンドウの内容に対して集計を使用する必要があるため、最後の単位を選択します。すべてのメジャーメントが同じ単位であると仮定します)。AverageByDevice
イベント定義はこれらを保持するためのものです。 -
新しいメジャーメントとして作成されたすべての情報
例:
using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;
monitor HourlyAvgMeasurementDeviceContext {
event AverageByDevice {
string source;
float avgValue;
string unit;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
group by m.source select
AverageByDevice(m.source,
avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
{"c8y_AverageTemperatureMeasurement":
{
"T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
}
}, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
}
}
}
ビットメジャーメントからアラームを作成
デバイスは多くの場合、アラームステータスをレジスタに保持し、アラームの意味を解釈できません。 次の例では、デバイスがレジスタ全体をメジャーメントのバイナリ値として送信することを想定しています。ルールがビットを識別し、それぞれのアラームを生成します。
各ビットのアラームテキスト、タイプ、重大度をマッピングするために3つのディクショナリーを作成し、値を検索するアクションを作成します。-1を使用してデフォルト値を示し、<position>を位置の文字列形式に置き換えます。
dictionary<integer, string> positionToAlarmType := {
0 : "c8y_HighTemperatureAlarm",
1 : "c8y_ProcessingAlarm",
2 : "c8y_DoorOpenAlarm",
3 : "c8y_SystemFailureAlarm",
-1 : "c8y_FaultRegister<position>Alaram"
};
dictionary<integer, string> positionToAlarmSeverity := {
0 : "MAJOR",
1 : "WARNING",
2 : "MINOR",
3 : "CRITICAL",
-1 : "MAJOR"
};
dictionary<integer, string> positionToAlarmText := {
0 : "The machine temperature reached a critical status",
1 : "There was an error trying to process data",
2 : "Door was opened",
3 : "There was a critical system failure",
-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};
action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
string template := lookup.getOr(bitPosition, lookup[-1]);
return template.replaceAll("<position>", bitPosition.toString());
}
バイナリメジャーメントの値を分析するには、文字列値として解釈し、各文字をループします。 getActiveBits()
関数がそれを行い、メジャーメントが「1」だったビット位置のリストを返します。 その後、for
ループを使用してそれを反復処理します:
action getBitPositions(string binaryAsText) returns sequence<integer> {
sequence<integer> bitsSet := new sequence<integer>;
integer i := 0;
while i < binaryAsText.length() {
string character := binaryAsText.substring(i, i+1);
if character = "1" {
bitsSet.append(binaryAsText.length() - i - 1);
}
i:=i+1;
}
return bitsSet;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement(type = "c8y_BinaryFaultRegister") as m {
string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
integer bitPosition;
for bitPosition in getBitPositions(faultRegister) {
Alarm alarm := new Alarm;
alarm.type := getText(bitPosition, positionToAlarmType);
alarm.severity := getText(bitPosition, positionToAlarmSeverity);
alarm.text := getText(bitPosition, positionToAlarmText);
alarm.source := m.source;
alarm.time := m.time;
alarm.status := "ACTIVE";
send alarm to Alarm.SEND_CHANNEL;
}
}
}
以下のようなメジャーメントを作成します。
{
"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
"time": "...",
"source": {"id": "..."},
"type": "c8y_BinaryFaultRegister"
}
これは最後のステートメントを3回トリガーします。
- ビット位置1のメジャーメント - c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”
- ビット位置2のメジャーメント - c8y_DoorOpenAlarm, MINOR, “Door was opened”
- ビット位置4のメジャーメント - c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”
したがって、3つのアラームが生成されます。
消費量メジャーメント
何かの現在の充填レベルを測定し、その値を定期的にThings Cloudに送信するセンサーがあると仮定すると、追加の消費値を簡単に生成することができます。2つのメジャーメントの絶対値の差を計算することは有用ですが、メジャーメントが常に同じ間隔で送信されている場合にのみ、明確な見解を得ることができます。そこで、絶対値の差を時間差と関連づけて、1時間あたりの消費量として計算してみます。
ここでは、2つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、1つのデバイスに対する2つの隣接するメジャーメントの値と時間差を比較します。
using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;
monitor FillLevelMeasurements {
event FillLevel {
float firstValue;
float firstTime;
float lastValue;
float lastTime;
string source;
}
action calculateConsumption(FillLevel l) returns float {
if(l.firstTime = l.lastTime) {
return 0.0;
} else {
return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
}
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {
Measurement m := new Measurement;
m.type := "c8y_HourlyWaterConsumption";
m.time := currentTime;
m.source := fill.source;
MeasurementValue mv := new MeasurementValue;
mv.value := calculateConsumption(fill);
mv.unit := "l/h";
m.measurements[m.type] := {"consumption":mv};
send m to Measurement.SEND_CHANNEL;
}
}
}
その他のサンプルアプリケーション
ストリーミング分析アプリケーションのEPLエディターには、Things Cloudオブジェクトのデータを取得したり、アラームを作成するなどの、Apama EPLの使用方法を示すサンプルアプリケーションがいくつか用意されています。これらのサンプルを使用して、独自アプリケーションを構築できます。