EPLアプリ

備考
このドキュメントは、Apama アプリケーション開発に関する基本的な知識があることを前提としています。詳細については、Apamaドキュメント を参照してください。

Apamaイベント処理言語(EPL)の使用

Apama イベント処理言語は Java 言語と構文的に類似しています。ifwhilefor のようなシンプルなフロー制御ステートメントに加えて、Things Cloud では流れてくるデータ(イベント)に反応するために on キーワードを使用してリスナーを作成できます。

Apama EPLについては、Apama ドキュメント を参照してください。

下記のステートメントでは、特定の温度より高い温度データを連続的にセンサーから抽出しています。

on all Measurement(type="c8y_TemperatureMeasurement") as m {
	if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
		Alarm alarm    := new Alarm;
		alarm.type     := "c8y_TemperatureAlert";
		alarm.source   := m.source;
		alarm.time     := currentTime;
		alarm.text     := "Temperature too high";
		alarm.status   := "ACTIVE";
		alarm.severity := "CRITICAL";
		send alarm to Alarm.SEND_CHANNEL;
	}
}

ここの Measurement とは、メジャーメントデータを含む定義済みのイベントです。こちらの例では、mMeasurement のイベントであり、c8y_TemperatureMeasurement のメジャーメントをフィルタリングし、そのプロパティとなる c8y_TemperatureMeasurement.T.value の温度センサーの摂氏でのリスナーとなります(フラグメントライブラリ をご参照ください)。

上記のようなリスナーは、モニターの onload ステートメントに配置する必要があります。ファイルには、リスナーで使用する型の using ステートメントが含まれている必要があります。Things Cloud で利用するイベントのほとんどは、com.apama.cumulocity パッケージに含まれています。すべてのリストを以下に示しますが、以降の例では簡潔にするためにこれらを省略します。

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Error;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;
using com.apama.cumulocity.SendEmail;
using com.apama.cumulocity.SendSMS;
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
using com.apama.correlator.timeformat.TimeFormat;
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Response;

monitor ListenForHighTemperatures {
	action onload() {
		on all Measurement(type="c8y_TemperatureMeasurement") as e {
			if e.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
				// メジャーメントを処理する
			}
		}
	}
}

派生データはどのように作成したらよいですか?

新しいアラームまたはオペレーションを作成するには、関連するイベント型のインスタンスを作成し、send ステートメントで関連するチャネル(イベント型の定数で定義)に送信します。例えば、温度が定義された値を超えるとただちに発生するアラームが必要な場合、下記のようなステートメントで実行可能になります。

on all Measurement(type="c8y_TemperatureMeasurement") as m {
	if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
		send Alarm("","c8y_TemperatureAlert",m.source,currentTime,"温度が高すぎます","ACTIVE","CRITICAL",1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
	}
}

技術的に説明すると、このステートメントは温度センサーが摂氏 100℃ を超えるたびに新しい Alarm イベントを作成し、それを Things Cloud に送信しています。

どのようにデバイスを制御すればよいですか?

Things Cloud のEPLによるリモート制御は、オペレーションイベントを送信することで行われ、派生データの 1 つとなります。例えば、デバイス上でオペレーションを走らせたい場合、デバイスをターゲットにした新しいオペレーションを作成します。下記では、温度計の温度によるスイッチの動きになります。

on all Measurement(type="c8y_TemperatureMeasurement") as m {
	if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
		send Operation("",m.source,"PENDING",{"c8y_Relay":<any>{"relayState":"CLOSED"}}) to Operation.SEND_CHANNEL;
	}
}
  • m.sourceは、トリガーとなる温度 ID のプレースホルダーとなります。

  • params フィールド(最後のフィールド)は、オペレーションのネストされた内容を定義します。この例では c8y_Relay オペレーションを作成し、relayState を CLOSED に設定します。最上位フィールドは dictionary<string, any> である必要があるため、<any> にキャストすることに注意してください。

どのようにデータを問い合わせればよいですか?

進行中のイベントプロセスの一部として時々関連情報を Things Cloud データベースに問い合わせることがあります。これはイベントを送信し、リスナーを使用してレスポンスを待機することでサポートされます。下記は、特定の顧客に 1時間ごとに自動販売機の総売上高を集計する場合の例になります。顧客データが購入後にトリガーされる売上レポートは、Things Cloud のデータベースから取得する必要があります。

using com.apama.aggregates.count;

monitor SalesReport {
	event SalesReport {
		Event e;
		ManagedObject customer;
	}
	event SalesOutput {
		integer count;
		string customerId;
	}

	action onload() {

		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

		on all Event() as e {
			monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
			integer reqId := integer.getUnique();
			on all FindManagedObjectResponse(reqId=reqId) as mor and not FindManagedObjectResponseAck(reqId=reqId) {
				route SalesReport(e, mor.managedObject);
			}
			on FindManagedObjectResponseAck(reqId=reqId) {
				monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
			}
			send FindManagedObject(reqId,"",{"childAssetId":e.source}) to FindManagedObject.SEND_CHANNEL;
		}

		from sr in all SalesReport() within 3600.0 every 3600.0
			group by sr.customer.id
			select SalesOutput(count(), sr.customer.id) as sales {
			send Measurement("", "total_cust_trx", "customer_trx_counterId", currentTime,
				{
					"total_cust_trx":{
						"total":MeasurementValue(sales.count.toFloat(), "COUNT", new dictionary<string,any>)
					}
				}, {"customer_id":<any> sales.customerId}) to Measurement.SEND_CHANNEL;
		}
	}
}

上記の例では、最初に SalesReport および SaleOutput イベントの定義を作成しています。これらは売上を特定する Event および ManagedObject を保持し、一連の売上から導出したい情報である countcustomerId を保持します。Event オブジェクトをリッスンし、イベントがどの ManagedObject から来たのかを調べるために FindManagedObject リクエストを送信します。これらの SalesReport オブジェクトは、ルート文を介してストリームクエリに送信されます。ストリームクエリは 1時間ごと(3,600秒)に発火し、顧客ごとの売上データの集計を選択し、その自動販売機の売上を表す新しいメジャーメントを送信します。

基本機能

アプリケーションの開発

EPL アプリはモニタ(*.mon)ファイルです。EPL アプリは 2 つの異なる方法で開発できます。

  • ストリーミング分析アプリケーション を使用して、Things Cloud のアプリケーションスイッチャーから利用可能な EPL アプリを Things Cloud 内で開発します。
  • または、ローカルマシンに Apama をインストールし、Apama Plugin for Eclipse、つまり別の環境で EPL アプリ(モニターファイルとして)を開発することができます。
備考
ストリーミング分析アプリケーションを使用して EPL アプリを開発およびデプロイしたり、モニターファイルを Apama Plugin for Eclipse から Things Cloud にインポートできるようにするには、テナントは、EPL アプリをサポートする Apama-ctrl マイクロサービスを登録する必要があります。ストリーミング分析アプリケーションに EPL アプリ ページが表示されず、EPL アプリを使用したい場合は、製品サポート にお問い合わせください。
注意
EPL アプリには、インベントリ、アラーム、その他多くの種類のオブジェクトであっても、テナント内のオブジェクトにほぼ任意の変更を加える機能があります。「CEP 管理」の管理者権限を持つユーザーは、EPL アプリを作成およびアクティブ化できるため、現在のテナントをほぼ完全に制御することもできます。したがって、テナント上のどのユーザーがこの権限を持っているかに注意する必要があります。

ストリーミング分析アプリケーションを使用したアプリの開発

ストリーミング分析アプリケーションの EPL アプリ ページには、新規または既存の EPL アプリ(*.mon ファイル)を対話的に編集したり、EPL アプリをインポートしてアクティブ化(デプロイ)したりするためのインターフェースが提供されます。

EPL アプリ ページの使用を希望するテナント上のユーザーは、CEP マネージャー である必要があります。権限の管理 を参照してください。

ステップ 1: ストリーミング分析アプリケーションを呼び出す

アプリケーション スイッチャーを開き、ストリーミング分析アプリケーションの アイコンをクリックします。次に、EPL アプリ ページに移動します。

EPL アプリ ページに移動すると、最初に EPL アプリ マネージャーが表示され、既存の EPL アプリがリストされます。各アプリケーションはカードとして表示されます。ここから新しい EPL アプリを追加したり、既存の EPL アプリを管理したりできます。

EPL Apps

アプリケーションに対して表示される各カードの上部には、アプリケーションを編集、ダウンロード、削除できるアクション メニューがあります。

このページからは、次のことができます。

  • 既存の EPL アプリを編集します。アクションメニューから 編集 コマンドを使用するか、アプリケーションに表示されているカードをクリックします。

  • 新しい EPL アプリを作成します。下記参照。

  • EPL アプリをインポートします。Things Cloudの外部でアプリを開発する場合(例えば、Apama Plugin for Eclipse を使用する)、トップメニューバーで EPL のインポート をクリックしてアップロードします。Apama モニター(*.mon)ファイルをアプリとして、ストリーミング分析アプリケーションに追加します。

  • EPLアプリをダウンロードします。アクションメニューから ダウンロード コマンドを使用して、アプリを *.mon ファイルとしてダウンロードします。

  • 既存の EPL アプリをデプロイします。アプリに対して表示されているカードで、モードを 非アクティブ から アクティブ に変更します。アプリケーションのデプロイ を参照してください。
    アプリをアクティブ化すると、構文エラーがあればすぐに報告されます。エラー状態がカードに表示されるので、アプリが良好な状態にあることを確認できます。エラーをクリックすると、問題の内容に関する情報が表示されます。構文エラーがある場合、アプリをアクティブ化することはできません。エラーは、修正されてアプリが再度アクティブ化されるまでカードに表示されます。

  • すべての EPL アプリをリロードします。トップメニューバーで 再読み込み をクリックして表示を更新すると、ページが読み込まれてから他のユーザーが行った変更(その間に発生したエラーも含む)が表示されます。

ステップ 2: EPLアプリを作成する

トップメニューバーで 新しい EPL アプリ をクリックします。表示される アプリの作成 ダイアログボックスが表示され、アプリに一意の名前を付けます。新しいアプリ用に作成されるカードに表示される説明を入力することもできます。OKをクリックします。

EPLエディタが表示されます。新しいアプリのEPLコードには、Things Cloudでの作業に必要な典型的な基本イベント定義およびユーティリティがすでに含まれています。アプリの必要に応じてそれらを調整できます。詳細については、ドキュメントとサンプルを参照してください。

備考
アプリ名を指定せずに キャンセル をクリックすると、EPL エディタも表示され、パンくずリストにデフォルト名の「新規」が表示されます。EPL コードを編集することはできますが、アプリ名を指定しないとアプリを保存できません。アプリ設定 をクリックし、表示されるダイアログボックスでアプリ名を指定します。

EPL editor

開始しやすいように、いくつかのサンプルが用意されています。これらを表示するには、エディタの右側に表示される サンプル をクリックします。サンプルをクリックすると、その内容のプレビューが表示されます。標準のキーの組み合わせ Ctrl+C および Ctrl+V キーを使用して、サンプル コードの一部を選択し、それを独自のコードにコピーできます。コマンドボタンを使用して、コード全体をクリップボードにコピーして独自のコードの適切な位置に挿入したり、既存のコードをすべてサンプル コードに置き換えたりすることもできます。

トップメニューバーのボタンを使用すると、現在のセッションでの最後の変更を元に戻したりやり直したり、変更を保存したりできます。

EPL エディタでモードを 非アクティブ から アクティブ(またはその逆)に変更することもできます。繰り返しますが、EPL コードにエラーがある場合、アプリをアクティブ化することはできません。コード内でエラーが強調表示されます。

備考
EPL エディタは、標準の Web コンポーネントを使用することに注意してください。これは、多くの一般的な開発者機能を提供しますが、その中には Quick Fix や Show Hover など、EPL に関連しないものも含まれますが、これらに限定されません。

トップメニューバーの閉じるアイコン をクリックして、EPL エディタを終了し、EPL アプリのリストに戻ります。

注意
別の URL に移動するか、ブラウザ ウィンドウを閉じると、保存されていない変更はすべて失われます。
ステップ 3: EPL アプリをテストする

アプリがアクティブになると、実行結果を確認できるようになります。これには、測定値の送信、データの受信、アラームの生成、およびApama-ctrlマイクロサービスへのログ記録が含まれる場合があります。Apama-ctrlマイクロサービスのログファイルを確認する方法については、マイクロサービスの監視をご覧ください。

アプリがアクティブになると、実行結果が表示されます。これには、メジャーメントの送信、データの受信、アラームの作成、Apama-ctrl マイクロサービスへのログインが含まれる場合があります。Apama-ctrl マイクロサービスのログ ファイルを確認する方法については、マイクロサービスの監視 を参照してください。 ユーザー ガイド の 管理 > マイクロサービスの管理と監視 をご覧ください。

アプリのデプロイも参照してください。

Apama Plugin for Eclipseを使用したアプリケーションの開発

Apama Plugin for Eclipse は完全な開発環境を提供し、複雑な EPL アプリケーションがある場合に最適なツールです。EPL アプリ(モニターファイル)の準備ができたら、それを Things Cloud にインポートする必要があります。

ステップ 1: Apama をインストールする

https://download.cumulocity.com/Apama/ から Apama の apama-c8y-dev パッケージをダウンロードし、解凍してインストールします。 これにより機能が制限され、いくつかの制限事項が課されたフリーミアム版 Apama Community Edition がインストールされます。すべての機能を使用するには、ライセンスが必要です。

ライセンスをお持ちの場合、ライセンスファイルを Apama の作業ディレクトリ(APAMA_WORK/license)にコピーしてください。

apama-c8y-dev パッケージには、Apama コミュニティエディションおよびライセンス版の両方で使用可能な Apama Plugin for Eclipse が含まれています。

ステップ 2: プロジェクトを作成する

インストールしたら、Apama Plugin for Eclipseで Apamaプロジェクトを作成し、Things Cloud 接続を有効にします。Apama プロジェクトの作成方法については、Apama ドキュメントの Apama プロジェクトの作成 を参照してください。

ステップ 3: Apama バンドルをプロジェクトに追加する

新しく作成した Apama プロジェクトに次の Apama バンドルを追加します。これらは、アプリをアクティブ化するために Things Cloud で必要となります。プロジェクトにバンドルを追加する方法については、Apamaドキュメントのプロジェクトへのバンドルの追加 を参照してください。

  • Things Cloud > Things Cloud のイベント定義
    Things Cloud とのデータの送受信に必要なイベント API を提供します。
  • Things Cloud > Things Cloud 用ユーティリティ
    Things Cloud から受信したデータを操作するためのヘルパー ユーティリティ関数を提供します。
  • 任意のエクストラクター
    any 型から値を抽出するためのサポートを提供します。
  • 時刻の形式
    Time Format プラグインのすべてのメソッドにアクセスするために必要です。時刻のフォーマットと解析時間に役立ちます。
  • HTTP クライアントの汎用イベント
    HTTP クライアント接続プラグインによって使用される、事前定義された汎用イベントを公開します。
  • ApplicationInitialized 時に自動
    これにより、起動時にすべての接続プラグインがすぐに開始されます。
  • HTTP クライアント > 一般的なリクエスト/レスポンス イベント定義を含む JSON
    EPL アプリが HTTP コールを行うことを許可します。
  • Things Cloud > Things Cloud クライアント
    Things Cloud クライアントを EPL アプリに公開します。

上記のバンドルは EPL アプリで許可される唯一のバンドルであるため、他のバンドルを追加しないように注意してください。追加しないと、Things Cloud でアクティブ化されたときにアプリが機能しなくなる可能性があります。

ステップ 4: モニターファイルを作成する

新しい Apama モニターファイルを作成するには、新しいモニターファイルの作成 を参照してください。

新しく作成したモニターファイルを EPL アプリとして Things Cloud にインポートし、そこでアクティブ化する前に、モニターファイルが Apama Plugin for Eclipse 内から期待通りに動作するかどうかをテストすることをお勧めします。

詳細については、Apama ドキュメントの The Cumulocity Transport Connectivity Plug-in を参照してください。

ステップ 5: モニターファイルを実行してテストする

プロジェクトをローカルで実行する場合、プロジェクト構成で Things Cloud の資格情報を指定する必要があります。Things Cloud クライアントの CumulocityIoT.properties ファイルに資格情報を構成します。

例:

CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.je1.thingscloud.ntt.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
備考
CUMULOCITY_APPKEY の値を取得するには、Things Cloud でアプリケーションを作成 する必要があります。

上記の説明は、URL がテナントを識別するテナントに接続していることを前提としていることに注意してください。これが当てはまらない場合(例えば、IP アドレスで接続している場合)、CumulocityIoT.properties ファイルでこれを設定する必要がある場合があります。

CUMULOCITY_TENANT=my_custom_tenant

プロジェクトをマルチテナント環境でローカルに実行する必要がある場合、マルチテナントのサポートを有効にし、使用するマルチテナント マイクロサービスの名前を指定します。Things Cloud クライアントの CumulocityIoT.properties ファイルで、次のプロパティを構成します。

# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true

# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms

さらに、モニターファイルがマルチテナント マイクロサービスで動作できることを確認してください。 詳細については、Apama ドキュメントの マルチテナント デプロイの操作 を参照してください。

これで、Apama Plugin for Eclipse で EPL のテストに進むことができます。

EPL アプリが準備できたら、アプリのデプロイ を参照して、Things Cloud にデプロイする方法を確認してください。

アプリケーションのデプロイ

以下を、Things Cloud にデプロイすることができます。

備考
ストリーミング分析アプリケーションでは、アプリケーションのデプロイに「アクティブ化」という用語が使用されます。

EPL アプリを、ストリーミング分析アプリケーションを使用して単一の *.mon ファイルとしてデプロイ

EPL アプリ(*.mon ファイル)が Things Cloud でアクティブ化されると、*.mon ファイルには一意のパッケージ名が割り当てられます。これにより、複数のモジュールがアクティブ化された場合の競合が防止されます。このため、*.mon ファイルに package ステートメントを指定しないでください。アプリケーションの異なる部分間でイベントを共有する必要がある場合、イベント定義とそれを使用するモニターを単一の *.mon ファイルに記述します。

EPL アプリで使用できるユーティリティと基本イベントのセットは制限されています。執筆時点では、これらには Time Format バンドルと HTTP Client > JSON with generic request/response event definitions バンドルが含まれています。

EPL アプリがランタイム エラーを通知すると、アラームとして発生します。ランタイム エラーには、キャッチされなかった例外のほか、EPL アプリが実行する必要がある警告やエラーの明示的なログが含まれます。一般的に、Apama ランタイムに関連する動作や状態の問題もアラームとして発生します。

Apama ランタイムやアクティブな EPL アプリの詳細な診断については、Apama-ctrl マイクロサービスのログを確認できます。ログファイルの詳細については、マイクロサービスの監視 を参照してください。ただし、Apama のログファイルを最大限に活用するには、Apama についてある程度の知識が必要です。

Apama アプリケーションをマイクロサービスとしてデプロイ

Apama Plugin for Eclipseを使用すると、次のようなより複雑なプロジェクトを開発することもできます。

  • 複数の *.mon ファイルに分散している
  • 他の Apama アプリケーションから分離する必要がある
  • デフォルトでは有効化されていない接続プラグインまたは EPL プラグインを使用している

このような種類のアプリケーションは、Things Cloud にマイクロサービスとしてデプロイする必要があります。

マイクロサービス マニフェストの必須設定

マイクロサービス マニフェストは、Things Cloud におけるマイクロサービス インスタンスとアプリケーションのデプロイを管理するために必要な設定を提供します。詳細については、マイクロサービスマニフェスト を参照してください。

Apama は、単一テナント マイクロサービスでも、マルチテナント マイクロサービスでも使用できます。 したがって、マイクロサービス マニフェストでは分離レベルを PER_TENANT または MULTI_TENANT に設定する必要があります。 Apama をマルチテナント マイクロサービスで使用する場合、Apama アプリケーションはマルチテナント対応として記述する必要があります。 詳細については、Apama ドキュメントの マルチテナント デプロイの操作 を参照してください。

EPL から Things Cloud トランスポートのすべての機能を起動して使用するには、マイクロサービスに次の権限が必要です。これらは、マイクロサービス マニフェストの requiredRoles で設定されています。

  • ROLE_APPLICATION_MANAGEMENT_READ
  • ROLE_INVENTORY_READ
  • ROLE_INVENTORY_ADMIN
  • ROLE_INVENTORY_CREATE
  • ROLE_MEASUREMENT_READ
  • ROLE_MEASUREMENT_ADMIN
  • ROLE_EVENT_READ
  • ROLE_EVENT_ADMIN
  • ROLE_ALARM_READ
  • ROLE_ALARM_ADMIN
  • ROLE_DEVICE_CONTROL_READ
  • ROLE_DEVICE_CONTROL_ADMIN
  • ROLE_IDENTITY_READ
  • ROLE_OPTION_MANAGEMENT_READ
  • ROLE_BULK_OPERATION_READ
  • ROLE_SMS_ADMIN
備考
上記は、カスタム Apama マイクロサービスに必要な最小限の権限リストです。カスタム マイクロサービスを開発している場合、マイクロサービス マニフェストにさらに権限を追加できます。
Apama アプリケーションをマイクロサービスとしてデプロイする
  1. Apama Plugin for Eclipse で、通常通りアプリケーションを開発します。

  2. Apama の Docker サポートを使用して、プロジェクト全体をマイクロサービスに変換できます。Project Explorer ビューでプロジェクトを右クリックし、Apama > Add Docker Support を選択すると、プロジェクトディレクトリのルートに Dockerfile が追加されます。 ビルドに使用する場合、Docker Hub で利用可能な Apama イメージが使用されます。Apama イメージにアクセスするための Docker Hub 認証情報が必要です。Apama Docker イメージは、 Linux ベースのみです。

  3. カスタムプラグインのビルドやライセンスファイルのイメージへのコピーなど、必要なカスタム手順を Dockerfile に追加します。

  4. プロジェクトのパッケージ化とデプロイには、Things Cloud マイクロサービス ユーティリティツールを使用します。詳細については、マイクロサービス ユーティリティツール を参照してください。マイクロサービス ユーティリティツールが構築するディレクトリ構造を作成する際は、プロジェクト ディレクトリ全体を「docker/」という名前でそのディレクトリにコピーします。

    例:
    docker/monitors/
    docker/eventdefinitions/
    docker/Dockerfile
    docker/…
    cumulocity.json

    マイクロサービス マニフェスト は手動で作成する必要がありますが、マイクロサービス マニフェストに特別な設定やプローブは必要ありません。ただし、liveness または readiness プローブを構成する場合、ポート 15903(Apama のデフォルトポート)のパス /pinghttpGet プローブを構成できます。Apama アプリケーションは通常ステートフルであり、入力データを自動的に分割しないため、自動スケーリングを有効にすることは推奨されません。

    このディレクトリからパッケージ、デプロイ、サブスクライブを行うと、Apama アプリケーションが実行中のマイクロサービスになります。Things Cloud の外部 (Apama Plugin for Eclipse またはテスト環境)でアプリケーションを実行した場合の動作は、Things Cloud 内での動作とほぼ同じになります。Things Cloud API へのリクエストを実行するマイクロサービスとしてデプロイされた場合、Apama はデプロイ先のテナントに接続するための資格情報を自動的に取得し、Apama に提供された他の資格情報を上書きします。ただし、リアルタイム イベントを受信する場合、外部の Apama 環境から Things Cloud に接続する場合と同様に、プロジェクト設定で有効な資格情報を指定する必要があります。

  5. Things Cloud にデプロイする準備ができたら、アプリケーションをマイクロサービスとしてアップロードします。詳細については、マイクロサービスの管理 を参照してください。

備考
2024 年 12 月以降、サポートされているすべてのリリース トレインの Docker イメージの場所が変更されました。Docker Hub ではなく、Amazon ECR Public Gallery で利用できるようになります。以前の場所のイメージを引き続き使用している場合は、移行する必要があります。
重要

Apama 10.15.0 では、いくつかの新しいコンテナ イメージが導入され、既存のコンテナ イメージの一部はコンテンツが変更されています。2024 年 12 月現在、これらのイメージは Amazon ECR Public Gallery から提供されます。 Things Cloud マイクロサービスとして使用するためのイメージをビルドする場合、以前のリリースとは異なります。 public.ecr.aws/apama/apama-cumulocity-jre イメージを、ビルダーイメージとして public.ecr.aws/apama/apama-cumulocity-builder イメージと組み合わせて使用​​する必要があります。 10.15.0 以前のバージョンで、Apama Plugin for Eclipse によって作成されたデフォルトのプロジェクト Dockerfile でこれを行うには、Dockerfile 内の FROM 行を適切に変更するか(これは一度だけ実行する必要がある)、次のフラグを使用してビルドする必要があります(これは毎回実行する)。

--build-arg APAMA_BUILDER=public.ecr.aws/apama/apama-cumulocity-builder:10.15 --build-arg APAMA_IMAGE=public.ecr.aws/apama/apama-cumulocity-jre:10.15

アプリのテスト

GitHub の Apama EPL Apps Tools を使用して、EPL アプリのアップロードをスクリプト化し、CI/CD(継続的インテグレーションおよび継続的デリバリー)ユースケースに合わせて管理できます。このツールは、PySys テスト フレームワークへの拡張機能も提供し、EPL アプリのテストを簡単に作成して自動的に実行できるようにします。

Apama EPL Apps Tools は https://github.com/Cumulocity-IoT/apama-eplapps-tools から入手できます。EPL アプリ ツールのドキュメント で詳細情報をご確認ください。

PySys の詳細については、Apama ドキュメントからアクセスできる Python の API リファレンス を参照してください。

サポートされている REST サービス

EPL アプリは、REST(Representational State Transfer)サービスを監視するように設計されており、すべての GET、POST、PUT、DELETE 操作をサポートします。さまざまな操作のリクエストの例を、次に示します。

これらの操作を実行するには、「CEP 管理」に対する読み取りおよび管理者権限が必要です(詳細は、権限の管理 参照)。

すべての操作のリクエスト ヘッダー

各リクエストは、Things Cloud に対して認証される必要があります。

名前 説明
Accept 「application/json」これは必須のパラメーターです

一般的なレスポンス コード

すべてのリクエストに対して、次の一般的なエラー レスポンスコードが予想されます。

コード 説明
401 リクエストが認可されていません
403 禁止されています。EPLアプリはApama-ctrl-starterマイクロサービスでは利用できません

特定のリクエストから予想されるその他のレスポンス コードを、次に示します。

一般的なフィールドの説明

操作に応じて、レスポンスでは次の一般的なフィールドを使用できます。

フィールド 説明
contents EPL ファイルの完全な内容
description ファイルの説明
eplPackageName EPL ファイルのパッケージ名。名前に特殊文字(スペースを含む)が含まれている場合、
これらの文字は有効な EPL 識別子にするためにエスケープされ、注入エラーが回避されます
errors ファイル内のすべてのコンパイル エラー(存在する場合)と行番号およびテキストのリスト
id ファイルの一意の識別子
name EPL に付けられた名前
state EPL がコレレータに注入され、実行されているかどうか。
これには active または inactive のいずれかになります
warnings ファイル内のすべてのコンパイル警告のリスト(存在する場合)と行番号およびテキスト

GET - 利用可能なすべての EPL ファイルを取得する

エンドポイント: /service/cep/eplfiles

リクエスト例

GET /service/cep/eplfiles

レスポンス
コード 説明
200 操作は成功しました。以下の値の例もご覧ください
400 不正なリクエストです。ヘッダーの内容に予期しない値が含まれています

レスポンスコード 200 の値の例:

{
  "eplfiles":[
    {
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[

      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[

      ]
    }
  ]
}

GET - 利用可能なすべての EPL ファイルをその内容とともに取得する

エンドポイント: /service/cep/eplfiles

リクエスト パラメーター
名前 説明
contents ブール型。EPL ファイルをその内容とともに取得します。これは、任意のクエリ パラメーターです
リクエスト例

GET /service/cep/eplfiles?contents=true

レスポンス
コード 説明
200 正常に動作しました。以下の値の例もご覧ください

レスポンスコード 200 の値の例:

{
  "eplfiles":[
    {
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
    }
  ]
}

GET - 識別子によって EPL ファイルを取得する

エンドポイント: /service/cep/eplfiles/{id}

リクエスト パラメーター
名前 説明
id 取得する EPL ファイルの識別子。これは、必須のパラメーターです
リクエスト例

GET /service/cep/eplfiles/{{id}}

レスポンス
コード 説明
200 操作は成功しました。以下の値の例もご覧ください
404 識別子を持つファイルが見つかりません。
このセクションの最後にある このレスポンスコードの値の例 もご覧ください

レスポンスコード 200 の値の例:

{
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
}

POST - 新しいEPLアプリケーションを作成する

エンドポイント: /service/cep/eplfiles

リクエスト例

POST /service/cep/eplfiles

リクエスト本文の例:

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

次の点に注意してください。

  • name はファイルのパッケージに使用され(したがって、EPL ファイルには package ステートメントを含めることはできません)、すべての EPL ファイル間で一意である必要があります。名前にはプレフィックスが付けられ、特定の文字はエスケープされます。使用される実際のパッケージ名は、便宜上 eplPackageName フィールドに返されます(これをマイクロサービス ログファイルで検索して、ログ ステートメントを見つけることができます)。
  • 安全にエスケープされた contents を必ず提供してください。
  • description は任意であり、空にすることもできます。

レスポンス
コード 説明
201 正常に作成されました / ファイルにエラーが発生して作成されました /
ファイルに警告が発生して作成されました。以下の例もご覧ください
405 入力に誤りがあります

正常に作成された場合のレスポンスコード 201 の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[
  ]
}

警告またはエラーが発生した場合のレスポンスコード 201 の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

PUT - 識別子によって EPL ファイルを更新する

エンドポイント: /service/cep/eplfiles/{id}

リクエスト パラメーター
名前 説明
id 更新するEPLファイルの識別子。識別子はパスに含める必要があります。これは必須のパラメータです
リクエスト例

PUT /service/cep/eplfiles/{id}

リクエスト本文の例:

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

POST リクエストについての情報も参照してください。

レスポンス
コード 説明
200 正常に更新されました。以下の値の例もご覧ください
404 識別子を持つファイルが見つかりません。
このセクションの最後にある このレスポンスコードの値の例 も参照してください

エラーなしで正常に更新された場合のレスポンスコード 200 の値の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[
  ]
}

エラーまたは警告で更新された場合のレスポンスコード 200 の値の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

DELETE - 識別子によって EPL ファイルを削除する

エンドポイント: /service/cep/eplfiles/{id}

リクエスト パラメーター
名前 説明
id 削除する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須のパラメータです
リクエスト例

DELETE /service/cep/eplfiles/{{id}}

レスポンス
コード 説明
200 正常に削除されました
404 識別子を持つファイルが見つかりません。
このセクションの最後にある このレスポンスコードの値の例 も参照してください

レスポンスコード 404の値の例

レスポンスコード 404 は、特定の識別子を持つファイルが見つからなかったことを示します。

{
  "error":"Not Found",
  "exception":"com.apama.in_c8y.FileNotFoundException",
  "message":"File with id 39613 not found",
  "path":"/eplfiles/39613",
  "status":404,
  "timestamp":"2020-01-17T12:21:42.457+0000"
}
  • error: エラーメッセージ
  • exception: 発生した例外を指定します
  • message: 例外メッセージの説明
  • path: リクエストされたパス
  • status: アプリケーションのステータス
  • timestamp: ISO形式のタイムスタンプ

イベントとチャネル

Apama EPL では、残りの Things Cloud エコシステムとのやり取りはイベントを通じて行われます。Things Cloud データにアクセスするために、多数のイベント定義が提供されています。

備考
Apama と Things Cloud は異なる「イベント」概念を使用します。Apama イベントは、デバイスのメジャーメント、アラーム、(Things Cloud)イベントの監視と作成など、Things Cloud とのすべてのやり取りに使用されます。Apama イベントの詳細については、Apama ドキュメントの イベントタイプの定義 を参照してください。Things Cloud イベントの詳細については、Things Cloud OpenAPI仕様内の イベント を参照してください。

定義済みのイベント タイプ

いくつかの Things Cloud API と対話するための、事前定義されたイベント タイプがいくつかあります。新しいメジャーメント、アラーム、イベントが作成されると、イベントは自動的に Apama アプリケーションに送信されます。Things Cloud バックエンドと対話するために、イベントを作成して関連するチャネルに送信できます。Things Cloud は、データベース クエリを自動的に実行するか、メールや SMS などの送信に必要な API コールを作成します。

EPL の API リファレンス(ApamaDoc)の データ モデル を参照して、各ストリームのイベントがどのように構造化されているかを確認してください。

イベントをチャネルに送信する

イベントの送信は、new <type> に続いてフィールドへの代入を使用するか、すべてのフィールドを指定するコンストラクターを使用してイベントを構築することによって行われます。次に、send ステートメントを使用してイベントを Things Cloud に送信します。send ステートメントにはチャネルが必要です。これはイベントタイプの SEND_CHANNEL 定数です。

イベントをリッスンする

チャネル上のイベントを監視することで EPL をトリガーできます。monitor.subscribe("string name") メソッドを使用してチャンネルに登録できます。これは、モニターの起動時に実行することも、時々イベントを受信する必要がある場合のみ、必要に応じて呼び出され、続いて monitor.unsubscribe("string name") を実行することもできます。

on ステートメントを使用してイベントを監視し、その後に監視しているイベントタイプ、開閉括弧、および as <identifier> を使用してイベントを保持する変数に名前を付けます。

デフォルトでは、リスナーは 1 回起動します。すべてのイベントに対して繰り返すには、イベントタイプの前に all キーワードを使用します。

フィルター

フィルターを追加するには、リスナーの括弧の間に 1 つ以上のフィールドを指定します。最上位フィールドのみをフィルタリングできます。より複雑なフィルタリング、またはイベントのサブプロパティ (辞書など)でのフィルタリングには、if ステートメントを使用します。

標準イベントタイプとチャネル

標準の Things Cloud イベントの場合、イベントを送受信するためのチャネルを含む定数があります。次に例を示します。

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;

次の表にリストされているイベントは、com.apama.cumulocity パッケージの一部です。

イベント 送信用チャネル 受信用チャネル
Operation Operation.SEND_CHANNEL Operation.SUBSCRIBE_CHANNEL
Measurement Measurement.SEND_CHANNEL Measurement.SUBSCRIBE_CHANNEL
Event Event.SEND_CHANNEL Event.SUBSCRIBE_CHANNEL
Alarm Alarm.SEND_CHANNEL Alarm.SUBSCRIBE_CHANNEL
ManagedObject ManagedObject.SEND_CHANNEL ManagedObject.SUBSCRIBE_CHANNEL
MeasurementFragment MeasurementFragment.SEND_CHANNEL MeasurementFragment.SUBSCRIBE_CHANNEL

メジャーメント フラグメント

Measurement および MeasurementFragment イベントは常に公開されます。

EPL では、Measurement イベントではなく MeasurementFragment イベントの内容に一致するリスナーを生成できます。

例:

on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}

メジャーメント フラグメント を参照してください。

作成通知と更新通知の区別

{< product-c8y-iot >}} からの AlarmEventManagedObjectOperation イベントを監視する場合、作成操作と更新操作を区別することが必要な場合があります。これらのイベントタイプにはそれぞれ、この目的のために isCreate() および isUpdate() という名前のアクションがあります。

新しいアラームを監視する例:

on all Alarm() as alarm {
    if alarm.isCreate() {
        log "Alarm created: " + alarm.toString() at INFO;
    }
    // else it's an update
}

同様に、更新されたアラームの場合のみ:

on all Alarm() as alarm {
    if alarm.isUpdate() {
        log "Alarm updated: " + alarm.toString() at INFO;
    }
    // else it's a create
}

Things Cloud からのイベントの場合、isUpdate()またはisCreate() のいずれかが常に true を返します。どちらのアクションも、選択肢と読みやすさを考慮して提供されています。

さまざまなタイプのオブジェクトの例などの詳細については、Apama ドキュメントの 更新通知の受信 を参照してください。

isCreate() および isUpdate() アクションの詳細については、API Reference for EPL (ApamaDoc) も参照してください。

この例では、com.apama.cumulocity.MeasurementFragment API を使用して新しいメジャーメントを監視します。受信したメジャーメントをフィルタリングして、指定された最大速度を超える速度値を見つけ、制限を超えた場合にアラームを発します。

  1. MeasurementFragment.SUBSCRIBE_CHANNEL チャネルに登録します。
  2. メジャーメント フラグメントを監視し、type、つまり c8y_SpeedMeasurement でフィルターします。valueFragment の値が c8y_speed であること、および valueSeriesspeedX のみでフィルターされていることを確認してください。また、valueSPEED_LIMIT より大きい場合は value でフィルターします。
  3. すべてのフィールドを指定するコンストラクターを使用して、イベントを作成します。
  4. イベントを正しいチャネル(Alarm.SEND_CHANNEL)に送信します。

結果の *.mon ファイルは、次のようになります。

using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;

monitor TriggerAlarmForSpeedBreach {
    constant float SPEED_LIMIT := 30.0;
    action onload() {
        monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
        // Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
        on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
            send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
                        "Speed limit breached", "ACTIVE", "CRITICAL", 1,
                        new dictionary<string,any>) to Alarm.SEND_CHANNEL;
        }
    }
}

組み込みアクション

概要

Apama EPLでは「アクション」と呼ばれる機能を利用することができます。すべてのモニターには少なくとも 1 つのアクション、つまり onload アクションがあります。このセクションでは、すぐに使用できる組み込みのアクションについて説明します。

組み込みタイプのアクションについては、EPLのAPIリファレンス (ApamaDoc) を参照してください。

Things Cloud データのクエリ

履歴データを操作するには、次のリクエストとレスポンスのイベント ペアのいずれかを使用してリソースを検索します。

例:アラームを検索するには、適切なクエリ パラメーターを指定して com.apama.cumulocity.FindAlarm リクエスト イベントを FindAlarm.SEND_CHANNEL チャネルに送信します。レスポンスとして、0 個以上の com.apama.cumulocity.FindAlarmResponse イベント(検索リクエストに一致するリソースの数によって異なります)と、com.apama.cumulocity.FindAlarmResponseAck イベントが FindAlarmResponse.SUBSCRIBE_CHANNEL チャネルに期待されます。マネージドオブジェクト、イベント、メジャーメント、操作を検索するための同様の機能も提供されます。

次の表にリストされているイベントは、com.apama.cumulocity パッケージの一部です。

検索対象 リクエスト / レスポンス イベント
ManagedObject FindManagedObject
FindManagedObjectResponse
FindManagedObjectResponseAck
Alarm FindAlarm
FindAlarmResponse
FindAlarmResponseAck
Event FindEvent
FindEventResponse
FindEventResponseAck
Measurement FindMeasurement
FindMeasurementResponse
FindMeasurementResponseAck
Operation FindOperation
FindOperationResponse
FindOperationResponseAck
CurrentUser CurrentUser
GetCurrentUser
GetCurrentUserResponse
TenantOption TenantOption
FindTenantOptions
FindTenantOptionsResponse
ドキュメント

Things Cloud REST API の他の部分の呼び出し

Things Cloud REST API は、個々のイベントタイプではカバーされていない、いくつかの追加機能をカバーします。REST API の他の部分を呼び出すために、Things Cloud API の任意の部分を呼び出すために使用できる汎用のリクエスト / レスポンス API が提供されています。

次のリクエスト / レスポンス イベントを使用できます。

  • com.apama.cumulocity.GenericRequest
  • com.apama.cumulocity.GenericResponse
  • com.apama.cumulocity.GenericResponseComplete
備考
Apama-ctrl マイクロサービス、その中のすべての EPL アプリコードは、EPL がインベントリ内のすべてのオブジェクトにアクセスし、ユーザーの詳細を読み取ることを許可する多数の権限で実行されます。これには、ユーザー名、メールアドレスなどの個人を特定できる情報が含まれます。

詳細については、Things Cloud OpenAPI仕様にあるREST実装 および Apama ドキュメントの「Invoking other parts of the Cumulocity REST API」を参照してください。

HTTP サービスの呼び出し

備考
次の情報は、外部 HTTP サービスとの対話に関するものです。Things Cloud REST API の他の部分にリクエストを行う場合、Things Cloud REST API の他の部分の呼び出し を参照してください。 他のマイクロサービスを含む、プラットフォーム上のその他の部分へのリクエストについては、Apama を他のマイクロサービスに接続 を参照してください。

REST および JSON を使用して HTTP サービスと対話するには、次のいずれかのファクトリ メソッドを使用して com.softwareag.connectivity.httpclient.HttpTransport インスタンスを作成します。

  • HttpTransport.getOrCreate(string host, integer port) は HttpTransport を返します
  • HttpTransport.getOrCreateWithConfiguration(string host, integer port, dictionary <string, string> configurations) は HttpTransport を返します(構成ディクショナリ内のキーは CONFIG_ 接頭辞が付いた HttpTransport の定数です)

HttpTransport オブジェクトで、create メソッドの 1 つを呼び出し、必要に応じてパスとペイロードを渡し、Request オブジェクトを生成します。

Request オブジェクトでは、必要に応じて cookie、ヘッダー、クエリ パラメーターを設定し、execute(action<Response> callback) でリクエストを呼び出すことができます。コールバックのモニターにアクションの名前を指定すると、リクエストが完了した(またはタイムアウトした)ときに Responseと共にそのアクションが呼び出されます。

コールバックでは、Response オブジェクトに statusCodepayload が指定されます。ペイロードのフィールドには、それが提供される com.apama.util.AnyExtractor オブジェクトを介してアクセスできます。次の「フラグメントにアクセスする」に関する情報を参照してください。

さらなる詳細は、EPLのAPIリファレンス (ApamaDoc)を参照してください。

ユーティリティ関数

フラグメントにアクセスする

params ディクショナリを介して、ほとんどのイベントのフラグメントにアクセスできます。AnyExtractor オブジェクトは、複数のサブフラグメントを含む任意のオブジェクトからデータを抽出して以下にアクセスできるように構築できます。

  • アクション getInteger(string path) :整数を返します

  • アクション getFloat(string path) :浮動小数点数を返します

  • アクション getString(string path) :文字列を返します

  • アクション getBoolean(string path) :ブール値を返します

  • アクション getSequence(string path) :シーケンス <any> を返します

  • アクション getDictionary(string path) :ディクショナリ <any, any> を返します

JSON パスを使用してオブジェクト構造内を移動できます。

例:

string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");

“fragment” の例: “c8y_TemperatureMeasurement”.
“sub.fragment.object” の例: “c8y_TemperatureMeasurement.T.Unit”.

「any」値をキャストする

あるいは、キャストを使用してanyを特定の型に変換します。

string s := <string> measurement.params["strfragment"];

オブジェクトの型が異なる場合、キャスト操作がスローされることに注意してください。

currentTime と TimeFormatter

読み取り専用変数 currentTime を使用して、現在のサーバー時刻を取得できます。Apama は、Unix エポック(UTC 1970 年 1 月 1 日)からの秒数を使用して時間を処理します。TimeFormat オブジェクトを使用すると、人間が判読できる形式に簡単に変換できます。TimeFormat オブジェクトは、日付と時刻の書式設定や解析に使用できます。

例:

using com.apama.correlator.timeformat.TimeFormat;

monitor Example {
    action onload {
        log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
    }
}

TimeFormatとその関数に関する詳細については、Apama ドキュメントの TimeFormat イベントライブラリの使用 および「EPLのAPIリファレンス(ApamaDoc)」を参照してください。

inMaintenanceMode

Util.inMaintenanceMode() 関数は、デバイスが現在メンテナンスモードであるかどうかを簡単に確認する方法です。マネージドオブジェクトをパラメーターとして受け取り、デバイスがメンテナンスモードの場合は true となるブール値を返します。

例:

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;

using com.apama.cumulocity.Util;

monitor ExampleMonitor {
  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
    monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
    on all Measurement() as m {
      integer reqId := integer.getUnique();
      send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
      on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
        if not Util.inMaintenanceMode(d.managedObject) {
          send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
        }
      }
    }
  }
}

プレースホルダーを置き換える

文字列を構築するには、次のように連結を使用できます。

string s:= "An event with the text " + evt.text + " has been created.";

テキストが長くなり、データから動的に設定される値が増える場合、Util.replacePlaceholders() 関数を使用できます。テキスト文字列では、イベントのフィールド名でプレースホルダーをマークし、それを #{} で囲みます。replacePlaceholders の 2 番目のパラメーターには、任意のイベントタイプを指定できます。

Utils::replacePlaceholders は、イベントまたはイベントのパラメーターで指定されたフィールド名を検索して、テキストの代替を生成します。タイプ #{X.Y} のフィールド名を使用して、イベント内のネストされた構造にアクセスできます。

myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);

置換文字列が #{source.name} のような形式である場合、source.name は基礎となるマネージドオブジェクト / デバイスの名前です。または、#{source.c8y_Hardware.notes} のような形式(c8y_Hardware はマネージドオブジェクトのフラグメント)である場合、交換するには特別な処理が必要になります。最初の置換後、プレースホルダー フィールド名を更新し、ソースの managedObject を使用して Util::replacePlaceholders を再度実行する必要があります。

myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);

高度な機能

カスタムフラグメント

Things Cloud API を使用すると、データを自由に構造化できます。Apama EPL では、これは、タイプ dictionary<string, any> 型の params に、エントリを追加することによって行われます。com.apama.cumulocity パッケージ内の各 Things Cloud イベント(AlarmEventMeasurementOperationなど)には、params フィールドがあります。これは、フラグメントまたはオプションのフィールドに変換されます。したがって、イベントを受信するとき、コードは params フィールド内のエントリを検索する必要があります。イベントを送信するときは、イベントタイプを定義することによって行うことも、dictionary<string, any> タイプを使用することもできます。イベントを受信するときの EPL タイプは dictionary<any, any> です。EPL は厳密に型指定されているため、フラグメントのないイベントを作成する場合、new dictionary<string, any> 式が必要になることに注意してください。辞書リテラルを使用してインラインでエントリを提供する場合、EPL は最初のキーと値のペアの型に基づいて型を決定します。つまり、dictionary<string, any> の場合、<any> キャスト演算子を使用して最初の値を any 型にキャストします。

send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;

MeasurementValue タイプは、Measurement タイプのメジャーメントに対して提供されます。MeasurementValue には、value フィールドと unit フィールド、および他のフラグメントの params があります。

例 1

send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
	"c8y_TemperatureMeasurement":{
		"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
		"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
		"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
		"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
		"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
	}},
	new dictionary<string,any>) to Measurement.SEND_CHANNEL;

これにより、次の JSON 構造が生成されます。

{
	"type": "c8y_TemperatureMeasurement",
	"time": "...",
	"source": {
		"id": "12345"
	},
	"c8y_TemperatureMeasurement": {
		"T1": {
			"value": 1,
			"unit": "C"
		},
		"T2": {
			"value": 1,
			"unit": "C"
		},
		"T3": {
			"value": 1,
			"unit": "C"
		},
		"T4": {
			"value": 1,
			"unit": "C"
		},
		"T5": {
			"value": 1,
			"unit": "C"
		},
	}
}

メジャーメント フラグメント

メジャーメントは、個々のメジャーメント フラグメントに分割できます。これは、メジャーメント内に存在する各フラグメントおよびシリーズに対して実行できます。メジャーメント フラグメントの詳細については、Things Cloud のドメインモデル を参照してください。

メジャーメント フラグメントまたはシリーズに基づいたフィルタリングが必要な場合は、com.apama.cumulocity.MeasurementFragment タイプのイベントを監視します。com.apama.cumulocity.Measurement イベントを監視して、measurements ディクショナリ内を調べる代わりになります。詳細については、Apamaドキュメントのメジャーメント フラグメントの使用 を参照してください。

リスナー

受信したイベントによってステートメントをトリガーすることが唯一の可能性ではありません。次のセクションでは、リスナーを組み合わせる他の方法について説明します。詳細については、Apamaドキュメントの Defining Event Listeners を参照してください。

フィルター

フィルターを使用すると、他のトリガーの組み合わせまたはシーケンスによってトリガーできます。例えば、次のようなトリガーがあるとします。

on all Event() as e { ... }

パターンにフィルターを追加することもできます。

on all Event(type = "c8y_EntranceEvent") as e { ... }

複数のイベントを監視できます。

on Event() as e and Alarm() as a { ... }

これは、イベントとアラーム イベントを受信するとトリガーされ、それぞれの最初のイベントがキャプチャされます。

シーケンスによってトリガーすることもできます。

on all (Event() as e -> Alarm() as a) { ... }

これは、「アラームに続くイベント」のペアごとにトリガーされます。イベントを受信すると、それ以降のイベントの待機を停止し、代わりにアラームの待機を開始します。アラームを受信すると、再びイベントの監視が開始されます。

タイマー

時間に基づいてリスナーをトリガーすることもできます。特定の間隔でトリガーすることもできます。例えば、5 分(300 秒)ごとにトリガーすることもできます。

on all wait(300.0) { ... }

または、Unix の cron スケジューラと同様の機能を使用して、1 日の特定の時間にリスナーを起動させることもできます。

// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59

on all at(*, *, *, *, *) {} // trigger every minute

on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month

タイマーパターンを、他のパターンと組み合わせることもできます。例えば、別のイベント後の一定時間内にイベントがあったかどうかを確認できます。

on Event() -> wait(600.0) and not Alarm() { ... }

これは、イベントが発生し、10 分(600 秒)以内にアラームが発生しない場合にトリガーされます。イベントが発生した場合にリスナーを終了する not の使用に注意してください。

テナント オプションを使用して、on all at タイマーに使用されるタイムゾーンを設定できます。テナント オプションを設定するには、microservice.runtime カテゴリと timezone キーを指定します。

例:

{
    "category" : "microservice.runtime",
    "key" : "timezone",
    "value" : "Europe/Warsaw"
}

このドキュメントの タイムゾーン変数 や、Apama ドキュメントのサポートされているタイムゾーン も参照してください。

備考
このテナント オプションは、マイクロサービスの開始時にのみ読み取られます。テナント オプションが変更された場合、マイクロサービスは次のマイクロサービス サブスクリプションでのみ、これを選択します。

ストリーム - ウィンドウ

ストリームを使用すると、イベントのウィンドウを操作できるようになります。ストリームは、on の代わりに from キーワードを使用し、操作対象のウィンドウを定義し、集計を使用してそのウィンドウから必要な出力を選択します。ウィンドウ は次の 2 つの方法で制限できます。

  1. 一定期間の ウィンドウ - within キーワードを使用します。

    from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements	["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    
  2. 一定量のイベントがあるウィンドウ - retain キーワードを使用します。

    from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    

ストリーム - 定期的に出力する

ストリームは、every 指定子を使用して評価の頻度を制御することもできます。

// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }

// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }

// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }

組み込み集計関数 については、Apamaドキュメントを参照してください。

独自のイベントタイプの作成

事前定義されたイベントタイプだけでなく、独自のイベントタイプを定義することもできます。これらは、同じモジュールの他の部分をトリガーする発生するイベントのパターンを検出するのに役立ちます。

event MyEvent {
	Measurement m1;
	Measurement m2;
}

...

on Measurement() as m1 -> Measurement() as m2 {
	route MyEvent(m1, m2);
}
備考
Things Cloud は各モジュールを独自の名前空間にデプロイするため、あるモジュールのイベント定義を他のモジュールで使用することはできません。これにより、モジュール間の依存関係が防止されます。

独自のアクションの作成

通常、次の例に示すように、アクション(Java の関数とよく似ています)を使用してモニターを構築します。

指定された重大度を上げる:

action upgradeSeverity(string old) returns string {
	if old = "WARNING" { return "MINOR"; }
	if old = "MINOR"   { return "MAJOR"; }
	if old = "MAJOR"   { return "CRITICAL"; }
	return old;
}

2 つの地理座標間の距離を計算します。

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

変数

モジュール内で変数を定義できます。

string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];

モニタースコープ変数を定義する場合(モニター内であるが、モニターのどのアクション内にも存在しない場合)、リスナーでのイベント共同割り当ての際、as の代わりにコロン(:)を使用すると、リスナーで使用できます。次の例では、10 秒ごとに最新のイベントを記録します。

monitor MyMonitor {
	// monitor scope:
    Event e;
    action onload() {
        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        on all Event():e {}
        on all wait(10.0) {
            log e.toString();
        }
    }
}

リスナーは開始時に、すべてのローカル変数のコピーを取得します。したがって、次の例では、間に他のイベントがあった場合でも、10 秒の遅延後に各イベントをログに記録します。

monitor MyMonitor {
	// monitor scope:
    Event e;
    action onload() {
      monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
      on all Event():e {
            on all wait(10.0) {
                log e.toString();
            }
        }
    }
}

モニター インスタンスとコンテキストの生成

単一のモニターで複数のデバイスを処理することは可能ですが(例えば、ストリームで group bypartition by を使用したり、他の状態のデバイス ID をキーとしたディクショナリを維持したりするなど)、処理を分離すると便利なことがよくあります。異なるデバイスを、個別のモニター インスタンスに分割します。

新しいモニター インスタンスは、spawn ステートメントを使用して作成できます。これにより、モニターのモニター スコープ変数のコピーが取得され、新しいモニター インスタンスで指定されたアクションが実行されます。新しいモニターにはリスナーはコピーされません。新しいモニター インスタンスを生成するコンテキストを指定することもできます。異なるコンテキストは相互に同時に実行でき、異なるモニターを相互に分離するのにも役立ちます。コンテキストを構築するときは、コンテキストを識別するための名前と、コンテキストがパブリックかどうかを制御するためのブール値を指定します。つまり、デフォルトで Things Cloud イベントを受け取ります(デフォルト チャネルに送信されます)。

このパターンは、そのコンテキスト内の他のリスナーによって一致しないイベントを識別するために、unmatched キーワードとともによく使用されます。各モニターに個別のコンテキストを使用することにより、一致しない動作の範囲がそのモニターに限定されます。

例:

monitor PerDeviceMeasurementTracker {
	action onload() {
		spawn factory to context("PerDeviceMeasurementTracker", true);
	}
	action factory() {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		on all unmatched Measurement() as m {
			spawn perDevice(m);
		}
	}

	dictionary<string, Measurement> latestMeasurementByType; // measurements for this device

	action perDevice(Measurement m) {
		processMeasurement(m);
		on all Measurement(source = m.source) as m {
			processMeasurement(m);
		}
	}
	action processMeasurement(Measurement m) {
		latestMeasurementByType[m.type] := m;
	}
}

ベストプラクティスとガイドライン

EPL モニター

症状: イベント処理ルールが自動的に無効になります

モニターが onload またはリスナーから例外をスローし、その例外がキャッチされなかった場合、モニターは終了します。例外をキャッチするか、発生する理由を回避します。

同様に、モニターがイベントの処理を完了し、アクティブなリスナーが残っていない場合、再度トリガーすることはできず、モニター自体が自動的に削除されます。

モニターごとの過剰なメモリ使用を回避します

イベント処理ルールがリスナーをリークしないようにしてください。例えば、リクエストとレスポンスの操作を行う場合、レスポンスが処理された後、またはタイムアウトが発生してレスポンスがない場合に、リスナーがアクティブのままになっていないことを確認してください。

数値形式

Things Cloud のメジャーメントには float 型が使用されます。タイムスタンプは float(1970 年 1 月 1 日 00:00 UTC からの秒数)として保存されることに注意してください。

チャネルとコンテキストのサブスクライブ

コンテキストは、Apama 内の並列処理ユニットです。モニター インスタンスは、spawn...to 構文を使用して複数のコンテキストにデプロイできます。チャネルをサブスクライブすると、コンテキスト内のすべてのモニター インスタンスがそのサブスクリプションのイベントを受信します。したがって、異なるサブスクリプションを異なるコンテキストに配置することをお勧めします。コンテキストを使用すると、過負荷になっているアプリケーションの一部がアプリケーションの他の部分に影響を与えるのを防ぐことができます。

コンテキストはわかりやすい名前で作成され、コンテキスト オブジェクトの個々のインスタンスは、たとえ同じ名前であっても、異なるコンテキストに対応します。

例:

action onload() {
   context subContext := context("Worker");
   spawn worker() to subContext;
}
action worker() {
   monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
   on all Measurement() as m {
      ...
   }
}  

Things Cloud におけるApamaの制限事項

Things Cloud 環境内で Apama を使用する場合、Apama をスタンドアロンで使用する場合に利用できる機能に必然的にいくつかの制限がかかります。

Things Cloud 内の Apama にアセットをデプロイするにはさまざまな方法があり、制限はそれらのメカニズムによって異なります。

  • EPL アプリ: Apama アセットを完全に管理された Apama コリレーターにデプロイするための最も簡単なメカニズム。アプリケーションのデプロイ を参照してください。
  • カスタムマイクロサービス: Things Cloud の マイクロサービス SDK を使用して、より複雑な Apama プロジェクトを構築できます。

あらゆる Things Cloud 環境内にデプロイされる Apama ソリューションを設計する場合は、次の点を考慮してください。

EPL アプリまたはカスタムマイクロサービスを使用する場合の Apama の一般的な制限事項

  • スケーラビリティのために、コリレーターはホスト間を移動する可能性があるため、永続的なファイルシステムにはアクセスできません。すべてのマイクロサービス(プラットフォームによって提供またはカスタム)は、ステートレスでなければならないという標準の Things Cloud 制約があります。マイクロサービス SDK を参照してください。
    この影響を受ける Apama 機能は、次の通りです。

    • コリレーターの持続性
    • MemoryStore の永続性

  • 外部システムまたはプロセスへの非 HTTP/REST 接続は、ほとんど実用的ではありません。ただし、サービスがインターネット経由で利用可能な場合、それを使用できます(例えば、Apama 内の HTTP クライアントは、公的にアクセス可能な HTTP サーバーに接続できます)。
    この影響を受ける Apama 機能は、次の通りです。

    • Apama データベース コネクタ (ADBC)
    • コリレーターに統合された Java Message Service(JMS)のサポート
    • 分散メモリストア
    • コリレーター間の接続

  • セキュリティとユーザーアクセス制御の実装のため、Things Cloud はコリレーターポートを外部プロセスで使用できるようにしません。マイクロサービス SDK を参照してください。
    次の機能はコリレーターポートへのアクセスを必要とするため、このアクセス制御と互換性がありません。

    • engine_connect、engine_management、engine_send、engine_receive などのコマンドラインツール
    • エンジン管理 API、イベントサービス API、シナリオサービス API
    • IAF アウトプロセスで実行されているアダプターへの接続
    • ダッシュボード(Apama に同梱)
    • Apama Plugin for Eclipse からのデバッグ。代わりに、ローカルコリレーターで実行されているアプリをデバッグする
    • コリレーター REST インターフェース

  • 起動時のアプリケーションのメモリ使用量とアプリケーションの起動時間の両方を削減するには、自動的にアンロードするモニターを挿入する前に、および時間のかかるクエリを実行する前に、アプリケーションが完全に初期化されていることを確認します。

EPL アプリを使用する場合の Apama 固有の制限事項

  • 使いやすさを考慮して、コリレーターの起動は Things Cloud によって制御されます。したがって、構成ファイルまたはコマンドライン オプションの変更が必要な機能にはアクセスできません。
    この影響を受ける Apama 機能は、次の通りです。

    • 持続性
    • 接続プラグイン

  • セキュリティのため、コリレーターが使用するファイルシステムにはアクセスできません。
    この影響を受ける Apama 機能は、次の通りです。

    • 入力ログへのアクセス
    • カスタム プラグインの使用
    • 強化のためのファイルシステム アセットの使用

  • 簡単にするために、独立した EPL インジェクションを行うことのみが可能です。各モニターは独立して管理されるため、異なるモニター間に依存関係を作成することはできません。
    この影響を受ける Apama 機能は、次の通りです。

    • *.monファイルは package ステートメントを含んではいけません(含むことはエラーです)
    • 別々の *.mon ファイル間でイベント定義を共有することはできません
    • Apama クエリは使用できません
    • Apama Plugin for Eclipseを使用したアプリケーションの開発 にリストされているバンドルのみを使用できます

これらの制限はすべて Things Cloud 内で、EPL アプリがスムーズかつ安全に動作することを保証するために実装されています。

例題

メジャーメントの 1 時間ごとの平均を計算する

入力データが、次のようになっていると仮定します。

{
  "c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
  "time": "...",
  "source": {"id":"..."},
  "type": "c8y_TemperatureMeasurement"
}

平均値 (平均) を作成するには、モジュール内に次の部分が必要です。

  • デバイス(ソース)ごとにグループ化された 1 時間以上の時間枠

  • 1 時間ごとの平均計算、ソースおよび単位を返す select(ウィンドウの内容に対して集計を使用する必要があるため、最後の単位を選択します。すべてのメジャーメントが同じ単位であると仮定します)。これらを保持する AverageByDevice イベント定義に注目してください。

  • すべて新しいメジャーメントとして作成します

例:

using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;

monitor HourlyAvgMeasurementDeviceContext {

  event AverageByDevice {
    string source;
    float avgValue;
    string unit;
  }

  action onload() {
    // Measurement.SUBSCRIBE_CHANNEL にサブスクライブしてすべての測定値を受信
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

    from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
      group by m.source select
        AverageByDevice(m.source,
          avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
          last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
            send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
              {"c8y_AverageTemperatureMeasurement":
                {
                  "T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
                }
              }, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
          }
  }
}

ビット メジャーメントからアラームを作成する

多くの場合、デバイスはアラーム ステータスをレジスタに保持しており、アラームの意味を解釈できません。この例では、デバイスがメジャーメントにおいてレジスタ全体をバイナリ値として送信するだけであると仮定します。ルールはビットを識別し、それぞれのアラームを作成する必要があります。

アラーム テキスト、各ビットのタイプと重大度、値を検索するアクションをマッピングする 3 つのディクショナリを作成します。-1 を使用してデフォルト値を示し、<position> を位置の文字列形式に置き換えます。

dictionary<integer, string> positionToAlarmType := {
	0  : "c8y_HighTemperatureAlarm",
	1  : "c8y_ProcessingAlarm",
	2  : "c8y_DoorOpenAlarm",
	3  : "c8y_SystemFailureAlarm",
	-1 : "c8y_FaultRegister<position>Alaram"
};

dictionary<integer, string> positionToAlarmSeverity := {
	0  : "MAJOR",
	1  : "WARNING",
	2  : "MINOR",
	3  : "CRITICAL",
	-1 : "MAJOR"
};

dictionary<integer, string> positionToAlarmText := {
	0  : "The machine temperature reached a critical status",
	1  : "There was an error trying to process data",
	2  : "Door was opened",
	3  : "There was a critical system failure",
	-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};

action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
	string template := lookup.getOr(bitPosition, lookup[-1]);
	return template.replaceAll("<position>", bitPosition.toString());
}

バイナリメジャーメントを分析するには、それを文字列値として解釈し、各文字をループします。getActiveBits() 関数はこれを実行し、メジャーメントが「1」であったビット位置のリストを返します。次に for ループを使用して、それを反復処理します。

action getBitPositions(string binaryAsText) returns sequence<integer> {
	sequence<integer> bitsSet := new sequence<integer>;
	integer i := 0;
	while i < binaryAsText.length() {
		string character := binaryAsText.substring(i, i+1);
		if character = "1" {
			bitsSet.append(binaryAsText.length() - i - 1);
		}
		i:=i+1;
	}
	return bitsSet;
}

action onload() {
	// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
	monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
	on all Measurement(type = "c8y_BinaryFaultRegister") as m {
		string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
		integer bitPosition;
		for bitPosition in getBitPositions(faultRegister) {
			Alarm alarm    := new Alarm;
			alarm.type     := getText(bitPosition, positionToAlarmType);
			alarm.severity := getText(bitPosition, positionToAlarmSeverity);
			alarm.text     := getText(bitPosition, positionToAlarmText);
			alarm.source   := m.source;
			alarm.time     := m.time;
			alarm.status   := "ACTIVE";
			send alarm to Alarm.SEND_CHANNEL;
		}
	}
}

このようなメジャーメントを作成します。

{
	"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
	"time": "...",
	"source": {"id": "..."},
	"type": "c8y_BinaryFaultRegister"
}

最後のステートメントを 3 回トリガーします。

  • ビット位置 1 でのメジャーメント: c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”(データの処理中にエラーが発生しました)
  • ビット位置 2 でのメジャーメント: c8y_DoorOpenAlarm, MINOR, “Door was opened”(ドアが開きました)
  • ビット位置 4 でのメジャーメント: c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”(バイナリ障害レジスタの位置 4 で未定義のアラームが報告されました)

したがって、3 つのアラームが作成されます。

消費量のメジャーメント

何かの現在の充填レベルを測定し、その値を定期的に Things Cloud に送信するセンサーがあると仮定すると、追加の消費値を簡単に作成できます。2 つのメジャーメント間の絶対差を計算すると便利ですが、明確な結果が得られるのは、メジャーメントが常に同じ間隔で送信された場合のみです。そこで、絶対差を時差に換算し、1 時間当たりの消費量として計算します。

2 つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、デバイスの 2 つの隣接するメジャーメントと時間差を比較します。

using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;

monitor FillLevelMeasurements {

	event FillLevel {
		float firstValue;
		float firstTime;
		float lastValue;
		float lastTime;
		string source;
	}

	action calculateConsumption(FillLevel l) returns float {
		if(l.firstTime = l.lastTime) {
			return 0.0;
		} else {
			return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
		}
	}

	action onload() {
		// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
			select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
							last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {

			Measurement m := new Measurement;
			m.type := "c8y_HourlyWaterConsumption";
			m.time := currentTime;
			m.source := fill.source;
			MeasurementValue mv := new MeasurementValue;
			mv.value := calculateConsumption(fill);
			mv.unit := "l/h";
			m.measurements[m.type] := {"consumption":mv};
			send m to Measurement.SEND_CHANNEL;
		}
	}
}

その他のサンプル アプリケーション

ストリーミング分析アプリケーションの EPL エディタには、Apama EPL の使用方法(Things Cloud オブジェクトのクエリやアラームの作成など)を示すいくつかのサンプル アプリケーションが用意されています。これらのサンプルを使用して独自のアプリケーションを構築できます。

ケーススタディ: 円形ジオフェンスアラーム

概要

このセクションでは、より複雑なルールを作成する方法の詳細な例を示します。このガイドの他のセクションで、前に説明した機能の複数を使用します。

Apama EPLを使い始めたばかりの場合は、例題 を参照してください。

前提条件

目標

位置イベントを継続的に送信している追跡デバイスがジオフェンスの外に移動した場合に、自動的にアラームを生成するようにしたいと考えています。このジオフェンスは円形になり、デバイスごとに個別に構成できる必要があります。アラームは、デバイスがジオフェンスの外に移動した瞬間に作成されます。屋外に移動している間は、最初のアラームがアクティブのままであるため、新しいアラームは作成されません。デバイスがジオフェンス内に戻るとすぐに、アラームはクリアされます。

Things Cloud データモデル

位置イベントの構造(必要な部分):

{
  "id": "...",
  "source": {"id": "..."},
  "text": "...",
  "time": "...",
  "type": "...",
  "c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}

ジオフェンス構成をデバイスに保存します(半径はメートル単位で構成されます)。

{
  "c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}

さらに、構成を完全に削除せずに、各デバイスのジオフェンス アラームを有効/無効にしたいと考えています。これを行うには、デバイスの c8y_SupportedOperations に「c8y_Geofence」を追加または削除します。

{
  "c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}

計算

現在の位置と中心の間の距離がジオフェンスの構成された半径よりも大きい場合、デバイスはジオフェンスの外側になります。必要なのは、2 つの 地理座標間の差を計算できる関数です。

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

上記のアクションは、距離をメートル単位で返します。

ステップ 1: 入力のフィルタリング

このモジュールの主な入力はイベントです。一致しないイベントをできるだけ早く破棄するために、リスナーの最初のチェックとしてこれを実行します。

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
	if e.params.hasKey("c8y_Position") {
		// we have an event
	}
}

ステップ 2: 必要なデータの収集

次のステップでは、計算のためにジオフェンスの構成が必要になり、それを取得します。

monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
   and not FindManagedObjectResponseAck(reqId) {
	  ManagedObject dev := resp.managedObject;
   }

ステップ 3: デバイスが c8y_Geofence をサポートしているか確認

デバイスが利用可能になったので、デバイスにジオフェンスが構成されているかどうか、およびアクティブ化されているかどうか(supportedOperations に「c8y_Geofence」が含まれている)を確認します。c8y_SupportedOperations 配列をチェックするには、indexOf() 関数を使用できます。この関数はすべての要素をループし、そのエントリのインデックスを返します。値が存在しない場合、負の数を返します。設定では、デバイスにフラグメント「c8y_Geofence」が含まれているかどうかを確認するだけです。

イベントとデバイスを取得したら、イベントの c8y_Position とデバイスの c8y_Geofence からデータを抽出します。これらのオブジェクトは、params 内の dictionary<any, any> エントリにマッピングされます。params はタイプ any の値を保持するため、dictionary<any, any> にキャストする必要があります。

if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
	dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
	float eventLat := <float> evtPos["lat"];
	float eventLng := <float> evtPos["lng"];

	dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
	float centerLat := <float> devGeofence["lat"];
	float centerLng := <float> devGeofence["lng"];
	float maxDistance := <float> devGeofence["radius"];
}

ステップ 4: トリガーの作成

前述したように、現在のデバイスの位置とジオフェンスの中心の間の距離が構成されたジオフェンスの半径よりも大きい場合、デバイスはフェンスの外側にあります。アラームをトリガーするには、2 つのイベントが必要です。これにより、これら 2 つのイベント内でデバイスがジオフェンスに入ったか、ジオフェンスから出たかを確認できるようになります。

最初のステップでは、前述の関数を使用して距離を計算します。

float d := distance(centerLat, centerLng, eventLat, eventLng);

次に、これをイベントとして再ルーティングします。

event LocationEventWithDistance {
	string source;
	float distance;
	Event e;
	float maxDistance;
}

...

route LocationEventWithDistance(e.source, d, e, maxDistance);

ソースをイベント内に配置して、リスナー内で簡単に照合できるようにします。

次に、イベント LocationEventWithDistance によってトリガーされるリスナーを設定し、同じソースの次の LocationEventWithDistance を監視します。

on all LocationEventWithDistance() as firstPos {
	on LocationEventWithDistance(source = firstPos.source) as secondPos {
		// now have two events with distances
	}
}

この LocationEventWithDistance イベントのペアには、アラームを作成する必要があるかどうかをチェックするためのすべてのデータが保持されます。最初のイベントと同じソースのものになるように、secondPos イベントをフィルタリングしていることに注意してください。イベントを受信したすべてのデバイスに、アクティブなリスナーが存在します。

ステップ 5: アラームの作成

アラームを作成するには、最初のイベントの距離が半径より小さく、2 番目のイベントの距離が半径より大きい 2 つのイベントが必要です。これは、デバイスがジオフェンスから出たばかりであることを意味します。

if firstPos.distance <= firstPos.maxDistance and
	secondPos.distance > secondPos.maxDistance {
	send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
					"Device moved out of circular geofence", "ACTIVE",
					"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}

ステップ 6: アラームのクリア

アラームをクリアするには、下部で条件を切り替え、さらに現在アクティブなアラームを取得してその ID を取得する必要があります。この時点では、既存のアラームがあるかどうかを気にする必要はありません。何もない場合、リスナーは and not FindAlarmResponseAck をトリガーし、リスナーを終了します。

monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
	secondPos.distance <= secondPos.maxDistance {
	integer reqId:= integer.getUnique();
	send FindAlarm(reqId, {"source": firstPos.source,
		"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
	on FindAlarmResponse(reqId=reqId) as alarmResponse
	   and not FindAlarmResponseAck(reqId=reqId) {
		send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
					firstPos.source, currentTime,  "Device moved back into circular geofence",
					"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
	}
}

すべてを統合する

すべてのパーツを 1 つのモジュールに結合できるようになりました。リスナーの順序は関係ありません。

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;

monitor MonitorDevicesForCircularGeofence {

	event LocationEventWithDistance {
		string source;
		float distance;
		Event e;
		float maxDistance;
	}

	action onload {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
		on all Event() as e {
			if e.params.hasKey("c8y_Position") {
				// イベントがあります
				integer reqId := integer.getUnique();
				send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
				on FindManagedObjectResponse(reqId = reqId) as resp
				and not FindManagedObjectResponseAck(reqId) {
				ManagedObject dev := resp.managedObject;

				if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {

						dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
						float eventLat := <float> evtPos["lat"];
						float eventLng := <float> evtPos["lng"];

						dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
						float centerLat := <float> devGeofence["lat"];
						float centerLng := <float> devGeofence["lng"];
						float maxDistance := <float> devGeofence["radius"];

						float d := distance(centerLat, centerLng, eventLat, eventLng);

						route LocationEventWithDistance(e.source, d, e, maxDistance);
					}
				}
			}
		}

		on all LocationEventWithDistance() as firstPos {
			on LocationEventWithDistance(source = firstPos.source) as secondPos {
				// now have two events with distances
				if firstPos.distance <= firstPos.maxDistance and
					secondPos.distance > secondPos.maxDistance {
					send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
							"Device moved out of circular geofence", "ACTIVE",
							"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
				}

				if firstPos.distance > firstPos.maxDistance and
					secondPos.distance <= secondPos.maxDistance {
					integer reqId:= integer.getUnique();
					send FindAlarm(reqId, {"source": firstPos.source,
						"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
					on FindAlarmResponse(reqId=reqId) as alarmResponse
					and not FindAlarmResponseAck(reqId=reqId) {
						send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
								firstPos.source, currentTime, "Device moved back into circular geofence",
								"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
					}
				}
			}
		}
	}

	action distance(float lat1, float lon1, float lat2, float lon2) returns float {
		float R := 6371000.0;
		float toRad := float.PI / 180.0;
		float lat1Rad := lat1 * toRad;
		float lat2Rad := lat2 * toRad;
		float deltaLatRad := (lat2-lat1) * toRad;
		float deltaLonRad := (lat2-lat1) * toRad;
		float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
		float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
		return R * c;
	}
}

Apama を他のマイクロサービスに接続

概要

Apama を使用したストリーミング分析アプリケーションは、他のマイクロサービスで実行されているアプリケーションを利用できます。このセクションでは、Apama-ctrl マイクロサービスの /health エンドポイントを使用しますが、手順は Things Cloud 内で実行されている他のマイクロサービスへの接続にも適用されます。このセクションでは、Apama EPL 内から Things Cloud プラットフォームへの接続を作成し、他のマイクロサービスを直接呼び出す方法を説明します。次に、リクエストを作成し、結果をデコードする方法を示します。

ストリーミング分析アプリケーションの一部である EPL エディタを使用して EPL アプリを開発していることを前提とし、マイクロサービスへのリクエストの例を示します。このガイドの手順は、Apama アプリケーションを作成する他の方法にも適用でき、あらゆるマイクロサービスとのやり取りに使用できます。

CumulocityRequestInterface APIを利用します。このAPIに関する技術情報については、Apamaドキュメントのマイクロサービスの呼び出し を参照してください。

EPL アプリの作成

アプリケーション スイッチャーでストリーミング分析アイコン をクリックします。表示されるホーム画面で、EPL アプリ ページに移動し、新しい EPL アプリ をクリックします。別のマイクロサービスと連携するアプリを作成するための EPL エディタ ウィンドウが表示されます。

Things Cloud プラットフォームへの接続

これらのリクエストをサポートするために、Things Cloud プラットフォームに自動的に接続し、他のマイクロサービスを呼び出すために使用できるリクエストを作成するアクションを含むヘルパーイベントを提供しています。このヘルパーイベントは CumulocityRequestInterface と呼ばれ、com.apama.cumulocity パッケージ内にあります。このヘルパーイベントは、Things Cloud に接続してイベントのインスタンスを返す静的アクションを提供します。マイクロサービス内、Things Cloud プラットフォーム自体、またはリモート コリレータから自動的に接続できます。そのインスタンスには、特定のマイクロサービスを呼び出すリクエストを作成するアクションが含まれています。

独自のコードから接続を作成するには、connectToCumulocity メソッドを呼び出して結果を保存するだけです。

CumulocityRequestInterface cumulocity := CumulocityRequestInterface.connectToCumulocity();

これにより、マイクロサービスに提供された資格情報と接続詳細、または外部 Apama インスタンスから接続する場合は Things Cloud トランスポートの設定を使用して、自動的に接続が作成されます。

マイクロサービス リクエストの作成

CumulocityRequestInterface インスタンスには、リクエストを作成するためのアクションがあります。

/**
* Allows creation of a request on a transport that
* has been configured for a Cumulocity connection.
*
* @param method The type of HTTP request, for example "GET".
* @param path A specific path to be appended to the request.
* @param payload A dictionary of elements to be included in the request.
*/
action createRequest(string method, string path, any payload) returns Request

このアクションは、使用する HTTP メソッド(通常は GET、PUT、POST)、Things Cloud サービス プレフィックスを含むパス(通常は /service/serviceName/path/on/service のような形式)、およびペイロードを受け取ります。ペイロードは、マイクロサービスに送信する前に JSON ドキュメントに変換されます。このアクションは HTTP クライアントインターフェースの一部である Request オブジェクトを返します。HTTP クライアントインターフェースのドキュメントは、EPL の API リファレンス(ApamaDoc) に記載されています。

リクエストはコールバック アクションを引数として実行され、リクエストが完了するとレスポンスが引数として渡されます。リクエストにオプション、クエリパラメータ、またはヘッダーを設定する必要がある場合、呼び出し前に Request オブジェクトに設定できます。

例:

action responseCallback(Response resp) {
    string objectId := resp.payload.getString("id");
    ...
}
...
Request req := cumulocity.createRequest("GET", "/service/otherService/data", any());
req.setQueryParameter("type", "object");
req.execute(responseCallback);

レスポンスも JSON からデコードされ、レスポンスペイロードは HTTP クライアント トランスポートのドキュメントにある Response イベントからリンクされている AnyExtractor パターンを使用します。上記の例は、REST リクエスト GET http://cumulocity/service/otherService/data?type=object と同等になります。

マイクロサービス エンドポイントへのリクエスト例

以下は、別のマイクロサービスにクエリを実行する方法を示す非常にシンプルなアプリケーションです。例として、Apama-ctrl マイクロサービスの /health エンドポイントを使用しています。

まず、EPL から始めます。これは、Things Cloud に接続し、リクエストを送信するアクションを呼び出します。

using com.apama.cumulocity.CumulocityRequestInterface;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;

monitor CallAnotherMicroservice {

	CumulocityRequestInterface requestIface;

	action onload() {
		requestIface := CumulocityRequestInterface.connectToCumulocity();
		sendHealthRequest();
	}

リクエストを送信する

最初に、リクエスト タイプ、リクエストパス、ペイロード any() を使用して Request を作成します。この例では、ペイロードに何も入れる必要がないためです。

次に、execute を使用してリクエストを送信し、レスポンスで呼び出されるアクションを指定します。

action sendHealthRequest()
{
	Request healthRequest:=
		requestIface.createRequest("GET", "/service/cep/health", any());
	healthRequest.execute(responseHandler);
}

この例では、コンテキストパスが /cep である Apama-ctrl マイクロサービスを使用します。これを別のマイクロサービス用に修正するには、/cep を、マイクロサービスのマニフェストで定義されているコンテキストパスに置き換えてください。 この例では、/health エンドポイントがリクエストパスを完成させますが、マイクロサービスの有効なエンドポイントであればどれでも置き換えることができます。

レスポンスを受信する

リクエスト送信時に使用した定義済みのアクションを次に示します。このアクションは、送信されたリクエストへのレスポンスとして呼び出され、Response オブジェクトで提供されます。

この例では、単にステータスコードとレスポンス本文のみをログに記録します。

action responseHandler(Response healthResponse)
{
	integer statusCode := healthResponse.statusCode;
	string payload := healthResponse.payload.data.toString();
	log "Health response status code = " + statusCode.toString() +
		", response body = " + payload at INFO;
}

他のマイクロサービス

このセクションでは、Apama-ctrl マイクロサービスとの通信について示しました。ただし、JSON ペイロードを含む標準 REST リクエストを使用している限り、Things Cloud を介して他のマイクロサービスにアクセスすることもできます。マイクロサービス名の後に、マイクロサービス内のリクエストのパスを続けて、適切な /service URL を構築するだけです。