Apama EPL をはじめる

投稿日 / 投稿者名

2021.10.20 / アプリT

はじめに

本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。 Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、 利用できない可能性があることをあらかじめご了承ください。 なお、作成にあたり、以下バージョンを用いています。

Things Cloud のカスタムストリーミング処理機能(CEP/EPL)について記載しているカスタムストリーミング処理機能ガイドもあわせてご覧ください。

また、本レポートでは、Software AG社 による Apamaの公式ドキュメントに多数リンクしていますので、ご活用ください。他のリンクと区別するため、Apamaドキュメントへのリンクは英文で斜体文字としています。



Apamaとは

概要

Apamaは、複合イベント処理(CEP: complex event processing) と呼ばれる技術を使って、イベントを処理するプラットフォームです。
イベントストリーム内をIoTデバイスから報告される計測値や状態など時々刻々と流れている「イベント」と呼ばれるデータ要素を監視し、記述されるビジネスロジックに従い、イベントドリブンでイベントのパターンを検出しリアルタイムに処理を行います。

apama stream event

上の図のようにストリーム内を時間軸でイベント(データ)が流れているイメージで、しきい値を超える温度イベントを3回以上受信するという条件に基づいてイベントを監視し、条件を満たすイベントを検出したらアラームを生成させるというようなイベント処理をリアルタイムに行います。

このように一致するイベントを監視し、検出したことを契機にどのような処理を実行するかをApamaのコリレーターに指示します。

コリレーター

Apamaは、複合イベント処理(CEP)、カスタムストリーミング処理機能を行うインメモリのシステムで、Apamaアプリケーションで定義されたイベントパターンを検出し処理を実行するのが、Apamaのコリレーターというランタイムのサーバープロセスになります。

Things Cloudのスマートルールや、カスタムストリーミング処理機能アプリケーションで作成したAnalytics Builderでのイベント処理モデル、EPL AppsでのEPLアプリは、 このコリレーターにデプロイされ実行されます。Things Cloudでは、これらはApama-ctrlマイクロサービスによって実行されます。Apama-ctrlマイクロサービスはテナントごとに登録され、独立した領域を持つインスタンス内で実行されます。マイクロサービスのランタイムとアプリケーションをあわせてご覧ください。

apama correlator

Apama EPLコードを書く

投稿日 / 投稿者名

2021.10.20 / 三橋

難易度 ★ ~ ★★★


所要時間

1時間

前提条件

目次

Apama EPL Apps とは

概要

Apama EPL Appsは、Apama のコリレーターで動作するEPLコードを GUI のエディター上で開発できるアプリケーションです。

Apamaのサーバープロセスであるコリレーターがストリーム内を時々刻々と流れているイベントをリアルタイムに受け入れて解釈するには、Apamaのイベント処理言語であるEPL(Event Processing Language)でプログラミングする必要があります。EPLはイベントを処理するプログラムを作成するために設計されたドメイン固有の言語で、波括弧{ }によるブロック構造で強い型付けがあります。EPLコードは、イベントの到着に応じイベントドリブンで記述します。

便利な機能

Apama EPL Appsの中でサンプルコードを表示、活用できるようになっていたり、エディターにはコード補完やシンタックスハイライトが搭載されている他、保存(ctrl + s)などのショートカットキーも利用できます。

epl samples

Apama EPL 入門

モニター とは

Apama EPL Appsアプリケーションから作成するEPLアプリがモニターファイル(*.mon ファイル)であり、アプリケーションの実行単位です。
EPLアプリの作成については、Apama EPL Appsを使用したアプリケーションの開発ステップ 2 - EPLアプリの作成をご覧ください。

コリレーターにEPLアプリがデプロイされると、コリレーターはモニターのインスタンスを生成し、モニターがロードされた直後にonload()アクションを実行します。そのため、onloadアクションは必ず定義する必要があります。

モニターでは、どのようなイベントを監視し、指定したイベントが見つかった場合にどのような処理を実施するかをonloadアクションに定義します。

基本構造
monitor MonitorName { 
    action onload() {       // 必須
        // do something
    }
}
ライフサイクル

モニターアクションは、モニターのライフサイクルで自動的に呼び出されます。 - Monitor lifecycle

モニターの終了時は、コリレーターはモニターのインスタンスごとにondie()アクションを呼び出し、最後のモニターインスタンスが終了したときonunload()アクションを呼び出します。

Monitor actions > Simple actions

モニターの終了

モニターインスタンスはすべてのコードを実行しアクティブなリスナー(イベント監視)がなくなったとき速やかにモニターを終了し、メモリから削除されます。- Terminating monitors

複製したモニターインスタンスはdieステートメントで終了できます。

イベント とは

イベントは、センサーの状態が変化したなど、なんらかのイベントが発生したことの通知を含むデータのオブジェクトです。コリレーターがイベントを理解するために基本構造に従う必要があります。各種イベントには、イベント型名と、それに関連付けられたイベントフィールドと呼ばれるデータ要素があります。

基本構造
event EventTypeName {
    [ field_type fieldName; ]                              // フィールド
    [ constant field_type fieldName := literal_value; ]    // 定数フィールド(定数値を設定し変更できない)
    [ wildcard field_type fieldName; ]                     // ワイルドカードフィールド(イベントフィルターに指定できない)
        :
    [ eventActionDefinition ]                              // イベントアクション
}

イベントフィールドのデータ型(Types)は、強い型付けがあり、プリミティブデータ型 stringintegerfloatdecimalboolean をサポートしています。このほか、イベントアクションや独自定義したイベント型、参照型(sequencedictionaryなど)も指定できます。(chunk、 stream、 および listener のデータ型を使用することはできません。)
インスタンス生成時などにフィールドの順序が重要となります。

イベント型を定義する上でのガイドライン

// 例)
event WindMeasurement {   // イベント名: UpperCamelCase - 単語の最初の文字は大文字
    string location;      // イベントフィールド名: lowerCamelCase - 最初の単語の文字は小文字、続く単語の最初の文字は大文字
    float windSpeed;
    integer windHeading;
}

インスタンスの生成

以下は、上記のイベント型(例)のインスタンスを生成する例で両者とも同じ値が設定されます。

// 例)
// インスタンスを生成し、別途値を設定
WindMeasurement windMeasurement := new WindMeasurement;
windMeasurement.location := "Site 1";
windMeasurement.windSpeed := 11.5;
windMeasurement.windHeading := 3;
// 例)
// インスタンス生成時に値を設定
WindMeasurement windMeasurement := WindMeasurement("Site 1", 11.5, 3);
Things Cloud API用の定義済みのイベント型

com.apama.cumulocityパッケージから利用することができます。イベントとチャネル > 定義済みのイベント型 をご覧ください。

イベントアクション

フィールドデータに関連するイベントアクションをイベント型に定義できます。- Specifying actions in event definitions

// 例)
event Temperature {
    float celsius;
    // イベントフィールドのcelsius(摂氏)からfahrenheit(華氏)を算出
    action calcFahrenheit() returns float {
        return celsius * 9.0/5.0 + 32.0; 
    }
}
:
Temperature temp := Temperature(100.0);
// 温度イベントの一部としてfahrenheit(華氏)を算出するアクションを呼び出し
log temp.calcFahrenheit().toString();    // 212
:

基本的な文法

変数
datatype variableName [:= initial_value]:
// 例)
anInteger.toString()
anInteger.toFloat()
aBoolean.toString()     // "true" または "false" を返却
aFloat.toString()
aFloat.floor()          // aFloatの数以下の最大の整数を返却 (< aFloat)
aFloat.ceil()           // aFloatの数以上の最小の整数を返却 (> aFloat)
aString.toInteger()     // 変換できない場合は 0 を返却
aString.toFloat()       // 変換できない場合は 0.0 を返却
// 例)
// コードブロック内で利用する変数
on all Measurement(type="c8y_Temperature") as m { code_block }
// 例)
// 変数宣言した後に代入するため、コードブロック外でも利用可能な変数
Measurement m;
on all Measurement(type="c8y_Temperature") : m { code_block }
参照データ型変数

いくつかの参照型(Reference types)より下記のオブジェクト(型)をご紹介します。

sequence<item_type> variableName
// 例)
sequence<float> speeds := [12.30, 11.95, 12.50, 11.75, 12.10];

float speed := speeds[2];       // 配列要素参照(代入)    Output speed:  12.50
speeds[0] := 11.11;             // 配列要素更新         Output speeds: [11.11, 11.95, 12.50, 11.75, 12.10]
speeds.append(99.99);           // 配列要素追加         Output speeds: [11.11, 11.95, 12.50, 11.75, 12.10, 99.99]
speeds.remove(2);               // 配列要素削除         Output speeds: [11.11, 11.95, 11.75, 12.10, 99.99]
speeds.size();                  // 要素数取得           Output speeds length: 5
speeds.remove();                // すべての配列要素削除   Output speeds: []
dictionary<key_type, item_type> variableName
// 例)
dictionary<string,float> windLimitDict := {"Site 1":12.0, "Site 2":13.5};

float wl := windLimitDict["Site 1"];  // 指定したキーの参照(代入)           Output wl: 12.0
windLimitDict["Site 1"] := 11.11;     // 指定したキーのvalue更新            Output windLimitDict: {"Site 1":11.11, "Site 2":13.5}
windLimitDict.add("Site 9", 99.99);   // key/valueペアの追加              Output windLimitDict: {"Site 1":12.0, "Site 2":13.5, "Site 9", 99.99}
windLimitDict.remove("Site 2");       // 指定したキーのkey/valueペアの削除   Output windLimitDict: {"Site 1":12.0, "Site 9", 99.99}
windLimitDict.hasKey("Site 9");       // 指定したキーが存在するか            Output windLimitDict exists: true
stream<item_type> variableName
// 例)
stream<Measurement> measurements := all Measurement(type="c8y_Temperature");
// do something
measurements.quit();    // quitメソッドでストリームを適切に終了する必要があります。
アクション定義
// 引数、返値ともに指定しない場合
action actionName() { 
    // do something
}
// 引数、返値をともに指定する場合
action actionName(type1 param1, type2 param2, ...) returns type3 { 
    // do something
    return type3_instance; 
}
EPLのAPIリファレンス

EPL用のAPIリファレンスのパッケージがあり、using ディレクティブで使いたいパッケージの名前空間からインポートして定義された型を利用します。EPLアプリを新規作成時、デフォルトでCumulocityのものが多数インポートされます。

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
:
using com.apama.util.AnyExtractor;
条件ステートメント
if condition {
    code_block
} [ else { code_block } ]
// 例)
float temperature := 32.0;

if temperature > 25.0 {
    log "It is warm";       // Output log
} else if temperature < 0.0 { 
    log "It is cold";
} else {
    log "It is cool";
}
ループの定義
for iterationIdentifier in sequence_instance { code_block }
// 例)
sequence<string> sensorIds := ["S005","S010","S015","S020"]; 
string s;
for s in sensorIds {
    Temperature t;
    on all Temperature(id=s) : t {
        // do something
    } 
}
while condition { code_block }
// 例)
IncomingOrder i;
on all IncomingOrder() : i {
    integer qty := i.quantity; 
    while(qty > 100) {
        route OutgoingOrder(i.symbol, 100, i.price);
        qty := qty  100;
    }
    if (qty > 0) then {
        route OutgoingOrder(i.symbol, qty, i.price);
    } 
}

ループの終了にはbreak、ループの先頭へのスキップにはcontinue、ループとループを含むアクションを終了するにはreturnがあります。

イベントの監視

イベントリスナー

イベントリスナーは、イベントパターンを定義し、一致したイベントが発生したときに実行するアクションをonステートメントで定義します。

イベントを監視するには、モニターでイベントリスナーを定義します。コリレーターがonステートメントを実行すると、イベントリスナーが作成されます。コリレーターはイベントストリーム内の各イベントを監視し、onステートメントに指定されたイベントが発生すると、イベントリスナーがトリガーされ、リスナーアクションを実行します。

基本構文
on event_expression [ coassignment ] listener_action
イベントテンプレート

イベントテンプレートは、条件に一致する入力パターンを検出するために、onステートメントで使用し、イベントのフィールドの値に基づいてフィルタリングを指定します。

イベントフィールドのフィルターを定義するには、各フィールドの名前による指定(Named)と、各フィールドの位置による指定(Positional)があります。ここでは、Namedについて説明します。

// 例)
WindMeasurement(location = "Site 1")                    // 指定した値に等しい
WindMeasurement(windSpeed > 12.4)                       // 指定した値より大きい
WindMeasurement(windSpeed >= 12.4)                      // 指定した値以上
WindMeasurement(windSpeed < 12.4)                       // 指定した値より小さい
WindMeasurement(windSpeed <= 12.4)                      // 指定した値以下
WindMeasurement(windSpeed in [12.4:12.9])               // 指定範囲 しきい値も含まれる ([]含む)
WindMeasurement(windSpeed in (12.4:12.9))               // 指定範囲 しきい値は含まれない  (()含まない)
WindMeasurement(windHeading in [0:10))                  // 指定範囲 最初の値のしきい値は含まれ、最後の値のしきい値は含まれない(0〜9)
WindMeasurement(location = "Site 1", windSpeed > 12.4)  // 複数指定
  • フィルタリングを指定しなくても構いません。e.g. WindMeasurement()
  • フィルターの値(右辺)にイベントフィールドを指定できません。e.g. WindMeasurement(windSpeed > windHeading)
  • 同じイベントフィールドは1回のみで重複して指定できません。e.g. WindMeasurement(location = “Site 1”, location = “Site 9”)
イベント式

イベント式は、1つのイベントテンプレートや、イベント演算子を追加し複数のイベントテンプレートとの組み合わせや、時間演算子のみで定義します。- Event expressions

// 例)
// 一致するWindMeasurementイベントを検知して、(次のWindMeasurementイベントを検知するまでに)AlertSubmittedイベントを検知していないとき
on all WindMeasurement(location = "Site 1", windSpeed > 12.5) as wind and not AlertSubmitted() { 
    // do something e.g. raise a new alert for wind
}
// 例)
// 一致するNewsTickイベントを検知した後に、一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") -> Turbine(location="Site 1") {
    // do something e.g. notify the user
}
// 例)
// 一致するNewsTickイベントか一致するTurbineイベントのどちらかを検知したとき
on NewsTick(location="Site 1") or Turbine(location="Site 1") {
    // do something e.g. notify the user 
}
// 例)
// 一致するNewsTickイベントと一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") or Turbine(location="Site 1") {
    // do something e.g. notify the user 
}
// 例)
// 毎秒10秒たったとき
on all wait(10.0) {
    // do something
} 
// 例)
// 一致するNewsTickイベントを検知してから、120秒の間に一致するTurbineイベントを検知したとき
on NewsTick(location="Site 1") -> Turbine(location="Site 1") within(120.0) {
    // do something e.g. notify the user
}
// 例)
// 毎時5分になったら
on all at(5,*,*,*,*) {
    // do something
}
イベントリスナーの終了
// 例)
listener l := on all WindMeasurement(location="Site 1", windSpeed < 12.5) as wind;
// do something
l.quit();
// 例)
// 一致するWindMeasurementイベントを検知して、(次のWindMeasurementイベントを検知するまでに)AlertSubmittedイベントを検知していないとき
on all WindMeasurement(location="Site 1", windSpeed > 12.5) as wind and not AlertSubmitted() { 
    // do something e.g. raise a new alert for wind
}

ストリームリスナー

ストリームリスナーは、ストリームから意図したイベント(アイテム)を収集し、収集したストリームに対して、フィルターや、結合、集計などの標準的なリレーショナル操作で、派生アイテム(たとえば温度の平均値など)を生成するストリームクエリをfromステートメントで定義します。ストリームクエリ定義は式であり、その結果はストリームです。このクエリの結果となるストリームを使ってストリームリスナーアクションを定義します。アクションは、クエリの結果の出力アイテムごとに一度だけ実行されます。

イベントを監視するには、モニターでストリームクエリ定義を含むステートメントを定義します。

ストリームクエリは、モニターインスタンスによって受信されたイベントから収集されたストリーム、またはストリームクエリによって作成されたストリームを取得し、付加価値のある派生アイテムを含むストリームを生成します。下の図のように、特定のストリームネットワークでは、上流のストリーム要素が下流のストリーム要素にフィードして派生アイテムを生成することができます。最終的に、ストリームネットワークからストリームリスナーで処理するアイテムを取り込みます。

基本構文
from itemIdentifier in stream_expression window_definition [ coassignment ] listener_action
// 例)
// 30秒間のTemperatureイベントのidごとのcelsiusの平均値をルーティングする
MeanTemp meanTemp;

from t in all Temperature() within 30.0
    group by t.id
    select MeanTemp(mean(t.celsius)) : meanTemp {
        route meanTemp;
    }
ウィンドウ

ストリームクエリは、デフォルトでは到着した単一のイベントに対して機能しますが、ウィンドウを定義することにより、ストリームクエリが対象とするアイテムの範囲を指定することができます。ウィンドウの内容は各アイテムの到着時に変更されます。

// 例)
// 5個のイベントが到着した後にウィンドウの内容を更新
from t in all Temperature(id="S001") retain 5 every 5
select mean(t.celsius);
// 例)
// 1秒ごとにウィンドウの内容を更新
from t in all Temperature(id="S001") within 1.5 every 1.0
select mean(t.celsius);
ストリームクエリ
where boolean_expression
// 例)
// price*volume の値がしきい値を超えたTickイベント100個について、priceの平均値を取得
from t in all Tick() retain 100
    where t.price * t.volume > threshold 
    select mean(t.price)

クエリ結果の射影 - Generating query results

[group by groupBy_expression[, groupBy_expression]...] [having boolean_expression] 
select [rstream] select_expression
group by groupBy_expression[, groupBy_expression]...
// 例)
// 30秒の間のTickイベントでsymbolごとにpriceの平均を取得
from t in all Tick() within 30.0 
    group by t.symbol
    select TickAverage(t.symbol, mean(t.price));
having boolean_expression
// 例)
// 60秒の間のTemperatureイベントが10個より多いときvalueの平均を取得
from t in all Temperature() within 60.0 
    having count() > 10 
    select mean(t.value)
select_expression
// 例)
// 60秒ごとの60秒の間のTemperatureイベントを取得
from t in all Temperature() within 60.0 every 60.0
    select t
ストリームリスナーの終了

ストリームまたはストリームリスナーを作成した後、次のいずれかが発生するまで存在し続けます。- Stream network lifetime

// 例)
listener l := from t in all Temperature(id="S001") retain 5 
    select MeanTemp(mean(t.celsius)) : meanTemp {
        // do something
    }
l.quit();

イベントリスナーとストリームリスナー

使いどころ
組み合わせの例
// 例)
// 発生したTurbineイベントのロケーションが一致するWindMeasurementイベントの中で、10秒の間に風速が上昇したとき
on all Turbine() as turbine {
    from wind in all WindMeasurement(location=turbine.location) within 10.0
        having (last(wind.windSpeed) - first(wind.windSpeed) > 1.0)
        select SubmitAlarm(last(wind.location), last(wind.windSpeed), ACTIVE) as submitAlarm {
            // do something e.g. send submitAlarm
        }
}
// 例)
// 10秒間の温度の平均値がしきい値の100を超えた場合、センサーごとに10秒待って、しきい値を超えない温度に戻らなかったとき
float THRESHOLD := 100.0;
from sens in all SensorData within 10.0 group by sens.id
    having mean(sens.temperature) > THRESHOLD 
    select OverTempAlert(last(sens.id), mean(sens.temperature)) as overTempAlert {
        on all wait(10.0) and not SensorData(id=overTempAlert.id, temperature < THRESHOLD) {
            // do something e.g. send overTempAlert
        } 
    }

イベント監視の例

発生しないイベントを検索するには?
// 例)
// ファイルを受け取ってから、30秒以内に処理が終了するか
action onload() {
    on all FileReceived() as f {
        // 30秒待った後にファイルが処理されなかった(受け取ったファイルの処理済イベントを検知しなかった)とき
        on wait(30.0) and not FileProcessed(id=f.id) {
            // ファイルが処理されなかった旨のアラームを送信など
            // do something e.g. send alert that file was not processed
        }
        // 30秒以内にファイルが処理された(受け取ったファイルの処理済イベントを検知した)とき
        on FileProcessed(id=f.id) within(30.0) {
            // ファイルが処理された旨の情報を送信など
            // do something e.g. send confirmation that the file was processed ()
        }
    }
}

Things Cloud のイベントを監視するには、Things Cloud で定義済みのイベント型を使用します。イベントとチャネル をご覧ください。

イベントの生成

イベント送信

イベントを送信するには、チャネルを指定して対象となるイベントを送信します。チャネルにサブスクライブされているすべてのコンテキストがイベントを受信します。
sendステートメントは、イベントをコリレーターの出力キューのチャネルにパブリッシュするよう、コリレーターに指示します。

基本構文
send event_instance to "channel-name"
Things Cloud にデータを生成

Things Cloud の Measurement/Event/Alarm/.. といったデータを生成する場合は、イベントとチャネル > 標準のイベント型とチャネルの送信用チャネルにイベントを送信してください。

// 例)
// Things Cloud にアラームデータを生成する
Alarm alarm := Alarm("", "c8y_SpeedAlarm", m.source, currentTime, "Speed limit breached", "ACTIVE", "CRITICAL", 1, new dictionary<string,any>);
send alarm to Alarm.SEND_CHANNEL;

ルーティング

routeステートメントは、イベントを現在のコンテキストの入力キューの先頭に送信します。

イベントの処理順序

イベントは、routeステートメントの実行と同じコンテキスト内でのみ処理されます。ルーティングされたイベントは、入力キューの外部から送信されたイベントの前、およびまだ処理されていないルーティングされたイベントの前に配置されます。- Event processing order for monitors

基本構文
route event_instance
ルーティングしたイベントを検出

なんらかのイベントを検知したときに独自定義したイベント型を使ってルーティングし、ルーティングしたイベントを監視するイベントリスナーを定義することができます。独自のイベント型の作成もあわせてご覧ください。

// 例)
monitor TurbineWatch { 
    action onload() {
        on all Turbine(location="Site 1") as t { 
            // Turbineイベントの一致するロケーションで、最大風速を超えたWindMeasurementイベントがあったとき
            on all WindMeasurement(location = t.location, windSpeed > t.max) as w {
                // SubmitAlarmイベントをルーテイング
                route SubmitAlarm(id, t.location, w.windSpeed, ACTIVE);
            }
        }
        
        // SubmitAlarmイベントを検知したとき
        on all SubmitAlarm() as a { 
            // do something e.g. shutdown Turbine at a.location
        }
    } 
} 

並列処理

コリレーターはデフォルトではイベントが到着した順序でシリアルに動作しますが、コンテキストと呼ばれるコンテナ構造を使用して複数スレッドを同時に並列で処理するオプションがあります。

複数のモニターインスタンスを同時に処理する場合に、同時に処理するタスクの依存関係や処理順序を考慮する必要がないか、あるいは最小限であるときに並列処理を検討してください。
たとえば、異なる識別子を持つ同じ種類の複数のイベントに対してタスクを実行するとき(デバイスごとのメジャーメントに対する処理など)が該当します。

デバイスごとに並列でメジャーメントをトラッキングする例がありますので、モニターインスタンスとコンテキストの生成もご覧ください。

補足

構文表記について

レポート中の構文における記号や要素などは、以下をご参考ください。

表記
構文要素
備考
波括弧 { } ブロック モニターやアクションなど開始と終了
山括弧 < > 型引数 括弧内にオブジェクトのデータ型を指定
角括弧 [ ] オプション 括弧内は任意指定
三点リーダー ... 反復可 表記前オブジェクトの繰り返し
code_block コードブロック 処理を記述
type データ型 string:文字列などのデータ型  filed_type,item_type,key_typeなども同義
value initial_value:初期値などの値
instance インスタンス sequence_instance:配列インスタンスなどのオブジェクトインスタンス
variableName 変数名 変数名をlowerCamelCaseで記載  fieldNameなども同義
identifier 識別子 itemIdentifier:アイテム識別子など識別オブジェクト  たとえば、ストリームクエリでのストリーム内のカレントアイテムを表すために使用
condition 条件 true または false で評価される条件文
expression event_expression:イベント式などのオブジェクト式  たとえば、イベント式ではイベントテンプレートやイベント演算子などの組み合わせ
coassignment 代入演算子 : または as で一致するイベントを変数に代入 (イベントテンプレートの一部)
window_definition ウィンドウ定義 オプションでretainwithineveryを使いウィンドウの範囲を任意指定(指定しない場合は単一イベント)
listener_action リスナーアクション イベントリスナーがトリガーされたときに実行するステートメントまたはブロック

Apama 例題

投稿日 / 投稿者名

2021.10.20 / 伊藤・高橋

難易度 ★★ ~ ★★★


所要時間

1.5時間

前提条件

目次

例題1(初級) 計測値がしきい値へ達した際にAlarmを生成する

Things Cloudで提供しているスマートルールは、実際はCEPを用いて作成されています。
このスマートルールを実現するコードをApamaで書いてみましょう。
ここでは、計測値が明示的しきい値に達した際にアラームを作成 のスマートルールに関して、スクリプトを作成します。
ただし、今回は指定のしきい値超過時に新規アラームを生成するのみの機能を持ったスクリプトを作成し、アラームクリアや障害値の範囲指定機能は省略します。

解説

スクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。

ステップ図

ここからは、上記の流れで解説していきます。

ステップ1

on all Measurement() as m {}

ステップ2

{
	"type": "c8y_TemperatureMeasurement",
	"source": 1304435,
    "com_TestMeasurement": {
        "T": {
            "unit": "C",
            "value": 55
        }
    }
}

今回取得したい値は、カスタムフラグメント内のvalue部分です。2つの取得方法が存在します。取得後、取得した値が規定値以上であるかのif文を作成すれば、判断できます。
Measurementのカスタムフラグメントはdictionary(辞書)型となっています。詳しくはリファレンス(Measurement)をご覧ください。
Apamaでdictionary内の値を取得する方法は2つ存在します。

m.measurements.getOrDefault("com_testMeasurement").getOrDefault("T").value
m.measurements["com_testMeasurement"]["T"].value

getOrDefaultに関して、詳しくはgetOrDefault をご覧ください。

ステップ3

action alarmCreate(string deviceId){
    Alarm alarms := new Alarm;
    alarms.source := deviceId;
    alarms.time := currentTime;
    alarms.text := "testAlarm";
    alarms.type := "com_testAlarm";
    alarms.severity := "MAJOR";
    alarms.status := "ACTIVE";
    send alarms to Alarm.CHANNEL;
}

Apamaでのアラーム作成時におけるフィールドの構成要素に関しては、以下をご参照ください。

実装例

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Alarm;

monitor MySampleMonitor {
	// 規定温度のイベント
	event Constants {
		// 規定温度は50度
		constant float BORDER_MEASUREMENT_VALUE := 50.0;
	}

	action onload() {
		monitor.subscribe(Measurement.CHANNEL);
		// 1. 対象のMeasurementを検知(タイプがc8y_TemperatureMeasurement, デバイスIDが1304435)
		on all Measurement(type="c8y_TemperatureMeasurement", source="1304435") as m {
			// 2. Measurement内のデータが規定値以上かどうか判断
			if (m.measurements.getOrDefault("com_testMeasurement").getOrDefault("T").value > Constants.BORDER_MEASUREMENT_VALUE) {
				// 3. 規定値以上であればアラームを生成
				alarmCreate(m.source);
			}
		}
	}

	// アラーム作成アクション(デバイスIDのみ受け取り、他は明示的に指定)
	action alarmCreate(string deviceId){
		Alarm alarms := new Alarm;
		alarms.source := deviceId;
		alarms.time := currentTime;
		alarms.text := "testAlarm";
		alarms.type := "com_testAlarm";
		alarms.severity := "MAJOR";
		alarms.status := "ACTIVE";
		send alarms to Alarm.CHANNEL;
	}
}

例題2(中級) デバイスからのMeasurement送信が60秒なかった場合、Alarmを生成する

ここでは、デバイスからのMeasurement送信が60秒なかった場合、Alarmを生成するスクリプトを作成します。指定の条件は以下となります。

解説

スクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。

ステップ図

ここからは、上記の流れで解説していきます。

ステップ1

on all Measurement(type={対象Measurementのタイプ名})

ステップ2

on wait(60.0) and not Measurement(type={対象Measurementのタイプ名}, source={対象MeasurementのデバイスID})
dictionary<string, string> activeAlarms := {};
/* example
{"92063": alarmId, "984844": alarmId}
*/
on all Alarm(type={対象Alarmのタイプ名}, status={対象Alarmのステータス}) as al{
	activeAlarms[al.source] := al.id;
}

ステップ3

if activeAlarms.hasKey(m.source) then {
	alarmDelete(activeAlarms[m.source]);
	activeAlarms.remove(m.source);
}

実装例

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.aggregates.sum;
using com.apama.aggregates.mean;
using com.apama.aggregates.last;

monitor MySampleMonitor {
	sequence<string> cacheAlarmList := ["", ""];
	dictionary<string, string> activeAlarms := {}; // {"92063": alarmId, "984844": alarmId} を作っていく	
	sequence<boolean> flagList := [false, false];
	sequence<string> sourceIds := ["92063", "984844"];
	string s;

	/** Initialize the application */
	action onload() {
		integer count := 0;
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);

		on all Measurement(type="com_TestMeasurement") as m{
			if activeAlarms.hasKey(m.source) then {
				logOut("Alarm Delete", activeAlarms[m.source]);
				alarmDelete(activeAlarms[m.source]);
				activeAlarms.remove(m.source);
			}

			on wait(60.0) and not Measurement(type="com_TestMeasurement", source=m.source){
				alarmCreate(m.source);
			}
		}

		on all Alarm(type="com_TestAlarm", status="ACTIVE") as al{
			activeAlarms[al.source] := al.id;
		}
	}

	// アラーム削除
	action alarmDelete(string alarmId) {
		// アラームステータスをCLEAREDヘ
		Alarm alarms := new Alarm;
		alarms.id := alarmId;
		alarms.status := "CLEARED";
		send alarms to Alarm.CHANNEL;
	}

	// アラーム生成
	action alarmCreate(string deviceId){
		Alarm alarms := new Alarm;
		alarms.source := deviceId;
		alarms.time := currentTime;
		alarms.text := "testAlarm";
		alarms.type := "com_TestAlarm";
		alarms.severity := "MAJOR";
		alarms.status := "ACTIVE";
		send alarms to Alarm.CHANNEL;
	}


}

例題3(中〜上級)

特定の Event 発生をトリガーに、https://httpbin.org/get に HTTP GET リクエストを送信します。レスポンスに応じて、以下を実施します。

補足: Apama での HTTP リクエストを扱った例題として上記 URL をリクエスト先に設定しており、実用的な意味はありません。

Apama でのデータ連携における注意点

コードの解説に入る前に、Apama でのデータ連携における注意点について説明します。ここでは、システム間連携を想定し、マスターデータを持つ側(一般的なクライアントサーバモデルにおけるサーバ)を Things Cloud、データを取得し利用する側(クライアント)を連携先システムとします。システム間でデータを連携する場合、以下の2つの方式とそれぞれの利点・欠点が考えられます。

方式 説明
pull型 連携先システムから Things Cloud のデータを取得する方式
利点:必要に応じて情報を取得できる
欠点:連携先システムから問い合わせない限り、Things Cloud 側でデータが更新されたかどうかわからない
push型 Things Cloudから連携先システムへデータを送信する方式
利点:データ更新があった場合即座に連携先システムに通知できる
欠点:送達保証が難しい、再送処理が複雑になる

基本的に、Things Cloud で Apama を用いて外部サービスへリクエストするような通信は push 型になります。このため、 Apama でのデータ連携は データの連携速度を重視し、情報の欠損を許容できる場合には適していますが、欠損が一つでも発生すると業務に影響を及ぼしてしまう場合の利用には適していません。 どのようなユースケースを実現したいかにもよりますが、使い所は注意深く検討する必要があります。

解説

ではコードの解説に入ります。以下の流れで実装していきます。

ステップ1:特定 Event 検知
ステップ2:HTTP リクエスト発行
ステップ3:ステータスに応じて Event 作成

各フェーズでのイベントの流れと処理は以下のようなイメージです。 overview

ステップ1:特定 Event 検知

まず、Apama EPL 起動時に実行される onload() 内で Event チャネルのサブスクライブと、特定のイベント(今回は com_SampleEvent とします)を監視するリスナーを設定します。

action onload() {
	// Application starts here
	monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

	on all Event(type="com_SampleEvent") { 
		// com_SampleEvent を観測した場合の処理
	}
}

ステップ2:HTTP リクエスト発行

HTTP リクエスト呼び出しアクションの登録

まず、HTTP リクエストを送信するアクションをリスナー内に記述しましょう。アクション名は invokeHTTPAPI() とします。

action onload() {
	// Application starts here
	monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

	on all Event(type="com_SampleEvent") { 
		invokeHTTPAPI();
	}
}

次に、invokeHTTPAPI() を実装します。Apama で用意されている HTTP クライアントを用い、以下のように実装できます。HTTP クライアントは com.softwareag.connectivity.httpclient パッケージから利用できます。以下のように using ステートメントを用いてインポートしておきましょう。Things Cloud API用の定義済みのイベント型 もご覧ください。

/** Http client */
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.Response;
using com.softwareag.connectivity.httpclient.Request;

monitor CepOutboundSample {
	// 先頭に定数として宣言
	constant string HTTP_HOST:="httpbin.org";
	constant integer HTTP_PORT:= 443; // https の場合は 443 を指定

	...
	action invokeHTTPAPI(string method, string path, dictionary<string, any> json, string source) {
		// HTTPTransport のインスタンスを生成
		HttpTransport transport := HttpTransport.getOrCreate(HTTP_HOST, HTTP_PORT);

		// ヘッダなどの情報を設定
		HttpOptions options := new HttpOptions;
		// 今回はヘッダは設定しませんが、以下のように記述できます
		// options.headers.add("Content-Type", "application/json" ); 
		// options.headers.add("Accept", "application/json"); 
		// options.headers.add("Authorization", "Basic xxxx");

		// HTTPTransport のインスタンスを介して HTTP リクエストを作成・実行
		transport.createAndExecuteRequest(method, path, json, options, RequestCallback(source).handleResponse);
	}
}

HTTPTransport を介して新規の HTTP リクエストを作成、実行します。HTTPOptions では、ヘッダ以外にも、cookie やクエリパラメータを設定できます。 transport.createAndExecuteRequest() の最後の引数では、レスポンス時に呼び出されるコールバックアクションを設定できます。今回の例題ではレスポンスステータスを後続処理に使用したいので、レスポンスの処理を行うコールバックアクションを RequestCallback(source).handleResponse という名前で登録しておきます。

HTTP クライアントの詳細は以下にも説明があるのでご覧ください。

onload() アクション内のリスナーに記述した invokeHTTPAPI() の引数を以下のように追加します。 今回は GET リクエストを送信するので、HTTP メソッドは GET、リクエストボディは指定しないので、空の dictionary を指定します。 最後の引数 checkStatus は、コールバック用イベントである RequestCallback の引数として渡されます。次で詳細を説明します。

on all Event(type="com_SampleEvent") { 
	invokeHTTPAPI("/get", "GET", new dictionary<string, any>, "checkStatus");
}
レスポンスの処理用のイベント型定義

まず、レスポンスを格納するイベント型を APIResponse として定義します。レスポンス( Response 型のイベント)の payload は AnyExtractor 型で返却されるので、以下のように定義します。

// レスポンス用のイベント定義
event APIResponse {
	string source; // リクエスト元判別のための変数
	integer status; // ステータスコード
	AnyExtractor body; 
}

次に、リクエストコールバック用のイベント型を RequestCallback として定義します。 source に加え、レスポンスを加工して、先ほど定義した APIResponse 型としてイベントを生成するために、以下のようにレスポンスをルーティングするイベントアクションを追加します。

// コールバック用のイベント定義
event RequestCallback {
	string source; 

	// レスポンスを処理するためのイベントアクション 
	action handleResponse(Response res){
		// レスポンスを APIResponse イベントへルーティング
		route APIResponse(source, res.statusCode, res.payload);
	}
}

source には任意の文字列を設定でき、レスポンス受け取り時に、想定するリクエストからのレスポンスかどうか、source に設定された文字列から判別できます。 今回は1つの HTTP リクエストのみを呼び出すのでレスポンスが混在することはありませんが、複数のリクエストが存在し、かつレスポンスの処理がそれぞれで異なる場合に活用できます。

ステップ3:ステータスに応じて Event 作成

次に、ステータスに応じて Event を作成する処理を記述します。 まず、 onload() 内にレスポンスを監視するためのリスナーを設定しましょう。source を以下のように活用することで必要なレスポンスのみを処理できます。(今回は checkStatus source 以外を持つリクエストは存在しません)

action onload() {
	...
	on all Event(type="com_SampleEvent") { 
		invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
	}
	
	on all APIResponse(source = "checkStatus") as res {

	}
}

APIResponse イベントで定義した status を使ってステータスコードを取り出し、以下のようにステータスに応じて Event を生成する処理を記述します。モニターファイルの先頭に MOID として定数を宣言し、Event を生成先デバイスの ID を指定します。

// 先頭に定数として宣言
constant string MOID:= "2505";

...
action onload() {
	// Application starts here
	monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

	on all Event(type="com_SampleEvent") { 
		invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
	}
	
	on all APIResponse(source = "checkStatus") as res {
		if (res.status = 200) { 
			send Event("", "com_ResponseOK", MOID, currentTime, "Repsonse 200 OK: "+res.body.getString("url"), new dictionary<string,any>) to Event.SEND_CHANNEL;
		} else { // status が 200 以外の時
			send Event("", "com_ResponseNG", MOID, currentTime, "Repsonse NG: " + res.status.toString(), new dictionary<string,any>) to Event.SEND_CHANNEL;
		}
	}
}

ここで、AnyExtractor オブジェクトのデータ取得方法について説明します。 今回例として挙げている https://httpbin.orgs/get へ GET リクエストを送ると以下のレスポンスが返却されます。

{
  "args": {}, 
  "headers": {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", 
    "Accept-Encoding": "gzip, deflate, br", 
    "Accept-Language": "ja,en-US;q=0.9,en;q=0.8", 
    "Host": "httpbin.org", 
    ...
  }, 
  ...
  "url": "https://httpbin.org/get"
}

APIResponse イベント型で定義したように、 上記のレスポンスボディは AnyExtractor オブジェクトとして格納されます。 例えば上記レスポンスのうち、urlheaders.Host の値を取り出したい場合、以下のように記述できます。

res.body.getString("url")
res.body.getString("headers.Host") // JSON の階層はこのように指定できる

Things Cloud の Measurement や Alarm も上記のような JSON データなので、同じように AnyExtractor を使ってデータを取り出せます。

// Measurement の例
{
  "c8y_Temperature": {
    "T": {
          "value": 30,
          "unit": "C"
    }
  }
}
// AnyExtractor での取り出し方
action onload() {
	// Application starts here
	monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

	on all Measurement(type="c8y_Temperature") as m {
		// unit を取り出してログに出力(メジャーアラームも生成されます)
		log AnyExtractor(m.measurements.getOrDefault("c8y_Temperature").getOrDefault("T").unit) at ERROR;
		// value を取り出してログに出力(メジャーアラームも生成されます)
		log AnyExtractor(m.measurements.getOrDefault("c8y_Temperature").getOrDefault("T").value).getFloat("").toString() at ERROR;
	}
}

AnyExtractor の詳細・使い方については以下もご覧ください。

実装は以上で完了です。動作確認をしてみましょう。

実装例

/** Basic event definitions for working with Cumulocity IoT */
using com.apama.cumulocity.Event;

/** Miscellaneous utilities */
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;

/** Http client */
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.Response;
using com.softwareag.connectivity.httpclient.Request;

monitor CepOutboundSample {
	constant string HTTP_HOST:="httpbin.org"; 
	constant integer HTTP_PORT:= 443; 
	constant string MOID:= "2505";
	
	event APIResponse {
		string source;
		integer status;
		AnyExtractor body;
	}

	event RequestCallback {
		string source;
		action handleResponse(Response res){
			route APIResponse(source, res.statusCode, res.payload);
		}
	}
	
	action onload() {
		// Application starts here
		monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

		on all Event(type="com_SampleEvent") { 
			invokeHTTPAPI("GET", "/get", new dictionary<string, any>, "checkStatus");
		}
		
		on all APIResponse(source = "checkStatus") as res {
			if (res.status = 200) { 
				send Event("", "com_ResponseOK", MOID, currentTime, "Repsonse 200 OK: "+res.body.getString("url"), new dictionary<string,any>) to Event.SEND_CHANNEL;
			} else { // status が 200 以外の時
				send Event("", "com_ResponseNG", MOID, currentTime, "Repsonse NG: " + res.status.toString(), new dictionary<string,any>) to Event.SEND_CHANNEL;
			}
		}
	}

	/*
	 * 引数で指定した先へ HTTP リクエスト送信
	 */ 
	action invokeHTTPAPI(string method, string path, dictionary<string, any> json, string source) {
		HttpTransport transport := HttpTransport.getOrCreate(HTTP_HOST, HTTP_PORT);
		HttpOptions options := new HttpOptions;
		transport.createAndExecuteRequest(method, path, json, options, RequestCallback(source).handleResponse);
	}
}

例題4(上級) 1分毎にMeasurementの集約値を計算し、新たな1つのMeasurementとして生成する

ここでは、1分毎にその1分前からのMeasurementの集約値(max, average, min)を計算し、新たな1つのMeasurementとして登録するスクリプトを作成します。

解説

スクリプトを作成していく一連の流れを考えてみましょう。例えば、以下のような流れになります。

ステップ図

ここからは、上記の流れで解説していきます。

ステップ1

on all at(*/1, *, *, *, *){}

1分以内のMeasurementを取得するため、stream機能を用いてMeasurementを取得します。今回はlistener型変数へ、from文で取得したMeasurementを代入します。
listener型の詳細は以下をご覧ください。

また、各入力条件は以下となります。

listener avgTempListener := on all at(*/1, *, *, *, *){
	from m in all Measurement(type="com_SampleMeasurement")
	within 60.0 every 60.0
	group by m.source
}

また、作成したstream(listener型変数)は都度終了する処理が必要です。listener型変数はquit()メソッドを所持しており、対象のリスナー(インスタンス)を終了させます。

avgTempListener.quit();

ステップ2

また、これらを使用するために、以下を宣言します。

using com.apama.aggregates.avg;
using com.apama.aggregates.min;
using com.apama.aggregates.max;

from m in all Measurement(type="com_SampleMeasurement")mへ、group byでデバイス毎に分けられたMeasurementが格納されています。 Measurementのカスタムフラグメントは以下のようになります。

"com_SampleMeasurement": {
		"T": {
			"value": 10,
			"unit": "C"
		}
	}
}

対象のMeasurementに対して、カスタムフラグメント内の値は例題1 ステップ2でご紹介した方法で取得することができます。
結果、以下のようなselect文となります。

select MinMaxAvg(min(m.measurements["com_SampleMeasurement"]["T"].value),
 							 max(m.measurements["com_SampleMeasurement"]["T"].value),
 							 avg(m.measurements["com_SampleMeasurement"]["T"].value),
 							 m.source) as minMaxAvg

ステップ3

"com_AggregatedMeasurement": {
	"min": <最小値>,
	"max": <最大値>,
	"avg": <平均値>
}

実装例

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.Event;
using com.apama.correlator.timeformat.TimeFormat;
using com.apama.aggregates.avg;
using com.apama.aggregates.min;
using com.apama.aggregates.max;

monitor PerDeviceMeasurementTracker {
	event MinMaxAvg {
		float min;
		float max;
		float avg;
		string source;
	}

 	action onload() {
 		monitor.subscribe(Measurement.CHANNEL);

 		// 1時間毎に一日分のMeasurementを格納、avg計算
 		// 毎回streamが増えていくので、終了する処理が必要
 		listener avgTempListener :=
 		on all at(*/1, *, *, *, *) {
			from m in all Measurement(type="com_SampleMeasurement") 
 			within 60.0 every 60.0 
 			group by m.source
 			select MinMaxAvg(min(m.measurements["com_SampleMeasurement"]["T"].value),
 							 max(m.measurements["com_SampleMeasurement"]["T"].value),
 							 avg(m.measurements["com_SampleMeasurement"]["T"].value),
 							 m.source) as minMaxAvg {
 				createMeasurements(minMaxAvg);
				
 			}
 			avgTempListener.quit();
 		}
 	}
	
	// 集約 Measurement として登録
	action createMeasurements(MinMaxAvg mval) {
		Measurement m := new Measurement;
		m.type := "com_AggregatedMeasurement";
		m.source := mval.source;
		m.time := currentTime;
		m.measurements := {
			"com_AggregatedMeasurement": {
				"min": MeasurementValue(mval.min, "T", new dictionary<string, any>),
				"max": MeasurementValue(mval.max, "T", new dictionary<string, any>),
				"avg": MeasurementValue(mval.avg, "T", new dictionary<string, any>)
			}
		};
		send m to Measurement.CREATE_CHANNEL;
	}	

}