Apamaイベント処理言語(EPL)の使用
Apama イベント処理言語は Java 言語と構文的に類似しています。if
、while
、for
のようなシンプルなフロー制御ステートメントに加えて、Things Cloud では流れてくるデータ(イベント)に反応するために on
キーワードを使用してリスナーを作成できます。
Apama EPLについては、Apama ドキュメント を参照してください。
下記のステートメントでは、特定の温度より高い温度データを連続的にセンサーから抽出しています。
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
Alarm alarm := new Alarm;
alarm.type := "c8y_TemperatureAlert";
alarm.source := m.source;
alarm.time := currentTime;
alarm.text := "Temperature too high";
alarm.status := "ACTIVE";
alarm.severity := "CRITICAL";
send alarm to Alarm.SEND_CHANNEL;
}
}
ここの Measurement
とは、メジャーメントデータを含む定義済みのイベントです。こちらの例では、m
は Measurement
のイベントであり、c8y_TemperatureMeasurement
のメジャーメントをフィルタリングし、そのプロパティとなる c8y_TemperatureMeasurement.T.value
の温度センサーの摂氏でのリスナーとなります(フラグメントライブラリ をご参照ください)。
上記のようなリスナーは、モニターの onload
ステートメントに配置する必要があります。ファイルには、リスナーで使用する型の using
ステートメントが含まれている必要があります。Things Cloud で利用するイベントのほとんどは、com.apama.cumulocity パッケージに含まれています。すべてのリストを以下に示しますが、以降の例では簡潔にするためにこれらを省略します。
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Error;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;
using com.apama.cumulocity.SendEmail;
using com.apama.cumulocity.SendSMS;
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
using com.apama.correlator.timeformat.TimeFormat;
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Response;
monitor ListenForHighTemperatures {
action onload() {
on all Measurement(type="c8y_TemperatureMeasurement") as e {
if e.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
// メジャーメントを処理する
}
}
}
}
派生データはどのように作成したらよいですか?
新しいアラームまたはオペレーションを作成するには、関連するイベント型のインスタンスを作成し、send
ステートメントで関連するチャネル(イベント型の定数で定義)に送信します。例えば、温度が定義された値を超えるとただちに発生するアラームが必要な場合、下記のようなステートメントで実行可能になります。
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
send Alarm("","c8y_TemperatureAlert",m.source,currentTime,"温度が高すぎます","ACTIVE","CRITICAL",1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
}
技術的に説明すると、このステートメントは温度センサーが摂氏 100℃ を超えるたびに新しい Alarm イベントを作成し、それを Things Cloud に送信しています。
どのようにデバイスを制御すればよいですか?
Things Cloud のEPLによるリモート制御は、オペレーションイベントを送信することで行われ、派生データの 1 つとなります。例えば、デバイス上でオペレーションを走らせたい場合、デバイスをターゲットにした新しいオペレーションを作成します。下記では、温度計の温度によるスイッチの動きになります。
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
send Operation("",m.source,"PENDING",{"c8y_Relay":<any>{"relayState":"CLOSED"}}) to Operation.SEND_CHANNEL;
}
}
-
m.source
は、トリガーとなる温度 ID のプレースホルダーとなります。 -
params フィールド(最後のフィールド)は、オペレーションのネストされた内容を定義します。この例では
c8y_Relay
オペレーションを作成し、relayState
を CLOSED に設定します。最上位フィールドはdictionary<string, any>
である必要があるため、<any>
にキャストすることに注意してください。
どのようにデータを問い合わせればよいですか?
進行中のイベントプロセスの一部として時々関連情報を Things Cloud データベースに問い合わせることがあります。これはイベントを送信し、リスナーを使用してレスポンスを待機することでサポートされます。下記は、特定の顧客に 1時間ごとに自動販売機の総売上高を集計する場合の例になります。顧客データが購入後にトリガーされる売上レポートは、Things Cloud のデータベースから取得する必要があります。
using com.apama.aggregates.count;
monitor SalesReport {
event SalesReport {
Event e;
ManagedObject customer;
}
event SalesOutput {
integer count;
string customerId;
}
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
integer reqId := integer.getUnique();
on all FindManagedObjectResponse(reqId=reqId) as mor and not FindManagedObjectResponseAck(reqId=reqId) {
route SalesReport(e, mor.managedObject);
}
on FindManagedObjectResponseAck(reqId=reqId) {
monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}
send FindManagedObject(reqId,"",{"childAssetId":e.source}) to FindManagedObject.SEND_CHANNEL;
}
from sr in all SalesReport() within 3600.0 every 3600.0
group by sr.customer.id
select SalesOutput(count(), sr.customer.id) as sales {
send Measurement("", "total_cust_trx", "customer_trx_counterId", currentTime,
{
"total_cust_trx":{
"total":MeasurementValue(sales.count.toFloat(), "COUNT", new dictionary<string,any>)
}
}, {"customer_id":<any> sales.customerId}) to Measurement.SEND_CHANNEL;
}
}
}
上記の例では、最初に SalesReport
および SaleOutput
イベントの定義を作成しています。これらは売上を特定する Event
および ManagedObject
を保持し、一連の売上から導出したい情報である count
と customerId
を保持します。Event
オブジェクトをリッスンし、イベントがどの ManagedObject
から来たのかを調べるために FindManagedObject
リクエストを送信します。これらの SalesReport
オブジェクトは、ルート文を介してストリームクエリに送信されます。ストリームクエリは 1時間ごと(3,600秒)に発火し、顧客ごとの売上データの集計を選択し、その自動販売機の売上を表す新しいメジャーメントを送信します。
基本機能
アプリケーションの開発
EPL アプリはモニタ(*.mon)ファイルです。EPL アプリは 2 つの異なる方法で開発できます。
- ストリーミング分析アプリケーション を使用して、Things Cloud のアプリケーションスイッチャーから利用可能な EPL アプリを Things Cloud 内で開発します。
- または、ローカルマシンに Apama をインストールし、Apama Plugin for Eclipse、つまり別の環境で EPL アプリ(モニターファイルとして)を開発することができます。
ストリーミング分析アプリケーションを使用したアプリの開発
ストリーミング分析アプリケーションの EPL アプリ ページには、新規または既存の EPL アプリ(*.mon ファイル)を対話的に編集したり、EPL アプリをインポートしてアクティブ化(デプロイ)したりするためのインターフェースが提供されます。
EPL アプリ ページの使用を希望するテナント上のユーザーは、CEP マネージャー である必要があります。権限の管理 を参照してください。
ステップ 1: ストリーミング分析アプリケーションを呼び出す
アプリケーション スイッチャーを開き、ストリーミング分析アプリケーションの アイコンをクリックします。次に、EPL アプリ ページに移動します。
EPL アプリ ページに移動すると、最初に EPL アプリ マネージャーが表示され、既存の EPL アプリがリストされます。各アプリケーションはカードとして表示されます。ここから新しい EPL アプリを追加したり、既存の EPL アプリを管理したりできます。
アプリケーションに対して表示される各カードの上部には、アプリケーションを編集、ダウンロード、削除できるアクション メニューがあります。
このページからは、次のことができます。
-
既存の EPL アプリを編集します。アクションメニューから 編集 コマンドを使用するか、アプリケーションに表示されているカードをクリックします。
-
新しい EPL アプリを作成します。下記参照。
-
EPL アプリをインポートします。Things Cloudの外部でアプリを開発する場合(例えば、Apama Plugin for Eclipse を使用する)、トップメニューバーで EPL のインポート をクリックしてアップロードします。Apama モニター(*.mon)ファイルをアプリとして、ストリーミング分析アプリケーションに追加します。
-
EPLアプリをダウンロードします。アクションメニューから ダウンロード コマンドを使用して、アプリを *.mon ファイルとしてダウンロードします。
-
既存の EPL アプリをデプロイします。アプリに対して表示されているカードで、モードを 非アクティブ から アクティブ に変更します。アプリケーションのデプロイ を参照してください。
アプリをアクティブ化すると、構文エラーがあればすぐに報告されます。エラー状態がカードに表示されるので、アプリが良好な状態にあることを確認できます。エラーをクリックすると、問題の内容に関する情報が表示されます。構文エラーがある場合、アプリをアクティブ化することはできません。エラーは、修正されてアプリが再度アクティブ化されるまでカードに表示されます。 -
すべての EPL アプリをリロードします。トップメニューバーで 再読み込み をクリックして表示を更新すると、ページが読み込まれてから他のユーザーが行った変更(その間に発生したエラーも含む)が表示されます。
ステップ 2: EPLアプリを作成する
トップメニューバーで 新しい EPL アプリ をクリックします。表示される アプリの作成 ダイアログボックスが表示され、アプリに一意の名前を付けます。新しいアプリ用に作成されるカードに表示される説明を入力することもできます。OKをクリックします。
EPLエディタが表示されます。新しいアプリのEPLコードには、Things Cloudでの作業に必要な典型的な基本イベント定義およびユーティリティがすでに含まれています。アプリの必要に応じてそれらを調整できます。詳細については、ドキュメントとサンプルを参照してください。
開始しやすいように、いくつかのサンプルが用意されています。これらを表示するには、エディタの右側に表示される サンプル をクリックします。サンプルをクリックすると、その内容のプレビューが表示されます。標準のキーの組み合わせ Ctrl+C および Ctrl+V キーを使用して、サンプル コードの一部を選択し、それを独自のコードにコピーできます。コマンドボタンを使用して、コード全体をクリップボードにコピーして独自のコードの適切な位置に挿入したり、既存のコードをすべてサンプル コードに置き換えたりすることもできます。
トップメニューバーのボタンを使用すると、現在のセッションでの最後の変更を元に戻したりやり直したり、変更を保存したりできます。
EPL エディタでモードを 非アクティブ から アクティブ(またはその逆)に変更することもできます。繰り返しますが、EPL コードにエラーがある場合、アプリをアクティブ化することはできません。コード内でエラーが強調表示されます。
トップメニューバーの閉じるアイコン をクリックして、EPL エディタを終了し、EPL アプリのリストに戻ります。
ステップ 3: EPL アプリをテストする
アプリがアクティブになると、実行結果を確認できるようになります。これには、測定値の送信、データの受信、アラームの生成、およびApama-ctrlマイクロサービスへのログ記録が含まれる場合があります。Apama-ctrlマイクロサービスのログファイルを確認する方法については、マイクロサービスの監視をご覧ください。
アプリがアクティブになると、実行結果が表示されます。これには、メジャーメントの送信、データの受信、アラームの作成、Apama-ctrl マイクロサービスへのログインが含まれる場合があります。Apama-ctrl マイクロサービスのログ ファイルを確認する方法については、マイクロサービスの監視 を参照してください。 ユーザー ガイド の 管理 > マイクロサービスの管理と監視 をご覧ください。
アプリのデプロイも参照してください。
Apama Plugin for Eclipseを使用したアプリケーションの開発
Apama Plugin for Eclipse は完全な開発環境を提供し、複雑な EPL アプリケーションがある場合に最適なツールです。EPL アプリ(モニターファイル)の準備ができたら、それを Things Cloud にインポートする必要があります。
ステップ 1: Apama をインストールする
https://download.cumulocity.com/Apama/ から Apama の apama-c8y-dev
パッケージをダウンロードし、解凍してインストールします。
これにより機能が制限され、いくつかの制限事項が課されたフリーミアム版 Apama Community Edition がインストールされます。すべての機能を使用するには、ライセンスが必要です。
ライセンスをお持ちの場合、ライセンスファイルを Apama の作業ディレクトリ(APAMA_WORK/license)にコピーしてください。
apama-c8y-dev
パッケージには、Apama コミュニティエディションおよびライセンス版の両方で使用可能な Apama Plugin for Eclipse が含まれています。
ステップ 2: プロジェクトを作成する
インストールしたら、Apama Plugin for Eclipseで Apamaプロジェクトを作成し、Things Cloud 接続を有効にします。Apama プロジェクトの作成方法については、Apama ドキュメントの Apama プロジェクトの作成 を参照してください。
ステップ 3: Apama バンドルをプロジェクトに追加する
新しく作成した Apama プロジェクトに次の Apama バンドルを追加します。これらは、アプリをアクティブ化するために Things Cloud で必要となります。プロジェクトにバンドルを追加する方法については、Apamaドキュメントのプロジェクトへのバンドルの追加 を参照してください。
- Things Cloud > Things Cloud のイベント定義
Things Cloud とのデータの送受信に必要なイベント API を提供します。 - Things Cloud > Things Cloud 用ユーティリティ
Things Cloud から受信したデータを操作するためのヘルパー ユーティリティ関数を提供します。 - 任意のエクストラクター
any
型から値を抽出するためのサポートを提供します。 - 時刻の形式
Time Format プラグインのすべてのメソッドにアクセスするために必要です。時刻のフォーマットと解析時間に役立ちます。 - HTTP クライアントの汎用イベント
HTTP クライアント接続プラグインによって使用される、事前定義された汎用イベントを公開します。 - ApplicationInitialized 時に自動
これにより、起動時にすべての接続プラグインがすぐに開始されます。 - HTTP クライアント > 一般的なリクエスト/レスポンス イベント定義を含む JSON
EPL アプリが HTTP コールを行うことを許可します。 - Things Cloud > Things Cloud クライアント
Things Cloud クライアントを EPL アプリに公開します。
上記のバンドルは EPL アプリで許可される唯一のバンドルであるため、他のバンドルを追加しないように注意してください。追加しないと、Things Cloud でアクティブ化されたときにアプリが機能しなくなる可能性があります。
ステップ 4: モニターファイルを作成する
新しい Apama モニターファイルを作成するには、新しいモニターファイルの作成 を参照してください。
新しく作成したモニターファイルを EPL アプリとして Things Cloud にインポートし、そこでアクティブ化する前に、モニターファイルが Apama Plugin for Eclipse 内から期待通りに動作するかどうかをテストすることをお勧めします。
詳細については、Apama ドキュメントの The Cumulocity Transport Connectivity Plug-in を参照してください。
ステップ 5: モニターファイルを実行してテストする
プロジェクトをローカルで実行する場合、プロジェクト構成で Things Cloud の資格情報を指定する必要があります。Things Cloud クライアントの CumulocityIoT.properties ファイルに資格情報を構成します。
例:
CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.je1.thingscloud.ntt.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
CUMULOCITY_APPKEY
の値を取得するには、Things Cloud でアプリケーションを作成 する必要があります。上記の説明は、URL がテナントを識別するテナントに接続していることを前提としていることに注意してください。これが当てはまらない場合(例えば、IP アドレスで接続している場合)、CumulocityIoT.properties ファイルでこれを設定する必要がある場合があります。
CUMULOCITY_TENANT=my_custom_tenant
プロジェクトをマルチテナント環境でローカルに実行する必要がある場合、マルチテナントのサポートを有効にし、使用するマルチテナント マイクロサービスの名前を指定します。Things Cloud クライアントの CumulocityIoT.properties ファイルで、次のプロパティを構成します。
# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true
# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms
さらに、モニターファイルがマルチテナント マイクロサービスで動作できることを確認してください。 詳細については、Apama ドキュメントの マルチテナント デプロイの操作 を参照してください。
これで、Apama Plugin for Eclipse で EPL のテストに進むことができます。
EPL アプリが準備できたら、アプリのデプロイ を参照して、Things Cloud にデプロイする方法を確認してください。
アプリケーションのデプロイ
以下を、Things Cloud にデプロイすることができます。
- EPL アプリ。ストリーミング分析アプリケーションを使用して、単一の *.mon ファイルを開発またはインポート できます。これは、EPL アプリをデプロイするための最も単純なメカニズムです。
- Apama アプリケーション。複雑な Apama アプリケーション(つまり、Apama Plugin for Eclipse で開発された Apama プロジェクト)を Things Cloud にアップロードし、Things Cloud マイクロサービス SDK を使用して カスタムマイクロサービスとしてデプロイ できます。
EPL アプリを、ストリーミング分析アプリケーションを使用して単一の *.mon ファイルとしてデプロイ
EPL アプリ(*.mon ファイル)が Things Cloud でアクティブ化されると、*.mon ファイルには一意のパッケージ名が割り当てられます。これにより、複数のモジュールがアクティブ化された場合の競合が防止されます。このため、*.mon ファイルに package
ステートメントを指定しないでください。アプリケーションの異なる部分間でイベントを共有する必要がある場合、イベント定義とそれを使用するモニターを単一の *.mon ファイルに記述します。
EPL アプリで使用できるユーティリティと基本イベントのセットは制限されています。執筆時点では、これらには Time Format バンドルと HTTP Client > JSON with generic request/response event definitions バンドルが含まれています。
EPL アプリがランタイム エラーを通知すると、アラームとして発生します。ランタイム エラーには、キャッチされなかった例外のほか、EPL アプリが実行する必要がある警告やエラーの明示的なログが含まれます。一般的に、Apama ランタイムに関連する動作や状態の問題もアラームとして発生します。
Apama ランタイムやアクティブな EPL アプリの詳細な診断については、Apama-ctrl マイクロサービスのログを確認できます。ログファイルの詳細については、マイクロサービスの監視 を参照してください。ただし、Apama のログファイルを最大限に活用するには、Apama についてある程度の知識が必要です。
Apama アプリケーションをマイクロサービスとしてデプロイ
Apama Plugin for Eclipseを使用すると、次のようなより複雑なプロジェクトを開発することもできます。
- 複数の *.mon ファイルに分散している
- 他の Apama アプリケーションから分離する必要がある
- デフォルトでは有効化されていない接続プラグインまたは EPL プラグインを使用している
このような種類のアプリケーションは、Things Cloud にマイクロサービスとしてデプロイする必要があります。
マイクロサービス マニフェストの必須設定
マイクロサービス マニフェストは、Things Cloud におけるマイクロサービス インスタンスとアプリケーションのデプロイを管理するために必要な設定を提供します。詳細については、マイクロサービスマニフェスト を参照してください。
Apama は、単一テナント マイクロサービスでも、マルチテナント マイクロサービスでも使用できます。 したがって、マイクロサービス マニフェストでは分離レベルを PER_TENANT または MULTI_TENANT に設定する必要があります。 Apama をマルチテナント マイクロサービスで使用する場合、Apama アプリケーションはマルチテナント対応として記述する必要があります。 詳細については、Apama ドキュメントの マルチテナント デプロイの操作 を参照してください。
EPL から Things Cloud トランスポートのすべての機能を起動して使用するには、マイクロサービスに次の権限が必要です。これらは、マイクロサービス マニフェストの requiredRoles で設定されています。
- ROLE_APPLICATION_MANAGEMENT_READ
- ROLE_INVENTORY_READ
- ROLE_INVENTORY_ADMIN
- ROLE_INVENTORY_CREATE
- ROLE_MEASUREMENT_READ
- ROLE_MEASUREMENT_ADMIN
- ROLE_EVENT_READ
- ROLE_EVENT_ADMIN
- ROLE_ALARM_READ
- ROLE_ALARM_ADMIN
- ROLE_DEVICE_CONTROL_READ
- ROLE_DEVICE_CONTROL_ADMIN
- ROLE_IDENTITY_READ
- ROLE_OPTION_MANAGEMENT_READ
- ROLE_BULK_OPERATION_READ
- ROLE_SMS_ADMIN
Apama アプリケーションをマイクロサービスとしてデプロイする
-
Apama Plugin for Eclipse で、通常通りアプリケーションを開発します。
-
Apama の Docker サポートを使用して、プロジェクト全体をマイクロサービスに変換できます。Project Explorer ビューでプロジェクトを右クリックし、Apama > Add Docker Support を選択すると、プロジェクトディレクトリのルートに Dockerfile が追加されます。 ビルドに使用する場合、Docker Hub で利用可能な Apama イメージが使用されます。Apama イメージにアクセスするための Docker Hub 認証情報が必要です。Apama Docker イメージは、 Linux ベースのみです。
-
カスタムプラグインのビルドやライセンスファイルのイメージへのコピーなど、必要なカスタム手順を Dockerfile に追加します。
-
プロジェクトのパッケージ化とデプロイには、Things Cloud マイクロサービス ユーティリティツールを使用します。詳細については、マイクロサービス ユーティリティツール を参照してください。マイクロサービス ユーティリティツールが構築するディレクトリ構造を作成する際は、プロジェクト ディレクトリ全体を「docker/」という名前でそのディレクトリにコピーします。
例:
docker/monitors/
docker/eventdefinitions/
docker/Dockerfile
docker/…
cumulocity.jsonマイクロサービス マニフェスト は手動で作成する必要がありますが、マイクロサービス マニフェストに特別な設定やプローブは必要ありません。ただし、liveness または readiness プローブを構成する場合、ポート 15903(Apama のデフォルトポート)のパス /ping に
httpGet
プローブを構成できます。Apama アプリケーションは通常ステートフルであり、入力データを自動的に分割しないため、自動スケーリングを有効にすることは推奨されません。このディレクトリからパッケージ、デプロイ、サブスクライブを行うと、Apama アプリケーションが実行中のマイクロサービスになります。Things Cloud の外部 (Apama Plugin for Eclipse またはテスト環境)でアプリケーションを実行した場合の動作は、Things Cloud 内での動作とほぼ同じになります。Things Cloud API へのリクエストを実行するマイクロサービスとしてデプロイされた場合、Apama はデプロイ先のテナントに接続するための資格情報を自動的に取得し、Apama に提供された他の資格情報を上書きします。ただし、リアルタイム イベントを受信する場合、外部の Apama 環境から Things Cloud に接続する場合と同様に、プロジェクト設定で有効な資格情報を指定する必要があります。
-
Things Cloud にデプロイする準備ができたら、アプリケーションをマイクロサービスとしてアップロードします。詳細については、マイクロサービスの管理 を参照してください。
Apama 10.15.0 では、いくつかの新しいコンテナ イメージが導入され、既存のコンテナ イメージの一部はコンテンツが変更されています。2024 年 12 月現在、これらのイメージは Amazon ECR Public Gallery から提供されます。
Things Cloud マイクロサービスとして使用するためのイメージをビルドする場合、以前のリリースとは異なります。
public.ecr.aws/apama/apama-cumulocity-jre イメージを、ビルダーイメージとして public.ecr.aws/apama/apama-cumulocity-builder イメージと組み合わせて使用する必要があります。
10.15.0 以前のバージョンで、Apama Plugin for Eclipse によって作成されたデフォルトのプロジェクト Dockerfile でこれを行うには、Dockerfile 内の FROM
行を適切に変更するか(これは一度だけ実行する必要がある)、次のフラグを使用してビルドする必要があります(これは毎回実行する)。
--build-arg APAMA_BUILDER=public.ecr.aws/apama/apama-cumulocity-builder:10.15 --build-arg APAMA_IMAGE=public.ecr.aws/apama/apama-cumulocity-jre:10.15
アプリのテスト
GitHub の Apama EPL Apps Tools を使用して、EPL アプリのアップロードをスクリプト化し、CI/CD(継続的インテグレーションおよび継続的デリバリー)ユースケースに合わせて管理できます。このツールは、PySys テスト フレームワークへの拡張機能も提供し、EPL アプリのテストを簡単に作成して自動的に実行できるようにします。
Apama EPL Apps Tools は https://github.com/Cumulocity-IoT/apama-eplapps-tools から入手できます。EPL アプリ ツールのドキュメント で詳細情報をご確認ください。
PySys の詳細については、Apama ドキュメントからアクセスできる Python の API リファレンス を参照してください。
サポートされている REST サービス
EPL アプリは、REST(Representational State Transfer)サービスを監視するように設計されており、すべての GET、POST、PUT、DELETE 操作をサポートします。さまざまな操作のリクエストの例を、次に示します。
これらの操作を実行するには、「CEP 管理」に対する読み取りおよび管理者権限が必要です(詳細は、権限の管理 参照)。
すべての操作のリクエスト ヘッダー
各リクエストは、Things Cloud に対して認証される必要があります。
名前 | 説明 |
---|---|
Accept | 「application/json」これは必須のパラメーターです |
一般的なレスポンス コード
すべてのリクエストに対して、次の一般的なエラー レスポンスコードが予想されます。
コード | 説明 |
---|---|
401 | リクエストが認可されていません |
403 | 禁止されています。EPLアプリはApama-ctrl-starterマイクロサービスでは利用できません |
特定のリクエストから予想されるその他のレスポンス コードを、次に示します。
一般的なフィールドの説明
操作に応じて、レスポンスでは次の一般的なフィールドを使用できます。
フィールド | 説明 |
---|---|
contents | EPL ファイルの完全な内容 |
description | ファイルの説明 |
eplPackageName | EPL ファイルのパッケージ名。名前に特殊文字(スペースを含む)が含まれている場合、 これらの文字は有効な EPL 識別子にするためにエスケープされ、注入エラーが回避されます |
errors | ファイル内のすべてのコンパイル エラー(存在する場合)と行番号およびテキストのリスト |
id | ファイルの一意の識別子 |
name | EPL に付けられた名前 |
state | EPL がコレレータに注入され、実行されているかどうか。 これには active または inactive のいずれかになります |
warnings | ファイル内のすべてのコンパイル警告のリスト(存在する場合)と行番号およびテキスト |
GET - 利用可能なすべての EPL ファイルを取得する
エンドポイント: /service/cep/eplfiles
リクエスト例
GET /service/cep/eplfiles
レスポンス
コード | 説明 |
---|---|
200 | 操作は成功しました。以下の値の例もご覧ください |
400 | 不正なリクエストです。ヘッダーの内容に予期しない値が含まれています |
レスポンスコード 200 の値の例:
{
"eplfiles":[
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - 利用可能なすべての EPL ファイルをその内容とともに取得する
エンドポイント: /service/cep/eplfiles
リクエスト パラメーター
名前 | 説明 |
---|---|
contents | ブール型。EPL ファイルをその内容とともに取得します。これは、任意のクエリ パラメーターです |
リクエスト例
GET /service/cep/eplfiles?contents=true
レスポンス
コード | 説明 |
---|---|
200 | 正常に動作しました。以下の値の例もご覧ください |
レスポンスコード 200 の値の例:
{
"eplfiles":[
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - 識別子によって EPL ファイルを取得する
エンドポイント: /service/cep/eplfiles/{id}
リクエスト パラメーター
名前 | 説明 |
---|---|
id | 取得する EPL ファイルの識別子。これは、必須のパラメーターです |
リクエスト例
GET /service/cep/eplfiles/{{id}}
レスポンス
コード | 説明 |
---|---|
200 | 操作は成功しました。以下の値の例もご覧ください |
404 | 識別子を持つファイルが見つかりません。 このセクションの最後にある このレスポンスコードの値の例 もご覧ください |
レスポンスコード 200 の値の例:
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
POST - 新しいEPLアプリケーションを作成する
エンドポイント: /service/cep/eplfiles
リクエスト例
POST /service/cep/eplfiles
リクエスト本文の例:
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
次の点に注意してください。
name
はファイルのパッケージに使用され(したがって、EPL ファイルにはpackage
ステートメントを含めることはできません)、すべての EPL ファイル間で一意である必要があります。名前にはプレフィックスが付けられ、特定の文字はエスケープされます。使用される実際のパッケージ名は、便宜上eplPackageName
フィールドに返されます(これをマイクロサービス ログファイルで検索して、ログ ステートメントを見つけることができます)。- 安全にエスケープされた
contents
を必ず提供してください。 description
は任意であり、空にすることもできます。
レスポンス
コード | 説明 |
---|---|
201 | 正常に作成されました / ファイルにエラーが発生して作成されました / ファイルに警告が発生して作成されました。以下の例もご覧ください |
405 | 入力に誤りがあります |
正常に作成された場合のレスポンスコード 201 の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
警告またはエラーが発生した場合のレスポンスコード 201 の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
PUT - 識別子によって EPL ファイルを更新する
エンドポイント: /service/cep/eplfiles/{id}
リクエスト パラメーター
名前 | 説明 |
---|---|
id | 更新するEPLファイルの識別子。識別子はパスに含める必要があります。これは必須のパラメータです |
リクエスト例
PUT /service/cep/eplfiles/{id}
リクエスト本文の例:
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
POST リクエストについての情報も参照してください。
レスポンス
コード | 説明 |
---|---|
200 | 正常に更新されました。以下の値の例もご覧ください |
404 | 識別子を持つファイルが見つかりません。 このセクションの最後にある このレスポンスコードの値の例 も参照してください |
エラーなしで正常に更新された場合のレスポンスコード 200 の値の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
エラーまたは警告で更新された場合のレスポンスコード 200 の値の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
DELETE - 識別子によって EPL ファイルを削除する
エンドポイント: /service/cep/eplfiles/{id}
リクエスト パラメーター
名前 | 説明 |
---|---|
id | 削除する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須のパラメータです |
リクエスト例
DELETE /service/cep/eplfiles/{{id}}
レスポンス
コード | 説明 |
---|---|
200 | 正常に削除されました |
404 | 識別子を持つファイルが見つかりません。 このセクションの最後にある このレスポンスコードの値の例 も参照してください |
レスポンスコード 404の値の例
レスポンスコード 404 は、特定の識別子を持つファイルが見つからなかったことを示します。
{
"error":"Not Found",
"exception":"com.apama.in_c8y.FileNotFoundException",
"message":"File with id 39613 not found",
"path":"/eplfiles/39613",
"status":404,
"timestamp":"2020-01-17T12:21:42.457+0000"
}
error
: エラーメッセージexception
: 発生した例外を指定しますmessage
: 例外メッセージの説明path
: リクエストされたパスstatus
: アプリケーションのステータスtimestamp
: ISO形式のタイムスタンプ
イベントとチャネル
Apama EPL では、残りの Things Cloud エコシステムとのやり取りはイベントを通じて行われます。Things Cloud データにアクセスするために、多数のイベント定義が提供されています。
定義済みのイベント タイプ
いくつかの Things Cloud API と対話するための、事前定義されたイベント タイプがいくつかあります。新しいメジャーメント、アラーム、イベントが作成されると、イベントは自動的に Apama アプリケーションに送信されます。Things Cloud バックエンドと対話するために、イベントを作成して関連するチャネルに送信できます。Things Cloud は、データベース クエリを自動的に実行するか、メールや SMS などの送信に必要な API コールを作成します。
EPL の API リファレンス(ApamaDoc)の データ モデル を参照して、各ストリームのイベントがどのように構造化されているかを確認してください。
イベントをチャネルに送信する
イベントの送信は、new <type>
に続いてフィールドへの代入を使用するか、すべてのフィールドを指定するコンストラクターを使用してイベントを構築することによって行われます。次に、send
ステートメントを使用してイベントを Things Cloud に送信します。send
ステートメントにはチャネルが必要です。これはイベントタイプの SEND_CHANNEL
定数です。
イベントをリッスンする
チャネル上のイベントを監視することで EPL をトリガーできます。monitor.subscribe("string name")
メソッドを使用してチャンネルに登録できます。これは、モニターの起動時に実行することも、時々イベントを受信する必要がある場合のみ、必要に応じて呼び出され、続いて monitor.unsubscribe("string name")
を実行することもできます。
on
ステートメントを使用してイベントを監視し、その後に監視しているイベントタイプ、開閉括弧、および as <identifier>
を使用してイベントを保持する変数に名前を付けます。
デフォルトでは、リスナーは 1 回起動します。すべてのイベントに対して繰り返すには、イベントタイプの前に all
キーワードを使用します。
フィルター
フィルターを追加するには、リスナーの括弧の間に 1 つ以上のフィールドを指定します。最上位フィールドのみをフィルタリングできます。より複雑なフィルタリング、またはイベントのサブプロパティ (辞書など)でのフィルタリングには、if
ステートメントを使用します。
標準イベントタイプとチャネル
標準の Things Cloud イベントの場合、イベントを送受信するためのチャネルを含む定数があります。次に例を示します。
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;
次の表にリストされているイベントは、com.apama.cumulocity
パッケージの一部です。
イベント | 送信用チャネル | 受信用チャネル |
---|---|---|
Operation | Operation.SEND_CHANNEL | Operation.SUBSCRIBE_CHANNEL |
Measurement | Measurement.SEND_CHANNEL | Measurement.SUBSCRIBE_CHANNEL |
Event | Event.SEND_CHANNEL | Event.SUBSCRIBE_CHANNEL |
Alarm | Alarm.SEND_CHANNEL | Alarm.SUBSCRIBE_CHANNEL |
ManagedObject | ManagedObject.SEND_CHANNEL | ManagedObject.SUBSCRIBE_CHANNEL |
MeasurementFragment | MeasurementFragment.SEND_CHANNEL | MeasurementFragment.SUBSCRIBE_CHANNEL |
メジャーメント フラグメント
Measurement
および MeasurementFragment
イベントは常に公開されます。
EPL では、Measurement
イベントではなく MeasurementFragment
イベントの内容に一致するリスナーを生成できます。
例:
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}
メジャーメント フラグメント を参照してください。
作成通知と更新通知の区別
{< product-c8y-iot >}} からの Alarm
、Event
、ManagedObject
、Operation
イベントを監視する場合、作成操作と更新操作を区別することが必要な場合があります。これらのイベントタイプにはそれぞれ、この目的のために isCreate()
および isUpdate()
という名前のアクションがあります。
新しいアラームを監視する例:
on all Alarm() as alarm {
if alarm.isCreate() {
log "Alarm created: " + alarm.toString() at INFO;
}
// else it's an update
}
同様に、更新されたアラームの場合のみ:
on all Alarm() as alarm {
if alarm.isUpdate() {
log "Alarm updated: " + alarm.toString() at INFO;
}
// else it's a create
}
Things Cloud からのイベントの場合、isUpdate()
またはisCreate()
のいずれかが常に true を返します。どちらのアクションも、選択肢と読みやすさを考慮して提供されています。
さまざまなタイプのオブジェクトの例などの詳細については、Apama ドキュメントの 更新通知の受信 を参照してください。
isCreate()
および isUpdate()
アクションの詳細については、API Reference for EPL (ApamaDoc) も参照してください。
例
この例では、com.apama.cumulocity.MeasurementFragment
API を使用して新しいメジャーメントを監視します。受信したメジャーメントをフィルタリングして、指定された最大速度を超える速度値を見つけ、制限を超えた場合にアラームを発します。
MeasurementFragment.SUBSCRIBE_CHANNEL
チャネルに登録します。- メジャーメント フラグメントを監視し、
type
、つまりc8y_SpeedMeasurement
でフィルターします。valueFragment
の値がc8y_speed
であること、およびvalueSeries
がspeedX
のみでフィルターされていることを確認してください。また、value
がSPEED_LIMIT
より大きい場合はvalue
でフィルターします。 - すべてのフィールドを指定するコンストラクターを使用して、イベントを作成します。
- イベントを正しいチャネル(
Alarm.SEND_CHANNEL
)に送信します。
結果の *.mon ファイルは、次のようになります。
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;
monitor TriggerAlarmForSpeedBreach {
constant float SPEED_LIMIT := 30.0;
action onload() {
monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
// Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
"Speed limit breached", "ACTIVE", "CRITICAL", 1,
new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
}
}
組み込みアクション
概要
Apama EPLでは「アクション」と呼ばれる機能を利用することができます。すべてのモニターには少なくとも 1 つのアクション、つまり onload
アクションがあります。このセクションでは、すぐに使用できる組み込みのアクションについて説明します。
組み込みタイプのアクションについては、EPLのAPIリファレンス (ApamaDoc) を参照してください。
Things Cloud データのクエリ
履歴データを操作するには、次のリクエストとレスポンスのイベント ペアのいずれかを使用してリソースを検索します。
例:アラームを検索するには、適切なクエリ パラメーターを指定して com.apama.cumulocity.FindAlarm
リクエスト イベントを FindAlarm.SEND_CHANNEL
チャネルに送信します。レスポンスとして、0 個以上の com.apama.cumulocity.FindAlarmResponse
イベント(検索リクエストに一致するリソースの数によって異なります)と、com.apama.cumulocity.FindAlarmResponseAck
イベントが FindAlarmResponse.SUBSCRIBE_CHANNEL
チャネルに期待されます。マネージドオブジェクト、イベント、メジャーメント、操作を検索するための同様の機能も提供されます。
次の表にリストされているイベントは、com.apama.cumulocity
パッケージの一部です。
検索対象 | リクエスト / レスポンス イベント | 例 |
---|---|---|
ManagedObject | FindManagedObject FindManagedObjectResponse FindManagedObjectResponseAck |
例 |
Alarm | FindAlarm FindAlarmResponse FindAlarmResponseAck |
例 |
Event | FindEvent FindEventResponse FindEventResponseAck |
例 |
Measurement | FindMeasurement FindMeasurementResponse FindMeasurementResponseAck |
例 |
Operation | FindOperation FindOperationResponse FindOperationResponseAck |
例 |
CurrentUser | CurrentUser GetCurrentUser GetCurrentUserResponse |
例 |
TenantOption | TenantOption FindTenantOptions FindTenantOptionsResponse |
ドキュメント |
Things Cloud REST API の他の部分の呼び出し
Things Cloud REST API は、個々のイベントタイプではカバーされていない、いくつかの追加機能をカバーします。REST API の他の部分を呼び出すために、Things Cloud API の任意の部分を呼び出すために使用できる汎用のリクエスト / レスポンス API が提供されています。
次のリクエスト / レスポンス イベントを使用できます。
- com.apama.cumulocity.GenericRequest
- com.apama.cumulocity.GenericResponse
- com.apama.cumulocity.GenericResponseComplete
詳細については、Things Cloud OpenAPI仕様にあるREST実装 および Apama ドキュメントの「Invoking other parts of the Cumulocity REST API」を参照してください。
HTTP サービスの呼び出し
REST および JSON を使用して HTTP サービスと対話するには、次のいずれかのファクトリ メソッドを使用して com.softwareag.connectivity.httpclient.HttpTransport
インスタンスを作成します。
- HttpTransport.getOrCreate(string host, integer port) は HttpTransport を返します
- HttpTransport.getOrCreateWithConfiguration(string host, integer port, dictionary <string, string> configurations) は HttpTransport を返します(構成ディクショナリ内のキーは
CONFIG_
接頭辞が付いた HttpTransport の定数です)
HttpTransport
オブジェクトで、create メソッドの 1 つを呼び出し、必要に応じてパスとペイロードを渡し、Request
オブジェクトを生成します。
Request
オブジェクトでは、必要に応じて cookie、ヘッダー、クエリ パラメーターを設定し、execute(action<Response> callback)
でリクエストを呼び出すことができます。コールバックのモニターにアクションの名前を指定すると、リクエストが完了した(またはタイムアウトした)ときに Response
と共にそのアクションが呼び出されます。
コールバックでは、Response
オブジェクトに statusCode
と payload
が指定されます。ペイロードのフィールドには、それが提供される com.apama.util.AnyExtractor
オブジェクトを介してアクセスできます。次の「フラグメントにアクセスする」に関する情報を参照してください。
さらなる詳細は、EPLのAPIリファレンス (ApamaDoc)を参照してください。
ユーティリティ関数
フラグメントにアクセスする
params
ディクショナリを介して、ほとんどのイベントのフラグメントにアクセスできます。AnyExtractor
オブジェクトは、複数のサブフラグメントを含む任意のオブジェクトからデータを抽出して以下にアクセスできるように構築できます。
-
アクション getInteger(string path) :整数を返します
-
アクション getFloat(string path) :浮動小数点数を返します
-
アクション getString(string path) :文字列を返します
-
アクション getBoolean(string path) :ブール値を返します
-
アクション getSequence(string path) :シーケンス <any> を返します
-
アクション getDictionary(string path) :ディクショナリ <any, any> を返します
JSON パスを使用してオブジェクト構造内を移動できます。
例:
string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");
“fragment” の例: “c8y_TemperatureMeasurement”.
“sub.fragment.object” の例: “c8y_TemperatureMeasurement.T.Unit”.
「any」値をキャストする
あるいは、キャストを使用してany
を特定の型に変換します。
string s := <string> measurement.params["strfragment"];
オブジェクトの型が異なる場合、キャスト操作がスローされることに注意してください。
currentTime と TimeFormatter
読み取り専用変数 currentTime
を使用して、現在のサーバー時刻を取得できます。Apama は、Unix エポック(UTC 1970 年 1 月 1 日)からの秒数を使用して時間を処理します。TimeFormat
オブジェクトを使用すると、人間が判読できる形式に簡単に変換できます。TimeFormat
オブジェクトは、日付と時刻の書式設定や解析に使用できます。
例:
using com.apama.correlator.timeformat.TimeFormat;
monitor Example {
action onload {
log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
}
}
TimeFormat
とその関数に関する詳細については、Apama ドキュメントの TimeFormat イベントライブラリの使用 および「EPLのAPIリファレンス(ApamaDoc)」を参照してください。
inMaintenanceMode
Util.inMaintenanceMode()
関数は、デバイスが現在メンテナンスモードであるかどうかを簡単に確認する方法です。マネージドオブジェクトをパラメーターとして受け取り、デバイスがメンテナンスモードの場合は true となるブール値を返します。
例:
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.Util;
monitor ExampleMonitor {
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
integer reqId := integer.getUnique();
send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
if not Util.inMaintenanceMode(d.managedObject) {
send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
}
}
}
}
}
プレースホルダーを置き換える
文字列を構築するには、次のように連結を使用できます。
string s:= "An event with the text " + evt.text + " has been created.";
テキストが長くなり、データから動的に設定される値が増える場合、Util.replacePlaceholders()
関数を使用できます。テキスト文字列では、イベントのフィールド名でプレースホルダーをマークし、それを #{}
で囲みます。replacePlaceholders
の 2 番目のパラメーターには、任意のイベントタイプを指定できます。
Utils::replacePlaceholders
は、イベントまたはイベントのパラメーターで指定されたフィールド名を検索して、テキストの代替を生成します。タイプ #{X.Y}
のフィールド名を使用して、イベント内のネストされた構造にアクセスできます。
myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);
置換文字列が #{source.name}
のような形式である場合、source.name
は基礎となるマネージドオブジェクト / デバイスの名前です。または、#{source.c8y_Hardware.notes}
のような形式(c8y_Hardware
はマネージドオブジェクトのフラグメント)である場合、交換するには特別な処理が必要になります。最初の置換後、プレースホルダー フィールド名を更新し、ソースの managedObject
を使用して Util::replacePlaceholders
を再度実行する必要があります。
myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);
高度な機能
カスタムフラグメント
Things Cloud API を使用すると、データを自由に構造化できます。Apama EPL では、これは、タイプ dictionary<string, any>
型の params
に、エントリを追加することによって行われます。com.apama.cumulocity
パッケージ内の各 Things Cloud イベント(Alarm
、Event
、Measurement
、Operation
など)には、params
フィールドがあります。これは、フラグメントまたはオプションのフィールドに変換されます。したがって、イベントを受信するとき、コードは params
フィールド内のエントリを検索する必要があります。イベントを送信するときは、イベントタイプを定義することによって行うことも、dictionary<string, any>
タイプを使用することもできます。イベントを受信するときの EPL タイプは dictionary<any, any>
です。EPL は厳密に型指定されているため、フラグメントのないイベントを作成する場合、new dictionary<string, any>
式が必要になることに注意してください。辞書リテラルを使用してインラインでエントリを提供する場合、EPL は最初のキーと値のペアの型に基づいて型を決定します。つまり、dictionary<string, any>
の場合、<any>
キャスト演算子を使用して最初の値を any
型にキャストします。
send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;
MeasurementValue
タイプは、Measurement
タイプのメジャーメントに対して提供されます。MeasurementValue
には、value
フィールドと unit
フィールド、および他のフラグメントの params
があります。
例 1:
send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
"c8y_TemperatureMeasurement":{
"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
}},
new dictionary<string,any>) to Measurement.SEND_CHANNEL;
これにより、次の JSON 構造が生成されます。
{
"type": "c8y_TemperatureMeasurement",
"time": "...",
"source": {
"id": "12345"
},
"c8y_TemperatureMeasurement": {
"T1": {
"value": 1,
"unit": "C"
},
"T2": {
"value": 1,
"unit": "C"
},
"T3": {
"value": 1,
"unit": "C"
},
"T4": {
"value": 1,
"unit": "C"
},
"T5": {
"value": 1,
"unit": "C"
},
}
}
メジャーメント フラグメント
メジャーメントは、個々のメジャーメント フラグメントに分割できます。これは、メジャーメント内に存在する各フラグメントおよびシリーズに対して実行できます。メジャーメント フラグメントの詳細については、Things Cloud のドメインモデル を参照してください。
メジャーメント フラグメントまたはシリーズに基づいたフィルタリングが必要な場合は、com.apama.cumulocity.MeasurementFragment
タイプのイベントを監視します。com.apama.cumulocity.Measurement
イベントを監視して、measurements
ディクショナリ内を調べる代わりになります。詳細については、Apamaドキュメントのメジャーメント フラグメントの使用 を参照してください。
リスナー
受信したイベントによってステートメントをトリガーすることが唯一の可能性ではありません。次のセクションでは、リスナーを組み合わせる他の方法について説明します。詳細については、Apamaドキュメントの Defining Event Listeners を参照してください。
フィルター
フィルターを使用すると、他のトリガーの組み合わせまたはシーケンスによってトリガーできます。例えば、次のようなトリガーがあるとします。
on all Event() as e { ... }
パターンにフィルターを追加することもできます。
on all Event(type = "c8y_EntranceEvent") as e { ... }
複数のイベントを監視できます。
on Event() as e and Alarm() as a { ... }
これは、イベントとアラーム イベントを受信するとトリガーされ、それぞれの最初のイベントがキャプチャされます。
シーケンスによってトリガーすることもできます。
on all (Event() as e -> Alarm() as a) { ... }
これは、「アラームに続くイベント」のペアごとにトリガーされます。イベントを受信すると、それ以降のイベントの待機を停止し、代わりにアラームの待機を開始します。アラームを受信すると、再びイベントの監視が開始されます。
タイマー
時間に基づいてリスナーをトリガーすることもできます。特定の間隔でトリガーすることもできます。例えば、5 分(300 秒)ごとにトリガーすることもできます。
on all wait(300.0) { ... }
または、Unix の cron スケジューラと同様の機能を使用して、1 日の特定の時間にリスナーを起動させることもできます。
// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59
on all at(*, *, *, *, *) {} // trigger every minute
on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month
タイマーパターンを、他のパターンと組み合わせることもできます。例えば、別のイベント後の一定時間内にイベントがあったかどうかを確認できます。
on Event() -> wait(600.0) and not Alarm() { ... }
これは、イベントが発生し、10 分(600 秒)以内にアラームが発生しない場合にトリガーされます。イベントが発生した場合にリスナーを終了する not
の使用に注意してください。
テナント オプションを使用して、on all at
タイマーに使用されるタイムゾーンを設定できます。テナント オプションを設定するには、microservice.runtime
カテゴリと timezone
キーを指定します。
例:
{
"category" : "microservice.runtime",
"key" : "timezone",
"value" : "Europe/Warsaw"
}
このドキュメントの タイムゾーン変数 や、Apama ドキュメントのサポートされているタイムゾーン も参照してください。
ストリーム - ウィンドウ
ストリームを使用すると、イベントのウィンドウを操作できるようになります。ストリームは、on
の代わりに from
キーワードを使用し、操作対象のウィンドウを定義し、集計を使用してそのウィンドウから必要な出力を選択します。ウィンドウ は次の 2 つの方法で制限できます。
-
一定期間の ウィンドウ -
within
キーワードを使用します。from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements ["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
-
一定量のイベントがあるウィンドウ -
retain
キーワードを使用します。from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
ストリーム - 定期的に出力する
ストリームは、every
指定子を使用して評価の頻度を制御することもできます。
// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }
// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }
// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
組み込み集計関数 については、Apamaドキュメントを参照してください。
独自のイベントタイプの作成
事前定義されたイベントタイプだけでなく、独自のイベントタイプを定義することもできます。これらは、同じモジュールの他の部分をトリガーする発生するイベントのパターンを検出するのに役立ちます。
event MyEvent {
Measurement m1;
Measurement m2;
}
...
on Measurement() as m1 -> Measurement() as m2 {
route MyEvent(m1, m2);
}
独自のアクションの作成
通常、次の例に示すように、アクション(Java の関数とよく似ています)を使用してモニターを構築します。
指定された重大度を上げる:
action upgradeSeverity(string old) returns string {
if old = "WARNING" { return "MINOR"; }
if old = "MINOR" { return "MAJOR"; }
if old = "MAJOR" { return "CRITICAL"; }
return old;
}
2 つの地理座標間の距離を計算します。
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
変数
モジュール内で変数を定義できます。
string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];
モニタースコープ変数を定義する場合(モニター内であるが、モニターのどのアクション内にも存在しない場合)、リスナーでのイベント共同割り当ての際、as
の代わりにコロン(:)を使用すると、リスナーで使用できます。次の例では、10 秒ごとに最新のイベントを記録します。
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {}
on all wait(10.0) {
log e.toString();
}
}
}
リスナーは開始時に、すべてのローカル変数のコピーを取得します。したがって、次の例では、間に他のイベントがあった場合でも、10 秒の遅延後に各イベントをログに記録します。
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {
on all wait(10.0) {
log e.toString();
}
}
}
}
モニター インスタンスとコンテキストの生成
単一のモニターで複数のデバイスを処理することは可能ですが(例えば、ストリームで group by
と partition by
を使用したり、他の状態のデバイス ID をキーとしたディクショナリを維持したりするなど)、処理を分離すると便利なことがよくあります。異なるデバイスを、個別のモニター インスタンスに分割します。
新しいモニター インスタンスは、spawn
ステートメントを使用して作成できます。これにより、モニターのモニター スコープ変数のコピーが取得され、新しいモニター インスタンスで指定されたアクションが実行されます。新しいモニターにはリスナーはコピーされません。新しいモニター インスタンスを生成するコンテキストを指定することもできます。異なるコンテキストは相互に同時に実行でき、異なるモニターを相互に分離するのにも役立ちます。コンテキストを構築するときは、コンテキストを識別するための名前と、コンテキストがパブリックかどうかを制御するためのブール値を指定します。つまり、デフォルトで Things Cloud イベントを受け取ります(デフォルト チャネルに送信されます)。
このパターンは、そのコンテキスト内の他のリスナーによって一致しないイベントを識別するために、unmatched キーワードとともによく使用されます。各モニターに個別のコンテキストを使用することにより、一致しない動作の範囲がそのモニターに限定されます。
例:
monitor PerDeviceMeasurementTracker {
action onload() {
spawn factory to context("PerDeviceMeasurementTracker", true);
}
action factory() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all unmatched Measurement() as m {
spawn perDevice(m);
}
}
dictionary<string, Measurement> latestMeasurementByType; // measurements for this device
action perDevice(Measurement m) {
processMeasurement(m);
on all Measurement(source = m.source) as m {
processMeasurement(m);
}
}
action processMeasurement(Measurement m) {
latestMeasurementByType[m.type] := m;
}
}
ベストプラクティスとガイドライン
EPL モニター
症状: イベント処理ルールが自動的に無効になります
モニターが onload
またはリスナーから例外をスローし、その例外がキャッチされなかった場合、モニターは終了します。例外をキャッチするか、発生する理由を回避します。
同様に、モニターがイベントの処理を完了し、アクティブなリスナーが残っていない場合、再度トリガーすることはできず、モニター自体が自動的に削除されます。
モニターごとの過剰なメモリ使用を回避します
イベント処理ルールがリスナーをリークしないようにしてください。例えば、リクエストとレスポンスの操作を行う場合、レスポンスが処理された後、またはタイムアウトが発生してレスポンスがない場合に、リスナーがアクティブのままになっていないことを確認してください。
数値形式
Things Cloud のメジャーメントには float 型が使用されます。タイムスタンプは float(1970 年 1 月 1 日 00:00 UTC からの秒数)として保存されることに注意してください。
チャネルとコンテキストのサブスクライブ
コンテキストは、Apama 内の並列処理ユニットです。モニター インスタンスは、spawn...to
構文を使用して複数のコンテキストにデプロイできます。チャネルをサブスクライブすると、コンテキスト内のすべてのモニター インスタンスがそのサブスクリプションのイベントを受信します。したがって、異なるサブスクリプションを異なるコンテキストに配置することをお勧めします。コンテキストを使用すると、過負荷になっているアプリケーションの一部がアプリケーションの他の部分に影響を与えるのを防ぐことができます。
コンテキストはわかりやすい名前で作成され、コンテキスト オブジェクトの個々のインスタンスは、たとえ同じ名前であっても、異なるコンテキストに対応します。
例:
action onload() {
context subContext := context("Worker");
spawn worker() to subContext;
}
action worker() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
...
}
}
Things Cloud におけるApamaの制限事項
Things Cloud 環境内で Apama を使用する場合、Apama をスタンドアロンで使用する場合に利用できる機能に必然的にいくつかの制限がかかります。
Things Cloud 内の Apama にアセットをデプロイするにはさまざまな方法があり、制限はそれらのメカニズムによって異なります。
- EPL アプリ: Apama アセットを完全に管理された Apama コリレーターにデプロイするための最も簡単なメカニズム。アプリケーションのデプロイ を参照してください。
- カスタムマイクロサービス: Things Cloud の マイクロサービス SDK を使用して、より複雑な Apama プロジェクトを構築できます。
あらゆる Things Cloud 環境内にデプロイされる Apama ソリューションを設計する場合は、次の点を考慮してください。
EPL アプリまたはカスタムマイクロサービスを使用する場合の Apama の一般的な制限事項
-
スケーラビリティのために、コリレーターはホスト間を移動する可能性があるため、永続的なファイルシステムにはアクセスできません。すべてのマイクロサービス(プラットフォームによって提供またはカスタム)は、ステートレスでなければならないという標準の Things Cloud 制約があります。マイクロサービス SDK を参照してください。
この影響を受ける Apama 機能は、次の通りです。- コリレーターの持続性
- MemoryStore の永続性
-
外部システムまたはプロセスへの非 HTTP/REST 接続は、ほとんど実用的ではありません。ただし、サービスがインターネット経由で利用可能な場合、それを使用できます(例えば、Apama 内の HTTP クライアントは、公的にアクセス可能な HTTP サーバーに接続できます)。
この影響を受ける Apama 機能は、次の通りです。- Apama データベース コネクタ (ADBC)
- コリレーターに統合された Java Message Service(JMS)のサポート
- 分散メモリストア
- コリレーター間の接続
-
セキュリティとユーザーアクセス制御の実装のため、Things Cloud はコリレーターポートを外部プロセスで使用できるようにしません。マイクロサービス SDK を参照してください。
次の機能はコリレーターポートへのアクセスを必要とするため、このアクセス制御と互換性がありません。- engine_connect、engine_management、engine_send、engine_receive などのコマンドラインツール
- エンジン管理 API、イベントサービス API、シナリオサービス API
- IAF アウトプロセスで実行されているアダプターへの接続
- ダッシュボード(Apama に同梱)
- Apama Plugin for Eclipse からのデバッグ。代わりに、ローカルコリレーターで実行されているアプリをデバッグする
- コリレーター REST インターフェース
-
起動時のアプリケーションのメモリ使用量とアプリケーションの起動時間の両方を削減するには、自動的にアンロードするモニターを挿入する前に、および時間のかかるクエリを実行する前に、アプリケーションが完全に初期化されていることを確認します。
EPL アプリを使用する場合の Apama 固有の制限事項
-
使いやすさを考慮して、コリレーターの起動は Things Cloud によって制御されます。したがって、構成ファイルまたはコマンドライン オプションの変更が必要な機能にはアクセスできません。
この影響を受ける Apama 機能は、次の通りです。- 持続性
- 接続プラグイン
-
セキュリティのため、コリレーターが使用するファイルシステムにはアクセスできません。
この影響を受ける Apama 機能は、次の通りです。- 入力ログへのアクセス
- カスタム プラグインの使用
- 強化のためのファイルシステム アセットの使用
-
簡単にするために、独立した EPL インジェクションを行うことのみが可能です。各モニターは独立して管理されるため、異なるモニター間に依存関係を作成することはできません。
この影響を受ける Apama 機能は、次の通りです。- *.monファイルは package ステートメントを含んではいけません(含むことはエラーです)
- 別々の *.mon ファイル間でイベント定義を共有することはできません
- Apama クエリは使用できません
- Apama Plugin for Eclipseを使用したアプリケーションの開発 にリストされているバンドルのみを使用できます
これらの制限はすべて Things Cloud 内で、EPL アプリがスムーズかつ安全に動作することを保証するために実装されています。
例題
メジャーメントの 1 時間ごとの平均を計算する
入力データが、次のようになっていると仮定します。
{
"c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
"time": "...",
"source": {"id":"..."},
"type": "c8y_TemperatureMeasurement"
}
平均値 (平均) を作成するには、モジュール内に次の部分が必要です。
-
デバイス(ソース)ごとにグループ化された 1 時間以上の時間枠
-
1 時間ごとの平均計算、ソースおよび単位を返す
select
(ウィンドウの内容に対して集計を使用する必要があるため、最後の単位を選択します。すべてのメジャーメントが同じ単位であると仮定します)。これらを保持するAverageByDevice
イベント定義に注目してください。 -
すべて新しいメジャーメントとして作成します
例:
using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;
monitor HourlyAvgMeasurementDeviceContext {
event AverageByDevice {
string source;
float avgValue;
string unit;
}
action onload() {
// Measurement.SUBSCRIBE_CHANNEL にサブスクライブしてすべての測定値を受信
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
group by m.source select
AverageByDevice(m.source,
avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
{"c8y_AverageTemperatureMeasurement":
{
"T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
}
}, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
}
}
}
ビット メジャーメントからアラームを作成する
多くの場合、デバイスはアラーム ステータスをレジスタに保持しており、アラームの意味を解釈できません。この例では、デバイスがメジャーメントにおいてレジスタ全体をバイナリ値として送信するだけであると仮定します。ルールはビットを識別し、それぞれのアラームを作成する必要があります。
アラーム テキスト、各ビットのタイプと重大度、値を検索するアクションをマッピングする 3 つのディクショナリを作成します。-1 を使用してデフォルト値を示し、<position> を位置の文字列形式に置き換えます。
dictionary<integer, string> positionToAlarmType := {
0 : "c8y_HighTemperatureAlarm",
1 : "c8y_ProcessingAlarm",
2 : "c8y_DoorOpenAlarm",
3 : "c8y_SystemFailureAlarm",
-1 : "c8y_FaultRegister<position>Alaram"
};
dictionary<integer, string> positionToAlarmSeverity := {
0 : "MAJOR",
1 : "WARNING",
2 : "MINOR",
3 : "CRITICAL",
-1 : "MAJOR"
};
dictionary<integer, string> positionToAlarmText := {
0 : "The machine temperature reached a critical status",
1 : "There was an error trying to process data",
2 : "Door was opened",
3 : "There was a critical system failure",
-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};
action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
string template := lookup.getOr(bitPosition, lookup[-1]);
return template.replaceAll("<position>", bitPosition.toString());
}
バイナリメジャーメントを分析するには、それを文字列値として解釈し、各文字をループします。getActiveBits()
関数はこれを実行し、メジャーメントが「1」であったビット位置のリストを返します。次に for
ループを使用して、それを反復処理します。
action getBitPositions(string binaryAsText) returns sequence<integer> {
sequence<integer> bitsSet := new sequence<integer>;
integer i := 0;
while i < binaryAsText.length() {
string character := binaryAsText.substring(i, i+1);
if character = "1" {
bitsSet.append(binaryAsText.length() - i - 1);
}
i:=i+1;
}
return bitsSet;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement(type = "c8y_BinaryFaultRegister") as m {
string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
integer bitPosition;
for bitPosition in getBitPositions(faultRegister) {
Alarm alarm := new Alarm;
alarm.type := getText(bitPosition, positionToAlarmType);
alarm.severity := getText(bitPosition, positionToAlarmSeverity);
alarm.text := getText(bitPosition, positionToAlarmText);
alarm.source := m.source;
alarm.time := m.time;
alarm.status := "ACTIVE";
send alarm to Alarm.SEND_CHANNEL;
}
}
}
このようなメジャーメントを作成します。
{
"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
"time": "...",
"source": {"id": "..."},
"type": "c8y_BinaryFaultRegister"
}
最後のステートメントを 3 回トリガーします。
- ビット位置 1 でのメジャーメント: c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”(データの処理中にエラーが発生しました)
- ビット位置 2 でのメジャーメント: c8y_DoorOpenAlarm, MINOR, “Door was opened”(ドアが開きました)
- ビット位置 4 でのメジャーメント: c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”(バイナリ障害レジスタの位置 4 で未定義のアラームが報告されました)
したがって、3 つのアラームが作成されます。
消費量のメジャーメント
何かの現在の充填レベルを測定し、その値を定期的に Things Cloud に送信するセンサーがあると仮定すると、追加の消費値を簡単に作成できます。2 つのメジャーメント間の絶対差を計算すると便利ですが、明確な結果が得られるのは、メジャーメントが常に同じ間隔で送信された場合のみです。そこで、絶対差を時差に換算し、1 時間当たりの消費量として計算します。
2 つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、デバイスの 2 つの隣接するメジャーメントと時間差を比較します。
using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;
monitor FillLevelMeasurements {
event FillLevel {
float firstValue;
float firstTime;
float lastValue;
float lastTime;
string source;
}
action calculateConsumption(FillLevel l) returns float {
if(l.firstTime = l.lastTime) {
return 0.0;
} else {
return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
}
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {
Measurement m := new Measurement;
m.type := "c8y_HourlyWaterConsumption";
m.time := currentTime;
m.source := fill.source;
MeasurementValue mv := new MeasurementValue;
mv.value := calculateConsumption(fill);
mv.unit := "l/h";
m.measurements[m.type] := {"consumption":mv};
send m to Measurement.SEND_CHANNEL;
}
}
}
その他のサンプル アプリケーション
ストリーミング分析アプリケーションの EPL エディタには、Apama EPL の使用方法(Things Cloud オブジェクトのクエリやアラームの作成など)を示すいくつかのサンプル アプリケーションが用意されています。これらのサンプルを使用して独自のアプリケーションを構築できます。
ケーススタディ: 円形ジオフェンスアラーム
概要
このセクションでは、より複雑なルールを作成する方法の詳細な例を示します。このガイドの他のセクションで、前に説明した機能の複数を使用します。
Apama EPLを使い始めたばかりの場合は、例題 を参照してください。
前提条件
目標
位置イベントを継続的に送信している追跡デバイスがジオフェンスの外に移動した場合に、自動的にアラームを生成するようにしたいと考えています。このジオフェンスは円形になり、デバイスごとに個別に構成できる必要があります。アラームは、デバイスがジオフェンスの外に移動した瞬間に作成されます。屋外に移動している間は、最初のアラームがアクティブのままであるため、新しいアラームは作成されません。デバイスがジオフェンス内に戻るとすぐに、アラームはクリアされます。
Things Cloud データモデル
位置イベントの構造(必要な部分):
{
"id": "...",
"source": {"id": "..."},
"text": "...",
"time": "...",
"type": "...",
"c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}
ジオフェンス構成をデバイスに保存します(半径はメートル単位で構成されます)。
{
"c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}
さらに、構成を完全に削除せずに、各デバイスのジオフェンス アラームを有効/無効にしたいと考えています。これを行うには、デバイスの c8y_SupportedOperations
に「c8y_Geofence」を追加または削除します。
{
"c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}
計算
現在の位置と中心の間の距離がジオフェンスの構成された半径よりも大きい場合、デバイスはジオフェンスの外側になります。必要なのは、2 つの 地理座標間の差を計算できる関数です。
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
上記のアクションは、距離をメートル単位で返します。
ステップ 1: 入力のフィルタリング
このモジュールの主な入力はイベントです。一致しないイベントをできるだけ早く破棄するために、リスナーの最初のチェックとしてこれを実行します。
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// we have an event
}
}
ステップ 2: 必要なデータの収集
次のステップでは、計算のためにジオフェンスの構成が必要になり、それを取得します。
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
}
ステップ 3: デバイスが c8y_Geofence をサポートしているか確認
デバイスが利用可能になったので、デバイスにジオフェンスが構成されているかどうか、およびアクティブ化されているかどうか(supportedOperations
に「c8y_Geofence」が含まれている)を確認します。c8y_SupportedOperations
配列をチェックするには、indexOf()
関数を使用できます。この関数はすべての要素をループし、そのエントリのインデックスを返します。値が存在しない場合、負の数を返します。設定では、デバイスにフラグメント「c8y_Geofence」が含まれているかどうかを確認するだけです。
イベントとデバイスを取得したら、イベントの c8y_Position
とデバイスの c8y_Geofence
からデータを抽出します。これらのオブジェクトは、params
内の dictionary<any, any>
エントリにマッピングされます。params
はタイプ any
の値を保持するため、dictionary<any, any>
にキャストする必要があります。
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
}
ステップ 4: トリガーの作成
前述したように、現在のデバイスの位置とジオフェンスの中心の間の距離が構成されたジオフェンスの半径よりも大きい場合、デバイスはフェンスの外側にあります。アラームをトリガーするには、2 つのイベントが必要です。これにより、これら 2 つのイベント内でデバイスがジオフェンスに入ったか、ジオフェンスから出たかを確認できるようになります。
最初のステップでは、前述の関数を使用して距離を計算します。
float d := distance(centerLat, centerLng, eventLat, eventLng);
次に、これをイベントとして再ルーティングします。
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
...
route LocationEventWithDistance(e.source, d, e, maxDistance);
ソースをイベント内に配置して、リスナー内で簡単に照合できるようにします。
次に、イベント LocationEventWithDistance
によってトリガーされるリスナーを設定し、同じソースの次の LocationEventWithDistance
を監視します。
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
}
}
この LocationEventWithDistance
イベントのペアには、アラームを作成する必要があるかどうかをチェックするためのすべてのデータが保持されます。最初のイベントと同じソースのものになるように、secondPos
イベントをフィルタリングしていることに注意してください。イベントを受信したすべてのデバイスに、アクティブなリスナーが存在します。
ステップ 5: アラームの作成
アラームを作成するには、最初のイベントの距離が半径より小さく、2 番目のイベントの距離が半径より大きい 2 つのイベントが必要です。これは、デバイスがジオフェンスから出たばかりであることを意味します。
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
ステップ 6: アラームのクリア
アラームをクリアするには、下部で条件を切り替え、さらに現在アクティブなアラームを取得してその ID を取得する必要があります。この時点では、既存のアラームがあるかどうかを気にする必要はありません。何もない場合、リスナーは and not FindAlarmResponseAck
をトリガーし、リスナーを終了します。
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
すべてを統合する
すべてのパーツを 1 つのモジュールに結合できるようになりました。リスナーの順序は関係ありません。
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
monitor MonitorDevicesForCircularGeofence {
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
action onload {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// イベントがあります
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
float d := distance(centerLat, centerLng, eventLat, eventLng);
route LocationEventWithDistance(e.source, d, e, maxDistance);
}
}
}
}
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
}
}
}
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
}
Apama を他のマイクロサービスに接続
概要
Apama を使用したストリーミング分析アプリケーションは、他のマイクロサービスで実行されているアプリケーションを利用できます。このセクションでは、Apama-ctrl マイクロサービスの /health
エンドポイントを使用しますが、手順は Things Cloud 内で実行されている他のマイクロサービスへの接続にも適用されます。このセクションでは、Apama EPL 内から Things Cloud プラットフォームへの接続を作成し、他のマイクロサービスを直接呼び出す方法を説明します。次に、リクエストを作成し、結果をデコードする方法を示します。
ストリーミング分析アプリケーションの一部である EPL エディタを使用して EPL アプリを開発していることを前提とし、マイクロサービスへのリクエストの例を示します。このガイドの手順は、Apama アプリケーションを作成する他の方法にも適用でき、あらゆるマイクロサービスとのやり取りに使用できます。
CumulocityRequestInterface
APIを利用します。このAPIに関する技術情報については、Apamaドキュメントのマイクロサービスの呼び出し を参照してください。
EPL アプリの作成
アプリケーション スイッチャーでストリーミング分析アイコン をクリックします。表示されるホーム画面で、EPL アプリ ページに移動し、新しい EPL アプリ をクリックします。別のマイクロサービスと連携するアプリを作成するための EPL エディタ ウィンドウが表示されます。
Things Cloud プラットフォームへの接続
これらのリクエストをサポートするために、Things Cloud プラットフォームに自動的に接続し、他のマイクロサービスを呼び出すために使用できるリクエストを作成するアクションを含むヘルパーイベントを提供しています。このヘルパーイベントは CumulocityRequestInterface
と呼ばれ、com.apama.cumulocity
パッケージ内にあります。このヘルパーイベントは、Things Cloud に接続してイベントのインスタンスを返す静的アクションを提供します。マイクロサービス内、Things Cloud プラットフォーム自体、またはリモート コリレータから自動的に接続できます。そのインスタンスには、特定のマイクロサービスを呼び出すリクエストを作成するアクションが含まれています。
独自のコードから接続を作成するには、connectToCumulocity
メソッドを呼び出して結果を保存するだけです。
CumulocityRequestInterface cumulocity := CumulocityRequestInterface.connectToCumulocity();
これにより、マイクロサービスに提供された資格情報と接続詳細、または外部 Apama インスタンスから接続する場合は Things Cloud トランスポートの設定を使用して、自動的に接続が作成されます。
マイクロサービス リクエストの作成
CumulocityRequestInterface
インスタンスには、リクエストを作成するためのアクションがあります。
/**
* Allows creation of a request on a transport that
* has been configured for a Cumulocity connection.
*
* @param method The type of HTTP request, for example "GET".
* @param path A specific path to be appended to the request.
* @param payload A dictionary of elements to be included in the request.
*/
action createRequest(string method, string path, any payload) returns Request
このアクションは、使用する HTTP メソッド(通常は GET、PUT、POST)、Things Cloud サービス プレフィックスを含むパス(通常は /service/serviceName/path/on/service のような形式)、およびペイロードを受け取ります。ペイロードは、マイクロサービスに送信する前に JSON ドキュメントに変換されます。このアクションは HTTP クライアントインターフェースの一部である Request
オブジェクトを返します。HTTP クライアントインターフェースのドキュメントは、EPL の API リファレンス(ApamaDoc) に記載されています。
リクエストはコールバック アクションを引数として実行され、リクエストが完了するとレスポンスが引数として渡されます。リクエストにオプション、クエリパラメータ、またはヘッダーを設定する必要がある場合、呼び出し前に Request
オブジェクトに設定できます。
例:
action responseCallback(Response resp) {
string objectId := resp.payload.getString("id");
...
}
...
Request req := cumulocity.createRequest("GET", "/service/otherService/data", any());
req.setQueryParameter("type", "object");
req.execute(responseCallback);
レスポンスも JSON からデコードされ、レスポンスペイロードは HTTP クライアント トランスポートのドキュメントにある Response
イベントからリンクされている AnyExtractor
パターンを使用します。上記の例は、REST リクエスト GET http://cumulocity/service/otherService/data?type=object
と同等になります。
マイクロサービス エンドポイントへのリクエスト例
以下は、別のマイクロサービスにクエリを実行する方法を示す非常にシンプルなアプリケーションです。例として、Apama-ctrl マイクロサービスの /health
エンドポイントを使用しています。
まず、EPL から始めます。これは、Things Cloud に接続し、リクエストを送信するアクションを呼び出します。
using com.apama.cumulocity.CumulocityRequestInterface;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;
monitor CallAnotherMicroservice {
CumulocityRequestInterface requestIface;
action onload() {
requestIface := CumulocityRequestInterface.connectToCumulocity();
sendHealthRequest();
}
リクエストを送信する
最初に、リクエスト タイプ、リクエストパス、ペイロード any()
を使用して Request
を作成します。この例では、ペイロードに何も入れる必要がないためです。
次に、execute
を使用してリクエストを送信し、レスポンスで呼び出されるアクションを指定します。
action sendHealthRequest()
{
Request healthRequest:=
requestIface.createRequest("GET", "/service/cep/health", any());
healthRequest.execute(responseHandler);
}
この例では、コンテキストパスが /cep
である Apama-ctrl マイクロサービスを使用します。これを別のマイクロサービス用に修正するには、/cep
を、マイクロサービスのマニフェストで定義されているコンテキストパスに置き換えてください。
この例では、/health
エンドポイントがリクエストパスを完成させますが、マイクロサービスの有効なエンドポイントであればどれでも置き換えることができます。
レスポンスを受信する
リクエスト送信時に使用した定義済みのアクションを次に示します。このアクションは、送信されたリクエストへのレスポンスとして呼び出され、Response
オブジェクトで提供されます。
この例では、単にステータスコードとレスポンス本文のみをログに記録します。
action responseHandler(Response healthResponse)
{
integer statusCode := healthResponse.statusCode;
string payload := healthResponse.payload.data.toString();
log "Health response status code = " + statusCode.toString() +
", response body = " + payload at INFO;
}
他のマイクロサービス
このセクションでは、Apama-ctrl マイクロサービスとの通信について示しました。ただし、JSON ペイロードを含む標準 REST リクエストを使用している限り、Things Cloud を介して他のマイクロサービスにアクセスすることもできます。マイクロサービス名の後に、マイクロサービス内のリクエストのパスを続けて、適切な /service
URL を構築するだけです。