Apama EPL をはじめる
投稿日 / 投稿者名
2021.10.20 / アプリT
2021.10.20 / アプリT
本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。 Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、 利用できない可能性があることをあらかじめご了承ください。 なお、作成にあたり、以下バージョンを用いています。
Things Cloud のカスタムストリーミング処理機能(CEP/EPL)について記載しているカスタムストリーミング処理機能ガイドもあわせてご覧ください。
また、本レポートでは、Software AG社 による Apamaの公式ドキュメントに多数リンクしていますので、ご活用ください。他のリンクと区別するため、Apamaドキュメントへのリンクは英文で斜体文字としています。
Apamaは、複合イベント処理(CEP: complex event processing) と呼ばれる技術を使って、イベントを処理するプラットフォームです。
イベントストリーム内をIoTデバイスから報告される計測値や状態など時々刻々と流れている「イベント」と呼ばれるデータ要素を監視し、記述されるビジネスロジックに従い、イベントドリブンでイベントのパターンを検出しリアルタイムに処理を行います。
上の図のようにストリーム内を時間軸でイベント(データ)が流れているイメージで、しきい値を超える温度イベントを3回以上受信するという条件に基づいてイベントを監視し、条件を満たすイベントを検出したらアラームを生成させるというようなイベント処理をリアルタイムに行います。
このように一致するイベントを監視し、検出したことを契機にどのような処理を実行するかをApamaのコリレーターに指示します。
Apamaは、複合イベント処理(CEP)、カスタムストリーミング処理機能を行うインメモリのシステムで、Apamaアプリケーションで定義されたイベントパターンを検出し処理を実行するのが、Apamaのコリレーターというランタイムのサーバープロセスになります。
Things Cloudのスマートルールや、カスタムストリーミング処理機能アプリケーションで作成したAnalytics Builderでのイベント処理モデル、EPL AppsでのEPLアプリは、 このコリレーターにデプロイされ実行されます。Things Cloudでは、これらはApama-ctrlマイクロサービスによって実行されます。Apama-ctrlマイクロサービスはテナントごとに登録され、独立した領域を持つインスタンス内で実行されます。マイクロサービスのランタイムとアプリケーションをあわせてご覧ください。
2021.10.20 / 三橋
1時間
Apama EPL Appsは、Apama のコリレーターで動作するEPLコードを GUI のエディター上で開発できるアプリケーションです。
Apamaのサーバープロセスであるコリレーターがストリーム内を時々刻々と流れているイベントをリアルタイムに受け入れて解釈するには、Apamaのイベント処理言語であるEPL(Event Processing Language)でプログラミングする必要があります。EPLはイベントを処理するプログラムを作成するために設計されたドメイン固有の言語で、波括弧{ }
によるブロック構造で強い型付けがあります。EPLコードは、イベントの到着に応じイベントドリブンで記述します。
Apama EPL Appsの中でサンプルコードを表示、活用できるようになっていたり、エディターにはコード補完やシンタックスハイライトが搭載されている他、保存(ctrl + s)などのショートカットキーも利用できます。
Apama EPL Appsアプリケーションから作成するEPLアプリがモニターファイル(*.mon ファイル)であり、アプリケーションの実行単位です。
EPLアプリの作成については、Apama EPL Appsを使用したアプリケーションの開発 の ステップ 2 - EPLアプリの作成をご覧ください。
コリレーターにEPLアプリがデプロイされると、コリレーターはモニターのインスタンスを生成し、モニターがロードされた直後にonload()
アクションを実行します。そのため、onloadアクションは必ず定義する必要があります。
モニターでは、どのようなイベントを監視し、指定したイベントが見つかった場合にどのような処理を実施するかをonloadアクションに定義します。
monitor MonitorName {
action onload() { // 必須
// do something
}
}
モニターアクションは、モニターのライフサイクルで自動的に呼び出されます。 - Monitor lifecycle
モニターの終了時は、コリレーターはモニターのインスタンスごとにondie()
アクションを呼び出し、最後のモニターインスタンスが終了したときonunload()
アクションを呼び出します。
Monitor actions > Simple actions
onload()
アクションは必ず指定し、コリレーターはモニターのインスタンスを生成し、モニターがロードされた直後に実行します。ondie()
アクションを定義している場合、コリレーターはモニターインスタンスが終了すると実行します。onunload()
アクションを定義している場合、コリレーターはモニターをアンロードする直前に実行します。モニターインスタンスはすべてのコードを実行しアクティブなリスナー(イベント監視)がなくなったとき速やかにモニターを終了し、メモリから削除されます。- Terminating monitors
複製したモニターインスタンスはdie
ステートメントで終了できます。
イベントは、センサーの状態が変化したなど、なんらかのイベントが発生したことの通知を含むデータのオブジェクトです。コリレーターがイベントを理解するために基本構造に従う必要があります。各種イベントには、イベント型名と、それに関連付けられたイベントフィールドと呼ばれるデータ要素があります。
event EventTypeName {
[ field_type fieldName; ] // フィールド
[ constant field_type fieldName := literal_value; ] // 定数フィールド(定数値を設定し変更できない)
[ wildcard field_type fieldName; ] // ワイルドカードフィールド(イベントフィルターに指定できない)
:
[ eventActionDefinition ] // イベントアクション
}
イベントフィールドのデータ型(Types)は、強い型付けがあり、プリミティブデータ型 string
、 integer
、 float
、 decimal
、 boolean
をサポートしています。このほか、イベントアクションや独自定義したイベント型、参照型(sequence
、dictionary
など)も指定できます。(chunk、 stream、 および listener のデータ型を使用することはできません。)
インスタンス生成時などにフィールドの順序が重要となります。
イベント型を定義する上でのガイドライン
// 例)
event WindMeasurement { // イベント名: UpperCamelCase - 単語の最初の文字は大文字
string location; // イベントフィールド名: lowerCamelCase - 最初の単語の文字は小文字、続く単語の最初の文字は大文字
float windSpeed;
integer windHeading;
}
インスタンスの生成
以下は、上記のイベント型(例)のインスタンスを生成する例で両者とも同じ値が設定されます。
// 例)
// インスタンスを生成し、別途値を設定
WindMeasurement windMeasurement := new WindMeasurement;
windMeasurement.location := "Site 1";
windMeasurement.windSpeed := 11.5;
windMeasurement.windHeading := 3;
// 例)
// インスタンス生成時に値を設定
WindMeasurement windMeasurement := WindMeasurement("Site 1", 11.5, 3);
com.apama.cumulocityパッケージから利用することができます。イベントとチャネル > 定義済みのイベント型 をご覧ください。
フィールドデータに関連するイベントアクションをイベント型に定義できます。- Specifying actions in event definitions
// 例)
event Temperature {
float celsius;
// イベントフィールドのcelsius(摂氏)からfahrenheit(華氏)を算出
action calcFahrenheit() returns float {
return celsius * 9.0/5.0 + 32.0;
}
}
:
Temperature temp := Temperature(100.0);
// 温度イベントの一部としてfahrenheit(華氏)を算出するアクションを呼び出し
log temp.calcFahrenheit().toString(); // 212
:
self
パラメーターがあります。
action<float> returns float
ではなく、 action<Temperature, float> returns float
になります。datatype variableName [:= initial_value]:
// 例)
anInteger.toString()
anInteger.toFloat()
aBoolean.toString() // "true" または "false" を返却
aFloat.toString()
aFloat.floor() // aFloatの数以下の最大の整数を返却 (< aFloat)
aFloat.ceil() // aFloatの数以上の最小の整数を返却 (> aFloat)
aString.toInteger() // 変換できない場合は 0 を返却
aString.toFloat() // 変換できない場合は 0.0 を返却
coassignment
を使って、一致するイベントを変数に代入します。
以下は、変数 m に一致するイベント(Measurement)を代入する例です。
// 例)
// コードブロック内で利用する変数
on all Measurement(type="c8y_Temperature") as m { code_block }
// 例)
// 変数宣言した後に代入するため、コードブロック外でも利用可能な変数
Measurement m;
on all Measurement(type="c8y_Temperature") : m { code_block }
いくつかの参照型(Reference types)より下記のオブジェクト(型)をご紹介します。
sequence<item_type> variableName
// 例)
sequence<float> speeds := [12.30, 11.95, 12.50, 11.75, 12.10];
float speed := speeds[2]; // 配列要素参照(代入) Output speed: 12.50
speeds[0] := 11.11; // 配列要素更新 Output speeds: [11.11, 11.95, 12.50, 11.75, 12.10]
speeds.append(99.99); // 配列要素追加 Output speeds: [11.11, 11.95, 12.50, 11.75, 12.10, 99.99]
speeds.remove(2); // 配列要素削除 Output speeds: [11.11, 11.95, 11.75, 12.10, 99.99]
speeds.size(); // 要素数取得 Output speeds length: 5
speeds.remove(); // すべての配列要素削除 Output speeds: []
dictionary<key_type, item_type> variableName
// 例)
dictionary<string,float> windLimitDict := {"Site 1":12.0, "Site 2":13.5};
float wl := windLimitDict["Site 1"]; // 指定したキーの参照(代入) Output wl: 12.0
windLimitDict["Site 1"] := 11.11; // 指定したキーのvalue更新 Output windLimitDict: {"Site 1":11.11, "Site 2":13.5}
windLimitDict.add("Site 9", 99.99); // key/valueペアの追加 Output windLimitDict: {"Site 1":12.0, "Site 2":13.5, "Site 9", 99.99}
windLimitDict.remove("Site 2"); // 指定したキーのkey/valueペアの削除 Output windLimitDict: {"Site 1":12.0, "Site 9", 99.99}
windLimitDict.hasKey("Site 9"); // 指定したキーが存在するか Output windLimitDict exists: true
stream<item_type> variableName
// 例)
stream<Measurement> measurements := all Measurement(type="c8y_Temperature");
// do something
measurements.quit(); // quitメソッドでストリームを適切に終了する必要があります。
// 引数、返値ともに指定しない場合
action actionName() {
// do something
}
// 引数、返値をともに指定する場合
action actionName(type1 param1, type2 param2, ...) returns type3 {
// do something
return type3_instance;
}
EPL用のAPIリファレンスのパッケージがあり、using ディレクティブで使いたいパッケージの名前空間からインポートして定義された型を利用します。EPLアプリを新規作成時、デフォルトでCumulocityのものが多数インポートされます。
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
:
using com.apama.util.AnyExtractor;
if condition {
code_block
} [ else { code_block } ]
// 例)
float temperature := 32.0;
if temperature > 25.0 {
log "It is warm"; // Output log
} else if temperature < 0.0 {
log "It is cold";
} else {
log "It is cool";
}
for
ステートメントでシーケンス内の要素を反復処理します。反復変数の型は、シーケンス要素のデータ型と一致する必要があります。for iterationIdentifier in sequence_instance { code_block }
// 例)
sequence<string> sensorIds := ["S005","S010","S015","S020"];
string s;
for s in sensorIds {
Temperature t;
on all Temperature(id=s) : t {
// do something
}
}
while
ステートメントは、指定した条件の結果がtrueであるときにブロックを実行した後ループの先頭に戻り、falseになるとループは終了します。while condition { code_block }
// 例)
IncomingOrder i;
on all IncomingOrder() : i {
integer qty := i.quantity;
while(qty > 100) {
route OutgoingOrder(i.symbol, 100, i.price);
qty := qty – 100;
}
if (qty > 0) then {
route OutgoingOrder(i.symbol, qty, i.price);
}
}
ループの終了には
break
、ループの先頭へのスキップにはcontinue
、ループとループを含むアクションを終了するにはreturn
があります。
イベントリスナーは、イベントパターンを定義し、一致したイベントが発生したときに実行するアクションをon
ステートメントで定義します。
イベントを監視するには、モニターでイベントリスナーを定義します。コリレーターがonステートメントを実行すると、イベントリスナーが作成されます。コリレーターはイベントストリーム内の各イベントを監視し、onステートメントに指定されたイベントが発生すると、イベントリスナーがトリガーされ、リスナーアクションを実行します。
on event_expression [ coassignment ] listener_action
イベントテンプレートは、条件に一致する入力パターンを検出するために、on
ステートメントで使用し、イベントのフィールドの値に基づいてフィルタリングを指定します。
イベントフィールドのフィルターを定義するには、各フィールドの名前による指定(Named
)と、各フィールドの位置による指定(Positional
)があります。ここでは、Named
について説明します。
// 例)
WindMeasurement(location = "Site 1") // 指定した値に等しい
WindMeasurement(windSpeed > 12.4) // 指定した値より大きい
WindMeasurement(windSpeed >= 12.4) // 指定した値以上
WindMeasurement(windSpeed < 12.4) // 指定した値より小さい
WindMeasurement(windSpeed <= 12.4) // 指定した値以下
WindMeasurement(windSpeed in [12.4:12.9]) // 指定範囲 しきい値も含まれる ([]含む)
WindMeasurement(windSpeed in (12.4:12.9)) // 指定範囲 しきい値は含まれない (()含まない)
WindMeasurement(windHeading in [0:10)) // 指定範囲 最初の値のしきい値は含まれ、最後の値のしきい値は含まれない(0〜9)
WindMeasurement(location = "Site 1", windSpeed > 12.4) // 複数指定
- フィルタリングを指定しなくても構いません。e.g. WindMeasurement()
- フィルターの値(右辺)にイベントフィールドを指定できません。e.g.
WindMeasurement(windSpeed > windHeading)- 同じイベントフィールドは1回のみで重複して指定できません。e.g.
WindMeasurement(location = “Site 1”, location = “Site 9”)
イベント式は、1つのイベントテンプレートや、イベント演算子を追加し複数のイベントテンプレートとの組み合わせや、時間演算子のみで定義します。- Event expressions
not X
: Xではない場合、つまり、Xがトリガーされない場合は真となります。- Specifying the ‘not’ operator in event expressions// 例)
// 一致するWindMeasurementイベントを検知して、(次のWindMeasurementイベントを検知するまでに)AlertSubmittedイベントを検知していないとき
on all WindMeasurement(location = "Site 1", windSpeed > 12.5) as wind and not AlertSubmitted() {
// do something e.g. raise a new alert for wind
}
X -> Y
: Xがトリガーがされた後に、Yが評価されます。Yがトリガーされた場合、式は真となります。- Evaluating event listeners for all A-events followed by B-events// 例)
// 一致するNewsTickイベントを検知した後に、一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") -> Turbine(location="Site 1") {
// do something e.g. notify the user
}
X or Y
: X または Yのどちらがトリガーされた場合、式は真となります。- Specifying the ‘or’ operator in event expressions// 例)
// 一致するNewsTickイベントか一致するTurbineイベントのどちらかを検知したとき
on NewsTick(location="Site 1") or Turbine(location="Site 1") {
// do something e.g. notify the user
}
X and Y
: X かつ Yがトリガーされた場合は、式は真となります。- Specifying the ‘and’ operator in event expressions// 例)
// 一致するNewsTickイベントと一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") or Turbine(location="Site 1") {
// do something e.g. notify the user
}
all X
: 繰り返しXがトリガーされます。
wait(seconds)
: イベントリスナー内で指定した時間(秒)待機します。- Waiting within an event listener
// 例)
// 毎秒10秒たったとき
on all wait(10.0) {
// do something
}
within(seconds)
: 設定した時間内のイベントパターンを監視します。- Listening for event patterns within a set time// 例)
// 一致するNewsTickイベントを検知してから、120秒の間に一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") -> Turbine(location="Site 1") within(120.0) {
// do something e.g. notify the user
}
at(minutes, hours, days_of_month, months, days_of_week [ ,seconds])
: 特定の時間にイベントリスナーをトリガーします(*
はワイルドカード)。- Triggering event listeners at specific times// 例)
// 毎時5分になったら
on all at(5,*,*,*,*) {
// do something
}
quit
メソッドでリスナーを終了します。// 例)
listener l := on all WindMeasurement(location="Site 1", windSpeed < 12.5) as wind;
// do something
l.quit();
not
を使用してリスナーを終了します(and not 演算子)。 コリレーターは、トリガーできないイベントリスナーを認識するとコードブロックの処理を終了した後、リスナーを終了します。- Specifying ‘and not’ logic to terminate event listeners// 例)
// 一致するWindMeasurementイベントを検知して、(次のWindMeasurementイベントを検知するまでに)AlertSubmittedイベントを検知していないとき
on all WindMeasurement(location="Site 1", windSpeed > 12.5) as wind and not AlertSubmitted() {
// do something e.g. raise a new alert for wind
}
ストリームリスナーは、ストリームから意図したイベント(アイテム)を収集し、収集したストリームに対して、フィルターや、結合、集計などの標準的なリレーショナル操作で、派生アイテム(たとえば温度の平均値など)を生成するストリームクエリをfrom
ステートメントで定義します。ストリームクエリ定義は式であり、その結果はストリームです。このクエリの結果となるストリームを使ってストリームリスナーアクションを定義します。アクションは、クエリの結果の出力アイテムごとに一度だけ実行されます。
イベントを監視するには、モニターでストリームクエリ定義を含むステートメントを定義します。
ストリームクエリは、モニターインスタンスによって受信されたイベントから収集されたストリーム、またはストリームクエリによって作成されたストリームを取得し、付加価値のある派生アイテムを含むストリームを生成します。下の図のように、特定のストリームネットワークでは、上流のストリーム要素が下流のストリーム要素にフィードして派生アイテムを生成することができます。最終的に、ストリームネットワークからストリームリスナーで処理するアイテムを取り込みます。
from itemIdentifier in stream_expression window_definition [ coassignment ] listener_action
// 例)
// 30秒間のTemperatureイベントのidごとのcelsiusの平均値をルーティングする
MeanTemp meanTemp;
from t in all Temperature() within 30.0
group by t.id
select MeanTemp(mean(t.celsius)) : meanTemp {
route meanTemp;
}
ストリームクエリは、デフォルトでは到着した単一のイベントに対して機能しますが、ウィンドウを定義することにより、ストリームクエリが対象とするアイテムの範囲を指定することができます。ウィンドウの内容は各アイテムの到着時に変更されます。
within
では、アイテムは特定の期間保持されます。- Defining time-based windowsretain
では、特定の数のアイテムが保持されます。- Defining size-based windowsevery
では、ウィンドウの内容を制御します。時間ベースのウィンドウでは p秒ごとにウィンドウの内容を更新し、サイズベースのウィンドウでは mイベントが到着した後にウィンドウの内容を更新するように制御します。- Defining batched windows// 例)
// 5個のイベントが到着した後にウィンドウの内容を更新
from t in all Temperature(id="S001") retain 5 every 5
select mean(t.celsius);
// 例)
// 1秒ごとにウィンドウの内容を更新
from t in all Temperature(id="S001") within 1.5 every 1.0
select mean(t.celsius);
where
: 取得するイベントを指定した条件でフィルタリングします。- Filtering items before projectionwhere boolean_expression
// 例)
// price*volume の値がしきい値を超えたTickイベント100個について、priceの平均値を取得
from t in all Tick() retain 100
where t.price * t.volume > threshold
select mean(t.price)
クエリ結果の射影 - Generating query results
[group by groupBy_expression[, groupBy_expression]...] [having boolean_expression]
select [rstream] select_expression
group by
: 集約したアイテムを指定した値でグループ化します。- Aggregating items in projectionsgroup by groupBy_expression[, groupBy_expression]...
// 例)
// 30秒の間のTickイベントでsymbolごとにpriceの平均を取得
from t in all Tick() within 30.0
group by t.symbol
select TickAverage(t.symbol, mean(t.price));
having
: 集約したアイテムを指定した条件でフィルタリングします。- Filtering items in projectionshaving boolean_expression
// 例)
// 60秒の間のTemperatureイベントが10個より多いときvalueの平均を取得
from t in all Temperature() within 60.0
having count() > 10
select mean(t.value)
select
: 集約したアイテムから取得するアイテムを定義します。select_expression
// 例)
// 60秒ごとの60秒の間のTemperatureイベントを取得
from t in all Temperature() within 60.0 every 60.0
select t
ストリームまたはストリームリスナーを作成した後、次のいずれかが発生するまで存在し続けます。- Stream network lifetime
quit
メソッドでリスナーを終了します。// 例)
listener l := from t in all Temperature(id="S001") retain 5
select MeanTemp(mean(t.celsius)) : meanTemp {
// do something
}
l.quit();
// 例)
// 発生したTurbineイベントのロケーションが一致するWindMeasurementイベントの中で、10秒の間に風速が上昇したとき
on all Turbine() as turbine {
from wind in all WindMeasurement(location=turbine.location) within 10.0
having (last(wind.windSpeed) - first(wind.windSpeed) > 1.0)
select SubmitAlarm(last(wind.location), last(wind.windSpeed), “ACTIVE”) as submitAlarm {
// do something e.g. send submitAlarm
}
}
// 例)
// 10秒間の温度の平均値がしきい値の100を超えた場合、センサーごとに10秒待って、しきい値を超えない温度に戻らなかったとき
float THRESHOLD := 100.0;
from sens in all SensorData within 10.0 group by sens.id
having mean(sens.temperature) > THRESHOLD
select OverTempAlert(last(sens.id), mean(sens.temperature)) as overTempAlert {
on all wait(10.0) and not SensorData(id=overTempAlert.id, temperature < THRESHOLD) {
// do something e.g. send overTempAlert
}
}
// 例)
// ファイルを受け取ってから、30秒以内に処理が終了するか
action onload() {
on all FileReceived() as f {
// 30秒待った後にファイルが処理されなかった(受け取ったファイルの処理済イベントを検知しなかった)とき
on wait(30.0) and not FileProcessed(id=f.id) {
// ファイルが処理されなかった旨のアラームを送信など
// do something e.g. send alert that file was not processed
}
// 30秒以内にファイルが処理された(受け取ったファイルの処理済イベントを検知した)とき
on FileProcessed(id=f.id) within(30.0) {
// ファイルが処理された旨の情報を送信など
// do something e.g. send confirmation that the file was processed ()
}
}
}
Things Cloud のイベントを監視するには、Things Cloud で定義済みのイベント型を使用します。イベントとチャネル をご覧ください。
イベントを送信するには、チャネルを指定して対象となるイベントを送信します。チャネルにサブスクライブされているすべてのコンテキストがイベントを受信します。
send
ステートメントは、イベントをコリレーターの出力キューのチャネルにパブリッシュするよう、コリレーターに指示します。
send event_instance to "channel-name"
Things Cloud の Measurement/Event/Alarm/.. といったデータを生成する場合は、イベントとチャネル > 標準のイベント型とチャネルの送信用チャネルにイベントを送信してください。
// 例)
// Things Cloud にアラームデータを生成する
Alarm alarm := Alarm("", "c8y_SpeedAlarm", m.source, currentTime, "Speed limit breached", "ACTIVE", "CRITICAL", 1, new dictionary<string,any>);
send alarm to Alarm.SEND_CHANNEL;
route
ステートメントは、イベントを現在のコンテキストの入力キューの先頭に送信します。
イベントは、routeステートメントの実行と同じコンテキスト内でのみ処理されます。ルーティングされたイベントは、入力キューの外部から送信されたイベントの前、およびまだ処理されていないルーティングされたイベントの前に配置されます。- Event processing order for monitors
route event_instance
なんらかのイベントを検知したときに独自定義したイベント型を使ってルーティングし、ルーティングしたイベントを監視するイベントリスナーを定義することができます。独自のイベント型の作成もあわせてご覧ください。
// 例)
monitor TurbineWatch {
action onload() {
on all Turbine(location="Site 1") as t {
// Turbineイベントの一致するロケーションで、最大風速を超えたWindMeasurementイベントがあったとき
on all WindMeasurement(location = t.location, windSpeed > t.max) as w {
// SubmitAlarmイベントをルーテイング
route SubmitAlarm(id, t.location, w.windSpeed, “ACTIVE”);
}
}
// SubmitAlarmイベントを検知したとき
on all SubmitAlarm() as a {
// do something e.g. shutdown Turbine at a.location
}
}
}
コリレーターはデフォルトではイベントが到着した順序でシリアルに動作しますが、コンテキストと呼ばれるコンテナ構造を使用して複数スレッドを同時に並列で処理するオプションがあります。
複数のモニターインスタンスを同時に処理する場合に、同時に処理するタスクの依存関係や処理順序を考慮する必要がないか、あるいは最小限であるときに並列処理を検討してください。
たとえば、異なる識別子を持つ同じ種類の複数のイベントに対してタスクを実行するとき(デバイスごとのメジャーメントに対する処理など)が該当します。
デバイスごとに並列でメジャーメントをトラッキングする例がありますので、モニターインスタンスとコンテキストの生成もご覧ください。
レポート中の構文における記号や要素などは、以下をご参考ください。
表記 | 構文要素 | 備考 |
---|---|---|
波括弧 { } |
ブロック | モニターやアクションなど開始と終了 |
山括弧 < > |
型引数 | 括弧内にオブジェクトのデータ型を指定 |
角括弧 [ ] |
オプション | 括弧内は任意指定 |
三点リーダー ... |
反復可 | 表記前オブジェクトの繰り返し |
code_block | コードブロック | 処理を記述 |
type | データ型 | string:文字列などのデータ型 filed_type,item_type,key_typeなども同義 |
value | 値 | initial_value:初期値などの値 |
instance | インスタンス | sequence_instance:配列インスタンスなどのオブジェクトインスタンス |
variableName | 変数名 | 変数名をlowerCamelCaseで記載 fieldNameなども同義 |
identifier | 識別子 | itemIdentifier:アイテム識別子など識別オブジェクト たとえば、ストリームクエリでのストリーム内のカレントアイテムを表すために使用 |
condition | 条件 | true または false で評価される条件文 |
expression | 式 | event_expression:イベント式などのオブジェクト式 たとえば、イベント式ではイベントテンプレートやイベント演算子などの組み合わせ |
coassignment | 代入演算子 | : または as で一致するイベントを変数に代入 (イベントテンプレートの一部) |
window_definition | ウィンドウ定義 | オプションでretain 、within 、every を使いウィンドウの範囲を任意指定(指定しない場合は単一イベント) |
listener_action | リスナーアクション | イベントリスナーがトリガーされたときに実行するステートメントまたはブロック |
2021.10.20 / 伊藤・高橋
1.5時間
Things Cloudで提供しているスマートルールは、実際はCEPを用いて作成されています。
このスマートルールを実現するコードをApamaで書いてみましょう。
ここでは、計測値が明示的しきい値に達した際にアラームを作成 のスマートルールに関して、スクリプトを作成します。
ただし、今回は指定のしきい値超過時に新規アラームを生成するのみの機能を持ったスクリプトを作成し、アラームクリアや障害値の範囲指定機能は省略します。
スクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。
ここからは、上記の流れで解説していきます。
as m
で、検知したMeasurementをm
へ格納します。
詳細は基本的な文法をご覧ください。on all Measurement() as m {}
{
"type": "c8y_TemperatureMeasurement",
"source": 1304435,
"com_TestMeasurement": {
"T": {
"unit": "C",
"value": 55
}
}
}
今回取得したい値は、カスタムフラグメント内のvalue
部分です。2つの取得方法が存在します。取得後、取得した値が規定値以上であるかのif文を作成すれば、判断できます。
Measurementのカスタムフラグメントはdictionary(辞書)型となっています。詳しくはリファレンス(Measurement)をご覧ください。
Apamaでdictionary内の値を取得する方法は2つ存在します。
getOrDefault
での取得方法m.measurements.getOrDefault("com_testMeasurement").getOrDefault("T").value
m.measurements["com_testMeasurement"]["T"].value
getOrDefaultに関して、詳しくはgetOrDefault をご覧ください。
action alarmCreate(string deviceId){
Alarm alarms := new Alarm;
alarms.source := deviceId;
alarms.time := currentTime;
alarms.text := "testAlarm";
alarms.type := "com_testAlarm";
alarms.severity := "MAJOR";
alarms.status := "ACTIVE";
send alarms to Alarm.CHANNEL;
}
Apamaでのアラーム作成時におけるフィールドの構成要素に関しては、以下をご参照ください。
new
でインスタンス化、Alarm作成に必要な情報をインスタンス化した変数へ格納し、Alarmチャネルへsendする方法で実装しています。using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Alarm;
monitor MySampleMonitor {
// 規定温度のイベント
event Constants {
// 規定温度は50度
constant float BORDER_MEASUREMENT_VALUE := 50.0;
}
action onload() {
monitor.subscribe(Measurement.CHANNEL);
// 1. 対象のMeasurementを検知(タイプがc8y_TemperatureMeasurement, デバイスIDが1304435)
on all Measurement(type="c8y_TemperatureMeasurement", source="1304435") as m {
// 2. Measurement内のデータが規定値以上かどうか判断
if (m.measurements.getOrDefault("com_testMeasurement").getOrDefault("T").value > Constants.BORDER_MEASUREMENT_VALUE) {
// 3. 規定値以上であればアラームを生成
alarmCreate(m.source);
}
}
}
// アラーム作成アクション(デバイスIDのみ受け取り、他は明示的に指定)
action alarmCreate(string deviceId){
Alarm alarms := new Alarm;
alarms.source := deviceId;
alarms.time := currentTime;
alarms.text := "testAlarm";
alarms.type := "com_testAlarm";
alarms.severity := "MAJOR";
alarms.status := "ACTIVE";
send alarms to Alarm.CHANNEL;
}
}
ここでは、デバイスからのMeasurement送信が60秒なかった場合、Alarmを生成するスクリプトを作成します。指定の条件は以下となります。
CLEARED
とするスクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。
CLEARED
へ変更し、生成されなければ何もしないここからは、上記の流れで解説していきます。
on all Measurement(type={対象Measurementのタイプ名})
60秒待機して次のMeasurementが生成されない場合
は以下で実現できます。on wait(60.0) and not Measurement(type={対象Measurementのタイプ名}, source={対象MeasurementのデバイスID})
dictionary<string, string> activeAlarms := {};
/* example
{"92063": alarmId, "984844": alarmId}
*/
on all Alarm(type={対象Alarmのタイプ名}, status={対象Alarmのステータス}) as al{
activeAlarms[al.source] := al.id;
}
on all Measurement
がトリガーとなります。
もしそのMeasurementのデバイスIDが、保存しているAlarm情報の中のデバイスIDと一致していれば、AlarmのステータスをCLEARED
とします。if activeAlarms.hasKey(m.source) then {
alarmDelete(activeAlarms[m.source]);
activeAlarms.remove(m.source);
}
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.aggregates.sum;
using com.apama.aggregates.mean;
using com.apama.aggregates.last;
monitor MySampleMonitor {
sequence<string> cacheAlarmList := ["", ""];
dictionary<string, string> activeAlarms := {}; // {"92063": alarmId, "984844": alarmId} を作っていく
sequence<boolean> flagList := [false, false];
sequence<string> sourceIds := ["92063", "984844"];
string s;
/** Initialize the application */
action onload() {
integer count := 0;
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
on all Measurement(type="com_TestMeasurement") as m{
if activeAlarms.hasKey(m.source) then {
logOut("Alarm Delete", activeAlarms[m.source]);
alarmDelete(activeAlarms[m.source]);
activeAlarms.remove(m.source);
}
on wait(60.0) and not Measurement(type="com_TestMeasurement", source=m.source){
alarmCreate(m.source);
}
}
on all Alarm(type="com_TestAlarm", status="ACTIVE") as al{
activeAlarms[al.source] := al.id;
}
}
// アラーム削除
action alarmDelete(string alarmId) {
// アラームステータスをCLEAREDヘ
Alarm alarms := new Alarm;
alarms.id := alarmId;
alarms.status := "CLEARED";
send alarms to Alarm.CHANNEL;
}
// アラーム生成
action alarmCreate(string deviceId){
Alarm alarms := new Alarm;
alarms.source := deviceId;
alarms.time := currentTime;
alarms.text := "testAlarm";
alarms.type := "com_TestAlarm";
alarms.severity := "MAJOR";
alarms.status := "ACTIVE";
send alarms to Alarm.CHANNEL;
}
}
特定の Event 発生をトリガーに、https://httpbin.org/get に HTTP GET リクエストを送信します。レスポンスに応じて、以下を実施します。
com_ResponseOK
の Event を作成します。com_ResponseNG
の Event を作成します。補足: Apama での HTTP リクエストを扱った例題として上記 URL をリクエスト先に設定しており、実用的な意味はありません。
コードの解説に入る前に、Apama でのデータ連携における注意点について説明します。ここでは、システム間連携を想定し、マスターデータを持つ側(一般的なクライアントサーバモデルにおけるサーバ)を Things Cloud、データを取得し利用する側(クライアント)を連携先システムとします。システム間でデータを連携する場合、以下の2つの方式とそれぞれの利点・欠点が考えられます。
方式 | 説明 |
---|---|
pull型 | 連携先システムから Things Cloud のデータを取得する方式 利点:必要に応じて情報を取得できる欠点:連携先システムから問い合わせない限り、Things Cloud 側でデータが更新されたかどうかわからない |
push型 | Things Cloudから連携先システムへデータを送信する方式 利点:データ更新があった場合即座に連携先システムに通知できる欠点:送達保証が難しい、再送処理が複雑になる |
基本的に、Things Cloud で Apama を用いて外部サービスへリクエストするような通信は push 型になります。このため、 Apama でのデータ連携は データの連携速度を重視し、情報の欠損を許容できる場合には適していますが、欠損が一つでも発生すると業務に影響を及ぼしてしまう場合の利用には適していません。 どのようなユースケースを実現したいかにもよりますが、使い所は注意深く検討する必要があります。
ではコードの解説に入ります。以下の流れで実装していきます。
ステップ1:特定 Event 検知
ステップ2:HTTP リクエスト発行
ステップ3:ステータスに応じて Event 作成
各フェーズでのイベントの流れと処理は以下のようなイメージです。
まず、Apama EPL 起動時に実行される onload()
内で Event チャネルのサブスクライブと、特定のイベント(今回は com_SampleEvent
とします)を監視するリスナーを設定します。
action onload() {
// Application starts here
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
on all Event(type="com_SampleEvent") {
// com_SampleEvent を観測した場合の処理
}
}
まず、HTTP リクエストを送信するアクションをリスナー内に記述しましょう。アクション名は invokeHTTPAPI()
とします。
action onload() {
// Application starts here
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
on all Event(type="com_SampleEvent") {
invokeHTTPAPI();
}
}
次に、invokeHTTPAPI()
を実装します。Apama で用意されている HTTP クライアントを用い、以下のように実装できます。HTTP クライアントは com.softwareag.connectivity.httpclient
パッケージから利用できます。以下のように using
ステートメントを用いてインポートしておきましょう。Things Cloud API用の定義済みのイベント型 もご覧ください。
/** Http client */
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.Response;
using com.softwareag.connectivity.httpclient.Request;
monitor CepOutboundSample {
// 先頭に定数として宣言
constant string HTTP_HOST:="httpbin.org";
constant integer HTTP_PORT:= 443; // https の場合は 443 を指定
...
action invokeHTTPAPI(string method, string path, dictionary<string, any> json, string source) {
// HTTPTransport のインスタンスを生成
HttpTransport transport := HttpTransport.getOrCreate(HTTP_HOST, HTTP_PORT);
// ヘッダなどの情報を設定
HttpOptions options := new HttpOptions;
// 今回はヘッダは設定しませんが、以下のように記述できます
// options.headers.add("Content-Type", "application/json" );
// options.headers.add("Accept", "application/json");
// options.headers.add("Authorization", "Basic xxxx");
// HTTPTransport のインスタンスを介して HTTP リクエストを作成・実行
transport.createAndExecuteRequest(method, path, json, options, RequestCallback(source).handleResponse);
}
}
HTTPTransport を介して新規の HTTP リクエストを作成、実行します。HTTPOptions では、ヘッダ以外にも、cookie やクエリパラメータを設定できます。
transport.createAndExecuteRequest()
の最後の引数では、レスポンス時に呼び出されるコールバックアクションを設定できます。今回の例題ではレスポンスステータスを後続処理に使用したいので、レスポンスの処理を行うコールバックアクションを RequestCallback(source).handleResponse
という名前で登録しておきます。
HTTP クライアントの詳細は以下にも説明があるのでご覧ください。
onload()
アクション内のリスナーに記述した invokeHTTPAPI()
の引数を以下のように追加します。
今回は GET リクエストを送信するので、HTTP メソッドは GET
、リクエストボディは指定しないので、空の dictionary を指定します。
最後の引数 checkStatus
は、コールバック用イベントである RequestCallback
の引数として渡されます。次で詳細を説明します。
on all Event(type="com_SampleEvent") {
invokeHTTPAPI("/get", "GET", new dictionary<string, any>, "checkStatus");
}
まず、レスポンスを格納するイベント型を APIResponse
として定義します。レスポンス( Response
型のイベント)の payload は AnyExtractor 型で返却されるので、以下のように定義します。
// レスポンス用のイベント定義
event APIResponse {
string source; // リクエスト元判別のための変数
integer status; // ステータスコード
AnyExtractor body;
}
次に、リクエストコールバック用のイベント型を RequestCallback
として定義します。
source
に加え、レスポンスを加工して、先ほど定義した APIResponse
型としてイベントを生成するために、以下のようにレスポンスをルーティングするイベントアクションを追加します。
// コールバック用のイベント定義
event RequestCallback {
string source;
// レスポンスを処理するためのイベントアクション
action handleResponse(Response res){
// レスポンスを APIResponse イベントへルーティング
route APIResponse(source, res.statusCode, res.payload);
}
}
source
には任意の文字列を設定でき、レスポンス受け取り時に、想定するリクエストからのレスポンスかどうか、source
に設定された文字列から判別できます。
今回は1つの HTTP リクエストのみを呼び出すのでレスポンスが混在することはありませんが、複数のリクエストが存在し、かつレスポンスの処理がそれぞれで異なる場合に活用できます。
次に、ステータスに応じて Event を作成する処理を記述します。
まず、 onload()
内にレスポンスを監視するためのリスナーを設定しましょう。source
を以下のように活用することで必要なレスポンスのみを処理できます。(今回は checkStatus
source 以外を持つリクエストは存在しません)
action onload() {
...
on all Event(type="com_SampleEvent") {
invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
}
on all APIResponse(source = "checkStatus") as res {
}
}
APIResponse
イベントで定義した status
を使ってステータスコードを取り出し、以下のようにステータスに応じて Event を生成する処理を記述します。モニターファイルの先頭に MOID
として定数を宣言し、Event を生成先デバイスの ID を指定します。
// 先頭に定数として宣言
constant string MOID:= "2505";
...
action onload() {
// Application starts here
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
on all Event(type="com_SampleEvent") {
invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
}
on all APIResponse(source = "checkStatus") as res {
if (res.status = 200) {
send Event("", "com_ResponseOK", MOID, currentTime, "Repsonse 200 OK: "+res.body.getString("url"), new dictionary<string,any>) to Event.SEND_CHANNEL;
} else { // status が 200 以外の時
send Event("", "com_ResponseNG", MOID, currentTime, "Repsonse NG: " + res.status.toString(), new dictionary<string,any>) to Event.SEND_CHANNEL;
}
}
}
ここで、AnyExtractor オブジェクトのデータ取得方法について説明します。 今回例として挙げている https://httpbin.orgs/get へ GET リクエストを送ると以下のレスポンスが返却されます。
{
"args": {},
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "ja,en-US;q=0.9,en;q=0.8",
"Host": "httpbin.org",
...
},
...
"url": "https://httpbin.org/get"
}
APIResponse
イベント型で定義したように、 上記のレスポンスボディは AnyExtractor オブジェクトとして格納されます。
例えば上記レスポンスのうち、url
、headers.Host
の値を取り出したい場合、以下のように記述できます。
res.body.getString("url")
res.body.getString("headers.Host") // JSON の階層はこのように指定できる
Things Cloud の Measurement や Alarm も上記のような JSON データなので、同じように AnyExtractor を使ってデータを取り出せます。
// Measurement の例
{
"c8y_Temperature": {
"T": {
"value": 30,
"unit": "C"
}
}
}
// AnyExtractor での取り出し方
action onload() {
// Application starts here
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement(type="c8y_Temperature") as m {
// unit を取り出してログに出力(メジャーアラームも生成されます)
log AnyExtractor(m.measurements.getOrDefault("c8y_Temperature").getOrDefault("T").unit) at ERROR;
// value を取り出してログに出力(メジャーアラームも生成されます)
log AnyExtractor(m.measurements.getOrDefault("c8y_Temperature").getOrDefault("T").value).getFloat("").toString() at ERROR;
}
}
AnyExtractor の詳細・使い方については以下もご覧ください。
実装は以上で完了です。動作確認をしてみましょう。
/** Basic event definitions for working with Cumulocity IoT */
using com.apama.cumulocity.Event;
/** Miscellaneous utilities */
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
/** Http client */
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.Response;
using com.softwareag.connectivity.httpclient.Request;
monitor CepOutboundSample {
constant string HTTP_HOST:="httpbin.org";
constant integer HTTP_PORT:= 443;
constant string MOID:= "2505";
event APIResponse {
string source;
integer status;
AnyExtractor body;
}
event RequestCallback {
string source;
action handleResponse(Response res){
route APIResponse(source, res.statusCode, res.payload);
}
}
action onload() {
// Application starts here
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
on all Event(type="com_SampleEvent") {
invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
}
on all APIResponse(source = "checkStatus") as res {
if (res.status = 200) {
send Event("", "com_ResponseOK", MOID, currentTime, "Repsonse 200 OK: "+res.body.getString("url"), new dictionary<string,any>) to Event.SEND_CHANNEL;
} else { // status が 200 以外の時
send Event("", "com_ResponseNG", MOID, currentTime, "Repsonse NG: " + res.status.toString(), new dictionary<string,any>) to Event.SEND_CHANNEL;
}
}
}
/*
* 引数で指定した先へ HTTP リクエスト送信
*/
action invokeHTTPAPI(string method, string path, dictionary<string, any> json, string source) {
HttpTransport transport := HttpTransport.getOrCreate(HTTP_HOST, HTTP_PORT);
HttpOptions options := new HttpOptions;
transport.createAndExecuteRequest(method, path, json, options, RequestCallback(source).handleResponse);
}
}
ここでは、1分毎にその1分前からのMeasurementの集約値(max, average, min)を計算し、新たな1つのMeasurementとして登録するスクリプトを作成します。
スクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。
ここからは、上記の流れで解説していきます。
on all at(*/1, *, *, *, *){}
1分以内のMeasurementを取得するため、stream機能を用いてMeasurementを取得します。今回はlistener型変数へ、from文で取得したMeasurementを代入します。
listener型の詳細は以下をご覧ください。
また、各入力条件は以下となります。
within P
: P秒以内のMeasurementをすべて取得しますevery P
: P秒毎にMeasurementの一覧を取得したものに更新しますgroup by m.source
: デバイス毎にMeasurementをグループ化します。listener型変数の中へ、複数のMeasurementグループが作成されます。
group by
について、詳しくはPartitions and aggregate functionsをご覧ください。listener avgTempListener := on all at(*/1, *, *, *, *){
from m in all Measurement(type="com_SampleMeasurement")
within 60.0 every 60.0
group by m.source
}
また、作成したstream(listener型変数)は都度終了する処理が必要です。listener型変数はquit()
メソッドを所持しており、対象のリスナー(インスタンス)を終了させます。
avgTempListener.quit();
min
: 指定した複数の値から最小値を抽出しますmax
: 指定した複数の値から最大値を抽出しますavg
: 指定した複数の値の平均値を抽出しますまた、これらを使用するために、以下を宣言します。
using com.apama.aggregates.avg;
using com.apama.aggregates.min;
using com.apama.aggregates.max;
from m in all Measurement(type="com_SampleMeasurement")
のm
へ、group by
でデバイス毎に分けられたMeasurementが格納されています。
Measurementのカスタムフラグメントは以下のようになります。
"com_SampleMeasurement": {
"T": {
"value": 10,
"unit": "C"
}
}
}
対象のMeasurementに対して、カスタムフラグメント内の値は例題1 ステップ2でご紹介した方法で取得することができます。
結果、以下のようなselect文となります。
select MinMaxAvg(min(m.measurements["com_SampleMeasurement"]["T"].value),
max(m.measurements["com_SampleMeasurement"]["T"].value),
avg(m.measurements["com_SampleMeasurement"]["T"].value),
m.source) as minMaxAvg
"com_AggregatedMeasurement": {
"min": <最小値>,
"max": <最大値>,
"avg": <平均値>
}
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.Event;
using com.apama.correlator.timeformat.TimeFormat;
using com.apama.aggregates.avg;
using com.apama.aggregates.min;
using com.apama.aggregates.max;
monitor PerDeviceMeasurementTracker {
event MinMaxAvg {
float min;
float max;
float avg;
string source;
}
action onload() {
monitor.subscribe(Measurement.CHANNEL);
// 1時間毎に一日分のMeasurementを格納、avg計算
// 毎回streamが増えていくので、終了する処理が必要
listener avgTempListener :=
on all at(*/1, *, *, *, *) {
from m in all Measurement(type="com_SampleMeasurement")
within 60.0 every 60.0
group by m.source
select MinMaxAvg(min(m.measurements["com_SampleMeasurement"]["T"].value),
max(m.measurements["com_SampleMeasurement"]["T"].value),
avg(m.measurements["com_SampleMeasurement"]["T"].value),
m.source) as minMaxAvg {
createMeasurements(minMaxAvg);
}
avgTempListener.quit();
}
}
// 集約 Measurement として登録
action createMeasurements(MinMaxAvg mval) {
Measurement m := new Measurement;
m.type := "com_AggregatedMeasurement";
m.source := mval.source;
m.time := currentTime;
m.measurements := {
"com_AggregatedMeasurement": {
"min": MeasurementValue(mval.min, "T", new dictionary<string, any>),
"max": MeasurementValue(mval.max, "T", new dictionary<string, any>),
"avg": MeasurementValue(mval.avg, "T", new dictionary<string, any>)
}
};
send m to Measurement.CREATE_CHANNEL;
}
}