基本機能
アプリケーションの開発
EPL アプリはモニター (*.mon) ファイルです。EPL アプリは 2 つの異なる方法で開発できます。
- Things Cloud のアプリケーション スイッチャーから入手できる ストリーミング分析アプリケーション を使用し、Things Cloud 内で EPL アプリを開発できます
- Apama をローカル マシンにインストールし、Software AG Designer、つまり別の環境で EPL アプリを (モニターファイルとして) 開発することができます
コンセプトガイドの Apama イベント処理言語 (EPL) の使用 もご覧ください。
ストリーミング分析アプリケーションを使用したアプリケーションの開発
ストリーミング分析アプリケーションの EPL アプリ ページには、新規または既存の EPL アプリ (*.mon ファイル) を対話的に編集したり、EPL アプリをインポートしてアクティブ化 (デプロイ) したりするためのインターフェースが提供されます。
EPL アプリ ページの使用を希望するテナント上のユーザーは、CEP 管理者である必要があります。ユーザーガイドの管理 > 権限の管理をご覧ください。
ステップ 1 - ストリーミング分析アプリケーションを呼び出す
アプリケーション スイッチャーを開き、ストリーミング分析 アプリケーションのアイコンをクリックします。次に、EPL アプリ ページに移動します。
EPL アプリ ページに移動すると、最初に EPL アプリ マネージャーが表示され、既存の EPL アプリがリストされます。各アプリケーションはカードとして表示されます。ここから新しい EPL アプリを追加したり、既存の EPL アプリを管理したりできます。
アプリケーションに対して表示される各カードの上部には、アプリケーションを編集、エクスポート、または削除できるアクション メニューがあります。
このページからは、次のことができます。
-
既存の EPL アプリを編集します。アクションメニューから 編集 コマンドを使用するか、アプリケーションに表示されているカードをクリックします。
-
新しい EPL アプリを作成します。以降をご覧ください。
-
EPL アプリをインポートします。Things Cloud の外部でアプリケーションを開発する場合 (たとえば、Software AG Designer を使用する) は、上部のメニュー バーで EPLのインポート をクリックしてアップロードします。 Apama モニター (*.mon) ファイルをアプリケーションとして ストリーミング分析アプリケーションに追加します。
-
EPL アプリをエクスポートします。アクション メニューから エクスポート コマンドを使用して、アプリケーションを *.mon ファイルとしてダウンロードします。
-
既存の EPL アプリを展開します。アプリケーションに対して表示されているカードで、モードを 非アクティブ から アクティブ に変更します。詳細については、「アプリケーションのデプロイ」をご覧ください。
アプリケーションをアクティブ化すると、構文エラーがあればすぐに報告されます。エラー状態がカードに表示されるので、アプリケーションが良好な状態にあることを確認できます。エラーをクリックすると、問題の内容に関する情報が表示されます。構文エラーがある場合、アプリケーションをアクティブ化することはできません。エラーは、修正されてアプリケーションが再度アクティブ化されるまでカードに表示されます。
-
すべての EPL アプリをリロードします。上部のメニュー バーで 再読み込み をクリックして表示を更新すると、ページが読み込まれてから他のユーザーが行った変更 (その間に発生したエラーも含む) が表示されます。
ステップ 2 - EPL アプリを作成する
上部のメニュー バーで 新しい EPL アプリ をクリックします。アプリケーションに一意の名前を付けます。新しいアプリケーション用に作成されるカードに表示される説明を入力することもできます。
次に EPL エディタを示します。新しいアプリケーションの EPL コードには、Things Cloud を操作するために必要な典型的な基本的なイベント定義とユーティリティがすでに含まれています。アプリケーションの必要に応じてそれらを調整できます。詳細については、ドキュメントとサンプルをご覧ください。
開始しやすいように、いくつかのサンプルが用意されています。これらを表示するには、エディターの右側に表示される サンプル をクリックします。サンプルをクリックすると、その内容のプレビューが表示されます。標準のキーの組み合わせ Ctrl+C および Ctrl+V を使用して、サンプル コードの一部を選択し、それを独自のコードにコピーできます。コマンド ボタンを使用して、コード全体をクリップボードにコピーして独自のコードの適切な位置に挿入したり、既存のコードをすべてサンプル コードに置き換えたりすることもできます。
上部のメニュー バーのボタンを使用すると、現在のセッションでの最後の変更を元に戻したりやり直したり、変更を保存したりできます。
EPL エディターでモードを 非アクティブ から アクティブ (またはその逆) に変更することもできます。繰り返しますが、EPL コードにエラーがある場合、アプリケーションをアクティブ化することはできません。コード内でエラーが強調表示されます。
上部のメニュー バーの X をクリックして EPL エディターを終了し、EPL アプリのリストに戻ります。
ステップ 3 - EPL アプリをテストする
アプリケーションがアクティブ化されると、実行結果が表示されます。これには、メジャーメントの送信、データの受信、アラームの作成、Apama-ctrl マイクロサービスへのログインが含まれる場合があります。Apama-ctrl マイクロサービスのログ ファイルを確認する方法については、ユーザー ガイド の 管理 > マイクロサービスの管理と監視 をご覧ください。
アプリケーションのデプロイ もご覧ください。
Software AG Designer を使用したアプリケーションの開発
Software AG Designer は完全な開発環境を提供し、複雑な EPL アプリケーションがある場合に最適なツールです。EPL アプリ (モニターファイル) の準備ができたら、それを Things Cloud にインポートする必要があります。
ステップ 1 - Apama をインストールする
Apama のライセンスを取得している場合は、Software AG Installer を使用してインストールします。
機能が制限され、いくつかの制限が設けられたフリーミアム バージョンの Apama を使用することもできます。これを使用する場合は、https://www.apamacommunity.com/ から Apama Community Edition をダウンロードしてインストールします。
ライセンス版とフリーミアム版の両方のバージョンに Software AG Designer が含まれています。
ステップ 2 - プロジェクトの作成
インストールしたら、Software AG Designer で Apama プロジェクトを作成し、Things Cloud 接続を有効にします。Apama プロジェクトの作成方法については、Apama ドキュメントの Apama プロジェクトの作成 をご覧ください。
ステップ 3 - Apama バンドルをプロジェクトに追加する
新しく作成した Apama プロジェクトに次の Apama バンドルを追加します。これらは、アプリケーションをアクティブ化するために Things Cloud で必要となります。プロジェクトにバンドルを追加する方法については、Apama ドキュメンテーションの プロジェクトへのバンドルの追加 をご覧ください。
- Things IoT > Things のイベント定義
Things Cloud とのデータの送受信に必要なイベント API を提供します。 - Things IoT > Things for Things
Things Cloud から受信したデータを操作するためのヘルパー ユーティリティ関数を提供します。 - 任意のエクストラクター
「any」型から値を抽出するためのサポートを提供します。 - 時刻の形式
Time Format プラグインのすべてのメソッドにアクセスするために必要です。書式設定と解析時間に役立ちます。 - HTTP クライアントの汎用イベント
HTTP クライアント接続プラグインによって使用される事前定義された汎用イベントを公開します。 - ApplicationInitialized 時に自動
これにより、起動時にすべての接続プラグインがすぐに開始されます。 - HTTP クライアント > 一般的なリクエスト/レスポンス イベント定義を含む JSON
EPL アプリが HTTP 呼び出しを行うことを許可します。 - Things IoT > Things Cloud
Things Cloud クライアントを EPL アプリに公開します。
上記のバンドルは EPL アプリで許可される唯一のバンドルであるため、他のバンドルを追加しないように注意してください。追加しないと、Things Cloud でアクティブ化されたときにアプリケーションが機能しなくなる可能性があります。
ステップ 4 - モニターファイルの作成
新しい Apama モニターファイルを作成するには、Apama ドキュメントの EPL アプリケーション用の新しいモニターファイルの作成 をご覧ください。
新しく作成したモニターファイルを EPL アプリとして Things Cloud にインポートし、そこでアクティブ化する前に、モニターファイルが Software AG Designer 内から期待どおりに動作するかどうかをテストすることをお勧めします。
詳細については、ApamaドキュメントのThings IoT Transport Connectivity Plug-inをご覧ください。
ステップ 5 - モニターファイルを実行してテストする
プロジェクトをローカルで実行する場合は、プロジェクト構成で Things Cloud の認証情報を指定する必要があります。Things Cloud クライアントの CumulocityIoT.properties ファイルで資格情報を構成します。
例:
CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.je1.thingscloud.ntt.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
CUMULOCITY_APPKEY
の値を取得するには、Things Cloud で アプリケーションを作成 する必要があります。上記の説明は、URL がテナントを識別するテナントに接続していることを前提としていることに注意してください。これが当てはまらない場合 (たとえば、IP アドレスで接続している場合)、CumulocityIoT.properties ファイルでこれを設定する必要がある場合があります。
CUMULOCITY_TENANT=my_custom_tenant
プロジェクトをマルチテナント環境でローカルに実行する必要がある場合は、マルチテナントのサポートを有効にし、使用するマルチテナント マイクロサービスの名前を指定します。 Things Cloud クライアントの CumulocityIoT.properties ファイルで次のプロパティを構成します。
# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true
# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms
さらに、モニターファイルがマルチテナント マイクロサービスで動作できることを確認してください。 詳細については、Apama ドキュメントの マルチテナントデプロイの操作 をご覧ください。
これで、Software AG Designer で EPL のテストに進むことができます。
EPL アプリの準備ができたら、アプリケーションのデプロイ をご覧、それを Things Cloud にデプロイする方法を確認してください。
アプリケーションのデプロイ
以下を Things Cloud にデプロイできます。
- EPL アプリ。ストリーミング分析アプリケーションを使用して、単一の *.mon ファイルを開発またはインポート できます。これは、EPL アプリをデプロイするための最も単純なメカニズムです。
- Apama アプリケーション。複雑な Apama アプリケーション (つまり、Software AG Designer で開発された Apama プロジェクト) を Things Cloud にアップロードし、カスタム マイクロサービスとしてデプロイ Things Cloud マイクロサービス SDK を使用します。
EPL アプリを ストリーミング分析アプリケーションを使用して単一の *.mon ファイルとしてデプロイする
EPL アプリ (つまり、*.mon ファイル) が Things Cloud でアクティブ化されると、*.mon ファイルには一意のパッケージ名が割り当てられます。これにより、複数のモジュールがアクティブ化された場合の競合が防止されます。このため、*.mon ファイルに package
ステートメントを指定しないでください。アプリケーションの異なる部分間でイベントを共有する必要がある場合は、イベント定義とそれを使用するモニターを単一の *.mon ファイルに記述します。
EPL アプリで使用できるユーティリティと基本イベントのセットは制限されています。執筆時点では、これらには Time Format バンドルと HTTP Client > JSON with generic request/response event definitions バンドルが含まれています。
EPL アプリがランタイム エラーを通知すると、アラームとして発生します。ランタイム エラーには、キャッチされなかった例外のほか、EPL アプリが実行する必要がある警告やエラーの明示的なログが含まれます。一般的にApamaランタイムに関連する動作や状態の問題もアラームとして発生します。
Apama ランタイムとアクティブな EPL アプリの詳細な診断については、Apama-ctrl マイクロサービスのログを確認できます。ログ ファイルの詳細については、ユーザー ガイド の 管理 > マイクロサービスの管理と監視 をご覧ください。ただし、Apama ログ ファイルを最大限に活用するには、Apama についてある程度の知識が必要です。
アプリケーションのテスト
GitHub の Apama EPL アプリ ツールを使用して、EPL アプリのアップロードをスクリプト化し、CI/CD (継続的インテグレーションおよび継続的デリバリー) ユースケースに合わせて管理できます。このツールは、PySys テスト フレームワークへの拡張機能も提供し、EPL アプリのテストを簡単に作成して自動的に実行できるようにします。
Apama EPL Apps Tools は、https://github.com/SoftwareAG/apama-eplapps-tools から入手できます
PySys の詳細については、Apama ドキュメントからアクセスできる Python の API リファレンス をご覧ください。
サポートされている REST サービス
EPL アプリは、REST (Representational State Transfer) サービスを監視するように設計されており、すべての GET、POST、PUT、および DELETE 操作をサポートします。さまざまな操作のリクエストの例を以下に示します。
これらの操作を実行するには、「CEP 管理」に対する読み取りおよび管理者権限が必要です (ユーザー ガイド の 管理 > 権限の管理 もご覧ください)。
すべての操作のリクエスト ヘッダー
各リクエストは Things Cloud に対して認証される必要があります。
名前 | 説明 |
---|---|
Accept | “application/json” (これは必須パラメーターです) |
一般的なレスポンスコード
すべてのリクエストに対して、次の一般的なエラーレスポンスコードが予想されます。
コード | 説明 |
---|---|
401 | リクエストが認可されていません。 |
403 | リクエストが禁止されています。 |
特定のリクエストから予想されるその他のレスポンスコードを以下に示します。
一般的なフィールドの説明
操作に応じて、応答では次の共通フィールドを使用できます。
フィールド | 説明 |
---|---|
contents | EPL ファイルの完全な内容 |
description | ファイルの説明 |
eplPackageName | EPL ファイルのパッケージ名。名前に特殊文字 (スペースを含む) が含まれている場合、これらの文字は有効な EPL 識別子にするためにエスケープされ、注入エラーが回避されます |
errors | ファイル内のすべてのコンパイル エラー (存在する場合) と行番号およびテキストのリスト |
ID | ファイルの一意の識別子 |
name | EPLに付けられた名前 |
state | EPL がコリレーターに挿入され、実行されているかどうか。これはactive またはinactive のいずれかになります |
warnings | ファイル内のすべてのコンパイル警告のリスト (存在する場合) と行番号およびテキスト |
GET - 利用可能なすべての EPL ファイルを取得します
エンドポイント: /service/cep/eplfiles
リクエストの例
GET /service/cep/eplfiles
レスポンス
コード | 説明 |
---|---|
200 | 操作は成功しました。以下の値の例もご覧ください。 |
400 | 不正なリクエストです。ヘッダーの内容に予期しない値が含まれています。 |
レスポンスコード 200 の値の例:
{
"eplfiles":[
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - 利用可能なすべての EPL ファイルをその内容とともに取得します
エンドポイント: /service/cep/eplfiles
リクエストパラメーター
名前 | 説明 |
---|---|
contents | ブール型。EPL ファイルをその内容とともにフェッチします。これはオプションのクエリパラメーターです。 |
リクエストの例
GET /service/cep/eplfiles?contents=true
レスポンス
コード | 説明 |
---|---|
200 | 正常に動作しました。以下の値の例もご覧ください。 |
レスポンスコード 200 の値の例:
{
"eplfiles":[
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - 識別子によって EPL ファイルを取得します
エンドポイント: /service/cep/eplfiles/{id}
リクエストパラメーター
名前 | 説明 |
---|---|
ID | フェッチする EPL ファイルの識別子。これは必須パラメーターです。 |
リクエストの例
GET /service/cep/eplfiles/{{id}}
レスポンス
コード | 説明 |
---|---|
200 | 操作は成功しました。以下の値の例もご覧ください。 |
404 | 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。 |
レスポンスコード 200 の値の例:
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
POST - 新しい EPL アプリケーションを作成する
エンドポイント: /service/cep/eplfiles
リクエストの例
POST /service/cep/eplfiles
以下はリクエスト本文の例です。
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
次の点に注意してください。
name
はファイルのパッケージに使用され (したがって、EPL ファイルにはpackage
ステートメントを含めることはできません)、すべての EPL ファイル間で一意である必要があります。名前には接頭辞が付けられ、特定の文字はエスケープされます。使用される実際のパッケージ名は、便宜上eplPackageName
フィールドに返されます (これをマイクロサービス ログ ファイルで検索して、ログ ステートメントを見つけることができます)。- 安全にエスケープされた
contents
を必ず提供してください。 description
はオプションであり、空にすることもできます。
レスポンス
コード | 説明 |
---|---|
201 | 正常に作成されました / ファイルにエラーが発生して作成されました / ファイルに警告が発生して作成されました。以下の例もご覧ください。 |
405 | 入力に誤りがあります。 |
正常に作成された場合のレスポンスコード 201 の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
警告またはエラーが発生した場合のレスポンスコード 201 の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
PUT - 識別子によって EPL ファイルを更新します
エンドポイント: /service/cep/eplfiles/{id}
リクエストパラメーター
名前 | 説明 |
---|---|
ID | 更新する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須パラメーターです。 |
リクエストの例
PUT /service/cep/eplfiles/{id}
以下はリクエスト本文の例です。
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
POST リクエストについての情報もご覧ください。
レスポンス
コード | 説明 |
---|---|
200 | 正常に更新されました。以下の値の例もご覧ください。 |
404 | 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。 |
エラーなしで正常に更新された場合のレスポンスコード 200 の値の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
エラーまたは警告で更新された場合のレスポンスコード 200 の値の例:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
DELETE - 識別子によって EPL ファイルを削除します
エンドポイント: /service/cep/eplfiles/{id}
リクエストパラメーター
名前 | 説明 |
---|---|
ID | 削除する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須パラメーターです。 |
リクエストの例
DELETE /service/cep/eplfiles/{{id}}
レスポンス
コード | 説明 |
---|---|
200 | 正常に削除されました。 |
404 | 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。 |
レスポンスコード 404 の値の例
レスポンスコード 404 は、特定の識別子を持つファイルが見つからなかったことを示します。
{
"error":"Not Found",
"exception":"com.apama.in_c8y.FileNotFoundException",
"message":"File with id 39613 not found",
"path":"/eplfiles/39613",
"status":404,
"timestamp":"2020-01-17T12:21:42.457+0000"
}
error
はエラーメッセージです。Exception
は、発生した例外を指定します。message
は例外メッセージの説明です。path
はリクエストされたパスです。status
はアプリケーションのステータスです。timestamp
は ISO 形式のタイムスタンプです。
イベントとチャンネル
Apama EPL では、残りの Things Cloud エコシステムとのやり取りはイベントを通じて行われます。Things Cloud データにアクセスするために、多数のイベント定義が提供されています。
定義済みのイベント型
いくつかの Things Cloud API と対話するための事前定義されたイベント型がいくつかあります。新しい測定、アラーム、またはイベントが作成されると、イベントは自動的に Apama アプリケーションに送信されます。Things Cloud バックエンドと対話するために、イベントを作成して関連するチャネルに送信できます。Things Cloud は、データベース クエリを自動的に実行するか、メールや SMS などの送信に必要な API 呼び出しを作成します。
EPL の API リファレンス (ApamaDoc) の データ モデル にて、各ストリームのイベントがどのように構造化されているかを確認してください。
イベントをチャネルに送信する
イベントの送信は、new <type>
に続いてフィールドへの代入を使用するか、すべてのフィールドを指定するコンストラクターを使用してイベントを構築することによって行われます。次に、send
ステートメントを使用してイベントを Things Cloud に送信します。send
ステートメントにはチャネルが必要です。これはイベント型の SEND_CHANNEL
定数です。
イベントをリッスンする
チャネル上のイベントを監視することで EPL をトリガーできます。monitor.subscribe("string name")
メソッドを使用してチャンネルに登録できます。これは、モニターの起動時に実行することも、時々イベントを受信する必要がある場合のみ、必要に応じて呼び出され、続いて monitor.unsubscribe("string name")
を実行することもできます。
on
ステートメントを使用してイベントを監視し、その後に監視しているイベント型、開閉括弧、および as <identifier>
を使用してイベントを保持する変数に名前を付けます。
デフォルトでは、リスナーは 1 回起動します。すべてのイベントに対して繰り返すには、イベント型の前に all
キーワードを使用します。
フィルター
フィルターを追加するには、リスナーの括弧の間に 1 つ以上のフィールドを指定します。最上位フィールドのみをフィルタリングできます。より複雑なフィルタリング、またはイベントのサブプロパティ (辞書など) でのフィルタリングには、「if」ステートメントを使用します。
標準のイベント型とチャネル
標準の Things Cloud イベントの場合、イベントを送受信するためのチャネルを含む定数があります。次に例を示します。
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;
次の表にリストされているイベントは、「com.apama.cumulocity」パッケージの一部です。
イベント | 送信チャンネル | 受信用チャンネル |
---|---|---|
Operation | Operation.SEND_CHANNEL | Operation.SUBSCRIBE_CHANNEL |
Measurement | Measurement.SEND_CHANNEL | Measurement.SUBSCRIBE_CHANNEL |
Event | Event.SEND_CHANNEL | Event.SUBSCRIBE_CHANNEL |
Alarm | Alarm.SEND_CHANNEL | Alarm.SUBSCRIBE_CHANNEL |
ManagedObject | ManagedObject.SEND_CHANNEL | ManagedObject.SUBSCRIBE_CHANNEL |
MeasurementFragment | MeasurementFragment.SEND_CHANNEL | MeasurementFragment.SUBSCRIBE_CHANNEL |
メジャーメントフラグメント
Measurement
および MeasurementFragment
イベントは常に公開されます。
EPL では、Measurement
イベントではなく MeasurementFragment
イベントの内容に一致するリスナーを生成できます。例えば:
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}
メジャーメントフラグメント もご覧ください。
作成通知と更新通知の区別
Things Cloud からの Alarm
、Event
、ManagedObject
、または Operation
イベントを監視する場合、作成操作と更新操作を区別することが必要な場合があります。これらのイベント型にはそれぞれ、この目的のために isCreate()
および isUpdate()
という名前のアクションがあります。
新しいアラームを監視する例:
on all Alarm() as alarm {
if alarm.isCreate() {
log "Alarm created: " + alarm.toString() at INFO;
}
// else it's an update
}
同様に、更新されたアラームの場合のみ:
on all Alarm() as alarm {
if alarm.isUpdate() {
log "Alarm updated: " + alarm.toString() at INFO;
}
// else it's a create
}
Things Cloud からのイベントの場合、isUpdate()
または isCreate()
のいずれかが常に true を返します。どちらのアクションも、選択肢と読みやすさを考慮して提供されています。
さまざまなタイプのオブジェクトの例などの詳細については、Apama ドキュメントの Receiving update notifications をご覧ください。
isCreate()
および isUpdate()
の詳細については、API Reference for EPL (ApamaDoc) もご覧ください。
例
この例では、「com.apama.cumulocity.MeasurementFragment」 API を使用して新しいメジャーメントを監視します。受信したメジャーメントをフィルタリングして、指定された最大速度を超える速度値を見つけ、制限を超えた場合にアラームを発します。
MeasurementFragment.SUBSCRIBE_CHANNEL
チャンネルに登録します。- 測定フラグメントを監視し、
type
(c8y_SpeedMeasurement
) でフィルターします。valueFragment
の値がc8y_speed
であること、およびvaluesSeries
がspeedX
のみでフィルターされていることを確認してください。また、SPEED_LIMIT
より大きい場合はvalue
でフィルタリングします。 - すべてのフィールドを指定するコンストラクターを使用してイベントを作成します。
- イベントを正しいチャネル
Alarm.SEND_CHANNEL
に送信します。
結果の *.mon ファイルは次のようになります。
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;
monitor TriggerAlarmForSpeedBreach {
constant float SPEED_LIMIT := 30.0;
action onload() {
monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
// Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
"Speed limit breached", "ACTIVE", "CRITICAL", 1,
new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
}
}
組み込みアクション
概要
Apama EPLでは「アクション」と呼ばれる機能を利用することができます。 すべてのモニターには少なくとも 1 つのアクション、つまり「onload」アクションがあります。 このセクションでは、すぐに使用できる組み込みのアクションについて説明します。
組み込みタイプのアクションについては、API Reference for EPL (ApamaDoc) もご覧ください。
Things Cloud データのクエリ
履歴データを操作するには、次の要求と応答のイベント ペアのいずれかを使用してリソースを検索します。
例: アラームを検索するには、適切なクエリ パラメーターを指定して com.apama.cumulocity.FindAlarm
リクエスト イベントを FindAlarm.SEND_CHANNEL
チャネルに送信します。
応答として、0 個以上の com.apama.cumulocity.FindAlarmResponse
イベントが期待できます (検索リクエストに一致するリソースの数に応じて)
FindAlarmResponse.SUBSCRIBE_CHANNEL
チャネルの com.apama.cumulocity.FindAlarmResponseAck
イベント。
管理対象オブジェクト、イベント、メジャーメント、および操作を検索するための同様の機能も提供されます。
次の表にリストされているイベントは、「com.apama.cumulocity」パッケージの一部です。
検索対象 | リクエスト/レスポンス イベント | 例 |
---|---|---|
ManagedObject | FindManagedObject FindManagedObjectResponse FindManagedObjectResponseAck |
例 |
Alarm | FindAlarm FindAlarmResponse FindAlarmResponseAck |
例 |
Event | FindEvent FindEventResponse FindEventResponseAck |
例 |
Measurement | FindMeasurement FindMeasurementResponse FindMeasurementResponseAck |
例 |
Operation | FindOperation FindOperationResponse FindOperationResponseAck |
例 |
CurrentUser | CurrentUser GetCurrentUser GetCurrentUserResponse |
例 |
TenantOption | TenantOption FindTenantOptions FindTenantOptionsResponse |
ドキュメント |
Things Cloud REST API の他の部分の呼び出し
Things Cloud REST API は、個々のイベント型ではカバーされていないいくつかの追加機能をカバーします。 REST API の他の部分を呼び出すために、Things Cloud API の任意の部分を呼び出すために使用できる汎用のリクエスト/レスポンス API が提供されています。
次の要求/応答イベントを使用できます。
- com.apama.cumulocity.GenericRequest
- com.apama.cumulocity.GenericResponse
- com.apama.cumulocity.GenericResponseComplete
詳細については Things Cloud OpenAPI仕様の RESTの実装 と Apamaドキュメントの Invoking other parts of the Cumulocity IoT REST API をご覧ください。
HTTP サービスの呼び出し
REST および JSON を使用して HTTP サービスと対話するには、次のいずれかのファクトリ メソッドを使用して com.softwareag.connectivity.httpclient.HttpTransport
インスタンスを作成します。
- HttpTransport.getOrCreate(string host, integer port) は HttpTransport を返します
- HttpTransport.getOrCreateWithConfiguration(string host, integer port,dictionary <string, string>configuration) は HttpTransport を返します (構成ディクショナリ内のキーは、接頭辞
CONFIG_
が付いた HttpTransport の定数です)
HttpTransport
オブジェクトで、create メソッドの 1 つを呼び出し、必要に応じてパスとペイロードを渡し、Request
オブジェクトを生成します。
Request
オブジェクトでは、必要に応じて Cookie、ヘッダー、またはクエリ パラメーターを設定し、execute(action<Response> callback)
でリクエストを呼び出すことができます。 コールバックのモニターにアクションの名前を指定すると、リクエストが完了した (またはタイムアウトした) ときにResponse
呼び出されます。
コールバックでは、Response
オブジェクトにstatusCode
とpayload
が指定されます。 ペイロードのフィールドには、それが提供される com.apama.util.AnyExtractor
オブジェクトを介してアクセスできます。以下の アクセス フラグメント に関する情報をご覧ください。
詳細については、EPL の API リファレンス (ApamaDoc) をご覧ください。
ユーティリティ関数
フラグメントにアクセスする
params
ディクショナリを介してほとんどのイベントのフラグメントにアクセスできます。 AnyExtractor
オブジェクトは、複数のサブフラグメントを含む任意のオブジェクトからデータを抽出して以下にアクセスできるように構築できます。
-
アクション getInteger(string path) は整数を返します
-
アクション getFloat(string path) は float を返します
-
アクション getString(string path) は文字列を返します
-
アクション getBoolean(string path) はブール値を返します
-
アクション getSequence(string path) はシーケンス<any>を返します。
-
アクション getDictionary(string path) は辞書<any, any>を返します。
JSON パスを使用してオブジェクト構造内を移動できます。 例えば:
string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");
"fragment"の例: “c8y_TemperatureMeasurement”.
"sub.fragment.object"の例: “c8y_TemperatureMeasurement.T.Unit”.
「any」値をキャストする
あるいは、キャストを使用して「any」を特定の型に変換します。
string s := <string> measurement.params["strfragment"];
オブジェクトの型が異なる場合、キャスト操作がスローされることに注意してください。
currentTime と TimeFormatter
読み取り専用変数 currentTime
を使用して、現在のサーバー時刻を取得できます。 Apama は、Unix エポック (UTC 1970 年 1 月 1 日) からの秒数を使用して時間を処理します。 TimeFormat
オブジェクトを使用すると、人間が判読できる形式に簡単に変換できます。
TimeFormat
オブジェクトは、日付と時刻の書式設定や解析に使用できます。
using com.apama.correlator.timeformat.TimeFormat;
monitor Example {
action onload {
log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
}
}
TimeFormat
とその関数の詳細については、Apama の TimeFormat イベント ライブラリの使用 やドキュメントと EPL の API リファレンス (ApamaDoc)をご覧ください。
inMaintenanceMode
Util.inMaintenanceMode()
関数は、デバイスが現在メンテナンスモードであるかどうかを簡単に確認する方法です。 管理対象オブジェクトをパラメーターとして受け取り、デバイスがメンテナンスモードの場合は true となるブール値を返します。
例:
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.Util;
monitor ExampleMonitor {
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
integer reqId := integer.getUnique();
send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
if not Util.inMaintenanceMode(d.managedObject) {
send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
}
}
}
}
}
プレースホルダーを置き換える
文字列を構築するには、次のように連結を使用できます。
string s:= "An event with the text " + evt.text + " has been created.";
テキストが長くなり、データから動的に設定される値が増える場合は、Util.replacePlaceholders()
関数を使用できます。
テキスト文字列では、イベントのフィールド名でプレースホルダーをマークし、それを「#{}」で囲みます。
replacePlaceholders
の 2 番目のパラメーターには、任意のイベント型を指定できます。
Utils::replacePlaceholders
は、イベントまたはイベントのパラメーターで指定されたフィールド名を検索して、テキストの代替を生成します。
タイプ #{X.Y}
のフィールド名を使用して、イベント内のネストされた構造にアクセスできます。
myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);
置換文字列が#{source.name}
のような形式である場合、source.name
は基礎となる管理対象オブジェクト/デバイスの名前です。
または #{source.c8y_hardware.notes}
ここで、 c8y_hardware
は管理対象オブジェクトのフラグメントです。
その場合、交換するには特別な処理が必要になります。
最初の置換後、プレースホルダー フィールド名を更新し、ソースの managedObject
を使用して Util::replacePlaceholders
を再度実行する必要があります。
myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);
高度な機能
カスタムフラグメント
Things Cloud API を使用すると、データを自由に構造化できます。 Apama EPL では、これは、タイプ dictionary<string, any>
のエントリを params
に追加することによって行われます。 com.apama.cumulocity
パッケージ内の各 Things Cloud イベント (Alarm
、Event
、Measurement
、または Operation
など) には params
フィールドがあります。 フラグメントまたはオプションのフィールドに変換されます。 したがって、イベントを受信するとき、コードは「params」フィールド内のエントリを検索する必要があります。 イベントを送信するときは、イベント型を定義することによって行うことも、dictionary<string, any>
タイプを使用することもできます。 イベントを受信するときの EPL タイプは dictionary<any, any>
です。 EPL は厳密に型指定されているため、フラグメントのないイベントを作成する場合は、new dictionary<string, any>
式が必要になることに注意してください。 辞書リテラルを使用してインラインでエントリを提供する場合、EPL は最初のキーと値のペアの型に基づいて型を決定します。つまり、dictionary<string, any>
の場合、<any>
キャスト演算子を使用して最初の値を any
型にキャストします。:
send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;
MeasurementValue
タイプは、Measurement
タイプのメジャーメントに対して提供されます。 MeasurementValue
には、value
フィールドと unit
フィールド、および他のフラグメントの params
があります。
例 1:
send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
"c8y_TemperatureMeasurement":{
"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
}},
new dictionary<string,any>) to Measurement.SEND_CHANNEL;
これにより、次の JSON 構造が生成されます。
{
"type": "c8y_TemperatureMeasurement",
"time": "...",
"source": {
"id": "12345"
},
"c8y_TemperatureMeasurement": {
"T1": {
"value": 1,
"unit": "C"
},
"T2": {
"value": 1,
"unit": "C"
},
"T3": {
"value": 1,
"unit": "C"
},
"T4": {
"value": 1,
"unit": "C"
},
"T5": {
"value": 1,
"unit": "C"
},
}
}
メジャーメントフラグメント
メジャーメントは、個々のメジャーメントフラグメントに分割できます。 これは、メジャーメント内に存在する各フラグメントおよび系列に対して実行できます。 メジャーメントフラグメントの詳細については、コンセプトガイドの Things Cloud のドメイン モデル をご覧ください。
メジャーメントフラグメントまたはシリーズに基づいたフィルタリングが必要な場合は、com.apama.cumulocity.MeasurementFragment
タイプのイベントを監視します。
com.apama.cumulocity.Measurement
イベントを監視してmeasurements
ディクショナリ内を調べる代わりに。
詳細については、Apama ドキュメントの メジャーメントフラグメントの使用 をご覧ください。
リスナー
受信したイベントによってステートメントをトリガーすることが唯一の可能性ではありません。 次のセクションでは、リスナーを組み合わせる他の方法について説明します。 詳細については、Apama ドキュメントの Defining Event Listeners をご覧ください。
フィルター
フィルターを使用すると、他のトリガーの組み合わせまたはシーケンスによってトリガーできます。例えば、次のようなトリガーがあるとします:
on all Event() as e { ... }
パターンにフィルターを追加することもできます。
on all Event(type = "c8y_EntranceEvent") as e { ... }
複数のイベントを監視できます。
on Event() as e and Alarm() as a { ... }
これは、イベントとアラーム イベントを受信するとトリガーされ、それぞれの最初のイベントがキャプチャされます。
シーケンスによってトリガーすることもできます。
on all (Event() as e -> Alarm() as a) { ... }
これは、「アラームに続くイベント」のペアごとにトリガーされます。 イベントを受信すると、それ以降のイベントの待機を停止し、代わりにアラームの待機を開始します。 アラームを受信すると、再びイベントの監視が開始されます。
タイマー
時間に基づいてリスナーをトリガーすることもできます。 特定の間隔でトリガーすることもできます。たとえば、5 分 (300 秒) ごとにトリガーすることもできます。
on all wait(300.0) { ... }
または、Unix の cron スケジューラと同様の機能を使用して、1 日の特定の時間にリスナーを起動させることもできます。
// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59
on all at(*, *, *, *, *) {} // trigger every minute
on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month
タイマーパターンを他のパターンと組み合わせることもできます。 たとえば、別のイベント後の一定時間内にイベントがあったかどうかを確認できます。
on Event() -> wait(600.0) and not Alarm() { ... }
これは、イベントが発生し、10 分 (600 秒) 以内にアラームが発生しない場合にトリガーされます。 イベントが発生した場合にリスナーを終了する「not」の使用に注意してください。
テナント オプションを使用して、on all at
タイマーに使用されるタイム ゾーンを設定できます。 テナント オプションを設定するには、microservice.runtime
カテゴリと timezone
キーを指定します。
例えば:
{
"category" : "microservice.runtime",
"key" : "timezone",
"value" : "Europe/Warsaw"
}
Apama ドキュメントの サポートされているタイム ゾーン。
ストリーム - ウィンドウ
ストリームを使用すると、イベントのウィンドウを操作できるようになります。 ストリームは、on
の代わりにfrom
キーワードを使用し、操作対象のウィンドウを定義し、集計を使用してそのウィンドウから必要な出力を選択します。 Windows は次の 2 つの方法で制限できます。
-
一定期間の Windows -
within
キーワードを使用します。from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements ["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
-
一定量のイベントがあるウィンドウ -
retain
キーワードを使用します。from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
ストリーム - 定期的に出力する
ストリームは、every
指定子を使用して評価の頻度を制御することもできます。
// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }
// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }
// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
組み込み集計関数 については、Apama ドキュメントをご覧ください 。
独自のイベント型の作成
事前定義されたイベント型だけでなく、独自のイベント型を定義することもできます。 これらは、同じモジュールの他の部分をトリガーする発生するイベントのパターンを検出するのに役立ちます。
event MyEvent {
Measurement m1;
Measurement m2;
}
...
on Measurement() as m1 -> Measurement() as m2 {
route MyEvent(m1, m2);
}
独自のアクションの作成
通常、次の例に示すように、アクション (Java の関数とよく似ています) を使用してモニターを構築します。
指定された重大度を上げる:
action upgradeSeverity(string old) returns string {
if old = "WARNING" { return "MINOR"; }
if old = "MINOR" { return "MAJOR"; }
if old = "MAJOR" { return "CRITICAL"; }
return old;
}
2 つの地理座標間の距離を計算します。
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
変数
モジュール内で変数を定義できます。
string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];
モニタースコープ変数を定義する場合(つまり、モニター内であるが、モニターのどのアクション内にも存在しない場合)、リスナーでのイベント共同割り当ての際、as
の代わりにコロン(:)を使用すると、リスナーで使用できます。 次の例では、10秒ごとに最新のイベントを記録します:
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {}
on all wait(10.0) {
log e.toString();
}
}
}
リスナーは開始時に、すべてのローカル変数のコピーを取得します。 したがって、以下の例では、間に他のイベントがあった場合でも、10 秒の遅延後に各イベントをログに記録します。
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {
on all wait(10.0) {
log e.toString();
}
}
}
}
モニター インスタンスとコンテキストの生成
単一のモニターで複数のデバイスを処理することは可能ですが (たとえば、ストリームでgroup by
とpartition by
を使用したり、他の状態のデバイス ID をキーとした辞書を維持したりするなど)、処理を分離すると便利なことがよくあります。 異なるデバイスを個別のモニター インスタンスに分割します。
新しいモニター インスタンスは、spawn
ステートメントを使用して作成できます。 これにより、モニターのモニター スコープ変数のコピーが取得され、新しいモニター インスタンスで指定されたアクションが実行されます。 新しいモニターにはリスナーはコピーされません。 新しいモニター インスタンスを生成するコンテキストを指定することもできます。異なるコンテキストは相互に同時に実行でき、異なるモニターを相互に分離するのにも役立ちます。 コンテキストを構築するときは、コンテキストを識別するための名前と、コンテキストがパブリックかどうかを制御するためのブール値を指定します。つまり、デフォルトで Things Cloud イベントを受け取ります (デフォルト チャネルに送信されます)。 )。
このパターンは、そのコンテキスト内の他のリスナーによって一致しないイベントを識別するために、unmatched キーワードとともによく使用されます。 各モニターに個別のコンテキストを使用することにより、一致しない動作の範囲がそのモニターに限定されます。 例えば:
monitor PerDeviceMeasurementTracker {
action onload() {
spawn factory to context("PerDeviceMeasurementTracker", true);
}
action factory() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all unmatched Measurement() as m {
spawn perDevice(m);
}
}
dictionary<string, Measurement> latestMeasurementByType; // measurements for this device
action perDevice(Measurement m) {
processMeasurement(m);
on all Measurement(source = m.source) as m {
processMeasurement(m);
}
}
action processMeasurement(Measurement m) {
latestMeasurementByType[m.type] := m;
}
}
ベストプラクティスとガイドライン
EPL モニター
症状: イベント処理ルールが自動的に無効になります
モニターがonload
またはリスナーから例外をスローし、その例外がキャッチされなかった場合、モニターは終了します。 例外をキャッチするか、例外が発生する理由を回避します。
同様に、モニターがイベントの処理を完了し、アクティブなリスナーが残っていない場合は、再度トリガーすることはできず、モニター自体が自動的に削除されます。
モニターごとの過剰なメモリー使用を回避します
イベント処理ルールがリスナーをリークしないようにしてください。 たとえば、リクエストとレスポンスの操作を行う場合、レスポンスが処理された後、またはタイムアウトが発生して応答がない場合に、リスナーがアクティブのままになっていないことを確認してください。
数値形式
Things Cloud のメジャーメントには float 型が使用されます。 タイムスタンプは float (1970 年 1 月 1 日 00:00 UTC からの秒数) として保存されることに注意してください。
チャンネルとコンテキストの購読
コンテキストは、Apama 内の並列処理ユニットです。 モニター インスタンスは、spawn...to
構文を使用して複数のコンテキストにデプロイできます。 チャネルをサブスクライブすると、コンテキスト内のすべてのモニター・インスタンスがそのサブスクリプションのイベントを受信します。 したがって、異なるサブスクリプションを異なるコンテキストに配置することをお勧めします。 コンテキストを使用すると、過負荷になっているアプリケーションの一部がアプリケーションの他の部分に影響を与えるのを防ぐことができます。
コンテキストはわかりやすい名前で作成され、コンテキスト オブジェクトの個々のインスタンスは、たとえ同じ名前であっても、異なるコンテキストに対応します。
例えば:
action onload() {
context subContext := context("Worker");
spawn worker() to subContext;
}
action worker() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
...
}
}
Things Cloud における Apama の制限事項
Things Cloud 環境内で Apama を使用する場合、Apama をスタンドアロンで使用する場合に利用できる機能に必然的にいくつかの制限がかかります。
Things Cloud 内の Apama にアセットをデプロイするにはさまざまな方法があり、制限はそれらのメカニズムによって異なります。
- EPL アプリ - Apama アセットをフルマネージド Apama コリレーターツールにデプロイするための最も簡単なメカニズム。アプリケーションのデプロイ をご覧ください。
あらゆる形式の Things Cloud 環境内にデプロイされる Apama ソリューションを設計する場合は、次の点を考慮してください。
EPL アプリを使用する場合の Apama の一般的な制限事項
-
スケーラビリティのために、コリレーターはホスト間を移動する可能性があるため、永続的なファイル システムにはアクセスできません。 すべてのマイクロサービス (プラットフォームによって提供されるかカスタム) はステートレスでなければならないという標準の Things Cloud 制約があります。
この影響を受ける Apama 機能は次のとおりです。- コリレーターの持続性
- MemoryStore の永続性
-
外部システムまたはプロセスへの非 HTTP/REST 接続は、ほとんどの場合非現実的です。 ただし、サービスがインターネット経由で利用可能な場合は、それを使用できます(たとえば、Apama 内の HTTP クライアントは、公的にアクセス可能な HTTP サーバーに接続できます)。
この影響を受ける Apama 機能は次のとおりです。- Apama データベース コネクタ (ADBC)
- Correlator に統合された Java Message Service (JMS) のサポート
- デジタル イベント サービス
- 分散メモリー ストア
- コリレーター間の接続
-
セキュリティーとユーザー アクセス制御の実装のため、Things Cloud はコリレーターポートを外部プロセスで使用できるようにしません。コンセプトガイドの アプリケーションの開発 > マイクロサービス をご覧ください。
次の機能はコリレーターポートへのアクセスを必要とするため、このアクセス制御と互換性がありません。- Engine_connect、engine_management、engine_send、engine_receive などのコマンド ライン ツール
- エンジン管理 API、イベント サービス API、シナリオ サービス API
- IAF アウトプロセスで実行されているアダプターに接続します
- ダッシュボード (Apama に同梱されています)
- Software AG Designer からのデバッグ。 代わりに、ローカルコリレーターで実行されているアプリケーションをデバッグ
- コリレーター REST インターフェース
-
起動時のアプリケーションのメモリー使用量とアプリケーションの起動時間の両方を削減するには、 自動的にアンロードするモニターを挿入する前に、および時間のかかるクエリを実行する前に、アプリケーションが完全に初期化されていることを確認します。
EPL アプリを使用する場合の Apama 固有の制限事項
-
使いやすさを考慮して、コリレーターの起動は Things Cloud によって制御されます。 したがって、構成ファイルまたはコマンド ライン オプションの変更が必要な機能にはアクセスできません。 この影響を受ける Apama 機能は次のとおりです。
- 持続性
- 接続プラグイン
- コマンドセントラルによる管理
-
セキュリティーのため、コリレーターが使用するファイル システムにはアクセスできません
この影響を受ける Apama 機能は次のとおりです- 入力ログにアクセスします
- カスタムプラグインの使用
- エンリッチメントのためのファイル システム アセットの使用
-
簡単にするために、独立した EPL インジェクションを行うことのみが可能です。 各モニターは独立して管理されるため、異なるモニター間に依存関係を作成することはできません
この影響を受ける Apama 機能は次のとおりです- *.mon ファイルには package ステートメントを含めることはできません (これを含めるとエラーになります)。
- 別々の *.mon ファイル間でイベント定義を共有することはできません。 ※Apamaクエリは使用できません。
-
Software AG Designer を使用したアプリケーションの開発 にリストされているバンドルのみを使用できます。
これらの制限はすべて、Things Cloud 内で EPL アプリがスムーズかつ安全に動作することを保証するために実装されています。
例題
メジャーメントの 1 時間ごとの平均を計算する
入力データが次のようになっていると仮定します。
{
"c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
"time": "...",
"source": {"id":"..."},
"type": "c8y_TemperatureMeasurement"
}
平均値を作成するには、モジュール内に次の部分が必要です。
-
デバイス (ソース) ごとにグループ化された 1 時間以上の時間枠
-
時間ごとの平均計算、ソースおよび単位を返す
select
(ウィンドウの内容に対して集計を使用する必要があるため、最後の単位を選択します。すべてのメジャーメントが同じ単位であると仮定します)。 これらを保持する「AverageByDevice」イベント定義に注目してください -
新しいメジャーメントとして作成されたすべての情報
例えば:
using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;
monitor HourlyAvgMeasurementDeviceContext {
event AverageByDevice {
string source;
float avgValue;
string unit;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
group by m.source select
AverageByDevice(m.source,
avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
{"c8y_AverageTemperatureMeasurement":
{
"T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
}
}, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
}
}
}
ビットメジャーメントからアラームを作成する
多くの場合、デバイスはアラーム ステータスをレジスタに保持しており、アラームの意味を解釈できません。 この例では、デバイスが測定においてレジスタ全体をバイナリ値として送信するだけであると仮定します。 ルールはビットを識別し、それぞれのアラームを作成する必要があります。
アラーム テキスト、各ビットのタイプと重大度、値を検索するアクションをマッピングする 3 つのディクショナリを作成します。 -1 を使用してデフォルト値を示し、<position> を置き換えます。 位置の文字列形式を使用します。
dictionary<integer, string> positionToAlarmType := {
0 : "c8y_HighTemperatureAlarm",
1 : "c8y_ProcessingAlarm",
2 : "c8y_DoorOpenAlarm",
3 : "c8y_SystemFailureAlarm",
-1 : "c8y_FaultRegister<position>Alaram"
};
dictionary<integer, string> positionToAlarmSeverity := {
0 : "MAJOR",
1 : "WARNING",
2 : "MINOR",
3 : "CRITICAL",
-1 : "MAJOR"
};
dictionary<integer, string> positionToAlarmText := {
0 : "The machine temperature reached a critical status",
1 : "There was an error trying to process data",
2 : "Door was opened",
3 : "There was a critical system failure",
-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};
action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
string template := lookup.getOr(bitPosition, lookup[-1]);
return template.replaceAll("<position>", bitPosition.toString());
}
バイナリメジャーメントを分析するには、それを文字列値として解釈し、各文字をループします。 getActiveBits()
関数はこれを実行し、メジャーメントが「1」であったビット位置のリストを返します。 次に、「for」ループを使用してそれを反復処理します。
action getBitPositions(string binaryAsText) returns sequence<integer> {
sequence<integer> bitsSet := new sequence<integer>;
integer i := 0;
while i < binaryAsText.length() {
string character := binaryAsText.substring(i, i+1);
if character = "1" {
bitsSet.append(binaryAsText.length() - i - 1);
}
i:=i+1;
}
return bitsSet;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement(type = "c8y_BinaryFaultRegister") as m {
string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
integer bitPosition;
for bitPosition in getBitPositions(faultRegister) {
Alarm alarm := new Alarm;
alarm.type := getText(bitPosition, positionToAlarmType);
alarm.severity := getText(bitPosition, positionToAlarmSeverity);
alarm.text := getText(bitPosition, positionToAlarmText);
alarm.source := m.source;
alarm.time := m.time;
alarm.status := "ACTIVE";
send alarm to Alarm.SEND_CHANNEL;
}
}
}
このようなメジャーメントを作成する
{
"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
"time": "...",
"source": {"id": "..."},
"type": "c8y_BinaryFaultRegister"
}
最後のステートメントを 3 回トリガーします。
- ビット位置 1 でのメジャーメント - c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”
- ビット位置 2 でのメジャーメント - c8y_DoorOpenAlarm, MINOR, “Door was opened”
- ビット位置 4 でのメジャーメント - c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”
したがって、3 つのアラームが作成されます。
消費量のメジャーメント
何かの現在の充填レベルを測定し、その値を定期的に Things Cloud に送信するセンサーがあると仮定すると、追加の消費値を簡単に作成できます。 2 つのメジャーメント間の絶対差を計算すると便利ですが、明確な結果が得られるのは、メジャーメントが常に同じ間隔で送信された場合のみです。 そこで、絶対差を時差に換算し、1時間当たりの消費量として計算します。
2 つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、デバイスの 2 つの隣接するメジャーメントと時間差を比較します。
using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;
monitor FillLevelMeasurements {
event FillLevel {
float firstValue;
float firstTime;
float lastValue;
float lastTime;
string source;
}
action calculateConsumption(FillLevel l) returns float {
if(l.firstTime = l.lastTime) {
return 0.0;
} else {
return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
}
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {
Measurement m := new Measurement;
m.type := "c8y_HourlyWaterConsumption";
m.time := currentTime;
m.source := fill.source;
MeasurementValue mv := new MeasurementValue;
mv.value := calculateConsumption(fill);
mv.unit := "l/h";
m.measurements[m.type] := {"consumption":mv};
send m to Measurement.SEND_CHANNEL;
}
}
}
その他のサンプルアプリケーション
ストリーミング分析アプリケーションの EPL エディターには、Apama EPL の使用方法 (たとえば、Things Cloud オブジェクトのクエリやアラームの作成など) を示すいくつかのサンプルアプリケーションが用意されています。 これらのサンプルを使用して独自のアプリケーションを構築できます。
ケーススタディ:円形ジオフェンスアラーム
概要
このセクションでは、より複雑なルールを作成する方法の詳細な例を示します。 このガイドの他のセクションで前に説明した機能の複数を使用します。
Apama EPL を使い始めたばかりの場合は、サンプル をご覧ください。
前提条件
目標
位置イベントを継続的に送信している追跡デバイスがジオフェンスの外に移動した場合に自動的にアラームを生成するようにしたいと考えています。 このジオフェンスは円形になり、デバイスごとに個別に構成できる必要があります。 アラームは、デバイスがジオフェンスの外に移動した瞬間に作成されます。 屋外に移動している間は、最初のアラームがアクティブのままであるため、新しいアラームは作成されません。 デバイスがジオフェンス内に戻るとすぐに、アラームは解除されます。
Things Cloud データ モデル
ロケーションイベントの構造 (必要な部分):
{
"id": "...",
"source": {"id": "..."},
"text": "...",
"time": "...",
"type": "...",
"c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}
ジオフェンス構成をデバイスに保存します (半径はメートル単位で構成されます)。
{
"c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}
さらに、構成を完全に削除せずに、各デバイスのジオフェンス アラームを有効/無効にしたいと考えています。 これを行うには、デバイスの c8y_SupportedOperations
に「c8y_Geofence」を追加または削除します。
{
"c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}
計算
現在の位置と中心の間の距離がジオフェンスの構成された半径よりも大きい場合、デバイスはジオフェンスの外側になります。 必要なのは、2 地理座標間の差を計算できる関数です。
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
上記のアクションは距離をメートル単位で返します。
ステップ 1: 入力のフィルタリング
このモジュールの主な入力はイベントです。 一致しないイベントをできるだけ早く破棄するために、リスナーの最初のチェックとしてこれを実行します。
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// we have an event
}
}
ステップ 2: 必要なデータの収集
次のステップでは、計算のためにジオフェンスの構成が必要になり、それを取得します。
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
}
ステップ 3: デバイスが c8y_Geofence をサポートしているかどうかを確認する
デバイスが利用可能になったので、デバイスにジオフェンスが構成されているかどうか、およびアクティブ化されているかどうか (「supportedOperations」に「c8y_Geofence」が含まれている) を確認します。 c8y_SupportedOperations
配列をチェックするには、indexOf()
関数を使用できます。 この関数はすべての要素をループし、そのエントリのインデックスを返します。値が存在しない場合は負の数を返します。 設定では、デバイスにフラグメント「c8y_Geofence」が含まれているかどうかを確認するだけです。
イベントとデバイスを取得したら、イベントの c8y_Position
とデバイスの c8y_Geofence
からデータを抽出します。 これらのオブジェクトは、params
内の dictionary<any, any>
エントリにマッピングされます。 params
はタイプ any
の値を保持するため、dictionary<any, any>
にキャストする必要があります。
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
}
ステップ 4: トリガーの作成
前述したように、現在のデバイスの位置とジオフェンスの中心の間の距離が構成されたジオフェンスの半径よりも大きい場合、デバイスはフェンスの外側にあります。 アラームをトリガーするには 2 つのイベントが必要です。これにより、これら 2 つのイベント内でデバイスがジオフェンスに入ったか、ジオフェンスから出たかを確認できるようになります。
最初のステップでは、前述の関数を使用して距離を計算します。
float d := distance(centerLat, centerLng, eventLat, eventLng);
次に、これをイベントとして再ルーティングします。
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
...
route LocationEventWithDistance(e.source, d, e, maxDistance);
ソースをイベント内に配置して、リスナー内で簡単に照合できるようにします。
次に、イベント LocationEventWithDistance
によってトリガーされるリスナーを設定し、同じソースの次の LocationEventWithDistance
を監視します。
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
}
}
この LocationEventWithDistance
イベントのペアには、アラームを作成する必要があるかどうかをチェックするためのすべてのデータが保持されます。 最初のイベントと同じソースのものになるように secondPos
イベントをフィルタリングしていることに注意してください。イベントを受信したすべてのデバイスにアクティブなリスナーが存在します。
ステップ 5: アラームの作成
アラームを作成するには、最初のイベントの距離が半径より小さく、2 番目のイベントの距離が半径より大きい 2 つのイベントが必要です。 これは、デバイスがジオフェンスから出たばかりであることを意味します。
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
ステップ 6: アラームのクリア
アラームをクリアするには、下部で条件を切り替え、さらに現在アクティブなアラームを取得してその ID を取得する必要があります。 この時点では、既存のアラームがあるかどうかを気にする必要はありません。 何もない場合、リスナーはand not FindAlarmResponseAck
をトリガーし、リスナーを終了します。
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
すべてを統合する
すべてのパーツを 1 つのモジュールに結合できるようになりました。 リスナーの順序は関係ありません。
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
monitor MonitorDevicesForCircularGeofence {
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
action onload {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// we have an event
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
float d := distance(centerLat, centerLng, eventLat, eventLng);
route LocationEventWithDistance(e.source, d, e, maxDistance);
}
}
}
}
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
}
}
}
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
}