リアルタイム処理
Things Cloud では高レベルなリアルタイムプロセス言語を基盤にしたリアルタイム IoT ビジネスロジックを走らせることが可能です。
このセクションでは、リアルタイムプロセスの基本コンセプトを紹介し、Things Cloud 上でどのように独自のビジネスロジックを走らせれば良いのか説明していきます。
Things Cloud では高レベルなリアルタイムプロセス言語を基盤にしたリアルタイム IoT ビジネスロジックを走らせることが可能です。
このセクションでは、リアルタイムプロセスの基本コンセプトを紹介し、Things Cloud 上でどのように独自のビジネスロジックを走らせれば良いのか説明していきます。
「Things Cloud」に加えて、カスタムストリーミング処理機能エンジン「Apama」を使用して、デバイスやその他のデータソースから受信したデータを即座に処理するためのビジネスオペレーションを定義することもできます。これらのユーザー定義の操作には、例えば、新しい受信データのアプリケーションへのアラート、受信したデータに基づく新しい操作の作成(センサーのしきい値を超えたときのアラームの送信など)、またはデバイスに対する操作のトリガーなどがあります。オペレーションロジックは、Apamaのイベント処理言語(EPL)で実装されています。
ApamaのEvent Processing Language(イベント処理言語)は、アクションとモニターに分類されるステートメントをカバーします。モニターファイルは、Things Cloud内からストリーミング分析アプリケーションを使って直接編集できます。あるいは、ローカルマシンにApamaをインストールし、Eclipseベースの開発環境であるSoftware AG Designerを使用してアプリケーションを開発することもできます。モニターファイルをApamaアプリケーションとしてThings Cloudにデプロイできます。詳細については、「カスタムストリーミング処理ガイド」の EPLアプリケーション>基本機能 をご覧ください。
Apamaのイベント処理言語をThings Cloudで使用する方法の詳細については、下記の Apamaイベント処理言語(EPL)の使用 および カスタムストリーミング処理ガイド をご覧ください。
Things Cloudのリアルタイム処理機能には次のようなメリットがあります。
Apama イベント処理言語 はJava言語と構文的に類似しています。if
、while
、for
のようなシンプルなフロー制御ステートメントに加えて、Things Cloud では流れてくるデータ(イベント)に反応するために on
キーワードを使用してリスナーを作成できます。
Apama EPLについては、Apama documentation をご覧ください。
下記のステートメントでは、特定の温度より高い温度データを連続的にセンサーから抽出しています:
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
Alarm alarm := new Alarm;
alarm.type := "c8y_TemperatureAlert";
alarm.source := m.source;
alarm.time := currentTime;
alarm.text := "Temperature too high";
alarm.status := "ACTIVE";
alarm.severity := "CRITICAL";
send alarm to Alarm.SEND_CHANNEL;
}
}
ここの、Measurement
とはメジャーメントデータを含む事前定義されたイベントです。こちらの例では、m
は Measurement
のイベントであり、c8y_TemperatureMeasurement
のメジャーメントをフィルタリングし、そのプロパティとなるc8y_TemperatureMeasurement.T.value
の温度センサーの摂氏でのリスナーとなります(センサー・ライブラリをご参照ください)。
上記のようなリスナーは、モニターのonload
ステートメントに配置する必要があります。ファイルには、リスナーで使用する型のusing
ステートメントが含まれている必要があります。Things Cloudで利用するイベントのほとんどは、com.apama.cumulocityパッケージに含まれています。すべてのリストを以下に示しますが、以降の例では簡潔にするためにこれらを省略します。
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Error;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindMeasurement;
using com.apama.cumulocity.FindMeasurementResponse;
using com.apama.cumulocity.FindMeasurementResponseAck;
using com.apama.cumulocity.FindOperation;
using com.apama.cumulocity.FindOperationResponse;
using com.apama.cumulocity.FindOperationResponseAck;
using com.apama.cumulocity.FindEvent;
using com.apama.cumulocity.FindEventResponse;
using com.apama.cumulocity.FindEventResponseAck;
using com.apama.cumulocity.SendEmail;
using com.apama.cumulocity.SendSMS;
using com.apama.cumulocity.Util;
using com.apama.util.AnyExtractor;
using com.apama.correlator.timeformat.TimeFormat;
using com.softwareag.connectivity.httpclient.HttpOptions;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.RequestType;
using com.softwareag.connectivity.httpclient.Response;
monitor ListenForHighTemperatures {
action onload() {
on all Measurement(type="c8y_TemperatureMeasurement") as e {
if e.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
// handle the measurement
}
}
}
}
新しいアラームまたはオペレーションを作成するには、関連するイベント型のインスタンスを作成し、send
ステートメントで関連するチャネル(イベント型の定数で定義)に送信します。例えば、温度が定義された値を超えるとただちに発生するアラームが必要な場合、下記のようなステートメントで実行可能になります。
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
send Alarm("","c8y_TemperatureAlert",m.source,currentTime,"Temperature too high","ACTIVE","CRITICAL",1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
}
技術的に説明すると、このステートメントは温度センサーが摂氏100℃を超えるたびに新しい Alarmイベントを作成し、それをThings Cloudに送信しています。
Things Cloud の遠隔制御は派生データの1つとなります。例えば、デバイス上でオペレーションを走らせたい場合、デバイスをターゲットにした新しいオペレーションを作成します。下記では温度計の温度によるスイッチの動きになります。
on all Measurement(type="c8y_TemperatureMeasurement") as m {
if m.measurements.getOrDefault("c8y_TemperatureMeasurement").getOrDefault("T").value > 100.0 {
send Operation("",m.source,"PENDING",{"c8y_Relay":<any>{"relayState":"CLOSED"}}) to Operation.SEND_CHANNEL;
}
}
m.source
はトリガーとなる温度IDのプレースホルダーとなります。c8y_Relay
のオペレーションを作成し、relayState
をCLOSEDに設定します。最上位フィールドは dictionary<string, any>
である必要があるため、<any>
にキャストすることに注意してください。進行中のイベントのプロセスの一部として時々関連情報を Things Cloud データベースに問い合わせすることがあります。これは、イベントを送信し、リスナーを使用してレスポンスを待機することでサポートされます。下記は特定の顧客に一時間ごとに自動販売機の総売上高を集計する場合の例になります。顧客データが購入後にトリガーされる売上レポートは、Things Cloud のデータベースから取得する必要があります。
using com.apama.aggregates.count;
monitor SalesReport {
event SalesReport {
Event e;
ManagedObject customer;
}
event SalesOutput {
integer count;
string customerId;
}
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
integer reqId := integer.getUnique();
on all FindManagedObjectResponse(reqId=reqId) as mor and not FindManagedObjectResponseAck(reqId=reqId) {
route SalesReport(e, mor.managedObject);
}
on FindManagedObjectResponseAck(reqId=reqId) {
monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}
send FindManagedObject(reqId,"",{"childAssetId":e.source}) to FindManagedObject.SEND_CHANNEL;
}
from sr in all SalesReport() within 3600.0 every 3600.0
group by sr.customer.id
select SalesOutput(count(), sr.customer.id) as sales {
send Measurement("", "total_cust_trx", "customer_trx_counterId", currentTime,
{
"total_cust_trx":{
"total":MeasurementValue(sales.count.toFloat(), "COUNT", new dictionary<string,any>)
}
}, {"customer_id":<any> sales.customerId}) to Measurement.SEND_CHANNEL;
}
}
}
上記の例では、まずSalesReport
イベントと SaleOutput
イベントの定義を作成します。これらは、SalesReport
(個々の売上を識別するEvent
と ManagedObject
)と、一連の売上情報から取得したい派生情報のcount
と customerId
を保持します。Event
オブジェクトを監視し、FindManagedObject
リクエストを送信して、イベントの発生元のManagedObject
を検索します。これらの SalesReport
オブジェクトは、routeステートメントを介してストリームクエリに送信されます。ストリームクエリは1時間(3,600秒)ごとに実行され、顧客ごとの売上データを集計し、その自動販売機の売上を表す新しいメジャーメントを送信します。
Things Cloud は、API リクエストに対していくつかの処理モードを提供します。
persistent - データをThings Cloudデータベースに格納し、リアルタイムエンジンにデータを送信します。その後、Things Cloudはリクエストの結果を返します。これがデフォルトのモードです。
transient - リアルタイムエンジンにデータを送信し、すぐに非同期で返し、Things Cloudのデータベースには保存しません。このモードは、ストレージと処理のコストを節約し、例えばデータを保存せずにデバイスをリアルタイムで追跡する場合に便利です。
quiescent - リアルタイム通知が送信されない点を除き、persistentモードと同様に動作します。quiescentモードは、メジャーメントおよびイベントにのみ適用できます。
cep - リアルタイム通知が送信されないことを除き、transientモードと同様に動作します。現在は、メジャーメントとイベントにのみ適用されます。
車からの位置情報の更新は、車が運転している間は1秒ごとに監視されますが、レポートの目的でデータベースに保存されるのは1分に1回のみであるとします。これには、次のステートメントを使用します。
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Measurement;
monitor SendEveryMinute {
dictionary<string, Event> latestUpdates;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_LocationUpdate") {
latestUpdates[e.source] := e;
}
}
on all wait(60.0) {
Event e;
for e in latestUpdates.values() {
send e to Event.SEND_CHANNEL;
}
latestUpdates.clear();
}
}
}