基本機能

アプリケーションの開発

EPL アプリはモニター (*.mon) ファイルです。EPL アプリは 2 つの異なる方法で開発できます。

コンセプトガイドApama イベント処理言語 (EPL) の使用 もご覧ください。

備考
ストリーミング分析アプリケーションを使用して EPL アプリを開発およびデプロイしたり、モニターファイルを Software AG Designer から Things Cloud にインポートできるようにするには、 テナントは、EPL アプリをサポートする Apama-ctrl マイクロサービスをサブスクライブする必要があります。 ストリーミング分析アプリケーションに EPL アプリ ページが表示されず、EPL アプリを使用したい場合は、製品サポート にお問い合わせください。
注意
EPL アプリには、インベントリ、アラーム、その他多くの種類のオブジェクトであっても、テナント内のオブジェクトにほぼ任意の変更を加える機能があります。「CEP 管理」の 管理者権限を持つユーザーは、EPL アプリを作成およびアクティブ化できるため、現在のテナントをほぼ完全に制御することもできます。したがって、テナント上のどのユーザーがこの権限を持っているかに注意する必要があります。

ストリーミング分析アプリケーションを使用したアプリケーションの開発

ストリーミング分析アプリケーションの EPL アプリ ページには、新規または既存の EPL アプリ (*.mon ファイル) を対話的に編集したり、EPL アプリをインポートしてアクティブ化 (デプロイ) したりするためのインターフェースが提供されます。

EPL アプリ ページの使用を希望するテナント上のユーザーは、CEP 管理者である必要があります。ユーザーガイド管理 > 権限の管理をご覧ください。

ステップ 1 - ストリーミング分析アプリケーションを呼び出す

アプリケーション スイッチャーを開き、ストリーミング分析 アプリケーションのアイコンをクリックします。次に、EPL アプリ ページに移動します。

EPL アプリ ページに移動すると、最初に EPL アプリ マネージャーが表示され、既存の EPL アプリがリストされます。各アプリケーションはカードとして表示されます。ここから新しい EPL アプリを追加したり、既存の EPL アプリを管理したりできます。

EPL アプリ

アプリケーションに対して表示される各カードの上部には、アプリケーションを編集、エクスポート、または削除できるアクション メニューがあります。

このページからは、次のことができます。

ステップ 2 - EPL アプリを作成する

上部のメニュー バーで 新しい EPL アプリ をクリックします。アプリケーションに一意の名前を付けます。新しいアプリケーション用に作成されるカードに表示される説明を入力することもできます。

次に EPL エディタを示します。新しいアプリケーションの EPL コードには、Things Cloud を操作するために必要な典型的な基本的なイベント定義とユーティリティがすでに含まれています。アプリケーションの必要に応じてそれらを調整できます。詳細については、ドキュメントとサンプルをご覧ください。

EPL エディター

開始しやすいように、いくつかのサンプルが用意されています。これらを表示するには、エディターの右側に表示される サンプル をクリックします。サンプルをクリックすると、その内容のプレビューが表示されます。標準のキーの組み合わせ Ctrl+C および Ctrl+V を使用して、サンプル コードの一部を選択し、それを独自のコードにコピーできます。コマンド ボタンを使用して、コード全体をクリップボードにコピーして独自のコードの適切な位置に挿入したり、既存のコードをすべてサンプル コードに置き換えたりすることもできます。

上部のメニュー バーのボタンを使用すると、現在のセッションでの最後の変更を元に戻したりやり直したり、変更を保存したりできます。

EPL エディターでモードを 非アクティブ から アクティブ (またはその逆) に変更することもできます。繰り返しますが、EPL コードにエラーがある場合、アプリケーションをアクティブ化することはできません。コード内でエラーが強調表示されます。

備考
EPL エディターは標準の Web コンポーネントを使用することに注意してください。これは、多くの一般的な開発者機能を提供しますが、その中には、Quick Fix や Show Hover など、EPL に関連しないものも含まれますが、これらに限定されません。

上部のメニュー バーの X をクリックして EPL エディターを終了し、EPL アプリのリストに戻ります。

注意
別の URL に移動するか、ブラウザ ウィンドウを閉じると、保存されていない変更はすべて失われます。
ステップ 3 - EPL アプリをテストする

アプリケーションがアクティブ化されると、実行結果が表示されます。これには、メジャーメントの送信、データの受信、アラームの作成、Apama-ctrl マイクロサービスへのログインが含まれる場合があります。Apama-ctrl マイクロサービスのログ ファイルを確認する方法については、ユーザー ガイド管理 > マイクロサービスの管理と監視 をご覧ください。

アプリケーションのデプロイ もご覧ください。

Software AG Designer を使用したアプリケーションの開発

Software AG Designer は完全な開発環境を提供し、複雑な EPL アプリケーションがある場合に最適なツールです。EPL アプリ (モニターファイル) の準備ができたら、それを Things Cloud にインポートする必要があります。

ステップ 1 - Apama をインストールする

Apama のライセンスを取得している場合は、Software AG Installer を使用してインストールします。

機能が制限され、いくつかの制限が設けられたフリーミアム バージョンの Apama を使用することもできます。これを使用する場合は、https://www.apamacommunity.com/ から Apama Community Edition をダウンロードしてインストールします。

ライセンス版とフリーミアム版の両方のバージョンに Software AG Designer が含まれています。

ステップ 2 - プロジェクトの作成

インストールしたら、Software AG Designer で Apama プロジェクトを作成し、Things Cloud 接続を有効にします。Apama プロジェクトの作成方法については、Apama ドキュメントの Apama プロジェクトの作成 をご覧ください。

ステップ 3 - Apama バンドルをプロジェクトに追加する

新しく作成した Apama プロジェクトに次の Apama バンドルを追加します。これらは、アプリケーションをアクティブ化するために Things Cloud で必要となります。プロジェクトにバンドルを追加する方法については、Apama ドキュメンテーションの プロジェクトへのバンドルの追加 をご覧ください。

上記のバンドルは EPL アプリで許可される唯一のバンドルであるため、他のバンドルを追加しないように注意してください。追加しないと、Things Cloud でアクティブ化されたときにアプリケーションが機能しなくなる可能性があります。

ステップ 4 - モニターファイルの作成

新しい Apama モニターファイルを作成するには、Apama ドキュメントの EPL アプリケーション用の新しいモニターファイルの作成 をご覧ください。

新しく作成したモニターファイルを EPL アプリとして Things Cloud にインポートし、そこでアクティブ化する前に、モニターファイルが Software AG Designer 内から期待どおりに動作するかどうかをテストすることをお勧めします。

詳細については、ApamaドキュメントのThings IoT Transport Connectivity Plug-inをご覧ください。

ステップ 5 - モニターファイルを実行してテストする

プロジェクトをローカルで実行する場合は、プロジェクト構成で Things Cloud の認証情報を指定する必要があります。Things Cloud クライアントの CumulocityIoT.properties ファイルで資格情報を構成します。

例:

CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.je1.thingscloud.ntt.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
備考
CUMULOCITY_APPKEY の値を取得するには、Things Cloud で アプリケーションを作成 する必要があります。

上記の説明は、URL がテナントを識別するテナントに接続していることを前提としていることに注意してください。これが当てはまらない場合 (たとえば、IP アドレスで接続している場合)、CumulocityIoT.properties ファイルでこれを設定する必要がある場合があります。

CUMULOCITY_TENANT=my_custom_tenant

プロジェクトをマルチテナント環境でローカルに実行する必要がある場合は、マルチテナントのサポートを有効にし、使用するマルチテナント マイクロサービスの名前を指定します。 Things Cloud クライアントの CumulocityIoT.properties ファイルで次のプロパティを構成します。

# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true
 
# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms

さらに、モニターファイルがマルチテナント マイクロサービスで動作できることを確認してください。 詳細については、Apama ドキュメントの マルチテナントデプロイの操作 をご覧ください。

これで、Software AG Designer で EPL のテストに進むことができます。

EPL アプリの準備ができたら、アプリケーションのデプロイ をご覧、それを Things Cloud にデプロイする方法を確認してください。

アプリケーションのデプロイ

以下を Things Cloud にデプロイできます。

備考
ストリーミング分析アプリケーションでは、アプリケーションのデプロイに「アクティブ化」という用語が使用されます。

EPL アプリを ストリーミング分析アプリケーションを使用して単一の *.mon ファイルとしてデプロイする

EPL アプリ (つまり、*.mon ファイル) が Things Cloud でアクティブ化されると、*.mon ファイルには一意のパッケージ名が割り当てられます。これにより、複数のモジュールがアクティブ化された場合の競合が防止されます。このため、*.mon ファイルに package ステートメントを指定しないでください。アプリケーションの異なる部分間でイベントを共有する必要がある場合は、イベント定義とそれを使用するモニターを単一の *.mon ファイルに記述します。

EPL アプリで使用できるユーティリティと基本イベントのセットは制限されています。執筆時点では、これらには Time Format バンドルと HTTP Client > JSON with generic request/response event definitions バンドルが含まれています。

EPL アプリがランタイム エラーを通知すると、アラームとして発生します。ランタイム エラーには、キャッチされなかった例外のほか、EPL アプリが実行する必要がある警告やエラーの明示的なログが含まれます。一般的にApamaランタイムに関連する動作や状態の問題もアラームとして発生します。

Apama ランタイムとアクティブな EPL アプリの詳細な診断については、Apama-ctrl マイクロサービスのログを確認できます。ログ ファイルの詳細については、ユーザー ガイド管理 > マイクロサービスの管理と監視 をご覧ください。ただし、Apama ログ ファイルを最大限に活用するには、Apama についてある程度の知識が必要です。

アプリケーションのテスト

GitHub の Apama EPL アプリ ツールを使用して、EPL アプリのアップロードをスクリプト化し、CI/CD (継続的インテグレーションおよび継続的デリバリー) ユースケースに合わせて管理できます。このツールは、PySys テスト フレームワークへの拡張機能も提供し、EPL アプリのテストを簡単に作成して自動的に実行できるようにします。

Apama EPL Apps Tools は、https://github.com/SoftwareAG/apama-eplapps-tools から入手できます

PySys の詳細については、Apama ドキュメントからアクセスできる Python の API リファレンス をご覧ください。

サポートされている REST サービス

EPL アプリは、REST (Representational State Transfer) サービスを監視するように設計されており、すべての GET、POST、PUT、および DELETE 操作をサポートします。さまざまな操作のリクエストの例を以下に示します。

これらの操作を実行するには、「CEP 管理」に対する読み取りおよび管理者権限が必要です (ユーザー ガイド管理 > 権限の管理 もご覧ください)。

備考
この API には、Apama-ctrl マイクロサービスのバージョン 10.6.0 以降が必要です。

すべての操作のリクエスト ヘッダー

各リクエストは Things Cloud に対して認証される必要があります。

名前 説明
Accept “application/json” (これは必須パラメーターです)

一般的なレスポンスコード

すべてのリクエストに対して、次の一般的なエラーレスポンスコードが予想されます。

コード 説明
401 リクエストが認可されていません。
403 リクエストが禁止されています。

特定のリクエストから予想されるその他のレスポンスコードを以下に示します。

一般的なフィールドの説明

操作に応じて、応答では次の共通フィールドを使用できます。

フィールド 説明
contents EPL ファイルの完全な内容
description ファイルの説明
eplPackageName EPL ファイルのパッケージ名。名前に特殊文字 (スペースを含む) が含まれている場合、これらの文字は有効な EPL 識別子にするためにエスケープされ、注入エラーが回避されます
errors ファイル内のすべてのコンパイル エラー (存在する場合) と行番号およびテキストのリスト
ID ファイルの一意の識別子
name EPLに付けられた名前
state EPL がコリレーターに挿入され、実行されているかどうか。これはactiveまたはinactiveのいずれかになります
warnings ファイル内のすべてのコンパイル警告のリスト (存在する場合) と行番号およびテキスト

GET - 利用可能なすべての EPL ファイルを取得します

エンドポイント: /service/cep/eplfiles

リクエストの例

GET /service/cep/eplfiles

レスポンス
コード 説明
200 操作は成功しました。以下の値の例もご覧ください。
400 不正なリクエストです。ヘッダーの内容に予期しない値が含まれています。

レスポンスコード 200 の値の例:

{
  "eplfiles":[
    {
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[

      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[

      ]
    }
  ]
}

GET - 利用可能なすべての EPL ファイルをその内容とともに取得します

エンドポイント: /service/cep/eplfiles

リクエストパラメーター
名前 説明
contents ブール型。EPL ファイルをその内容とともにフェッチします。これはオプションのクエリパラメーターです。
リクエストの例

GET /service/cep/eplfiles?contents=true

レスポンス
コード 説明
200 正常に動作しました。以下の値の例もご覧ください。

レスポンスコード 200 の値の例:

{
  "eplfiles":[
    {
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
    }
  ]
}

GET - 識別子によって EPL ファイルを取得します

エンドポイント: /service/cep/eplfiles/{id}

リクエストパラメーター
名前 説明
ID フェッチする EPL ファイルの識別子。これは必須パラメーターです。
リクエストの例

GET /service/cep/eplfiles/{{id}}

レスポンス
コード 説明
200 操作は成功しました。以下の値の例もご覧ください。
404 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。

レスポンスコード 200 の値の例:

{
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
}

POST - 新しい EPL アプリケーションを作成する

エンドポイント: /service/cep/eplfiles

リクエストの例

POST /service/cep/eplfiles

以下はリクエスト本文の例です。

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

次の点に注意してください。

レスポンス
コード 説明
201 正常に作成されました / ファイルにエラーが発生して作成されました / ファイルに警告が発生して作成されました。以下の例もご覧ください。
405 入力に誤りがあります。

正常に作成された場合のレスポンスコード 201 の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[

  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[

  ]
}

警告またはエラーが発生した場合のレスポンスコード 201 の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

PUT - 識別子によって EPL ファイルを更新します

エンドポイント: /service/cep/eplfiles/{id}

リクエストパラメーター
名前 説明
ID 更新する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須パラメーターです。
リクエストの例

PUT /service/cep/eplfiles/{id}

以下はリクエスト本文の例です。

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

POST リクエストについての情報もご覧ください。

レスポンス
コード 説明
200 正常に更新されました。以下の値の例もご覧ください。
404 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。

エラーなしで正常に更新された場合のレスポンスコード 200 の値の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[

  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[

  ]
}

エラーまたは警告で更新された場合のレスポンスコード 200 の値の例:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

DELETE - 識別子によって EPL ファイルを削除します

エンドポイント: /service/cep/eplfiles/{id}

リクエストパラメーター
名前 説明
ID 削除する EPL ファイルの識別子。識別子はパスに含める必要があります。これは必須パラメーターです。
リクエストの例

DELETE /service/cep/eplfiles/{{id}}

レスポンス
コード 説明
200 正常に削除されました。
404 識別子のファイルが見つかりません。このセクションの最後にある このレスポンスコードの値の例 もご覧ください。

レスポンスコード 404 の値の例

レスポンスコード 404 は、特定の識別子を持つファイルが見つからなかったことを示します。

{
  "error":"Not Found",
  "exception":"com.apama.in_c8y.FileNotFoundException",
  "message":"File with id 39613 not found",
  "path":"/eplfiles/39613",
  "status":404,
  "timestamp":"2020-01-17T12:21:42.457+0000"
}

イベントとチャンネル

Apama EPL では、残りの Things Cloud エコシステムとのやり取りはイベントを通じて行われます。Things Cloud データにアクセスするために、多数のイベント定義が提供されています。

備考
Apama と Things Cloud は異なる「イベント」概念を使用します。Apama イベントは、デバイスのメジャーメント、アラーム、(Things Cloud) イベントの監視と作成など、Things Cloud とのすべてのやり取りに使用されます。Apama イベントの詳細については、Apama ドキュメントの イベント型の定義 をご覧ください。Things Cloud イベントの詳細については、Things Cloud OpenAPI仕様のEvents をご覧ください。

定義済みのイベント型

いくつかの Things Cloud API と対話するための事前定義されたイベント型がいくつかあります。新しい測定、アラーム、またはイベントが作成されると、イベントは自動的に Apama アプリケーションに送信されます。Things Cloud バックエンドと対話するために、イベントを作成して関連するチャネルに送信できます。Things Cloud は、データベース クエリを自動的に実行するか、メールや SMS などの送信に必要な API 呼び出しを作成します。

EPL の API リファレンス (ApamaDoc) の データ モデル にて、各ストリームのイベントがどのように構造化されているかを確認してください。

イベントをチャネルに送信する

イベントの送信は、new <type>に続いてフィールドへの代入を使用するか、すべてのフィールドを指定するコンストラクターを使用してイベントを構築することによって行われます。次に、send ステートメントを使用してイベントを Things Cloud に送信します。send ステートメントにはチャネルが必要です。これはイベント型の SEND_CHANNEL 定数です。

イベントをリッスンする

チャネル上のイベントを監視することで EPL をトリガーできます。monitor.subscribe("string name") メソッドを使用してチャンネルに登録できます。これは、モニターの起動時に実行することも、時々イベントを受信する必要がある場合のみ、必要に応じて呼び出され、続いて monitor.unsubscribe("string name") を実行することもできます。

on ステートメントを使用してイベントを監視し、その後に監視しているイベント型、開閉括弧、および as <identifier> を使用してイベントを保持する変数に名前を付けます。

デフォルトでは、リスナーは 1 回起動します。すべてのイベントに対して繰り返すには、イベント型の前に all キーワードを使用します。

フィルター

フィルターを追加するには、リスナーの括弧の間に 1 つ以上のフィールドを指定します。最上位フィールドのみをフィルタリングできます。より複雑なフィルタリング、またはイベントのサブプロパティ (辞書など) でのフィルタリングには、「if」ステートメントを使用します。

標準のイベント型とチャネル

標準の Things Cloud イベントの場合、イベントを送受信するためのチャネルを含む定数があります。次に例を示します。

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;

次の表にリストされているイベントは、「com.apama.cumulocity」パッケージの一部です。

イベント 送信チャンネル 受信用チャンネル
Operation Operation.SEND_CHANNEL Operation.SUBSCRIBE_CHANNEL
Measurement Measurement.SEND_CHANNEL Measurement.SUBSCRIBE_CHANNEL
Event Event.SEND_CHANNEL Event.SUBSCRIBE_CHANNEL
Alarm Alarm.SEND_CHANNEL Alarm.SUBSCRIBE_CHANNEL
ManagedObject ManagedObject.SEND_CHANNEL ManagedObject.SUBSCRIBE_CHANNEL
MeasurementFragment MeasurementFragment.SEND_CHANNEL MeasurementFragment.SUBSCRIBE_CHANNEL

メジャーメントフラグメント

Measurement および MeasurementFragment イベントは常に公開されます。

EPL では、Measurementイベントではなく MeasurementFragment イベントの内容に一致するリスナーを生成できます。例えば:

on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}

メジャーメントフラグメント もご覧ください。

作成通知と更新通知の区別

Things Cloud からの AlarmEventManagedObject、または Operation イベントを監視する場合、作成操作と更新操作を区別することが必要な場合があります。これらのイベント型にはそれぞれ、この目的のために isCreate() および isUpdate() という名前のアクションがあります。

新しいアラームを監視する例:

on all Alarm() as alarm {
    if alarm.isCreate() {
        log "Alarm created: " + alarm.toString() at INFO;
    }
    // else it's an update
}

同様に、更新されたアラームの場合のみ:

on all Alarm() as alarm {
    if alarm.isUpdate() {
        log "Alarm updated: " + alarm.toString() at INFO;
    }
    // else it's a create
}

Things Cloud からのイベントの場合、isUpdate() または isCreate() のいずれかが常に true を返します。どちらのアクションも、選択肢と読みやすさを考慮して提供されています。

さまざまなタイプのオブジェクトの例などの詳細については、Apama ドキュメントの Receiving update notifications をご覧ください。

isCreate() および isUpdate() の詳細については、API Reference for EPL (ApamaDoc) もご覧ください。

この例では、「com.apama.cumulocity.MeasurementFragment」 API を使用して新しいメジャーメントを監視します。受信したメジャーメントをフィルタリングして、指定された最大速度を超える速度値を見つけ、制限を超えた場合にアラームを発します。

  1. MeasurementFragment.SUBSCRIBE_CHANNELチャンネルに登録します。
  2. 測定フラグメントを監視し、type (c8y_SpeedMeasurement) でフィルターします。valueFragment の値が c8y_speed であること、および valuesSeriesspeedX のみでフィルターされていることを確認してください。また、SPEED_LIMITより大きい場合はvalueでフィルタリングします。
  3. すべてのフィールドを指定するコンストラクターを使用してイベントを作成します。
  4. イベントを正しいチャネルAlarm.SEND_CHANNELに送信します。

結果の *.mon ファイルは次のようになります。

using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;

monitor TriggerAlarmForSpeedBreach {
    constant float SPEED_LIMIT := 30.0;
    action onload() {
        monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
        // Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
        on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
            send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
                        "Speed limit breached", "ACTIVE", "CRITICAL", 1,
                        new dictionary<string,any>) to Alarm.SEND_CHANNEL;
        }
    }
}

組み込みアクション

概要

Apama EPLでは「アクション」と呼ばれる機能を利用することができます。 すべてのモニターには少なくとも 1 つのアクション、つまり「onload」アクションがあります。 このセクションでは、すぐに使用できる組み込みのアクションについて説明します。

組み込みタイプのアクションについては、API Reference for EPL (ApamaDoc) もご覧ください。

Things Cloud データのクエリ

履歴データを操作するには、次の要求と応答のイベント ペアのいずれかを使用してリソースを検索します。

例: アラームを検索するには、適切なクエリ パラメーターを指定して com.apama.cumulocity.FindAlarm リクエスト イベントを FindAlarm.SEND_CHANNEL チャネルに送信します。 応答として、0 個以上の com.apama.cumulocity.FindAlarmResponse イベントが期待できます (検索リクエストに一致するリソースの数に応じて) FindAlarmResponse.SUBSCRIBE_CHANNEL チャネルの com.apama.cumulocity.FindAlarmResponseAck イベント。 管理対象オブジェクト、イベント、メジャーメント、および操作を検索するための同様の機能も提供されます。

次の表にリストされているイベントは、「com.apama.cumulocity」パッケージの一部です。

検索対象 リクエスト/レスポンス イベント
ManagedObject FindManagedObject
FindManagedObjectResponse
FindManagedObjectResponseAck
Alarm FindAlarm
FindAlarmResponse
FindAlarmResponseAck
Event FindEvent
FindEventResponse
FindEventResponseAck
Measurement FindMeasurement
FindMeasurementResponse
FindMeasurementResponseAck
Operation FindOperation
FindOperationResponse
FindOperationResponseAck
CurrentUser CurrentUser
GetCurrentUser
GetCurrentUserResponse
TenantOption TenantOption
FindTenantOptions
FindTenantOptionsResponse
ドキュメント

Things Cloud REST API の他の部分の呼び出し

Things Cloud REST API は、個々のイベント型ではカバーされていないいくつかの追加機能をカバーします。 REST API の他の部分を呼び出すために、Things Cloud API の任意の部分を呼び出すために使用できる汎用のリクエスト/レスポンス API が提供されています。

次の要求/応答イベントを使用できます。

備考
Apama-ctrl マイクロサービス、つまりその中のすべての EPL アプリ コードは、EPL がインベントリ内のすべてのオブジェクトにアクセスし、ユーザーの詳細を読み取ることを許可する多数の権限で実行されます。 これには、ユーザー名、電子メール アドレスなどの個人を特定できる情報が含まれます。

詳細については Things Cloud OpenAPI仕様の RESTの実装 と Apamaドキュメントの Invoking other parts of the Cumulocity IoT REST API をご覧ください。

HTTP サービスの呼び出し

備考
以下の情報は、外部 HTTP サービスとの対話に関するものです。 Things Cloud REST API の他の部分にリクエストを行う場合、 Things Cloud REST API の他の部分の呼び出し をご覧ください。

REST および JSON を使用して HTTP サービスと対話するには、次のいずれかのファクトリ メソッドを使用して com.softwareag.connectivity.httpclient.HttpTransport インスタンスを作成します。

HttpTransport オブジェクトで、create メソッドの 1 つを呼び出し、必要に応じてパスとペイロードを渡し、Request オブジェクトを生成します。

Requestオブジェクトでは、必要に応じて Cookie、ヘッダー、またはクエリ パラメーターを設定し、execute(action<Response> callback)でリクエストを呼び出すことができます。 コールバックのモニターにアクションの名前を指定すると、リクエストが完了した (またはタイムアウトした) ときにResponse呼び出されます。

コールバックでは、ResponseオブジェクトにstatusCodepayloadが指定されます。 ペイロードのフィールドには、それが提供される com.apama.util.AnyExtractor オブジェクトを介してアクセスできます。以下の アクセス フラグメント に関する情報をご覧ください。

詳細については、EPL の API リファレンス (ApamaDoc) をご覧ください。

ユーティリティ関数

フラグメントにアクセスする

params ディクショナリを介してほとんどのイベントのフラグメントにアクセスできます。 AnyExtractor オブジェクトは、複数のサブフラグメントを含む任意のオブジェクトからデータを抽出して以下にアクセスできるように構築できます。

JSON パスを使用してオブジェクト構造内を移動できます。 例えば:

string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");

"fragment"の例: “c8y_TemperatureMeasurement”.
"sub.fragment.object"の例: “c8y_TemperatureMeasurement.T.Unit”.

「any」値をキャストする

あるいは、キャストを使用して「any」を特定の型に変換します。

string s := <string> measurement.params["strfragment"];

オブジェクトの型が異なる場合、キャスト操作がスローされることに注意してください。

currentTime と TimeFormatter

読み取り専用変数 currentTime を使用して、現在のサーバー時刻を取得できます。 Apama は、Unix エポック (UTC 1970 年 1 月 1 日) からの秒数を使用して時間を処理します。 TimeFormat オブジェクトを使用すると、人間が判読できる形式に簡単に変換できます。 TimeFormat オブジェクトは、日付と時刻の書式設定や解析に使用できます。

using com.apama.correlator.timeformat.TimeFormat;

monitor Example {
    action onload {
        log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
    }
}

TimeFormatとその関数の詳細については、Apama の TimeFormat イベント ライブラリの使用 やドキュメントと EPL の API リファレンス (ApamaDoc)をご覧ください。

inMaintenanceMode

Util.inMaintenanceMode()関数は、デバイスが現在メンテナンスモードであるかどうかを簡単に確認する方法です。 管理対象オブジェクトをパラメーターとして受け取り、デバイスがメンテナンスモードの場合は true となるブール値を返します。

例:

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;

using com.apama.cumulocity.Util;

monitor ExampleMonitor {
  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
    monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
    on all Measurement() as m {
      integer reqId := integer.getUnique();
      send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
      on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
        if not Util.inMaintenanceMode(d.managedObject) {
          send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
        }
      }
    }
  }
}

プレースホルダーを置き換える

文字列を構築するには、次のように連結を使用できます。

string s:= "An event with the text " + evt.text + " has been created.";

テキストが長くなり、データから動的に設定される値が増える場合は、Util.replacePlaceholders() 関数を使用できます。 テキスト文字列では、イベントのフィールド名でプレースホルダーをマークし、それを「#{}」で囲みます。 replacePlaceholdersの 2 番目のパラメーターには、任意のイベント型を指定できます。

Utils::replacePlaceholders は、イベントまたはイベントのパラメーターで指定されたフィールド名を検索して、テキストの代替を生成します。 タイプ #{X.Y} のフィールド名を使用して、イベント内のネストされた構造にアクセスできます。

myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);

置換文字列が#{source.name}のような形式である場合、source.nameは基礎となる管理対象オブジェクト/デバイスの名前です。 または #{source.c8y_hardware.notes} ここで、 c8y_hardware は管理対象オブジェクトのフラグメントです。 その場合、交換するには特別な処理が必要になります。 最初の置換後、プレースホルダー フィールド名を更新し、ソースの managedObject を使用して Util::replacePlaceholders を再度実行する必要があります。

myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);

高度な機能

カスタムフラグメント

Things Cloud API を使用すると、データを自由に構造化できます。 Apama EPL では、これは、タイプ dictionary<string, any> のエントリを params に追加することによって行われます。 com.apama.cumulocity パッケージ内の各 Things Cloud イベント (AlarmEventMeasurement、または Operation など) には params フィールドがあります。 フラグメントまたはオプションのフィールドに変換されます。 したがって、イベントを受信するとき、コードは「params」フィールド内のエントリを検索する必要があります。 イベントを送信するときは、イベント型を定義することによって行うことも、dictionary<string, any> タイプを使用することもできます。 イベントを受信するときの EPL タイプは dictionary<any, any> です。 EPL は厳密に型指定されているため、フラグメントのないイベントを作成する場合は、new dictionary<string, any> 式が必要になることに注意してください。 辞書リテラルを使用してインラインでエントリを提供する場合、EPL は最初のキーと値のペアの型に基づいて型を決定します。つまり、dictionary<string, any> の場合、<any> キャスト演算子を使用して最初の値を any 型にキャストします。:

send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;

MeasurementValue タイプは、Measurement タイプのメジャーメントに対して提供されます。 MeasurementValue には、value フィールドと unit フィールド、および他のフラグメントの params があります。

例 1:

send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
	"c8y_TemperatureMeasurement":{
		"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
		"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
		"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
		"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
		"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
	}},
	new dictionary<string,any>) to Measurement.SEND_CHANNEL;

これにより、次の JSON 構造が生成されます。

{
	"type": "c8y_TemperatureMeasurement",
	"time": "...",
	"source": {
		"id": "12345"
	},
	"c8y_TemperatureMeasurement": {
		"T1": {
			"value": 1,
			"unit": "C"
		},
		"T2": {
			"value": 1,
			"unit": "C"
		},
		"T3": {
			"value": 1,
			"unit": "C"
		},
		"T4": {
			"value": 1,
			"unit": "C"
		},
		"T5": {
			"value": 1,
			"unit": "C"
		},
	}
}

メジャーメントフラグメント

メジャーメントは、個々のメジャーメントフラグメントに分割できます。 これは、メジャーメント内に存在する各フラグメントおよび系列に対して実行できます。 メジャーメントフラグメントの詳細については、コンセプトガイドThings Cloud のドメイン モデル をご覧ください。

メジャーメントフラグメントまたはシリーズに基づいたフィルタリングが必要な場合は、com.apama.cumulocity.MeasurementFragmentタイプのイベントを監視します。 com.apama.cumulocity.Measurementイベントを監視してmeasurementsディクショナリ内を調べる代わりに。 詳細については、Apama ドキュメントの メジャーメントフラグメントの使用 をご覧ください。

リスナー

受信したイベントによってステートメントをトリガーすることが唯一の可能性ではありません。 次のセクションでは、リスナーを組み合わせる他の方法について説明します。 詳細については、Apama ドキュメントの Defining Event Listeners をご覧ください。

フィルター

フィルターを使用すると、他のトリガーの組み合わせまたはシーケンスによってトリガーできます。例えば、次のようなトリガーがあるとします:

on all Event() as e { ... }

パターンにフィルターを追加することもできます。

on all Event(type = "c8y_EntranceEvent") as e { ... }

複数のイベントを監視できます。

on Event() as e and Alarm() as a { ... }

これは、イベントとアラーム イベントを受信するとトリガーされ、それぞれの最初のイベントがキャプチャされます。

シーケンスによってトリガーすることもできます。

on all (Event() as e -> Alarm() as a) { ... }

これは、「アラームに続くイベント」のペアごとにトリガーされます。 イベントを受信すると、それ以降のイベントの待機を停止し、代わりにアラームの待機を開始します。 アラームを受信すると、再びイベントの監視が開始されます。

タイマー

時間に基づいてリスナーをトリガーすることもできます。 特定の間隔でトリガーすることもできます。たとえば、5 分 (300 秒) ごとにトリガーすることもできます。

on all wait(300.0) { ... }

または、Unix の cron スケジューラと同様の機能を使用して、1 日の特定の時間にリスナーを起動させることもできます。

// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59

on all at(*, *, *, *, *) {} // trigger every minute

on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month

タイマーパターンを他のパターンと組み合わせることもできます。 たとえば、別のイベント後の一定時間内にイベントがあったかどうかを確認できます。

on Event() -> wait(600.0) and not Alarm() { ... }

これは、イベントが発生し、10 分 (600 秒) 以内にアラームが発生しない場合にトリガーされます。 イベントが発生した場合にリスナーを終了する「not」の使用に注意してください。

テナント オプションを使用して、on all atタイマーに使用されるタイム ゾーンを設定できます。 テナント オプションを設定するには、microservice.runtime カテゴリと timezone キーを指定します。 例えば:

{
    "category" : "microservice.runtime",
    "key" : "timezone",
    "value" : "Europe/Warsaw"
}

Apama ドキュメントの サポートされているタイム ゾーン

備考
このテナント オプションは、マイクロサービスの開始時にのみ読み取られます。 テナント オプションが変更された場合、マイクロサービスは次のマイクロサービス サブスクリプションでのみこれを選択します。

ストリーム - ウィンドウ

ストリームを使用すると、イベントのウィンドウを操作できるようになります。 ストリームは、onの代わりにfromキーワードを使用し、操作対象のウィンドウを定義し、集計を使用してそのウィンドウから必要な出力を選択します。 Windows は次の 2 つの方法で制限できます。

  1. 一定期間の Windows - within キーワードを使用します。

    from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements	["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    
  2. 一定量のイベントがあるウィンドウ - retain キーワードを使用します。

    from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    

ストリーム - 定期的に出力する

ストリームは、every指定子を使用して評価の頻度を制御することもできます。

// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }

// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }

// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }

組み込み集計関数 については、Apama ドキュメントをご覧ください 。

独自のイベント型の作成

事前定義されたイベント型だけでなく、独自のイベント型を定義することもできます。 これらは、同じモジュールの他の部分をトリガーする発生するイベントのパターンを検出するのに役立ちます。

event MyEvent {
	Measurement m1;
	Measurement m2;
}

...

on Measurement() as m1 -> Measurement() as m2 {
	route MyEvent(m1, m2);
}
備考
Things Cloud は各モジュールを独自の名前空間にデプロイするため、あるモジュールのイベント定義を他のモジュールで使用することはできません。 これにより、モジュール間の依存関係が防止されます。

独自のアクションの作成

通常、次の例に示すように、アクション (Java の関数とよく似ています) を使用してモニターを構築します。

指定された重大度を上げる:

action upgradeSeverity(string old) returns string {
	if old = "WARNING" { return "MINOR"; }
	if old = "MINOR"   { return "MAJOR"; }
	if old = "MAJOR"   { return "CRITICAL"; }
	return old;
}

2 つの地理座標間の距離を計算します。

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

変数

モジュール内で変数を定義できます。

string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];

モニタースコープ変数を定義する場合(つまり、モニター内であるが、モニターのどのアクション内にも存在しない場合)、リスナーでのイベント共同割り当ての際、asの代わりにコロン(:)を使用すると、リスナーで使用できます。 次の例では、10秒ごとに最新のイベントを記録します:

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        on all Event():e {}
        on all wait(10.0) {
            log e.toString();
        }
    }
}

リスナーは開始時に、すべてのローカル変数のコピーを取得します。 したがって、以下の例では、間に他のイベントがあった場合でも、10 秒の遅延後に各イベントをログに記録します。

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
      monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
      on all Event():e {
            on all wait(10.0) {
                log e.toString();
            }
        }
    }
}

モニター インスタンスとコンテキストの生成

単一のモニターで複数のデバイスを処理することは可能ですが (たとえば、ストリームでgroup bypartition byを使用したり、他の状態のデバイス ID をキーとした辞書を維持したりするなど)、処理を分離すると便利なことがよくあります。 異なるデバイスを個別のモニター インスタンスに分割します。

新しいモニター インスタンスは、spawn ステートメントを使用して作成できます。 これにより、モニターのモニター スコープ変数のコピーが取得され、新しいモニター インスタンスで指定されたアクションが実行されます。 新しいモニターにはリスナーはコピーされません。 新しいモニター インスタンスを生成するコンテキストを指定することもできます。異なるコンテキストは相互に同時に実行でき、異なるモニターを相互に分離するのにも役立ちます。 コンテキストを構築するときは、コンテキストを識別するための名前と、コンテキストがパブリックかどうかを制御するためのブール値を指定します。つまり、デフォルトで Things Cloud イベントを受け取ります (デフォルト チャネルに送信されます)。 )。

このパターンは、そのコンテキスト内の他のリスナーによって一致しないイベントを識別するために、unmatched キーワードとともによく使用されます。 各モニターに個別のコンテキストを使用することにより、一致しない動作の範囲がそのモニターに限定されます。 例えば:

monitor PerDeviceMeasurementTracker {
	action onload() {
		spawn factory to context("PerDeviceMeasurementTracker", true);
	}
	action factory() {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		on all unmatched Measurement() as m {
			spawn perDevice(m);
		}
	}

	dictionary<string, Measurement> latestMeasurementByType; // measurements for this device

	action perDevice(Measurement m) {
		processMeasurement(m);
		on all Measurement(source = m.source) as m {
			processMeasurement(m);
		}
	}
	action processMeasurement(Measurement m) {
		latestMeasurementByType[m.type] := m;
	}
}

ベストプラクティスとガイドライン

EPL モニター

症状: イベント処理ルールが自動的に無効になります

モニターがonloadまたはリスナーから例外をスローし、その例外がキャッチされなかった場合、モニターは終了します。 例外をキャッチするか、例外が発生する理由を回避します。

同様に、モニターがイベントの処理を完了し、アクティブなリスナーが残っていない場合は、再度トリガーすることはできず、モニター自体が自動的に削除されます。

モニターごとの過剰なメモリー使用を回避します

イベント処理ルールがリスナーをリークしないようにしてください。 たとえば、リクエストとレスポンスの操作を行う場合、レスポンスが処理された後、またはタイムアウトが発生して応答がない場合に、リスナーがアクティブのままになっていないことを確認してください。

数値形式

Things Cloud のメジャーメントには float 型が使用されます。 タイムスタンプは float (1970 年 1 月 1 日 00:00 UTC からの秒数) として保存されることに注意してください。

チャンネルとコンテキストの購読

コンテキストは、Apama 内の並列処理ユニットです。 モニター インスタンスは、spawn...to構文を使用して複数のコンテキストにデプロイできます。 チャネルをサブスクライブすると、コンテキスト内のすべてのモニター・インスタンスがそのサブスクリプションのイベントを受信します。 したがって、異なるサブスクリプションを異なるコンテキストに配置することをお勧めします。 コンテキストを使用すると、過負荷になっているアプリケーションの一部がアプリケーションの他の部分に影響を与えるのを防ぐことができます。

コンテキストはわかりやすい名前で作成され、コンテキスト オブジェクトの個々のインスタンスは、たとえ同じ名前であっても、異なるコンテキストに対応します。

例えば:

action onload() {
   context subContext := context("Worker");
   spawn worker() to subContext;
}
action worker() {
   monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
   on all Measurement() as m {
      ...
   }
}  

Things Cloud における Apama の制限事項

Things Cloud 環境内で Apama を使用する場合、Apama をスタンドアロンで使用する場合に利用できる機能に必然的にいくつかの制限がかかります。

Things Cloud 内の Apama にアセットをデプロイするにはさまざまな方法があり、制限はそれらのメカニズムによって異なります。

あらゆる形式の Things Cloud 環境内にデプロイされる Apama ソリューションを設計する場合は、次の点を考慮してください。

EPL アプリを使用する場合の Apama の一般的な制限事項

EPL アプリを使用する場合の Apama 固有の制限事項

これらの制限はすべて、Things Cloud 内で EPL アプリがスムーズかつ安全に動作することを保証するために実装されています。

例題

メジャーメントの 1 時間ごとの平均を計算する

入力データが次のようになっていると仮定します。

{
  "c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
  "time": "...",
  "source": {"id":"..."},
  "type": "c8y_TemperatureMeasurement"
}

平均値を作成するには、モジュール内に次の部分が必要です。

例えば:

using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;

monitor HourlyAvgMeasurementDeviceContext {

  event AverageByDevice {
    string source;
    float avgValue;
    string unit;
  }

  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

    from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
      group by m.source select
        AverageByDevice(m.source,
          avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
          last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
            send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
              {"c8y_AverageTemperatureMeasurement":
                {
                  "T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
                }
              }, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
          }
  }
}

ビットメジャーメントからアラームを作成する

多くの場合、デバイスはアラーム ステータスをレジスタに保持しており、アラームの意味を解釈できません。 この例では、デバイスが測定においてレジスタ全体をバイナリ値として送信するだけであると仮定します。 ルールはビットを識別し、それぞれのアラームを作成する必要があります。

アラーム テキスト、各ビットのタイプと重大度、値を検索するアクションをマッピングする 3 つのディクショナリを作成します。 -1 を使用してデフォルト値を示し、<position> を置き換えます。 位置の文字列形式を使用します。

dictionary<integer, string> positionToAlarmType := {
	0  : "c8y_HighTemperatureAlarm",
	1  : "c8y_ProcessingAlarm",
	2  : "c8y_DoorOpenAlarm",
	3  : "c8y_SystemFailureAlarm",
	-1 : "c8y_FaultRegister<position>Alaram"
};

dictionary<integer, string> positionToAlarmSeverity := {
	0  : "MAJOR",
	1  : "WARNING",
	2  : "MINOR",
	3  : "CRITICAL",
	-1 : "MAJOR"
};

dictionary<integer, string> positionToAlarmText := {
	0  : "The machine temperature reached a critical status",
	1  : "There was an error trying to process data",
	2  : "Door was opened",
	3  : "There was a critical system failure",
	-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};

action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
	string template := lookup.getOr(bitPosition, lookup[-1]);
	return template.replaceAll("<position>", bitPosition.toString());
}

バイナリメジャーメントを分析するには、それを文字列値として解釈し、各文字をループします。 getActiveBits() 関数はこれを実行し、メジャーメントが「1」であったビット位置のリストを返します。 次に、「for」ループを使用してそれを反復処理します。

action getBitPositions(string binaryAsText) returns sequence<integer> {
	sequence<integer> bitsSet := new sequence<integer>;
	integer i := 0;
	while i < binaryAsText.length() {
		string character := binaryAsText.substring(i, i+1);
		if character = "1" {
			bitsSet.append(binaryAsText.length() - i - 1);
		}
		i:=i+1;
	}
	return bitsSet;
}

action onload() {
	// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
	monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
	on all Measurement(type = "c8y_BinaryFaultRegister") as m {
		string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
		integer bitPosition;
		for bitPosition in getBitPositions(faultRegister) {
			Alarm alarm    := new Alarm;
			alarm.type     := getText(bitPosition, positionToAlarmType);
			alarm.severity := getText(bitPosition, positionToAlarmSeverity);
			alarm.text     := getText(bitPosition, positionToAlarmText);
			alarm.source   := m.source;
			alarm.time     := m.time;
			alarm.status   := "ACTIVE";
			send alarm to Alarm.SEND_CHANNEL;
		}
	}
}

このようなメジャーメントを作成する

{
    "c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
    "time": "...",
    "source": {"id": "..."},
    "type": "c8y_BinaryFaultRegister"
}

最後のステートメントを 3 回トリガーします。

したがって、3 つのアラームが作成されます。

消費量のメジャーメント

何かの現在の充填レベルを測定し、その値を定期的に Things Cloud に送信するセンサーがあると仮定すると、追加の消費値を簡単に作成できます。 2 つのメジャーメント間の絶対差を計算すると便利ですが、明確な結果が得られるのは、メジャーメントが常に同じ間隔で送信された場合のみです。 そこで、絶対差を時差に換算し、1時間当たりの消費量として計算します。

2 つのエントリを保持するストリームを使用し、最初と最後のタイムスタンプと値を選択して、デバイスの 2 つの隣接するメジャーメントと時間差を比較します。

using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;

monitor FillLevelMeasurements {

	event FillLevel {
		float firstValue;
		float firstTime;
		float lastValue;
		float lastTime;
		string source;
	}

	action calculateConsumption(FillLevel l) returns float {
		if(l.firstTime = l.lastTime) {
			return 0.0;
		} else {
			return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
		}
	}

	action onload() {
		// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
			select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
							last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {

			Measurement m := new Measurement;
			m.type := "c8y_HourlyWaterConsumption";
			m.time := currentTime;
			m.source := fill.source;
			MeasurementValue mv := new MeasurementValue;
			mv.value := calculateConsumption(fill);
			mv.unit := "l/h";
			m.measurements[m.type] := {"consumption":mv};
			send m to Measurement.SEND_CHANNEL;
		}
	}
}

その他のサンプルアプリケーション

ストリーミング分析アプリケーションの EPL エディターには、Apama EPL の使用方法 (たとえば、Things Cloud オブジェクトのクエリやアラームの作成など) を示すいくつかのサンプルアプリケーションが用意されています。 これらのサンプルを使用して独自のアプリケーションを構築できます。

ケーススタディ:円形ジオフェンスアラーム

概要

このセクションでは、より複雑なルールを作成する方法の詳細な例を示します。 このガイドの他のセクションで前に説明した機能の複数を使用します。

Apama EPL を使い始めたばかりの場合は、サンプル をご覧ください。

前提条件

目標

位置イベントを継続的に送信している追跡デバイスがジオフェンスの外に移動した場合に自動的にアラームを生成するようにしたいと考えています。 このジオフェンスは円形になり、デバイスごとに個別に構成できる必要があります。 アラームは、デバイスがジオフェンスの外に移動した瞬間に作成されます。 屋外に移動している間は、最初のアラームがアクティブのままであるため、新しいアラームは作成されません。 デバイスがジオフェンス内に戻るとすぐに、アラームは解除されます。

Things Cloud データ モデル

ロケーションイベントの構造 (必要な部分):

{
  "id": "...",
  "source": {"id": "..."},
  "text": "...",
  "time": "...",
  "type": "...",
  "c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}

ジオフェンス構成をデバイスに保存します (半径はメートル単位で構成されます)。

{
  "c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}

さらに、構成を完全に削除せずに、各デバイスのジオフェンス アラームを有効/無効にしたいと考えています。 これを行うには、デバイスの c8y_SupportedOperations に「c8y_Geofence」を追加または削除します。

{
  "c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}

計算

現在の位置と中心の間の距離がジオフェンスの構成された半径よりも大きい場合、デバイスはジオフェンスの外側になります。 必要なのは、2 地理座標間の差を計算できる関数です。

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

上記のアクションは距離をメートル単位で返します。

ステップ 1: 入力のフィルタリング

このモジュールの主な入力はイベントです。 一致しないイベントをできるだけ早く破棄するために、リスナーの最初のチェックとしてこれを実行します。

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
	if e.params.hasKey("c8y_Position") {
		// we have an event
	}
}

ステップ 2: 必要なデータの収集

次のステップでは、計算のためにジオフェンスの構成が必要になり、それを取得します。

monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
   and not FindManagedObjectResponseAck(reqId) {
	  ManagedObject dev := resp.managedObject;
   }

ステップ 3: デバイスが c8y_Geofence をサポートしているかどうかを確認する

デバイスが利用可能になったので、デバイスにジオフェンスが構成されているかどうか、およびアクティブ化されているかどうか (「supportedOperations」に「c8y_Geofence」が含まれている) を確認します。 c8y_SupportedOperations 配列をチェックするには、indexOf() 関数を使用できます。 この関数はすべての要素をループし、そのエントリのインデックスを返します。値が存在しない場合は負の数を返します。 設定では、デバイスにフラグメント「c8y_Geofence」が含まれているかどうかを確認するだけです。

イベントとデバイスを取得したら、イベントの c8y_Position とデバイスの c8y_Geofence からデータを抽出します。 これらのオブジェクトは、params 内の dictionary<any, any> エントリにマッピングされます。 params はタイプ any の値を保持するため、dictionary<any, any> にキャストする必要があります。

if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
	dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
	float eventLat := <float> evtPos["lat"];
	float eventLng := <float> evtPos["lng"];

	dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
	float centerLat := <float> devGeofence["lat"];
	float centerLng := <float> devGeofence["lng"];
	float maxDistance := <float> devGeofence["radius"];
}

ステップ 4: トリガーの作成

前述したように、現在のデバイスの位置とジオフェンスの中心の間の距離が構成されたジオフェンスの半径よりも大きい場合、デバイスはフェンスの外側にあります。 アラームをトリガーするには 2 つのイベントが必要です。これにより、これら 2 つのイベント内でデバイスがジオフェンスに入ったか、ジオフェンスから出たかを確認できるようになります。

最初のステップでは、前述の関数を使用して距離を計算します。

float d := distance(centerLat, centerLng, eventLat, eventLng);

次に、これをイベントとして再ルーティングします。

event LocationEventWithDistance {
	string source;
	float distance;
	Event e;
	float maxDistance;
}

...

route LocationEventWithDistance(e.source, d, e, maxDistance);

ソースをイベント内に配置して、リスナー内で簡単に照合できるようにします。

次に、イベント LocationEventWithDistance によってトリガーされるリスナーを設定し、同じソースの次の LocationEventWithDistance を監視します。

on all LocationEventWithDistance() as firstPos {
	on LocationEventWithDistance(source = firstPos.source) as secondPos {
		// now have two events with distances
	}
}

この LocationEventWithDistance イベントのペアには、アラームを作成する必要があるかどうかをチェックするためのすべてのデータが保持されます。 最初のイベントと同じソースのものになるように secondPos イベントをフィルタリングしていることに注意してください。イベントを受信したすべてのデバイスにアクティブなリスナーが存在します。

ステップ 5: アラームの作成

アラームを作成するには、最初のイベントの距離が半径より小さく、2 番目のイベントの距離が半径より大きい 2 つのイベントが必要です。 これは、デバイスがジオフェンスから出たばかりであることを意味します。

if firstPos.distance <= firstPos.maxDistance and
	secondPos.distance > secondPos.maxDistance {
	send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
					"Device moved out of circular geofence", "ACTIVE",
					"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}

ステップ 6: アラームのクリア

アラームをクリアするには、下部で条件を切り替え、さらに現在アクティブなアラームを取得してその ID を取得する必要があります。 この時点では、既存のアラームがあるかどうかを気にする必要はありません。 何もない場合、リスナーはand not FindAlarmResponseAckをトリガーし、リスナーを終了します。

monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
    secondPos.distance <= secondPos.maxDistance {
    integer reqId:= integer.getUnique();
    send FindAlarm(reqId, {"source": firstPos.source,
        "status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
    on FindAlarmResponse(reqId=reqId) as alarmResponse
       and not FindAlarmResponseAck(reqId=reqId) {
        send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
                    firstPos.source, currentTime, "Device moved back into circular geofence",
                    "CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
    }
}

すべてを統合する

すべてのパーツを 1 つのモジュールに結合できるようになりました。 リスナーの順序は関係ありません。

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;

monitor MonitorDevicesForCircularGeofence {

	event LocationEventWithDistance {
		string source;
		float distance;
		Event e;
		float maxDistance;
	}

	action onload {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
		on all Event() as e {
			if e.params.hasKey("c8y_Position") {
				// we have an event
				integer reqId := integer.getUnique();
				send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
				on FindManagedObjectResponse(reqId = reqId) as resp
				and not FindManagedObjectResponseAck(reqId) {
				ManagedObject dev := resp.managedObject;

				if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {

						dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
						float eventLat := <float> evtPos["lat"];
						float eventLng := <float> evtPos["lng"];

						dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
						float centerLat := <float> devGeofence["lat"];
						float centerLng := <float> devGeofence["lng"];
						float maxDistance := <float> devGeofence["radius"];

						float d := distance(centerLat, centerLng, eventLat, eventLng);

						route LocationEventWithDistance(e.source, d, e, maxDistance);
					}
				}
			}
		}

		on all LocationEventWithDistance() as firstPos {
			on LocationEventWithDistance(source = firstPos.source) as secondPos {
				// now have two events with distances
				if firstPos.distance <= firstPos.maxDistance and
					secondPos.distance > secondPos.maxDistance {
					send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
							"Device moved out of circular geofence", "ACTIVE",
							"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
				}

				if firstPos.distance > firstPos.maxDistance and
					secondPos.distance <= secondPos.maxDistance {
					integer reqId:= integer.getUnique();
					send FindAlarm(reqId, {"source": firstPos.source,
						"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
					on FindAlarmResponse(reqId=reqId) as alarmResponse
					and not FindAlarmResponseAck(reqId=reqId) {
						send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
								firstPos.source, currentTime, "Device moved back into circular geofence",
								"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
					}
				}
			}
		}
	}

	action distance(float lat1, float lon1, float lat2, float lon2) returns float {
		float R := 6371000.0;
		float toRad := float.PI / 180.0;
		float lat1Rad := lat1 * toRad;
		float lat2Rad := lat2 * toRad;
		float deltaLatRad := (lat2-lat1) * toRad;
		float deltaLonRad := (lat2-lat1) * toRad;
		float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
		float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
		return R * c;
	}
}