例題

1時間ごとのメジャーメント平均値を計算

入力データは次のようなものと想定します:

{
  "c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
  "time": "...",
  "source": {"id":"..."},
  "type": "c8y_TemperatureMeasurement"
}

平均値を作成するには、モジュールに次のパーツが必要です:

例:

using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;

monitor HourlyAvgMeasurementDeviceContext {

  event AverageByDevice {
    string source;
    float avgValue;
    string unit;
  }

  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

    from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
      group by m.source select
        AverageByDevice(m.source,
          avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
          last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
            send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
              {"c8y_AverageTemperatureMeasurement":
                {
                  "T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
                }
              }, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
          }
  }
}

ビットメジャーメントからアラームを作成

デバイスは多くの場合、アラームステータスをレジスタに保持し、アラームの意味を解釈できません。 次の例では、デバイスがレジスタ全体をメジャーメントのバイナリ値として送信することを想定しています。ルールがビットを識別し、それぞれのアラームを生成します。

各ビットのアラームテキスト、タイプ、重大度をマッピングするために3つのディクショナリーを作成し、値を検索するアクションを作成します。-1を使用してデフォルト値を示し、<position>を位置の文字列形式に置き換えます。

dictionary<integer, string> positionToAlarmType := {
	0  : "c8y_HighTemperatureAlarm",
	1  : "c8y_ProcessingAlarm",
	2  : "c8y_DoorOpenAlarm",
	3  : "c8y_SystemFailureAlarm",
	-1 : "c8y_FaultRegister<position>Alaram"
};

dictionary<integer, string> positionToAlarmSeverity := {
	0  : "MAJOR",
	1  : "WARNING",
	2  : "MINOR",
	3  : "CRITICAL",
	-1 : "MAJOR"
};

dictionary<integer, string> positionToAlarmText := {
	0  : "The machine temperature reached a critical status",
	1  : "There was an error trying to process data",
	2  : "Door was opened",
	3  : "There was a critical system failure",
	-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};

action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
	string template := lookup.getOr(bitPosition, lookup[-1]);
	return template.replaceAll("<position>", bitPosition.toString());
}

バイナリメジャーメントの値を分析するには、文字列値として解釈し、各文字をループします。 getActiveBits()関数がそれを行い、メジャーメントが「1」だったビット位置のリストを返します。 その後、forループを使用してそれを反復処理します:

action getBitPositions(string binaryAsText) returns sequence<integer> {
	sequence<integer> bitsSet := new sequence<integer>;
	integer i := 0;
	while i < binaryAsText.length() {
		string character := binaryAsText.substring(i, i+1);
		if character = "1" {
			bitsSet.append(binaryAsText.length() - i - 1);
		}
		i:=i+1;
	}
	return bitsSet;
}

action onload() {
	// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
	monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
	on all Measurement(type = "c8y_BinaryFaultRegister") as m {
		string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
		integer bitPosition;
		for bitPosition in getBitPositions(faultRegister) {
			Alarm alarm    := new Alarm;
			alarm.type     := getText(bitPosition, positionToAlarmType);
			alarm.severity := getText(bitPosition, positionToAlarmSeverity);
			alarm.text     := getText(bitPosition, positionToAlarmText);
			alarm.source   := m.source;
			alarm.time     := m.time;
			alarm.status   := "ACTIVE";
			send alarm to Alarm.SEND_CHANNEL;
		}
	}
}

以下のようなメジャーメントを作成します。

{
    "c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
    "time": "...",
    "source": {"id": "..."},
    "type": "c8y_BinaryFaultRegister"
}

これは最後のステートメントを3回トリガーします。

したがって、3つのアラームが生成されます。

消費量メジャーメント

何かの現在の充填レベルを測定し、その値を定期的にThings Cloudに送信するセンサーがあると仮定すると、追加の消費値を簡単に生成することができます。2つのメジャーメントの絶対値の差を計算することは有用ですが、メジャーメントが常に同じ間隔で送信されている場合にのみ、明確な見解を得ることができます。そこで、絶対値の差を時間差と関連づけて、1時間あたりの消費量として計算してみます。

ここでは、2つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、1つのデバイスに対する2つの隣接するメジャーメントの値と時間差を比較します。

using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;

monitor FillLevelMeasurements {

	event FillLevel {
		float firstValue;
		float firstTime;
		float lastValue;
		float lastTime;
		string source;
	}

	action calculateConsumption(FillLevel l) returns float {
		if(l.firstTime = l.lastTime) {
			return 0.0;
		} else {
			return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
		}
	}

	action onload() {
		// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
			select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
							last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {

			Measurement m := new Measurement;
			m.type := "c8y_HourlyWaterConsumption";
			m.time := currentTime;
			m.source := fill.source;
			MeasurementValue mv := new MeasurementValue;
			mv.value := calculateConsumption(fill);
			mv.unit := "l/h";
			m.measurements[m.type] := {"consumption":mv};
			send m to Measurement.SEND_CHANNEL;
		}
	}
}

その他のサンプルアプリケーション

ストリーミング分析アプリケーションのEPLエディターには、Things Cloudオブジェクトのデータを取得したり、アラームを作成するなどの、Apama EPLの使用方法を示すサンプルアプリケーションがいくつか用意されています。これらのサンプルを使用して、独自アプリケーションを構築できます。