CEPスクリプトを書く(応用編)

重要事項: イベント処理機能(Esper)の新規利用は終了し、ご利用中のお客様におかれましてはApama CEPエンジンへの切替完了後、順次サポートを終了いたします。Apama によるカスタムストリーミング処理機能の詳細は カスタムストリーミング処理ガイド をご覧ください。
本章の記載内容は新規利用終了済みのイベント処理機能(Esper)に関する記述になりますのでご注意ください。

投稿日 / 更新日 / 投稿者名

2018.8.1 / 2018.8.28 / sasa

はじめに

本レポートは、Things Cloud の利用例をより知っていただくための実利用レポートとして作成したものです。
Things Cloud は極力後方互換性を持つよう開発されていますが、今後のリリースにより、一部画面やコマンド、手順などが変更となったり、利用できない可能性があることをあらかじめご了承ください。
なお、作成にあたり、以下バージョンを用いています。

  • ver.8.15.8(backend/UI)

難易度 ★★★


所要時間

2時間

前提条件

概要

本レポートでは、リアルタイムイベント処理を行う Things Cloud のイベント言語(CEPスクリプト)の応用について解説します。基本編をマスターしている人が対象です。
イベント言語を利用することで、複雑な条件で Things Cloud のデータを加工、変更することができるようになります。

このレポートでは、以下について説明しています。

CEP応用

基本形をマスターしましたが、普通はもっと複雑な処理や条件がつくことが多いでしょう。 ここでは基本形をベースに他の記載について使える技を紹介します。

記法と関数

コメント

C++/C タイプのコメントを入れることができます。

書式 説明
// 同じ行の // 以降がコメントとなります insert into CreateEvent // イベントを生成
/* ~ */ /* から */ までがコメントとなります。複数行にわたってコメントすることができます /*
* これはコメント
*/

@Name

ステートメントの最初に @Name("文字列") をつけることで、Things Cloud のイベント処理の「デバッグとモニタリング」の「通知タブ」で実行されたステートメントが指定した文字列で表示されます。ステートメントの処理内容を簡潔に示す名前をつけましょう。

例:

@Name("温度が40℃を超えたらAlarm生成")
insert into CreateAlarm
select
	"hotEvent" as type,
	:
	:

通知タブでスクリプト実行状態を確認

Things Cloud のイベント処理の「デバッグとモニタリング」の「通知タブ」では、随時処理されたステートメントが表示されます。ステートメントをクリックすると、処理時 select 句で取得された値が表示されます。デバッグで便利なので、積極的に活用してください。

また、単に select 句を書くと、処理時のイベントの値を参照することができます。ステートメント実行時の値を取得するのみで、処理内容に影響を与えないため、やはりデバッグに便利です。

例:EventCreated のイベント発生時、EventCreated のオブジェクト内容を「通知タブ」に表示させます。

@Name("debug:イベントの内容を表示")
select * from EventCreated;

Nameタグと通知タブの利用例です。

フラグメントの指定

Measurement, ManagedObject などを生成する場合、フラグメント(自由定義できる要素)への値の指定は、以下の書式で行います。

書式:

{
	key1, value1,
	key2, value2,
	key3, value3
} as fragments

例:Event が発生したら、Measurement を生成

insert into CreateMeasurement
select
  "12345" as source,
  "c8y_TemperatureMeasurement" as type,
  current_timestamp().toDate() as time,
  {
    "c8y_TemperatureMeasurement.T1.value", 1,
    "c8y_TemperatureMeasurement.T1.unit", "C"
  } as fragments
from EventCreated;

上の例では、何らかの Event が発生したら、id “12345” のデバイスに紐づく Measurement を生成します。Measurement は、以下の JSON 形式で設定します。(これは REST-API で post する際 の body の記法です)

{
	"source": {
		"id": "12345"
	},
	"type": "c8y_TemperatureMeasurement",
	"time": <<現在時刻>>,
	"c8y_TemperatureMeasurement":{
		"T1":{
			"value":1,
			"unit":"C"
		}
	}
}

Things Cloud におけるフラグメントは JavaScript の要素のように後から追加で指定できる(事前に型宣言が不要な)要素です。CEPでは、Java の型アーキテクチャがベースになっているおり、このような要素は型の要素として明示的には定義されていない場合があります。 例えば、上記例では “c8y_TemperatureMeasurement” という要素は CreateMeasurement の中で宣言されていないため、type や time などのように指定しようとするとエラーとなります。

逆に取得時も同様で、MeasurementCreated の measurement 要素に含まれていないため、measurement.c8y_TemperatureMeasurement のように参照しようとするとやはりエラーとなります。 このため CEP では fragments という名の要素にリスト形式で与えることで指定し、値を取得する場合は getObject(), getNumber(), getString() といった関数を利用して取得します。

配列、Map の表し方

配列の例:配列は中括弧{}の中に要素をカンマで区切って記述します。この値は java.lang.Object[] (のサブクラス)になります。上述のfragments の指定で利用した記法です。

{ 1, 2, 3, 4, 5, 6 } // 1~6の値を含む、要素数6の配列を表します

マップの例:マップは、中括弧{}の中に key:value をカンマで区切って記述します。key は文字列です。この値は java.util.Map (LinkedHashMap) のインスタンスになります。

{
  "key1":value1,
  "key2":value2
}

Java関数

各オブジェクトでJava言語でのメソッドをコールできます。パッケージ java.lang java.math java.text java.util は import 済みのため、javaパッケージ名を省略可能です。

例:加速度の絶対値を java.lang.Math.abs() 関数で取得する。

select
  Math.abs(getNumber(m.measurement,"c8y_AccelerationMeasurement.acceleration.value").doubleValue())
from
  MeasurementCreated m;

組み込み関数(単独データ用)

組み込み関数は、select句、where句、from句で利用します。

関数 使い方
Long current_timestamp() 現在時刻を得ます。current_timestamp().toDate()で Things Cloud の time フィールドに設定できる型(java.util.Date)になります(CEP独自の記法)。
ManagedObject findManagedObjectById(String) 指定された id のマネージドオブジェクトを Things Cloud データベースから取得します
Object getObject(Object, String) 指定されたオブジェクトの指定されたパスの値を取得します。Things Cloud のフラグメントは Java のメンバ変数でなく、直接 object.member のような参照ができないことがあるため、関数を使用します。
<classの型> cast(Object, class) 指定されたオブジェクトを第2引数の型に変換を試みます。classはint, long, String のようなJavaの型名で指定します。
boolean inMaintenanceMode(com.cumulocity.model.ID) 指定されたIDのデバイスがメンテナンス中かを Things Cloud データベースから取得します
Number getNumber(Object, String[, Number defaultValue]) 指定した Object の String で示される path にある fragment 値を数値として取得します。定義済み要素の値も取得できます。
String getString(Object, String[, String defaultValue]) 指定した Object の String で示される path にある fragment 値を文字列として取得します。定義済み要素の値も取得できます。
Object getObject(Object, String[, Object defaultValue]) 指定した Object の String で示される path にある fragment 値をオブジェクトとして取得します。通常 cast 関数を併用します。定義済み要素の値も取得できます。
Date getDate(Object, String[, Date defaultValue]) 指定した Object の String で示される path にある fragment 値を Date として取得します。定義済み要素も取得できます。
List getList(Object, String[, List defaultValue]) 指定した Object の String で示される path にある fragment 値をListとして取得します。定義済み要素の値も取得できます。
String toJSON(Object) 指定した Object を JSON 文字列に変換します。

例:type=“nttcom_showText” の Event が発生したら、id “46617” の ManagedObject を Things Cloud から取得し、フラグメント “eventText” の値を text 値として持つ Event を生成。

@Name("show text")
insert into CreateEvent
select
	"nttcom_SampleEvent" as type,
	e.event.source as source,
	current_timestamp().toDate() as time,
	getString(findManagedObjectById("46617"), "eventText") as text
from
	EventCreated e
where
	e.type = "nttcom_ShowText";

時間に関する便利関数

Event や Measurement にある time は getTime() メソッドで long 値のミリ秒に変換することができます。この long 値に対し、以下のメソッドが提供されています。

関数 使い方
getMillisOfSecond() ミリ秒部分を取得します。0~999 の数値になります。
getSecondOfMinute() 秒の部分を取得します。0~59 の数値になります。
getMinuteOfHour() 分の部分を取得します。0~59 の数値になります。
getHourOfDay() 時の部分を取得します。0~23 の数値になります。
getDayOfWeek() 曜日を取得します。日(1)、月(2)、火(3)、水(4)、木(5)、金(6)、土(7)の数値になります。
getDayOfMonth() 日を返します。1~31の数値になります。
getDayOfYear() 年の何日目かを示します。年の最初の日は値1です。
getWeekYear() 現在の年の週番号(1~)を返します。
getMonthOfYear() 月を示す数値を取得します。1月(0)~12月(11)となります。
getYear() 年を示す数値を取得します。
getEra() ユリウス暦のADまたはBCなどの年代を取得します。

例:Event の time が16時となるもののみ抽出する

select
	e.event
from
	EventCreated e
where
	e.event.time.getTime().getHourOfDay() = 16;

ストリーム/関数/変数定義とfilter

create schema

create schema は、ユーザー定義のストリームを作成します。ユーザー定義のストリームは、入力ストリーム/出力ストリームとして利用できます。複雑な処理を行う場合や、さまざまなデータを扱う場合に、いったんユーザー定義のストリームに中間情報を保存することで、見通しのよいスクリプトを作成することができます。

書式:

create schema <<ストリーム名>>(
	<<変数名1>> <<型1>>,
	<<変数名2>> <<型2>>,
	<<変数名3>> <<型3>>,
	:
);

例:String型のtype, source, double型の value を要素として持つストリーム MyStream を定義

create schema MyStream (
	type String,
	source String,
	value double
);

とくに決まりではありませんが、ストリーム名は UpperCamelCase、変数名は lowerCamelCase にすると見やすいコードになります。

名前のスコープについて。ストリーム名や今後出てくる変数名、関数名、コンテキスト名などは、同じテナント全体で利用されます。別のスクリプトから呼び出すこともできますが、テナント内で名前が重複しないよう注意してください。

create expression

複雑な処理を行うため、ユーザー定義の関数を定義することができます。この関数は JavaScript で記述します。通常のステートメントでは Java の型を使って記述しますが、関数内の言語は JavaScript なので、注意してください。引数や返り値では、Java - JavaScript 間で暗黙の型変換が行われます。

書式:

create expression <<返却型>> <<関数名>>(<<引数の変数名>>) [
	<<JavaScriptコード>>
];

返却型は省略可能です。省略した場合、利用されているステートメントから型が推定されます。

返り値は return 文で指定するのではなく、コードの最後の式の評価値が返り値として扱われます。次の例を見てください。

例:ヘキサ文字列を入力し、2バイト目の値を返す関数

create expression pickSecondByte(a) [
	var r;
	if (a == null || a.length() < 4) {
		r = -1;
	} else {
		r = parseInt(a.substring(2,4), 16);
	}
	output = r;
];

この例では最後の式を、output = r; としていますが、この関数の結果は最後の式の評価値 r になります。最後の文を、単に r; と書くこともできます。

この JavaScript は Java で記述された JavaScript エンジン Rhino で実行されています。したがって、JavaScript コード中で Java のクラスを利用することもできます。 JavaScript コード内で Java クラスを利用する場合は、パッケージ名を含む完全修飾 クラス名を利用します。Rhino で準備されている importPackage() や importClass() を利用するとパッケージ名を省略することができます。 ステートメントと異なり、import 済みの java パッケージはありません。

expression の返り値(JavaScriptの型)で指定できる Java クラス型の例を以下に記載します。以下は有用と思われる例ですので、ご注意ください。

JavaScript型(返り値) 返却型の指定例(Java型)
number double, java.lang.Double
int, java.lang.Integer
string java.lang.String
Array java.util.List
Object java.util.Map

create variable

create variable は変数を設定できます。定数を事前定義すれば、わかりやすくメンテナンスしやすいコードになります。

書式:

create variable <<Javaの型名>> <<変数名>> = <<設定する値>>;

例:

create variable String EVENT_TYPE_BASE = "nttcom_UplinkRequest";

これで、以降のステートメント中で String 型の変数 EVENT_TYPE_BASE が利用できるようになります。決まりではありませんが、定数は UPPER_SNAKE_CASE、変数は lowerCamelCase にすると見やすいコードになります。

変数を格納することもできます。

例:Javaオブジェクトを格納して乱数を得る例(5秒間隔で乱数を発生させる)

create variable Random random = new Random(5);
select
	random.nextInt()
from
  pattern [every timer:interval(5 seconds)];

from句に記述されているpattern句は後述しますが、この例では 5 秒ごとに空のイベントを発生させています。

filter

フィルターは、ストリームに対し条件を指定してイベントを取捨選択します。where 節と似ていますが、違いがあります(後述)。

書式:

<<ストリーム名>>(フィルターの論理式)

または

<<変数名>>=<<ストリーム名>>(フィルターの論理式)

論理式の中では、ストリームに含まれる要素や、変数を記述できます。

例:type=“c8y_TemperatureMeasurement” のイベントのみにフィルタリングします。

m=MeasurementCreated(measurement.type="c8y_TemperatureMeasurement")

上述はイベントを変数に入れているため、m=MeasurementCreated(m.measurement.type="c8y_TemperatureMeasurement") と書くこともできます。

フィルターの論理式で利用できる演算子、記法

演算子 意味
= type=“c8y_Temperature” などしい場合に真(抽出されます)
!= type!=“nttcom_Acceleration” などしくない場合に真
<, >, <=, >= getNumber(measurement, “nttcom_Speed.v.value”) < 40 measurementのfragment “nttcom_Speed.v.value” の値が 40 未満であれば真
in x in [0:10] xの値が0以上10以下の場合に真となります
in x in [0:10) xの値が0以上10未満の場合に真となります。カギカッコは含む、丸カッコは含まない
in type in (’nttcom_Speed_x’, ’nttcom_Speed_y') typeの値が’nttcom_Speed_x’, ’nttcom_Speed_y’のいずれかの場合に真となります
between value between 0 and 10 valueの値が0以上10以下の場合に真となります
and (条件式1) and (条件式2) 左記では、条件式1, 条件式2 がともに真の場合に真となります。単に (条件式1), (条件式2) とカンマで区切った場合も同じ意味になります。
or (条件式1) or (条件式2) 左記では、条件式1, 条件式2 のいずれかが真の場合に真となります。

Window

window

ウィンドウ、は現在を基準として、特定の範囲のイベントを表します。処理対象として from 句で利用します。いままで利用していた ストリーム は時間制約のないウィンドウと見なされます。

  • MeasurementCreated … ステートメントが処理開始してから現在までのすべてのMeasurement生成イベント(のウィンドウ)

window

書式: window は from 句で指定します

:
from <<ストリーム名>>.win:<<windowビューの名前>>(<<引数>>)
:

例:直近10秒間の MeasurementCreated のウィンドウ

:
from MeasurementCreated.win:time(10 seconds)
:
windowビューの書式 説明 トリガータイミング
.time(«期間表現») 直近の期間表現に含まれるイベントを出力します。1つ1つのイベント到着ごとに指定期間以前までのイベントがまとめて出力されます .win:time(1 day)
.win:time(2 months)
.win:time(1 second 500 msec)
.win:time(1 hour 20 minutes 5 seconds)
イベント到着ごと
.length(«整数表現») 整数で示される数分のwindowを示します。1つ1つのイベント到着ごとに過去の指定個数分のイベントがまとめて出力されます .win:length(5) イベント到着ごと
.time_batch(«期間表現») 期間表現の間イベントをバッファリングし、期間表現のタイミングごとにまとめてイベントが発生します。 .win:time_batch(10 seconds) 期間表現(左の例では10秒)ごと
.length_batch(«整数表現») 指定された数のイベントが来るまでバッファリングし、集まったらイベントが発生します。 .win:length_batch(100) 整数表現回数イベント(左の例では100イベント)発生ごと
.time_length_batch(«期間表現»,«整数表現») 指定された期間表現、または到着イベント数が整数表現に達するまでバッファリングし、まとめてイベントとして出力します。 .win:time_length_batch(1 hour, 500) 期間表現、または整数表現回数イベントごと

標準window関数

ここでは、有用なwindowビューを取得する関数を紹介します。

windowビューの書式 説明
std:unique(«プロパティ») プロパティで示される値が異なるもの毎に、最後のイベントを保持するwindowを返します。プロパティはカンマで区切って複数指定することもできます。 EventCreated.std:unique(event.source.value)
std:uniqueに詳しく解説しています
std:firstunique(«プロパティ») プロパティで示される値が異なるもの毎に、最初のイベントを保持するwindowを返します。プロパティはカンマで区切って複数していすることもできます。 EventCreated.std:firstunique(event.source.value)
std:size(«プロパティ») window に含まれる要素の個数を size プロパティとして返します。実際には size プロパティを持つイベント1つからなる window として返します。プロパティを指定すると、個数とともにプロパティの値も取得することができます。 select size from MeasurementCreated.win:time(1 min).std:size();
select size,type from MeasurementCreated.win:time(1 min).std:size(type);
std:lastevent() 最後のイベント1つからなるwindowを返します。 AlarmCreated.std:lastevent()
std:firstevent() 最初のイベント1つからなるwindowを返します。 EventCreated.std:firstevent()

filter と where

filter は、where と同じように処理対象のイベントを取捨選択しますが、window を指定する場合に挙動が異なります。

例1:window 指定と filter を併用

select window(*)
from EventCreated(event.type="nttcom_SomeEvent").win:length(5)

例2:window 指定と where を併用

select window(*)
from EventCreated.win:length(5) e
where e.event.type="nttcom_CertainEvent"

上記の2つの例は挙動が異なります。filter は window 生成前に作用し、where は window 生成後に作用します。window(*) は window の要素すべてを取得する集約関数です。

type=“nttcom_CertainEvent” となるイベントを★、他のイベントを☆と表記します。

例1:window に入れる対象が filter されます。

時間 発生イベント filter window トリガー
1 ★1 →(通過) ★1 トリガー
2 ★2 →(通過) ★1★2 トリガー
3 ☆3 ×(破棄) ★1★2
4 ★4 →(通過) ★1★2★4 トリガー
5 ★5 →(通過) ★1★2★4★5 トリガー
6 ☆6 ×(破棄) ★1★2★4★5
7 ★7 →(通過) ★1★2★4★5★7 トリガー

例2 :window に入る場合、出る場合それぞれトリガーします

時間 発生イベント window where トリガー
1 ★1 ★1 →(通過) トリガー
2 ★2 ★1★2 →(通過) トリガー
3 ☆3 ★1★2☆3 ×(破棄)
4 ★4 ★1★2☆3★4 →(通過) トリガー
5 ★5 ★1★2☆3★4★5 →(通過) トリガー
6 ☆6 ★2☆3★4★5☆6 ×(破棄) トリガー(*)
7 ★7 ☆3★4★5☆6★7 →(通過) トリガー

例1では、window の要素はすべて条件を満たしたものが格納されますが、例2ではwindowにすべての要素が格納されます。

(*) 以下のように、select句で単一イベントを参照する場合、トリガーしません。

select e from EventCreated.win:length(5) e where e.event.type="nttcom_CertainEvent";

pattern

pattern は from 句で利用することができ、特定の順序/条件でのイベント、定刻タイミングでの起動、など複雑なトリガー条件を指定することができます。

例:Event が生成されてから、同じsource値をもつ Alarm が生成された場合にトリガー

from pattern[every (e=EventCreated -> a=AlarmCreated(alarm.source = e.event.source))];

every は続く条件が発生したら、また条件検出をやり直すことを示します。 every をつけないと、デプロイ後最初の条件の発生1回のみが検知されます。

例:type=“openEvent” の後に、同一 source の “closeEvent” が来た場合にトリガー

from pattern[every e=EventCreated(event.type="openEvent") -> f=EventCreated(event.source = e.event.source and event.type="closeEvent")]

上記の例では、every の後が ( ) でくくられていないことに注意してください。 この場合、pattern 句は openEvent を見つけるたびに、それぞれの openEvent に対応する同一 source の closeEvent を待ちます。 every の後を ( ) でくくると、pattern 句は openEvent を検知し,次に source が同一の closeEvent が来るまで待ちます。そしてそれが来たらまた openEvent の検知に戻る動きになります。closeEvent が来るのを待っている間、別の openEvent が来ても無視されます。

次のようにイベントが到着する場合を考えます。o は openEvent, x は closeEvent を示し、カッコ内の数値は source 値です。

時間 0 1 2 3 4 5
イベント o(1) o(2) o(3) x(1) x(3) x(2)

カッコがない場合

every e=EventCreated(event.type="openEvent") -> f=EventCreated(event.source = e.event.source and event.type="closeEvent")
時間 0 1 2 3 4 5
イベント o(1) o(2) o(3) x(1) x(3) x(2)
CEPエンジンの動き o(1)に対し条件検知開始,即座に次のoを待つ o(2)に対し条件検知開始,即座に次のoを待つ o(3)に対し条件検知開始,即座に次のoを待つ o(1)→x(1)でトリガー o(3)→x(3)でトリガー o(2)→x(2)でトリガー

カッコがある場合

> every (e=EventCreated(event.type="openEvent") -> f=EventCreated(event.source = e.event.source and event.type="closeEvent") )
時間 0 1 2 3 4 5
イベント o(1) o(2) o(3) x(1) x(3) x(2)
CEPエンジンの動き o(1)に対し条件検知開始, x(1)を待つ xでないので無視 xでないので無視 o(1)→x(1)でトリガー, 次のoを待つ oでないので無視 oでないので無視

例:5分ごとにトリガーする(5分ごとに空のイベントを自動トリガー)

from pattern [every timer:interval(5 minutes)];

例:cron のように定刻起動できます

// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (オプション) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek:     0 (日曜) - 6 (土曜)
// seconds: 0-59

from pattern [every timer:at(*, *, *, *, *)]; // 毎分トリガー
from pattern [every timer:at(*, *, *, *, *, *)]; // 毎秒トリガー
from pattern [every timer:at(*/10, *, *, *)]; // 10分ごとにトリガー
from pattern [every timer:at(0, 1, *, *, [1,3,5])]; // 毎週月水金の夜中1時にトリガー
from pattern [every timer:at(0, */2, 1:7, *, *)]; // 毎月1日から7日まで、2時間ごとにトリガー

join

ウィンドウには決まった要素が含まれています。複数のウィンドウを指定すると、これらの要素に対して RDBにおける join の操作を行うこともできます。RDB と同様に、inner/outer/left outer/right outer の join が定義されます。デフォルト(無指定)では inner join となります。

例:3秒以内に、同一のデバイスから c8y_LocationUpdate の Event と Measurement が発生した場合、Measurementの温度、Eventのlat/lng情報を取得します。もちろん insert into でこれらの値をストリームに出力することもできます。

select
	getNumber(m.measurement, "c8y_Temperature.T.value"),
	getNumber(e.event, "c8y_Position.lat"),
	getNumber(e.event, "c8y_Position.lng")
from
	MeasurementCreated.win:length(3) as m,
	EventCreated.win:length(3) as e
where
	m.measurement.source.value = e.event.source.value and e.type="c8y_LocationUpdate";

この例は from 句でとくに join の仕方を指定していないので、デフォルトの inner join が適用されます。 過去3つの Measurement の window と、過去3つの Event の window の要素の組で、where で指定された すべての組み合わせの 要素が抽出されます。

例:Event の type はすべて “c8y_LocationUpdate” とします。また、source は大文字のMと大文字のEが一致しているとします。

時間(秒) → 0 1 2 3 4
MeasurementCreated - M1 m2 - M4
EventCreated e0 E1 - E3 e4

このとき、以下のようにイベントが発生します。時間(秒)が4のとき、2回イベントが発生していることに注意してください。

時間(秒) → 0 1 2 3 4
イベント - M1,E1 - M1,E3 M4,E1
M4,E3

std:unique

ここでは、標準windowの中でもとくに有用性の高い std:unique について解説します。 std:unique(«プロパティ名») はウィンドウに対する関数で、指定されたプロパティ名の値ごとにイベントをグルーピングし、各グループでの最新の(最後の)イベントを1つずつとり、それらを値として持つ window を生成します。

type の異なるイベントが次々に発生する場合の std:unique の動きを例で示します。イベントを En(type) のように表します。n は通し番号、type は type 値です。

時間 1 2 3 4 5 6 7
イベント E1(a) E2(b) E3(b) E4(c) E5(b) E6(a) E7(d)
std.unique(type) E1(a)
-
-
-
E1(a)
E2(b)
-
-
E1(a)
E3(b)
-
-
E1(a)
E3(b)
E4(c)
-
E1(a)
E5(b)
E4(c)
-
E6(a)
E5(b)
E4(c)
-
E6(a)
E5(b)
E4(c)
E7(d)

これにより、デバイスごとに条件を指定してデータを取得することができます。例として、複数のデバイスから位置情報が Event として通知され、気圧が Measurement で通知される場合、デバイスごとの現時点の最新の 位置情報、気圧 の値を次のように取得することができます。

select
	getNumber(e.event, "c8y_Position.lat").doubleValue() as lat,
	getNumber(e.event, "c8y_Position.lng").doubleValue() as lng,
	getNumber(m.measurement, "nttcom_AtomosphericPressure.P.value").doubleValue() as pressure
from
	EventCreated.std:unique(event.source.value) e,
	MeasurementCreated.std:unique(measurement.source.value) m
where
	e.event.source.value = m.measurement.source.value;

上記のステートメントで、

EventCreated.std:unique(event.source.value)

は、event.source.value の値ごと(デバイスごと)の最新のイベントからなる window (要素数はEvent通知したデバイス数分あります)です。また、

MeasurementCreated.std:unique(measurement.source.value) m

は、measurement.source.value の値ごと(デバイスごと)の最新のイベントからなる window(要素数はMeasurement通知したデバイス数分あります)です。

これらの inner join は、2つの window の要素の組で、すべての組み合わせからなる表(要素数は(Event通知デバイス数)×(Measurement通知デバイス数))となります。where により、表の中で e.event.source.value = m.measurement.source.value を満たす組み合わせ(同じデバイスから通知された Event と Measurement の組)が抽出されます。

unidirectional

ストリームに unidirectional キーワードを使うと、トリガータイミングを決めるストリームを指定することができます。unidirectional が指定されたウィンドウにイベントが到着するたびに、そのイベントとそのほかのウィンドウに対してjoinが行われます。

例:5 秒ごとに、最新の MeasurementCreated を抽出します。MeasurementCreated が 5 秒以上発生しない場合は、5 秒ごとに同じ内容が出力されます。

select m
from
	pattern [every timer:interval(5 sec)] unidirectional,
	MeasurementCreated.win:length(1) as m;

例:Event が発生するごとに、最新の MeasurementCreated を抽出します。

select m
from
	EventCreated unidirectional,
	MeasurementCreated.win:length(1) as m;

unidirectional キーワードは、ストリーム全体に対して適用でき、ウィンドウには適用できません。また、到着したイベントひとつひとつに対して、他のウィンドウとの join を行います。

処理単位の制御

create context

コンテキストは、ストリーム内の特定のイベント要素を指定し、その値ごとに独立してステートメント処理を行わせたい場合に利用します。 例えば、温度センサーが山の上や、海岸、部屋の中などに複数ある場合を考えます。それぞれの測定地での温度センサーの1時間の平均温度を取得したいとしましょう。 これを、

select
	avg(getNumber(m.measurement, "c8y_TemperatureMeasurement"))
from
	MeasurementCreated.win:time(1 hours) m
where
	getObject(m.measurement, "c8y_TemperatureMeasurement") is not null;

のように書くと、全てのセンサーの1時間平均が取得されてしまいます。 Context は、ステートメントをストリームの特定の値(デバイスIDなど)で別々に実行することができます。

書式:

create context <<コンテキスト名>>
partition by <<区別に利用するイベント要素>>
from <<ストリーム名>>;

例:Measurement のデバイスid ごとに直近1時間の平均気温の処理を行う。

// コンテキスト宣言
create context DeviceAwareContext
	partition by
		measurement.source.value from MeasurementCreated;

// insert 処理
@Name("calculate average")
context DeviceAwareContext // コンテキストを使用
insert into CreateMeasurement
select
	m.measurement.source as source,
	"nttcom_HourAverage" as type,
	m.measurement.time as time,
	{
		"nttcom_HourAverage.T.value", avg(getNumber(m.measurement, "c8y_TemperatureMeasurement.T.value")),
		"nttcom_HourAverage.T.unit", "C"
	} as fragments
from
	MeasurementCreated.win:time(1 hour) m
where
	m.measurement.type = "c8y_TemperatureMeasurement";

上記の例では、MeasurementCreated ストリームを入力とし、CreateMeasurement ストリームに出力しています。 CreateMeasurement への出力は MeasurementCreated イベントを発生させます。 このような場合や、入力、出力ストリームが同一の場合、無限ループに陥ることがないように注意してください。 上記例では、入力/出力で type 値を分けることで無限ループを回避しています。 無限ループが発生した場合、リソース使用量が多くなるため、スクリプトは Things Cloud 内で自動的にアンデプロイされます。

output

通常のステートメントでは、イベント発生時に処理が行われますが、一定のイベント数や時間間隔で処理させるために、output 句を利用することができます。

集約関数

組み込み関数(ウィンドウ用集約関数)

ウィンドウ用集約関数は、select句で利用します。

SQL関数

数値を返却する関数です。

関数 機能
avg([all/distinct]expr.[,filter]) expr.で示される項目の平均を double 型で返却します avg(getNumber(m.measurement, “c8y_Temperature.T.value”))
count([all/distinct]expr.[,filter]) expr.で示される項目で null でないものの数を long 型で返却します count(getObject(m.measurement, “someFragment”))
count(*[,filter]) イベント数を long 型で返却します count(*)
max([all/distinct]expr.) expr.で指定された項目の最大値を取得します max(getNumber(m.measurement, “nttcom_Acceleration.x.value”))
fmax([all/distinct]expr.,filter) expr.で指定された項目でフィルターを満たすものの最大値を取得します fmax(value, type=‘someType’)
min([all/distinct]expr.) expr.で指定された項目の最小値を取得します min(soldCount)
fmin([all/distinct]expr.,filter) expr.で指定された項目でフィルターを満たすものの最小値を取得します fmin(pressure, weather=‘cloud’ or weather=‘rain’)
sum([all/distinct]expr.[,filter]) expr.で指定された項目のウィンドウでの総和を算出します。フィルターを指定することもできます sum(toll, carType=‘truck’)

イベント集約関数

イベント集約関数は、単一または複数のイベントを返却します。

関数 機能
first(*/stream.*/expr.[,index]) ウィンドウの最初の要素を取得します。 first(*)
last(*/stream.*/expr.[,index]) ウィンドウの最後の要素を取得します。 last(getNumber(measurement, “c8y_Temperature.T.value”), 1) 最後から2番目を取得
maxBy() 指定した項目を最大にするウィンドウ内のイベントを取得します maxBy(getNumber(measurement, “c8y_Temperature.T.value”)).source.value
minBy() 指定した項目を最小にするウィンドウ内のイベントを取得します minBy(speed).carType
window() ウィンドウの要素全部を返却します window(*)

汎用集約関数

関数 機能
aggregate() 指定された繰り返し処理によって要素を集約します window(*).aggregate(0, (a,b)=>a+b)

汎用変換関数

関数 機能
selectFrom() リストの各要素に対し、指定された処理を施して新しい同じサイズのリスト(java.util.Collection)を返します {1, 2, 3}.selectFrom(i => i+2)

aggregate 関数

書式:

<<list>>.aggregate(Object initialValue, lambda expr.)
  • initialValue : 初期値
  • lambda expr. : ラムダ式 ( (a,b) => (aとbの式) )

例:

{1,3,10}.aggregate(0, (a,b)=>a+b)

上記の結果は 14 になります。 上記の処理は以下のように行われます。

  • 初期値0
  • リスト{1,3,10}の各要素に対して、(現在の値, 要素の値) に対して (現在の値)+(要素の値) を計算し、その計算結果で(現在の値)を更新していきます。
    • (現在の値) = 0 (初期値)
    • (0,1) => 0+1 = 1 (現在の値を更新)
    • (1,3) => 1+3 = 4 (現在の値を更新)
    • (4,10)=> 4+10 = 14 (現在の値を更新)

他の例1:文字列への集約

この例では、直近3つの Event の text を連結した csv 形式の文字列を作成します。

select window(*).aggregate('',
  (result, item) => result || (case when result='' then '' else ',' end) || item.event.text)
from EventCreated.win:length(3);

他の例2:リストへの集約。この例は aggregate 関数の利用法の説明であり、実用的な意味はありません。

create expression List addList(list, item) [
    list.add(item);
    list;
];

select
    {1,2,3}.aggregate(new ArrayList(), (list,item) => addList(list,item))
from pattern[timer:interval(3 sec)];

初期値、ラムダ式の第一引数、ラムダ式の計算結果 は同一のJava型となるようにします。

selectFrom 関数

selectFrom は、対象のリストやウィンドウの各要素それぞれに対して、指定された処理を施します。 返却値は java.util.Collection 型です。

書式:

<<list>>.selectFrom(lambda expr.)
  • lambda expr. : ラムダ式 ( a => aの式)

例1:シンプルな例

{1,3,10}.selectFrom( i => i+1 )

上記の結果は {2,4,11} になります。 ラムダ式は、i に対して i+1 をマッピングさせており、各要素に対して 1 を加える操作を表現しています。

例2:ある ManagedObject の子アセット(ManagedObject の配列)の実体を取得し、ストリームに流す

/**
 * 対象の ManagedObject からなるストリーム
 */
create schema MyManagedObject(managedObject ManagedObject);

/**
 * ManagedObject を取得(Event トリガー)
 */
@Name("handleMo")
insert into
    MyManagedObject
select
    findManagedObjectById("59859") as managedObject
from EventCreated;    

/**
 * 子アセットからなるストリーム
 */
create schema MyChildAssets(childAssets Collection);

/**
 * 対象の ManagedObject の子アセットの実体を selectFrom で取得
 */
@Name("handleMoAssets")
insert into MyChildAssets
select
    m.managedObject.childAssets.selectFrom(i => findManagedObjectById(GId.asGId(i))) as childAssets
from
    MyManagedObject m;