Things Cloudでのリアルタイム処理とは?

「Things Cloud」に加えて、カスタムストリーミング処理機能エンジン「Apama」を使用して、デバイスやその他のデータソースから受信したデータを即座に処理するためのビジネスオペレーションを定義することもできます。これらのユーザー定義の操作には、例えば、新しい受信データのアプリケーションへのアラート、受信したデータに基づく新しい操作の作成(センサーのしきい値を超えたときのアラームの送信など)、またはデバイスに対する操作のトリガーなどがあります。オペレーションロジックは、Apamaのイベント処理言語(EPL)で実装されています。

ApamaのEvent Processing Language(イベント処理言語)は、アクションとモニターに分類されるステートメントをカバーします。モニターファイルは、Things Cloud内からストリーミング分析アプリケーションを使って直接編集できます。あるいは、ローカルマシンにApamaをインストールし、Eclipseベースの開発環境であるSoftware AG Designerを使用してアプリケーションを開発することもできます。モニターファイルをApamaアプリケーションとしてThings Cloudにデプロイできます。詳細については、「カスタムストリーミング処理ガイド」の EPLアプリケーション>基本機能 をご覧ください。

Apamaのイベント処理言語をThings Cloudで使用する方法の詳細については、下記の Apamaイベント処理言語(EPL)の使用 および カスタムストリーミング処理ガイド をご覧ください。

リアルタイム処理を使用するメリット

Things Cloudのリアルタイム処理機能には次のようなメリットがあります。

Apama イベント処理言語(EPL)の使用

概要

Apama イベント処理言語 はJava言語と構文的に類似しています。ifwhileforのようなシンプルなフロー制御ステートメントに加えて、Things Cloud では流れてくるデータ(イベント)に反応するために onキーワードを使用してリスナーを作成できます。

Apama EPLについては、Apama documentation をご覧ください。

下記のステートメントでは、特定の温度より高い温度データを連続的にセンサーから抽出しています:

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        Alarm alarm    := new Alarm;
        alarm.type     := "c8y_TemperatureAlert";
        alarm.source   := m.source;
        alarm.time     := currentTime;
        alarm.text     := "Temperature too high";
        alarm.status   := "ACTIVE";
        alarm.severity := "CRITICAL";
        send alarm to Alarm.SEND_CHANNEL;
    }
}

ここの、Measurement とはメジャーメントデータを含む事前定義されたイベントです。こちらの例では、mMeasurementのイベントであり、c8y_TemperatureMeasurementのメジャーメントをフィルタリングし、そのプロパティとなるc8y_TemperatureMeasurement.T.valueの温度センサーの摂氏でのリスナーとなります(センサー・ライブラリをご参照ください)。

上記のようなリスナーは、モニターのonloadステートメントに配置する必要があります。ファイルには、リスナーで使用する型のusingステートメントが含まれている必要があります。Things Cloudで利用するイベントのほとんどは、com.apama.cumulocityパッケージに含まれています。すべてのリストを以下に示しますが、以降の例では簡潔にするためにこれらを省略します。

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Error;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;
using com.apama.cumulocity.SendEmail;
using com.apama.cumulocity.SendSMS;
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
using com.apama.correlator.timeformat.TimeFormat;
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Response;

monitor ListenForHighTemperatures {
    action onload() {
        on all Measurement(type="c8y_TemperatureMeasurement") as e {
            if e.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
                // handle the measurement
            }
        }
    }
}

派生データはどのように作成したらいいのか?

新しいアラームまたはオペレーションを作成するには、関連するイベント型のインスタンスを作成し、sendステートメントで関連するチャネル(イベント型の定数で定義)に送信します。例えば、温度が定義された値を超えるとただちに発生するアラームが必要な場合、下記のようなステートメントで実行可能になります。

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        send Alarm("","c8y_TemperatureAlert",m.source,currentTime,"Temperature too high","ACTIVE","CRITICAL",1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
    }
}

技術的に説明すると、このステートメントは温度センサーが摂氏100℃を超えるたびに新しい Alarmイベントを作成し、それをThings Cloudに送信しています。

どのようにデバイスを制御すればよいですか?

Things Cloud の遠隔制御は派生データの1つとなります。例えば、デバイス上でオペレーションを走らせたい場合、デバイスをターゲットにした新しいオペレーションを作成します。下記では温度計の温度によるスイッチの動きになります。

on all Measurement(type="c8y_TemperatureMeasurement") as m {
    if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
        send Operation("",m.source,"PENDING",{"c8y_Relay":<any>{"relayState":"CLOSED"}}) to Operation.SEND_CHANNEL;
    }
}

どのようにデータを問い合わせればよいですか?

進行中のイベントのプロセスの一部として時々関連情報を Things Cloud データベースに問い合わせすることがあります。これは、イベントを送信し、リスナーを使用してレスポンスを待機することでサポートされます。下記は特定の顧客に一時間ごとに自動販売機の総売上高を集計する場合の例になります。顧客データが購入後にトリガーされる売上レポートは、Things Cloud のデータベースから取得する必要があります。

using com.apama.aggregates.count;

monitor SalesReport {
    event SalesReport {
        Event e;
        ManagedObject customer;
    }
    event SalesOutput {
        integer count;
        string customerId;
    }

    action onload() {

        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

        on all Event() as e {
            monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
            integer reqId := integer.getUnique();
            on all FindManagedObjectResponse(reqId=reqId) as mor and not FindManagedObjectResponseAck(reqId=reqId) {
                route SalesReport(e, mor.managedObject);
            }
            on FindManagedObjectResponseAck(reqId=reqId) {
                monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
            }
            send FindManagedObject(reqId,"",{"childAssetId":e.source}) to FindManagedObject.SEND_CHANNEL;
        }

        from sr in all SalesReport() within 3600.0 every 3600.0
            group by sr.customer.id
            select SalesOutput(count(), sr.customer.id) as sales {
            send Measurement("", "total_cust_trx", "customer_trx_counterId", currentTime,
                {
                    "total_cust_trx":{
                        "total":MeasurementValue(sales.count.toFloat(), "COUNT", new dictionary<string,any>)
                    }
                }, {"customer_id":<any> sales.customerId}) to Measurement.SEND_CHANNEL;
        }
    }
}

上記の例では、まずSalesReportイベントと SaleOutputイベントの定義を作成します。これらは、SalesReport(個々の売上を識別するEventManagedObject)と、一連の売上情報から取得したい派生情報のcountcustomerIdを保持します。Eventオブジェクトを監視し、FindManagedObjectリクエストを送信して、イベントの発生元のManagedObjectを検索します。これらの SalesReportオブジェクトは、routeステートメントを介してストリームクエリに送信されます。ストリームクエリは1時間(3,600秒)ごとに実行され、顧客ごとの売上データを集計し、その自動販売機の売上を表す新しいメジャーメントを送信します。

どのようにリアルタイム処理が実施されていますか?

Things Cloud は、API リクエストに対していくつかの処理モードを提供します。

CEPアーキテクチャ

車からの位置情報の更新は、車が運転している間は1秒ごとに監視されますが、レポートの目的でデータベースに保存されるのは1分に1回のみであるとします。これには、次のステートメントを使用します。

using com.apama.cumulocity.Event;
using com.apama.cumulocity.Measurement;

monitor SendEveryMinute {

    dictionary<string, Event> latestUpdates;

    action onload() {

        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        on all Event() as e {
            if e.params.hasKey("c8y_LocationUpdate") {
                latestUpdates[e.source] := e;
            }
        }

        on all wait(60.0) {
            Event e;
            for e in latestUpdates.values() {
                send e to Event.SEND_CHANNEL;
            }
            latestUpdates.clear();
        }
    }
}