MQTTサービス

必要条件
MQTT Serviceを使用するには、テナントがMqtt-serviceマイクロサービスをサブスクライブしている必要があります。 これは、Things Cloud環境の構成方法によっては自動的に行われている場合があります。 サブスクリプションを確認するには、管理アプリケーションを開き、エコシステム > マイクロサービスに移動します。 Mqtt-serviceマイクロサービスが一覧に表示されない場合は、製品サポート(パブリック環境の場合)またはThings Cloud管理者(専用環境の場合)に連絡して、テナント向けのサブスクリプションを依頼してください。

MQTT Serviceは、MQTTデバイスをThings Cloudプラットフォームと統合するための単一の統合エンドポイントを提供します。 これは汎用のMQTTブローカーではありません。 多数の接続デバイスがあり、デバイスからプラットフォームへのメッセージの総スループットが高いシナリオ向けに最適化されています。

このドキュメントは、MQTTデバイスをThings Cloudと統合したい開発者、またはデバイスと通信するクライアントを構築したい開発者を対象としています。 MQTT通信の基本については説明しません。 MQTTに不慣れな場合は、MQTT Webサイトをはじめとして、多数の入門資料を利用できます。

概要とアーキテクチャ

MQTT Service は Messaging Service と連携して、MQTT デバイスを Things Cloud プラットフォームに迅速かつ安全に統合します。 すでに Things Cloud ドメインモデルを理解しているデバイスは、Core MQTT プロトコル(SmartREST および JSON-over-MQTT)を使用して Things Cloud と直接通信できます。 または、デバイスは任意の MQTT トピックで任意のペイロードのメッセージを送受信できます。 このような generic デバイスでは、デバイスのプロトコルと Things Cloud ドメインモデルの間の変換はテナントの責任となります。 この変換は、プラットフォーム内で実行されるマイクロサービス、または外部クライアントアプリケーションで実装できます。

MQTT Service は、完全な MQTT broker ではなく MQTT endpoint と見なす必要があります。 これは、いくつかの非常に非対称な特性を持つ IoT device integration のユースケースに最適化されています。

  • Things Cloud プラットフォームにメッセージを公開する、同時接続された多数のデバイス(最大数千万台)
  • 一意の MQTT トピックが多数存在すること(最大数千万件)
  • Things Cloud プラットフォームに公開されるメッセージの高い総スループット(最大毎秒数百万件)
  • Things Cloud プラットフォーム内に存在する、少数の高スループットなメッセージコンシューマー
  • 個々のデバイスのメッセージスループットはより小さいこと(最大毎秒数百件)
  • Things Cloud プラットフォームからデバイスへの総メッセージスループットはより小さいこと(最大毎秒数千件)
  • デバイス間の直接的なピアツーピア通信がないこと
  • デバイスは地理的に分散しており、物理的なセキュリティが限定されていることが多いこと
  • Things Cloud プラットフォームが、安全で管理された高可用性インフラストラクチャ上にデプロイされていること

最適なパフォーマンスを得るために、これらのユースケースは Messaging Service への直接接続を持つ、より従来型のパブリッシュ/サブスクライブアーキテクチャを使用して実装する必要があります。

主な機能

プロトコルとクライアント

接続プロトコル TCP のみ。
MQTT プロトコルバージョン 3.1.1 および 5.0。詳細については MQTT プロトコルの実装 を参照してください。
Generic MQTT デバイスプロトコル MQTT デバイスは、任意の MQTT トピックで任意のペイロードを公開およびサブスクライブできます。
Things Cloud Core MQTT プロトコル SmartREST 1.0、SmartREST 2.0、および JSON-over-MQTT プロトコルの プレビュー サポート。詳細については Core MQTT デバイスサポート を参照してください。
Apache Pulsar マイクロサービスおよび外部クライアントは、デバイスプロトコルと Things Cloud ドメインモデルの間を変換するために Messaging Service に直接接続 します。
Messaging Service クライアントは、必要に応じてデバイスを Things Cloud マネージドオブジェクトとして登録する責任もあります。
Thin Edge Thin Edge では、同じデバイス上の generic および Core MQTT プロトコルをそのままサポートしています。

セキュリティと分離

マルチテナンシー 複数のテナントの MQTT デバイスが同じエンドポイントに接続します。各テナントの接続、トピック、メッセージは完全に分離されます。
デバイス分離 テナント内の MQTT デバイス同士は直接通信できません。各デバイスは実質的に独自のプライベートなトピック空間を持ちます。
クライアントアクセス Messaging Service に接続するマイクロサービスおよび外部アプリケーションは、テナント内のすべてのトピックとメッセージにアクセスできます。
双方向 TLS 証明書のトラストアンカーは Things Cloud 内で管理されます。証明書は個々の MQTT デバイスに厳密にひも付けられます。

パフォーマンスとスケーリング

水平スケーリング MQTT Service は Things Cloud コアとは独立してスケーリングできます。
ベンチマーク結果 1 億件の同時デバイス接続と、毎秒 100 万件の一意のメッセージスループットまでスケールすることが検証されています。
制限とクォータ テナントごとおよびクライアントごとの制限とクォータにより、サービスの安定性を確保し、「noisy neighbour」の問題を防止します。

アーキテクチャ

以下の図は、テナント内での MQTT Service のデータフローを示しています。

MQTT デバイスによって公開されたすべてのメッセージは Messaging Service に転送され、そこでコンシュームされるまで永続化されます。 Core MQTT プロトコルで使用される MQTT トピックに公開された Things Cloud ドメインモデルメッセージは、Things Cloud コアによって直接コンシュームされます。 他の MQTT トピックに公開されたメッセージは、メッセージを Things Cloud ドメインモデルにマッピングする責任を持つマイクロサービスや外部クライアントによってコンシュームされます。 同様に、Things Cloud コアおよびクライアントは Messaging Service にメッセージを公開でき、それらは MQTT Service によってコンシュームされてデバイスに転送されます。

MQTT Service architecture

デバイス分離

device isolation 機能により、異なるクライアントが使用する同じ名前のトピック間に相互作用はありません。 実質的に、すべてのデバイスはそのデバイスだけがアクセスできる独自のプライベートなトピック空間を持ちます。 これは、図の中で device 1device N がどちらも topic A で公開とサブスクライブを行っていることから確認できます。 デバイスは分離されているため、device 1device N によって公開されたメッセージを一切見ることができず、その逆も同様です。 ただし、Messaging Service に直接接続するマイクロサービスまたは外部アプリケーションクライアントは、すべてのデバイスで使用されるトピックに完全にアクセスでき、必要に応じてクライアント間でメッセージを転送できます。

MQTTデバイスの接続

このセクションでは、MQTT Service に MQTT デバイスを接続して認証する際の詳細について説明します。 Things Cloud と MQTT デバイスを統合するすべての人にとって関心のある内容です。 一般に、MQTT Service は、デバイスが Things Cloud の Core MQTT プロトコルを使用するか、非 Things Cloud プロトコルを使用するかにかかわらず、すべてのデバイスに対して同じように動作します。 デバイスで使用されるアプリケーションプロトコルに関連する違いがある場合は、該当箇所で文書化されます。

ポート

MQTT Service への MQTT 接続では TCP を使用する必要があります。 WebSocket 接続はサポートされていません。 接続先ホストには、たとえば <MY-TENANT>.cumulocity.com のように、テナントドメインを使用してください。

MQTT Service は既存の Core MQTT と並行して動作するため、MQTT Service を使用するデバイスは別のポートに接続する必要があります。

  • ポート 9883(TLS)は、MQTT Service と安全に暗号化通信を行うためのデフォルトポートです。 一方向(サーバー証明書のみ)TLS と双方向(クライアント証明書とサーバー証明書の両方)TLS の両方がサポートされています。 クライアント証明書を使用しない場合、サーバーは基本認証を使用してクライアントを認証します。

  • ポート 2883(非 TLS)は、暗号化されていないトラフィックを許可することによるセキュリティリスクのため、Things Cloud の共有パブリック環境では有効化されていません。 専用環境でポート 2883 を有効にするには、製品サポート にお問い合わせください。

クライアント識別子

特定のテナント内で MQTT Service に接続するすべてのデバイスは、一意の クライアント識別子(client ID)を使用する必要があります。 すでに接続されている client ID を使用してデバイスが接続した場合、MQTT 仕様に従って、既存の 接続は終了されます。 異なるテナントに属するデバイスは、同じ client ID を使用して同時に接続できます。 空の client ID は許可されていません。

一般に、client ID は構造を持たない識別子として扱われ、MQTT Service がそれを何らかの方法で解釈することはありません。 ただし、MQTT Service に接続する事前登録済みの Core MQTT デバイスに対しては、いくつかの特別な処理が行われます。 詳細については、Core MQTT デバイスサポート セクションを参照してください。

認証

MQTT Service は次の認証方式をサポートしています。 いずれの場合も、MQTT Service がデバイスに関連付けられた Things Cloud テナントを識別できるように、MQTT ユーザー名が正しく設定されていることを確認することが重要です。

  • ユーザー名とパスワード(基本認証)
    テナント上の任意のユーザーの認証情報を使用して、MQTT Service に対してデバイスを認証できます。 MQTT CONNECT パケット内のユーザー名には、<tenantID>/<username> の形式でテナント ID とユーザー名を必ず含める必要があります。 CONNECT パケット内のパスワードは、ユーザーの暗号化されていないパスワードでなければなりません。 この認証方式に関連付けられたユーザーには、Mqtt service 権限タイプに対する ADMIN 権限を割り当てる必要があります。

  • X.509 デバイス証明書(証明書認証)
    証明書を使用して認証するには、デバイスは Things Cloud テナント用に設定された トラストアンカー によって信頼される 証明書チェーン を提示する必要があります。 証明書の Common Name(CN)フィールドは、MQTT CONNECT パケット内の client ID フィールドと必ず一致している必要があります。 デバイスは MQTT CONNECT パケットのユーザー名フィールドにテナント ID を指定することが推奨されます。 トラストアンカーとデバイス証明書の作成および管理の詳細については、以下の TLS 証明書の使用 セクションを参照してください。

TLS 証明書の使用

このセクションには、MQTT Service における TLS 証明書サポートの簡略化された概要が記載されています。 詳細については、Things Cloud の一般的な デバイス証明書 のドキュメントを参照してください。

サーバー証明書

MQTT Service は、メインの Things Cloud 環境ドメインに割り当てられているものと同じサーバー証明書を使用します。 エンタープライズテナント(親テナント) は、SSL 管理機能を使用してこれらの証明書をカスタマイズすることはできません。

デバイス(クライアント)証明書

MQTT Service で使用されるデバイス証明書は、一般ドキュメントに記載されている同じ前提条件を共有します。 さらに、MQTT Service に接続するデバイスについては、以下が適用されます。

  1. 証明書の Common Name(CN)フィールドは、MQTT CONNECT パケット内の client ID フィールドと必ず一致している必要があります。 client ID と証明書の CN が一致しない場合、接続は拒否されます。
  2. デバイスは MQTT CONNECT パケットのユーザー名フィールドにテナント ID を指定することが推奨されます。 テナント ID が指定されている場合、それは指定された証明書を信頼するテナントに対応していなければなりません。そうでない場合、接続は拒否されます。 同様に、トラストアンカーが複数のテナントによって信頼されていて、テナント ID が指定されて_いない_場合、接続は拒否されます。 現時点では、マルチテナントのトラストアンカーは Things Cloud でサポートされていませんが、この機能は将来的に導入される可能性があります。 トラストアンカーの設定が変更された場合でもデバイスが接続を継続できるよう、ユーザー名フィールドには常にテナント ID を指定することを推奨します。

証明書トラストアンカーの設定

Things Cloud プラットフォームの TLS トラストアンカーは、テナントごとに定義されます。 認証にデバイス証明書を使用するには、デバイス証明書に署名するルート証明書または中間証明書をプラットフォームにアップロードし、テナントの信頼済み証明書リストに追加する必要があります。 たとえば、トラストアンカーとしてルート証明書のみが設定されている場合、デバイスは(少なくとも)そのデバイス固有の証明書と、ルートによって信頼される中間証明書を含む証明書チェーンを送信する必要があります。 逆に、中間証明書がトラストアンカーとして設定されている場合、デバイスはその中間証明書によって信頼されるデバイスごとの固有証明書のみを送信できます。

トラストアンカーは、UI の 信頼済み証明書 ページ、または REST API を通じて設定できます。

さらに、証明書を追加する際には Auto registration オプションが有効になっていることを確認してください。これにより、有効な証明書を提示する任意のデバイスが初回接続時にプラットフォームへ自動登録されます。

自己署名証明書の作成

デバイス証明書を自己署名するには、ルート Certificate Authority(CA)証明書を作成する必要があります。 OpenSSL CLI ツールを使用して秘密鍵を作成し、次にそれから自己署名ルート証明書を生成します。

openssl genpkey -algorithm RSA -out ca.key
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=MQTTServiceCA"

次に、デバイス用の秘密鍵を作成し、この秘密鍵から Certificate Signing Request(CSR)を生成して、その CSR に署名します。

openssl genpkey -algorithm RSA -out client.key
openssl rsa -in client.key -out client-key.pem -outform PEM
openssl req -new -key client.key -out client.csr -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=mqtt-client"
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 3650 -sha256
cat client.crt ca.crt > client-chain.pem

証明書作成に関してさらに高度な要件がある場合は、証明書の生成と署名 を参照してください。

また、デバイスが Enrollment over Secure Transport(EST)プロトコルをサポートできる場合は、Things Cloud の組み込み Certificate Authority も使用できる可能性があります。

証明書の使用

CA 証明書が Things Cloud にアップロードされ、信頼されるようになると、デバイスは信頼された CA によって署名されたクライアント証明書を使用して認証できます。 任意の MQTT クライアントを使用して接続するには、前に生成したクライアント証明書と鍵を使用します。 この例では、Mosquitto MQTT クライアントを使用しています。

mosquitto_pub --cafile cumulocity.com.pem -d -q 1 \
  -h "cumulocity.com" -p "9883" -i myclient \
  -u t11101 \
  -t "v1/devices/me/telemetry" \
  --key client-key.pem \
  --cert client-chain.pem \
  -m '{"temperature":25}'

説明:

  • --cafile cumulocity.com.pem: このファイルには Things Cloud の MQTT Service ブローカーの CA 証明書が含まれており、サーバーの識別情報を検証するために使用されます。
  • --key client-key.pem--cert client-chain.pem: これらは、信頼された CA によって署名されたクライアント証明書と秘密鍵です。
  • -u t11101: (任意)MQTT ユーザー名を指定します。これはテナント ID である必要があります。
CA 証明書のダウンロード

Things Cloud は、よく知られた公開 CA によって署名された証明書を使用しています。 一部のクライアント(Mosquitto など)では CA ファイルを明示的に指定する必要がありますが、他のクライアント(MQTTX など)ではこれらの証明書を自動的に信頼します。 Things Cloud MQTT Service ブローカーの CA 証明書をダウンロードするには:

  1. ブラウザで cumulocity.com を開きます。
  2. アドレスバーの南京錠アイコンをクリックし、証明書の詳細を表示します。
  3. ルート証明書をダウンロードまたはエクスポートし、cumulocity.com.pem として保存します。

または、openssl を使用して証明書を取得および抽出することもできます。

echo | openssl s_client -connect cumulocity.com:9883 -showcerts 2>/dev/null | \
    sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > cumulocity.com.pem

MQTTプロトコルの機能

このセクションでは、MQTT Service でサポートされている MQTT プロトコルのバージョンと機能の詳細を説明します。 MQTT デバイスの接続 と同様に、Things Cloud と MQTT デバイスを統合するすべてのユーザーにとって参考になります。 Things Cloud MQTT プロトコル(SmartREST および JSON-over-MQTT)を MQTT Service で使用する際の具体的な情報については、Core MQTT デバイスサポート セクションを参照してください。ただし、Core MQTT サポートには、このセクションで記載されているものと同じ一般的な機能および制限が適用される点に注意してください。

サービスへの接続

MQTT Service は、MQTT プロトコルのバージョン 3.1、3.1.1、5.0 を使用するクライアントからの接続をサポートしています。 これらのバージョンの違いの詳細については、MQTT specifications を参照してください。

MQTT バージョン 3.1 は廃止されており、推奨されません。 以下のバージョン 3.1.1 に関する詳細のほとんどはバージョン 3.1 にも当てはまりますが、プロトコルバージョン 3.1 における具体的な違いは明示的には記載されていません。

MQTT バージョン 3.1.1 の機能

これらの機能は、MQTT プロトコルのバージョン 5.0 を使用して接続するデバイスにも適用されます。

Clean Session

MQTT Service では、デバイスは MQTT CONNECT パケット内の Clean Session フラグを “1”(true)に設定して接続することが必須です。 このフラグは MQTT バージョン 5.0 では Clean Start と呼ばれます。 このフラグが設定されていない場合、クライアント接続は MQTT Service によって拒否されます。

注意
これは、デバイスが切断されている間にデバイス 宛てに 送信されたメッセージは、再接続時に自動的には配信されないことを意味します。 この動作がユースケース上重要である場合、デバイスおよびクライアントは、未受信メッセージを送信するためのアプリケーションレベルのプロトコルを実装する必要があります。 この制限は、Core MQTT デバイスの実験的サポートにも適用されます。 保留中の Things Cloud デバイスオペレーションは、接続時に Core MQTT デバイスには送信されません。

Quality of Service

MQTT Service は、2 つのレベルの MQTT Quality of Service(QoS)をサポートしています。 必要な QoS レベルは、デバイスが MQTT Service にメッセージを送信する際は MQTT PUBLISH パケット内で、デバイスが MQTT トピックをサブスクライブする際は MQTT SUBSCRIBE パケット内で指定します。

Level Supported Description
QoS 0 (at most once) Yes サービスはデバイスから送信されたメッセージを確認応答せず、メッセージが配信される保証もありません。
サブスクリプションについては、サービスはデバイスからの確認応答を期待せず、同じメッセージを複数回送信することもありません。
QoS 1 (at least once) Yes サービスはデバイスから送信されたメッセージを確認応答し、確認応答を受信しなかった場合、デバイスはメッセージを再送することがあります。
確認応答されたメッセージは、Messaging Service クライアントに少なくとも 1 回配信されることが保証されます。
サブスクリプションについては、デバイスはサービスから送信されたメッセージを確認応答する必要があり、サービスは同じメッセージを複数回送信することがあります。(1)
QoS 2 (exactly once) No サポートされていません

注記:

  1. MQTT Service ではデバイスが clean session で接続する必要があるため、確認応答されていないメッセージは、デバイスが切断して再接続した後に MQTT Service によって再送されません。

Duplicate Message Indicator

MQTT PUBLISH パケット内の Duplicate Message Indicator(DUP フラグ)は、これが同じパケットを以前に送信しようとした試行の再配信である 可能性がある ことを示します。 これは、QoS レベル 1 を使用して送信されたメッセージにのみ設定されます。 DUP フラグは、MQTT 仕様に従って MQTT Service でサポートされています。

Will Message

MQTT の Will Message 機能により、デバイスは CONNECT パケット内でメッセージを指定でき、予期せず切断された場合にそのメッセージがデバイスに代わってパブリッシュされます。 Will Message は MQTT Service でサポートされていますが、以下の制限があります。

  • デバイス分離 により、Will Message は他の接続済み MQTT デバイスには配信されません。 Will Message は Messaging Service にパブリッシュされ、そこでマイクロサービスまたは外部アプリケーションクライアントがコンシュームできます。
  • Will Message の QoS レベルは QoS 0(最大 1 回)または QoS 1(少なくとも 1 回)にできます。 QoS レベル 2(1 回だけ)はサポートされていません。
  • Retained Will Message はサポートされていません。 Will Message に retain フラグが設定されている場合、そのメッセージは受け付けられません。

Retained Message

MQTT の Retained Message 機能は MQTT Service ではサポートされていません。 デバイスからの PUBLISH メッセージに RETAIN フラグが設定されている場合、そのメッセージは受け付けられず、接続はクローズされます。

MQTT Service からデバイスにパブリッシュされるメッセージには、retain フラグは設定されません。

ワイルドカードサブスクリプション

ワイルドカードサブスクリプション により、デバイスは固定のトピック名の代わりに パターン を使用して複数の MQTT トピックをサブスクライブできます。 MQTT Service は、MQTT バージョン 3.1.1 および 5.0 の標準に準拠したワイルドカードサブスクリプションをサポートしています。

  • 単一レベル (+): ちょうど 1 つのトピックレベルに一致します。 たとえば、sensors/+/temperaturesensors/room1/temperature には一致しますが、sensors/room1/basement/temperature には一致しません。
  • 複数レベル (#): 任意の数のネストされたレベルに一致します。 これはトピック文字列の最後の文字でなければなりません。 たとえば、sensors/#sensors/room1/temperature および sensors/room2/basement/humidity に一致します。
デバイス分離

ワイルドカードを使用する場合でもデバイス分離は有効です。 ワイルドカードサブスクリプションによって、クライアントがすでに参照を許可されていないトピックへのアクセスが許可されることはありません。

重複するサブスクリプション

クライアントが同じトピックに一致する複数の重複するサブスクリプションを持っている場合(たとえば、sensors/+/status へのサブスクリプションと sensors/thermostat/status への別のサブスクリプション)、MQTT Service はメッセージを 1 回だけ 配信します。 重複するサブスクリプションで異なる QoS 設定がある場合、サービスは一致したサブスクリプションの中で最も高い QoS レベルを使用してメッセージを配信します。

注意
MQTT Service は Core MQTT トピック へのワイルドカードサブスクリプションを受け付けますが、これはサポートされていません。 Core MQTT トピックでワイルドカードを使用すると、予測不能な動作につながる可能性があります。 Core MQTT トピックへのすべてのサブスクリプションでは、固定トピック文字列を使用することを強く推奨します。

MQTT バージョン 5.0 の機能

これらの機能は、上記で説明した MQTT バージョン 3.1.1 の機能に加えて、MQTT プロトコルのバージョン 5.0 を使用するデバイスに適用されます。

デバイス接続プロパティ

MQTT バージョン 5.0 の CONNECT パケットでは、デバイス接続時に MQTT セッションの多くのオプションプロパティを構成できます。 MQTT Service におけるこれらの機能のサポートレベルはさまざまであり、以下の表に示しています。 ある機能が「ignored」と記述されている場合、接続時に要求することはできますが、MQTT Service の動作には影響しないことを意味します。 「not supported」と記述されている機能を使用すると、メッセージが拒否されたり、デバイスが切断されたりする場合があります。

Feature Support level Notes
Client Identifier Mandatory バージョン 3.1.1 と同様です。
Clean Start Mandatory バージョン 3.1.1 と同様です。 Clean Start はすべてのデバイス接続で必須です。
Will Message Supported QoS レベル、retained message、デバイス分離に関しては バージョン 3.1.1 と同じ制限があります。
Will Message 上の以下の追加のバージョン 5.0 プロパティはサポートされています。
Delay IntervalPayload Format IndicatorContent TypeResponse TopicCorrelation DataUser Property
Will Message 上の Message Expiry Interval プロパティは無視されます。
Receive Maximum Supported MQTT Service は、デバイスに対する未確認の QoS 1 メッセージ数を、要求された最大値に制限します。
Maximum Packet Size Supported MQTT Service は、このデバイスに対して要求サイズを超えるメッセージを送信しません。
要求サイズを超えるメッセージは通知なしに破棄される点に注意してください。
Request Problem Information Supported これが要求されている場合でも、デバイスは MQTT Service が reason string を送信すると想定すべきではありません。
Session Expiry Interval Ignored すべての接続で Clean Start を設定する必要があるため、セッションデータは保持されません。
Topic Alias Maximum Ignored MQTT Service は、デバイスに送信するメッセージで topic alias を使用しません。
Request Response Information Ignored MQTT Service は、CONNACK パケットに request/response のヒントを送信しません。
User Property Ignored CONNECT パケット上の user property は MQTT Service によって無視されます。
Authentication Method Ignored 拡張認証方式はサポートされていません。
Authentication Data Ignored 拡張認証方式はサポートされていません。

メッセージパブリッシュ機能

これらの機能は、デバイスから MQTT Service に送信される PUBLISH パケット、または MQTT Service からデバイスに送信される PUBLISH パケットに関連します。 多くの場合、メッセージ上の追加の MQTT バージョン 5.0 プロパティは、デバイスから Messaging Service クライアントへ、またはその逆へ「そのまま」渡されます。 これらのプロパティを適切に処理する責任は、メッセージを受信するデバイスまたはクライアントにあります。

Feature Support level Notes
Quality of Service level QoS 0 and 1 バージョン 3.1.1 と同様です。QoS レベル 2 はサポートされていません。
Duplicate Message Indicator Supported バージョン 3.1.1 と同様です。MQTT 仕様に従ってサポートされています。
Payload Format Indicator Supported MQTT デバイスと Messaging Service クライアントの間でそのまま渡されます。
Response Topic Supported MQTT デバイスと Messaging Service クライアントの間でそのまま渡されます。
クライアントは、指定されたトピックにレスポンスメッセージを送信する責任があります。
Correlation Data Supported MQTT デバイスと Messaging Service クライアントの間でそのまま渡されます。
クライアントは、任意のレスポンスメッセージに correlation data を含める責任があります。
User Property Supported MQTT デバイスと Messaging Service クライアントの間でそのまま渡されます。
Content Type Supported MQTT デバイスと Messaging Service クライアントの間でそのまま渡されます。
Message Expiry Interval Ignored デバイスからパブリッシュされるメッセージ上の message expiry interval は効果を持ちません。
デバイスにパブリッシュされるすべてのメッセージは同じ message expiry interval を使用します。
Retained Message Not supported バージョン 3.1.1 と同様です。retained message はサポートされていません。
Topic Alias Not supported topic alias を使用してパブリッシュされたメッセージは MQTT Service によって拒否されます。
Subscription Identifier Not supported subscription identifier は MQTT Service によってパブリッシュされるメッセージには設定されません。

トピックサブスクリプション機能

これらの機能は、MQTT Service に送信される SUBSCRIBE パケットに含めることができる MQTT バージョン 5.0 のフラグおよびプロパティに関係します。

Feature Support level Notes
Maximum QoS level QoS 0 and 1 バージョン 3.1.1 と同様です。QoS レベル 2 はサポートされていません。
User Property Ignored SUBSCRIBE パケット上の user property は MQTT Service によって無視されます。
No Local Ignored このオプションの設定に関係なく、ローカル転送はサポートされていません。
Retained As Published Ignored retained message はサポートされていないため、このオプションは効果を持ちません。
Retain Handling Ignored retained message はサポートされていないため、このオプションは効果を持ちません。
Subscription Identifier Not supported subscription identifier を使用するサブスクリプションは MQTT Service によって拒否されます。
Shared Subscription Not supported $share で始まるトピック名へのサブスクリプションはサポートされていません。

トピック

一般に、MQTT Service はトピック構造に制限を課さず、デバイスは MQTT 仕様で許可されている任意のトピック名を使用できます。 ただし、過去との互換性または将来の使用の可能性のために予約されているトピック名が少数存在します。 これらのトピック名はデバイスでは使用できません。

  • 特に文書化されている場合を除く、すべての システムトピック(トピック名が $ で始まるもの)

トピック名の最大長にはハード上限があります。

特定のトピックは、Core MQTT プロトコルを使用するデバイス用に予約されています。 完全なリストについては Core MQTT topics を参照してください。 Core MQTT と汎用デバイスのトピック空間は重複しておらず、それ以外のすべてのトピックは「汎用」MQTT デバイスで使用できます。 汎用デバイスは、Core MQTT 配下で使用されていないトピックが一部に含まれている場合でも、以下に示す Core MQTT のプレフィックスで始まるトピック名の使用を避けるべきです。 これにより、特定のトピックをどのように処理すべきかが明確でなくなり、デバッグが困難になる状況を避けることができます。

ペイロード

MQTT Service は、特定のメッセージペイロード形式を課しません。 メッセージペイロードは、受信したとおりに正確に配信される不透明なバイト列として扱われます。 メッセージペイロードの内容は、MQTT Service の動作に影響しません。

テナントごとに MQTT メッセージの最大サイズに対するハードクォータがあります。 メッセージサイズの計算には、メッセージヘッダーとペイロードの両方が含まれます。 メッセージヘッダーサイズは大きく変動する可能性があり、特に MQTT バージョン 5.0 デバイスではその傾向がありますが、常に少なくとも 2 バイトであり、通常はそれより大きくなります。

エラーレポート

MQTT Service は、サーバーからデバイスへのレスポンスについて MQTT 仕様に従います。

仕様によると、サーバーが不正な形式のパケットまたはプロトコルエラーを受信した場合、デバイスを切断する必要があります。 MQTT バージョン 3.1.1 デバイスでは、警告なしで単純に切断されます。 MQTT バージョン 5.0 デバイスでは、MQTT Service は接続を閉じる前に、切断理由を示す reason code を含むパケットをデバイスに送信する場合があります。 これは、CONNECT パケットのエラーに対しては CONNACK パケット、その他の不正なパケットに対しては DISCONNECT パケットになります。

サーバーは、プロトコル上は正しいが、上限超過など他の理由により拒否するパケットを受信する場合があります。 MQTT バージョン 3.1.1 を使用するデバイスでは、プロトコル上、パケットが拒否された理由を示す手段がないため、接続は単純に切断されます。 唯一の例外は SUBACK パケットで、より詳細な理由は示されないものの、サブスクリプションが失敗したことを示すことができます。 MQTT バージョン 5.0 を使用するデバイスでは、プロトコルにより SUBACKPUBACK を含むレスポンスパケットで reason code を送信できます。 reason code により、特定のリクエストが拒否された理由について、デバイスはより多くの情報を得られます。 レスポンスパケット送信後に接続が切断される場合もあります。

利用可能な reason code は、MQTT version 5.0 specification の section 2.4 に記載されています。

MQTT デバイスのクォータと上限

MQTT Service は、MQTT デバイスに対して複数の異なるクォータおよび上限を適用します。

他のエラー条件と同様に、クォータまたは上限を超えたデバイスは MQTT 仕様に従って処理されます。

特に指定がない限り、上限はテナントレベルで適用されます。 たとえば、これは受信メッセージレート上限が、そのテナントに接続されているすべてのデバイス全体の合計レートに適用されることを意味します。

MQTT バージョン 3.1.1 を使用するデバイスでは、プロトコル上、上限到達を示す手段がないため、接続は単純に切断されます。 唯一の例外は SUBACK パケットで、より詳細な理由は示されないものの、サブスクリプションが失敗したことを示すことができます。

MQTT バージョン 5.0 を使用するデバイスでは、プロトコル上 reason code を送信できる場合、コード 0x97(Quota exceeded)が使用されます。 この reason code を送信した後でも接続が切断される場合があります。

すべてのプロトコルバージョンで、Alarms に記載されたレート制限の対象として、アラームも生成されます。

MQTT Service によって適用される Messaging Service のクォータと上限 に関する説明も参照してください。 これらはデバイス接続に直接影響しませんが、Messaging Service からの「バックプレッシャー」により、たとえば Messaging Service がデバイスからの追加メッセージを受け付けられない場合などに、デバイスエラーが発生する可能性があります。

アラーム

MQTT Service は、デバイス接続上の一部のエラー条件に応じて Things Cloud アラームを生成します。 これにより、テナントユーザーやアプリケーションは問題をより可視化でき、特にデバイスから十分な診断データを取得することが難しい場合に有用です。 アラームには レート制限 が適用され、Things Cloud プラットフォームに過剰な数のアラームが送られて負荷がかかることを防ぎます。 これは、たとえば多くのデバイスが短時間に許可最大サイズを超えるメッセージをパブリッシュした場合でも、問題のすべての発生に対してアラームが生成されるわけではないことを意味します。 ただし、テナントユーザーはデバイスが大きすぎるメッセージをパブリッシュしていることを把握でき、是正措置を講じることができます。

以下の表は、デバイス接続に関連する問題に対して生成されるアラームを示します。

Alarm type Description
c8y_MqttService_MaximumPacketSize_Connect デバイスが許可された最大サイズを超える CONNECT パケットを送信しました。
c8y_MqttService_MaximumPacketSize_Publish デバイスが許可された最大サイズを超える PUBLISH パケットを送信しました。
c8y_MqttService_MaximumPacketSize_Subscribe デバイスが許可された最大サイズを超える SUBSCRIBE パケットを送信しました。
c8y_MqttService_MaximumPacketSize_Unsubscribe デバイスが許可された最大サイズを超える UNSUBSCRIBE パケットを送信しました。
c8y_MqttService_TenantConnectionsLimitExceeded 接続されているデバイス数が許可された最大値を超えました。
c8y_MqttService_TenantConnectionRateLimitExceeded 1 秒あたりのデバイス接続試行数が許可された最大値を超えました。
c8y_MqttService_IncomingPublishRateLimitExceeded 1 秒あたりの受信(デバイスからの)メッセージ数が許可された最大値を超えました。
c8y_MqttService_OutgoingPublishRateLimitExceeded 1 秒あたりの送信(デバイスへの)メッセージ数が許可された最大値を超えました。

Core MQTT デバイスサポート

機能プレビュー

この機能はパブリックプレビューです。 つまり、まだ一般提供されておらず、今後変更される可能性があります。

Core MQTT サポートはデフォルトで無効になっており、テナントごとに明示的に有効化する必要があります。 Core MQTT サポートを有効にするには、管理アプリケーションで 設定 > 機能切り替え に移動し、mqtt-service.smartrest トグルキーのステータスを 有効 に設定します。 Core MQTT サポートが無効な間は、Core MQTT topics にパブリッシュされたすべてのメッセージは無効として扱われ、MQTT クライアントが切断される可能性があります。

MQTT Service におけるプレビュー版の Core MQTT サポートは、デバイスを Things Cloud コアに直接接続する場合と比較して、いくつかの動作の違いがあります。 これらの違いの一部は、アーキテクチャ図 に示すとおり、MQTT Service が Things Cloud コアから 分離されており、両者の間でメッセージが 非同期に 転送されるために発生します。 これは第一に、MQTT Service が受信し、場合によっては確認応答したメッセージが、Core MQTT 実装でまだ処理されていない可能性があることを意味します。 第二に、Core MQTT 実装は、MQTT Service に接続されたデバイスの接続ライフサイクルおよびトピックサブスクリプションを完全には把握できません。 MQTT Service における Core MQTT サポートが一般提供に達する前に、これらの違いの多くを解消する予定です。

MQTT Service を介して Core MQTT プロトコルを使用する場合にも、このセクションの他の場所に記載されている同じ機能および制限が適用されます。これらは、Things Cloud コアを介して直接 Core MQTT にアクセスする場合とは異なることがあります。 特に以下の点に注意してください。

  • WebSocket 接続はサポートされていません
  • QoS レベル 2 はサポートされていません
  • MQTT バージョン 5.0 クライアントはサポートされています
  • RETAIN フラグが設定されたメッセージは、フラグが無視されるのではなく拒否されます

Core MQTT トピック

Core MQTT プロトコルは、MQTT quick reference で定義された特定のトピックセットを使用します。 これらのトピック上でのすべてのメッセージのパブリッシュおよびサブスクリプションは Core MQTT デバイス向けであると見なされ、Things Cloud コアとの間でルーティングされます。

  • s/
  • t/
  • q/
  • c/
  • alarm/alarms/
  • event/events/
  • measurement/measurements/
  • inventory/managedObjects/
  • error
  • devicecontrol/notifications

接続時の動作

クライアント識別子

Core MQTT で許可されている構造化 MQTT client identifiers は、MQTT Service ではサポートされていません。

  • client ID に :defaultTemplateIdentifier サフィックスが含まれている場合、特別な処理は行われず、単に client ID の一部として扱われます。
  • client ID に d:(接続タイプ)プレフィックスが含まれている場合、すでに登録済みの Core MQTT デバイスが MQTT Service に接続し、Core MQTT で同じデバイスとして扱われるよう認識されます。 ただし、Things Cloud プラットフォームに初めて接続する新規デバイスについては、d: プレフィックスは特別には扱われません。 新規デバイスが初回接続する場合、d: プレフィックスは特別な処理なしで単に client ID の一部として扱われます。
保留中のオペレーション

保留中のオペレーションは、デバイスが MQTT Service に接続した際に自動的には送信されません。 送信待ちの保留中オペレーションの更新を要求するには、デバイスは Get PENDING operations SmartREST メッセージ(テンプレート 500)を送信する必要があります。

デバイス登録

デバイスが証明書を使用して MQTT Service に認証すると、Core MQTT トピックと初めてやり取りした時点で、Things Cloud コアによって新しいデバイスとして自動登録されます。 この自動動作を無効にすることはできません。

デバイスエラー処理

前述のとおり、MQTT Service の分離された非同期アーキテクチャは、Things Cloud コアが接続済み MQTT デバイスの可視性を十分に持てないことを意味します。 これは、デバイスが次のいずれかを行っても自動的には切断されないことを意味します。

  1. 無効または利用不可の Core MQTT トピックをサブスクライブする。
  2. 無効な Core MQTT メッセージを送信する。

この場合でもデバイスは接続されたままですが、無効なメッセージは処理されず、無効なトピックからのメッセージも受信しません。 このような場合に Core MQTT 実装から送信されるエラーメッセージを監視するために、デバイスは s/e トピックをサブスクライブできます。

接続監視

デバイス からの「send connection」トラフィックに対する 接続監視 は、想定どおりに機能します。

ただし、デバイス への「push connection」トラフィックの監視は MQTT Service ではサポートされていません。

レート制限

Things Cloud の レート制限 メカニズムは、MQTT Service 経由で接続された MQTT デバイスには使用されません。

マイクロサービスおよび外部アプリケーションとの統合

Things Cloud のマイクロサービスおよび外部アプリケーションは、MQTT Service に接続されたデバイスによって公開されたメッセージをコンシュームし、それらのデバイスにメッセージを公開できます。 これを行うには、マイクロサービスまたは外部アプリケーションを Things Cloud Messaging Service(Apache Pulsar のデプロイメント)に接続し、Pulsar プロトコルを使用して MQTT メッセージの公開とコンシュームを行います。 以下の図は、Pulsar を介して MQTT Service とやり取りする際に使用される重要なインターフェイスとデータフローを示しています。

MQTT Service Pulsar connections

備考
MQTT Service messaging client とは、Pulsar を介して MQTT Service とやり取りするソフトウェアコンポーネントです。 これは、Things Cloud プラットフォームでホストされるマイクロサービスとしてデプロイすることも、プラットフォーム外でホストされる外部アプリケーションの一部としてデプロイすることもできます。 このドキュメントでは、このようなコンポーネントを単に client と呼びます。 実装または動作がクライアントのホスト場所によって異なる場合は、該当箇所でその違いを記載します。

MQTT Service は device isolation を実装しています。つまり、MQTT Service に接続された MQTT デバイス同士は、MQTT プロトコルを使用して直接通信することはできません。 すべてのデバイス間通信は、図に示すように、クライアントによって明示的に管理する必要があります。

このドキュメントでは、Pulsar によって実装される publish-subscribe メッセージングの概念やアーキテクチャ、また単純な MQTT Service クライアントの実装に必要な範囲を超える Pulsar クライアントライブラリの機能については扱いません。 これらの詳細については、Pulsar 製品ドキュメントを参照してください。

Messaging Service への接続

クライアントを Messaging Service に接続するには、以下が必要です。

  1. Pulsar クライアントライブラリ
  2. Things Cloud 環境内の Messaging Service(Pulsar broker)の URL。
  3. Messaging Service 上の MQTT Service データにアクセスする権限を持つ、テナント内ユーザーの認証情報。

これらの前提条件は、それぞれ以下で詳しく説明します。

Pulsar クライアントライブラリ

オープンソースの Pulsar クライアントライブラリは、さまざまな言語とプロトコル向けに提供されています。 このドキュメントのサンプルコードでは、Java クライアントライブラリを使用します。 Pulsar には強力なクロスバージョン互換性があります。 Messaging Service で使用されているサーバーバージョンに関係なく、選択したクライアントライブラリの最新版を使用してください。 MQTT Service との統合では、最新のサーバーバージョンでのみ利用可能な高度な Pulsar 機能は必要ありません。

注意
現在、Pulsar を介して Messaging Service に接続するクライアントでは、“basic”(ユーザー名/パスワード)認証のみがサポートされています。 したがって、選択した Pulsar クライアントライブラリがこの認証方式をサポートしていることを確認する必要があります。

Pulsar URL

マイクロサービスクライアントの場合、URL はマイクロサービス起動時に渡される C8Y_BASEURL_PULSAR 環境変数 から取得する必要があります。 外部アプリケーションクライアントの場合、URL の一般的な形式は pulsar+ssl://<tenant_domain>:6651/ です。ここで <tenant_domain> は Things Cloud テナントのドメインで、たとえば my-tenant.cumulocity.com です。 pulsar+ssl というプロトコル名が示すとおり、すべての外部アプリケーションクライアント接続は SSL/TLS セキュリティを使用します。 現在は一方向 TLS のみがサポートされています。サーバーはクライアントが検証できる証明書を提供します。クライアント証明書は使用できません。 外部アプリケーションクライアントが C8Y_BASEURL_PULSAR 環境変数から Pulsar URL を読み取るように実装すると、マイクロサービスとしても外部アプリケーションとしてもデプロイできるクライアントをより簡単に開発できます。

Pulsar 認証

認証情報は、Things Cloud テナントとそのテナント内のユーザーの両方を識別します。 現在、Pulsar を介して Messaging Service に接続するクライアントでは、“basic”(ユーザー名とパスワード)認証のみがサポートされています。 マイクロサービスクライアントの場合、そのテナントがサブスクライブされたときにマイクロサービスへ渡されるテナントごとの サービスユーザー の認証情報を使用してください。 外部アプリケーションクライアントの場合は、以下で説明する適切な認可ロールが割り当てられた任意のテナントユーザーの認証情報を使用できます。 ユーザー名は <tenantID>/<user> の形式でなければなりません。ここで <tenantID> はテナント ID(テナント名ではありません)、<user> はそのテナント内のユーザーです。 テナントで二要素認証(TFA)が有効になっている場合、そのユーザーに対する TFA チェックを無効化するには、ユーザーに devices ロールが割り当てられている必要があります。 詳細は TFA Settings を参照してください。 devices ロールは、Things Cloud ユーザーインターフェイスでは “Device User” と表示される場合があることに注意してください。

ロールベースのアクセス制御

Pulsar クライアント接続には、認証されたユーザーに割り当てられたロールと権限に基づいて、Messaging Service リソースへのアクセス権が付与されます。 MQTT Service messaging client には、以下のロールと権限を使用してください。

ロールと権限 付与されるアクセス
Mqtt service messaging topics, Read MQTT Service に接続された MQTT デバイスからのメッセージをコンシューム
Mqtt service messaging topics, Update MQTT Service に接続された MQTT デバイスへメッセージを公開

マイクロサービスクライアントの場合、必要な権限を microservice manifestrequiredRoles セクションに追加する必要があります。これにより、テナントごとのサービスユーザーに要求された権限が付与されます。 例:

{
    "apiVersion": "v2",
    "name": "my-mqtt-service-client",
    "version": "1.0.0",
    ...
    "requiredRoles": [
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_READ",
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE"
    ],
    ...
}

外部アプリケーションクライアントの場合、必要な権限は 管理 アプリケーション を通じて認証ユーザーに設定する必要があります。

クライアントの動作に必要な最小限の権限のみを割り当ててください。 たとえば、マイクロサービスがメッセージをコンシュームするだけであれば、manifest に ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE 権限を含めないでください。

サンプルコード – Messaging Service への接続

以下のコードスニペットは、Pulsar Java クライアントライブラリを使用して basic 認証で Messaging Service に接続する方法を示しています。 Pulsar URL が C8Y_BASEURL_PULSAR 環境変数にあり、テナント識別子、ユーザー名、パスワードがコマンドラインで渡されることを前提としています。 PulsarClient オブジェクトが作成された時点では、クライアントライブラリは実際にはすぐに Pulsar サーバーへの接続を試みないことに注意してください。 簡潔さと明確さを優先するため、この例ではエラーハンドリングを行っていません。 実際の実装では、Pulsar クライアントライブラリのメソッドによってスローされる例外を処理する必要があります。

package c8y.example.mqttservice;

import java.text.MessageFormat;
import java.nio.charset.StandardCharsets;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

public class SimplePulsarClient {
    public static void main(String[] args) throws Exception {
        // Validate command line.
        if (args.length != 2) {
            System.err.println("Usage: SimplePulsarClient <tenantID> <username>");
            System.err.println("The Pulsar URL will be read from the C8Y_BASEURL_PULSAR environment variable");
            System.err.println("The password will be read from the console");
            System.exit(-1);
        }

        // Collect all the configuration properties.
        final String pulsarUrl = System.getenv("C8Y_BASEURL_PULSAR");
        final String tenantID = args[0];
        final String username = args[1];
        final String password = new String(System.console().readPassword("Password for user %s/%s: ", tenantID, username));

        // Create the basic authentication credentials object.
        final AuthenticationBasic basicAuth = new AuthenticationBasic();
        basicAuth.configure(MessageFormat.format("'{'\"userId\":\"{0}/{1}\",\"password\":\"{2}\"'}'", tenantID, username, password));

        // Create a Pulsar client using the basic authentication credentials.
        // The client will *not* try to connect and authenticate immediately.
        final PulsarClient client = PulsarClient.builder()
            .serviceUrl(pulsarUrl)
            .authentication(basicAuth)
            .build();
        System.out.println("Created Pulsar client");

        // The rest of the example will go here...
    }
}

メッセージペイロードとプロパティ

Pulsar メッセージは、payload と一連の properties で構成されます。

ペイロードは 0 個以上のバイト列であり、対応する Pulsar メッセージに対応する MQTT PUBLISH メッセージのペイロードと同一です。 クライアントには、通信対象の MQTT デバイスが生成し受け入れるペイロードの形式を理解する責任があります。

デバイスから受信したメッセージでは、Pulsar の eventTime フィールドに MQTT PUBLISH メッセージが MQTT Service に到着した時刻が保持されます。 時刻は Unix タイムスタンプ(1970 年 1 月 1 日 00:00:00 UTC からのミリ秒数)で表されます。 これにより、下流のコンシューマーは処理のための一貫した時刻ソースを利用できます。

Pulsar メッセージプロパティは名前と値のペアであり、名前と値の両方がテキスト文字列です。 MQTT Service が認識するプロパティは、以下の表に示します。 MQTT デバイスから受信したメッセージには、必須とマークされたプロパティが常に含まれ、任意のプロパティが含まれる場合もあります。 受信メッセージには、ここに記載されているもの以外のプロパティは含まれません。 MQTT デバイスへ公開されるメッセージには、必須プロパティをすべて含める必要があり、任意のプロパティを含めることもできます。 公開メッセージにここに記載されているもの以外のプロパティが含まれている場合、それらのプロパティは MQTT Service によって無視されます。

プロパティ名 必須 値の型とエンコーディング 目的
topic YES String MQTT トピック名
clientID YES(1) String MQTT クライアント識別子
tx.clientUsername(2) NO(3) String MQTT クライアントのユーザー名、または証明書認証の場合は Common Name
tx.clientAuthType NO(3) String MQTT クライアント認証方式(BASIC または X509)
tx.payloadFormatIndicator NO 許可される 2 つの値を持つ 1 バイト。文字列 "0" および "1" としてエンコード MQTT v5 Payload Format Indicator
tx.contentType NO String MQTT v5 Content Type
tx.responseTopic NO String MQTT v5 Response Topic
tx.correlationData NO Base64 文字列としてエンコードされたバイト列 MQTT v5 Correlation Data
tx.userProperties.<name> NO String 名前 name の MQTT v5 User Property(4)

注記:

  1. clientID プロパティは、以下の broadcast messages で説明する broadcast メッセージという特別な場合にのみ、省略できます。
  2. tx. プレフィックスは、そのプロパティが transport、この場合は MQTT Service に固有であることを示します。 他の transport はそれぞれ独自の transport 固有プロパティを定義しますが、すべての transport で topicclientID が使用されます。
  3. デバイスが証明書認証を使用して MQTT Service に接続する場合、サービスは厳密なバインディングを適用し、 証明書の Common Name が clientID と一致することを保証します。 ただし、デバイスが basic 認証を使用して接続する場合、認証されたユーザーと clientID の間に自動的なバインディングはありません。 クライアントのなりすましを防ぐため、認可の検証を実装するのはコンシューマーの責任です。tx.clientAuthType および tx.clientUsername プロパティを確認することで、下流のコンシューマー(マイクロサービスなど)は、認証されたユーザーが、指定された clientID を名乗ってメッセージを公開する権限を実際に持っているかどうかを検証できます。
  4. MQTT バージョン 5 の仕様では、同じ名前の user property を複数含むメッセージを許可しています。 この機能は MQTT Service ではサポートされていません。 デバイスが同じ名前の複数の user property を含むメッセージを公開した場合、そのうち 1 つだけが Pulsar メッセージにコピーされます。 どのプロパティがコピーされるかは未定義です。

MQTT デバイスからのメッセージのコンシューム

特定のテナントについて MQTT Service に接続されたデバイスによって公開されたすべてのメッセージは、URL persistent://<tenantID>/mqtt/from-device で識別される single Pulsar topic に公開されます。 このトピック URL は 4 つのコンポーネントに分解できます。

コンポーネント 説明
persistent これは永続トピックであり、コンポーネント障害や再起動をまたいで Messaging Service に保持されることを示します。これにより “at least once” 配信保証が提供されます
<tenantID> Pulsar テナント ID。Things Cloud テナント ID と一致します
mqtt テナント内の Pulsar namespace。MQTT Service では常に mqtt です
from-device namespace 内の Pulsar topic。MQTT Service に接続されたデバイスからのメッセージでは常に from-device です

認証されたユーザーが “Mqtt service messaging topics” ロールに対して “read” 権限を持っている場合にのみ、クライアントはこのトピックをコンシュームできます。 クライアントは他のどのトピックからもコンシュームできません。

メッセージを公開したデバイスのクライアント識別子と、そのメッセージが公開された MQTT トピックは、前述のとおりメッセージプロパティ clientID および topic から取得できます。 Pulsar の eventTime フィールドは、メッセージが MQTT Service によって受信された正確な時刻を提供します。 これは、クライアントが、関心のないものも含めて、そのテナントの MQTT Service に接続されたすべてのデバイスによって公開されたすべてのメッセージを必ずコンシュームしなければならないことを意味します。 クライアントにとって不要なメッセージは、追加処理せずに単に確認応答できます。

注意
クライアントは、テナント内の MQTT Service に接続されたすべてのデバイスによって公開されたすべてのメッセージを安全に処理できるものとして信頼されている必要があります。 信頼できないユーザーがテナントにアクセスできる場合、それらのユーザーにはマイクロサービスのアップロードも、Messaging Service への外部アプリケーションクライアント接続も許可しないでください。 この推奨事項は、互いに信頼していない複数の顧客が 1 つのテナントを共有している場合にも当てはまります。

永続サブスクリプションとメッセージ確認応答

コンシューマーをトピックにサブスクライブすると、そのトピックに対する durable subscription が確立されます。 これは、Messaging Service が、そのトピックに公開されたメッセージを、クライアントに配信され、確認応答されるまで保持することを意味します。 サブスクリプションは、明示的に削除されるまで残ります。 クライアントが現在実行されていないという理由だけでは削除されません。 クライアントの切断中に公開されたメッセージは、再接続時にコンシュームできます。 各メッセージをコンシュームした後、クライアントはそれを明示的に確認応答する必要があります。 メッセージに確認応答すると、クライアントがそのメッセージにこれ以上関心がないことを Messaging Service に伝え、メッセージを破棄できるようになります。 永続サブスクリプションを正しく管理するための詳細は、以下の best practices セクションを参照してください。

サンプルコード – メッセージのコンシューム

以下のコードスニペットは、Pulsar Java クライアントライブラリを使用して MQTT Service の from-device トピックからメッセージをコンシュームする方法を示しています。 これは、Pulsar サーバーへの接続を設定する前の例を拡張したものです。

トピックからメッセージをコンシュームするには、クライアントは Pulsar Consumer を作成してそのトピックにサブスクライブする必要があります。 コンシューマーは、新しいメッセージがトピックに到着するたびに呼び出される MessageListener コールバックを登録する必要があります。 MessageListener 実装は、受信メッセージのペイロード、プロパティ、および到着時刻にアクセスする方法を示しています。 簡単にするため、例中のアプリケーションメッセージは単純なテキスト文字列です。 ただし、Pulsar メッセージのペイロードは常にバイト配列であり、アプリケーションで使用される形式に変換する必要があります。

        // Create a simple message listener that will log some details of
        // each message received, when registered with a consumer.
        final MessageListener<byte[]> listener = new MessageListener<byte[]>() {
            @Override
            public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
                final String clientId = message.getProperty("clientID");
                final String topic = message.getProperty("topic");
                final long eventTime = message.getEventTime();
                System.out.println(MessageFormat.format("Received message from MQTT device {0} on MQTT topic {1}", clientId, topic));
                System.out.println(MessageFormat.format("MQTT PUBLISH arrival timestamp: {0}", eventTime));
                System.out.println(MessageFormat.format("Message payload: {0}", new String(message.getValue(), StandardCharsets.UTF_8)));
                System.out.println(MessageFormat.format("Message properties: {0}", message.getProperties()));
                try {
                    // Acknowledge the message.
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }
        };

        // Create a Pulsar consumer on the from-device topic for the tenant,
        // using the listener defined above to process each message.
        // This will trigger connection and authentication by the client.
        final Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
            .topic(MessageFormat.format("persistent://{0}/mqtt/from-device", tenantID))
            .subscriptionName("demoSubscription")
            .messageListener(listener)
            .subscribe();
        System.out.println("Created Pulsar consumer");

MQTT デバイスへのメッセージの公開

特定のテナントについてクライアントが MQTT Service に接続されたデバイスへ送信したいすべてのメッセージは、URL persistent://<tenantID>/mqtt/to-device で識別される single Pulsar topic に公開する必要があります。 URL のコンポーネントは、上記の MQTT デバイスからのメッセージのコンシューム で説明したとおりに解釈してください。

認証されたユーザーが “Mqtt service messaging topics” ロールに対して “update” 権限を持っている場合にのみ、クライアントはこのトピックへ公開できます。 クライアントは他のどのトピックにも公開できません。

to-device トピックに公開されたメッセージは、2 つの必須メッセージプロパティを使用して接続済み MQTT デバイスへルーティングされます。

プロパティ名 目的
clientID メッセージを受信すべき MQTT デバイスのクライアント識別子
topic メッセージが公開されるべき MQTT トピックの名前

topic プロパティが空または存在しない場合、そのメッセージはどの MQTT クライアントにも公開されません。 メッセージは、指定された MQTT トピックへのアクティブなサブスクリプションを持つデバイスにのみ公開されます。 メッセージは、MQTT Service が公開メッセージを処理した時点で接続されているクライアントにのみ公開されます。

Messaging Service へのメッセージ公開が成功したからといって、そのメッセージがいずれかの MQTT デバイスに正常に配信されたことを意味するわけではありません。 MQTT デバイスへの後続の公開は 非同期に 行われ、Pulsar クライアントへのフィードバックはありません。 メッセージは MQTT プロトコル仕様に従って、デバイスが行った MQTT サブスクリプションの QoS レベルを使用してデバイスに配信されます。 ただし、MQTT Service に接続する際には MQTT デバイスが clean session を使用する必要があるため、デバイス切断中にそのデバイスへ公開されたメッセージは配信されません。

ブロードキャストメッセージ

デバイスレベルの分離を強制するため、メッセージは、clientID プロパティで識別される特定の MQTT クライアントにのみ公開されます。ただし、そのクライアントが関連する MQTT トピックに対してアクティブなサブスクリプションを持っている必要があります。 clientID プロパティが存在しない場合、メッセージはそのトピックに対してアクティブなサブスクリプションを持つすべての接続済み MQTT クライアントにブロードキャストされます。

ブロードキャスト公開は、多数のクライアントが接続されている場合に高コストになる可能性があり、想定外のデバイスにメッセージが配信される場合があります。 アプリケーションがトピックをサブスクライブしているすべてのデバイスに同じメッセージを公開する必要がある場合にのみ、ブロードキャストを使用してください。

メッセージキー

MQTT デバイスへ送信されるメッセージの効率的な配信と正しい順序を実現するために、クライアントは to-device トピックに公開される Pulsar メッセージの key必ず設定する必要があります。 キーは次のように設定してください。

  • clientID メッセージプロパティが設定されている場合、キーはこのプロパティと同じ値にする必要があります。
  • clientID メッセージプロパティが設定されていない場合、キーは topic メッセージプロパティと同じ値にする必要があります。

無効なメッセージの扱い

上記で説明したメッセージプロパティおよびキーのルールに従っていない公開メッセージは、どの MQTT デバイスにも配信されません。 特に、以下の無効な構成を持つメッセージがこれに該当します。

  • メッセージの key が設定されていない。
  • メッセージの key は設定されているが、message keys で説明した clientID または topic プロパティと一致しない。
  • clientID プロパティが設定されているが、値が空である。
  • topic プロパティが設定されていない、または設定されているが値が空である。

このような無効なメッセージが検出され破棄されると、Things Cloud テナントでアラームが発生します。 異なるメッセージに対する同一エラーについて冗長なアラームでテナントが過負荷にならないよう、アラーム送信レートは制限されています。 Pulsar の to-device トピック上の無効なメッセージに対しては、以下のアラームが発生します。

アラームタイプ 説明
c8y_MqttService_ToDevice_NoKey メッセージキーが設定されていません。
c8y_MqttService_ToDevice_InvalidKey メッセージキーは設定されていますが、clientID または topic と一致しません。
c8y_MqttService_ToDevice_EmptyClientId clientID プロパティは設定されていますが、値が空です。
c8y_MqttService_ToDevice_MissingTopic topic プロパティが設定されていません。
c8y_MqttService_ToDevice_EmptyTopic topic プロパティは設定されていますが、値が空です。
c8y_MqttService_ToDevice_InvalidTopic topic プロパティが無効です。たとえば SmartREST トピックが使用されています。

空でない clientID プロパティを持ち、現在接続されていない MQTT デバイスを参照するメッセージは、無効とは見なされません。 ただし、接続時にデバイスが clean session を使用する必要があるため、このメッセージは、その後デバイスが接続したとしても配信されません。 同様に、現在接続されている MQTT デバイスに対して公開されたが、topic プロパティで指定された MQTT トピックを現在サブスクライブしていないメッセージも、無効とは見なされません。 これらの状況では、メッセージは配信されませんがアラームも発生しません。

サンプルコード – メッセージの公開

以下のコードスニペットは、Pulsar Java クライアントライブラリを使用して MQTT Service の to-device トピックにメッセージを公開する方法を示しています。 これは、Pulsar サーバーへの接続を設定しメッセージコンシューマーを作成した前の例を拡張したものです。

トピックにメッセージを公開するには、クライアントはまずそのトピックに関連付けられた Pulsar Producer を作成する必要があります。 次に、その Producer を使用して、トピックへ公開される新しい Message オブジェクトを作成できます。 サンプルコードでは、単一デバイス宛てのメッセージと “broadcast” メッセージについて、メッセージプロパティとメッセージキーを正しく設定する方法を示しています。 ここでも、アプリケーションメッセージは単純なテキスト文字列であり、MQTT Service が期待するバイト配列に変換する必要があることを前提としています。 明確さのため、ほとんどのエラーハンドリングコードは省略しています。 本番クライアントでのエラー処理については、Handling Messaging Service errors を参照してください。

        // Wrap all the operations that might fail after we create the
        // durable subscription in a try-catch, so that we can delete the
        // subscription if something goes wrong.
        try {
            // Create a Pulsar producer on the to-device topic for the tenant.
            final Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                .topic(MessageFormat.format("persistent://{0}/mqtt/to-device", tenantID))
                .create();
            System.out.println("Created Pulsar producer");

            // Publish a message to a single MQTT device.
            producer.newMessage()
                .property("clientID", "demoClient")
                .property("topic", "demoTopicB")
                .key("demoClient")
                .value("Message sent to a single device".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to single device");

            // Publish a message to all MQTT devices subscribed to a topic.
            // Note that the "clientID" property is omitted here.
            producer.newMessage()
                .property("topic", "demoTopicB")
                .key("demoTopicB")
                .value("Message sent to all subscribed devices".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to all subscribed devices");

            // Close the producer.
            producer.close();
        }

Messaging Service のクォータと制限

Pulsar topic に公開されたメッセージは、関心を持つすべてのコンシューマーに配信され、確認応答されるまで、Messaging Service によって永続的に保存されます。 MQTT Service によって from-device トピックへ公開されたメッセージの場合、コンシューマーはそのトピックに永続サブスクリプションを作成したすべてのクライアントです。 クライアントによって to-device トピックへ公開されたメッセージの場合、コンシューマーはそのメッセージをデバイスへ配信する MQTT Service のインスタンスです。

リソース使用量を最適化するため、Messaging Service は永続保存されたメッセージに対してストレージ制限とメッセージ time-to-live(TTL)を課しています。

これらの制限はテナント単位で設定可能です。 ユースケースで異なる設定が必要な場合、または質問や懸念がある場合は、product support に連絡してください。

メッセージバックログクォータ

永続メッセージは、関心を持つコンシューマーに配信されるまで backlog に保存されます。 バックログの最大サイズは backlog quota limit によって設定され、保存可能なメッセージ数、ひいてはプラットフォームのリソース消費量に直接影響します。

各 Pulsar topic には個別のバックログが存在するため、MQTT Service では、テナントの from-device および to-device トピックはそれぞれ独立したバックログを持ちます。 バックログはトピック上のすべてのサブスクリプションで共有されます。 バックログクォータ制限に達すると、一部の古いメッセージが配信されるか TTL 期限切れで削除されるまで、新しいメッセージをバックログに追加できません。

Pulsar from-device トピックのバックログクォータ制限に達すると、接続されたデバイスからの新しい MQTT PUBLISH パケットは拒否されます。 PUBLISH パケットが QoS レベル 0 で送信された場合、メッセージは失われます。 PUBLISH パケットが QoS レベル 1 で送信された場合、動作はデバイスが使用している MQTT プロトコルバージョンによって異なります。

  • MQTT バージョン 3 を使用するデバイスでは、デバイスは切断されます。
  • MQTT バージョン 5 を使用するデバイスでは、reason code 0x97Quota exceeded を持つ PUBACK パケットを受信します。

Pulsar to-device トピックのバックログクォータ制限に達すると、Producer.send() メソッド、またはクライアントが使用する Pulsar ライブラリ内の同等メソッドを呼び出すクライアントは、クライアントライブラリから適切な例外またはエラーレスポンスを受け取ります。

メッセージ time-to-live

未配信メッセージは、バックログ上に time-to-live (TTL) limit より長く存在すると自動的に削除されます。 このポリシーは、全体的なリソース使用量を制限し、コンシューマーの長時間切断後に古くなったデータを処理する必要性を減らすのに役立ちます。

未配信メッセージが TTL 制限に達しない限り、バックログから削除されることはありません。 メッセージは常に、そのトピックに公開された順序でコンシューマーへ配信されます。

デバイスからの信頼性の高いメッセージ配信のためのベストプラクティス

トピックがバックログクォータ制限に達すると、新しいメッセージを受け付けなくなり、メッセージが失われる場合があります。これを避けるには、以下を実施してください。

  • from-device トピックからのメッセージをできるだけ速やかに処理し、確認応答してください。 クライアントがそのメッセージに関心がない場合でも、すべてのメッセージに必ず明示的に確認応答する必要があります。 処理が完了するか、後で処理するためにメッセージが安全に保存されるまでは、確認応答しないでください。 確認応答済みメッセージは、クライアント障害または再起動後に再配信されません。
  • サブスクリプションのライフサイクルを管理してください。コンシューマーのサブスクライブは、明示的に削除されるまで残る durable subscription を作成します。 クライアント切断中に公開されたメッセージは、そのサブスクリプションのために保持され、クライアント再接続時に配信されます。 サブスクリプションは永続するため、クライアントが 1 つも動作していない場合でも、トピックがバックログクォータに達する可能性があります。
    1. クライアント接続時には毎回同じサブスクリプション名を使用してください。実行ごとにランダムなサブスクリプション名を作成しないでください。そうすると非アクティブなサブスクリプションが蓄積し、バックログを使い果たす可能性があります。
    2. 不要になったサブスクリプションは明示的に削除してください。たとえば、クライアントを長期間停止する場合は、consumer の unsubscribe() メソッドを呼び出すか、Messaging Service の monitoring and management インターフェイスを使用してサブスクリプションを削除してください。

サンプルコード – サブスクリプションの削除

以下のコードスニペットは、サブスクリプションを削除し、前のコード例で作成した他の Pulsar クライアントオブジェクトを閉じる方法を示しています。

        finally {
            // Delete the durable subscription.
            // This is only necessary if messages should *not* be retained
            // on the topic while the client is disconnected.
            consumer.unsubscribe();
        }

        // Close the other Pulsar objects that we created.
        consumer.close();
        client.close();

Messaging Service エラーの処理

Things Cloud Messaging Service は、クライアントから離れた場所で動作する複雑な分散サービスです。 すべての分散システムと同様に、完全な信頼性は保証できないため、クライアントは Pulsar クライアントライブラリによって報告されるエラーを処理できるようにしておく必要があります。 これらのエラーは、大きく 2 つのカテゴリに分けられます。

  1. クライアント実装における設定またはロジックエラー。 このカテゴリのエラーは通常 “fatal” であり、クライアントが Messaging Service に接続したり、メッセージを公開またはコンシュームしたりすることを妨げます。 この種のエラーの典型例には、以下があります。
    • 誤った Pulsar URL で接続しようとする。
    • 無効な認証情報を使用する。
    • Messaging Service へのアクセス権限を持たないユーザーの認証情報を使用する。
    • to-device トピックからコンシュームしようとしたり、from-device トピックへ公開しようとしたりする。
    • その他のトピックへの公開またはコンシュームを試みる。
    • 不正に構築されたメッセージを公開しようとする。 最も可能性の高い原因は、明示的にバイト配列として作成されていないペイロードを持つメッセージを公開しようとすることです。
  2. Messaging Service における一時的なエラー。 このカテゴリのエラーは通常、Messaging Service サーバーの一時的な問題を反映しており、自動的に、または管理者の対応によって解消されます。 クライアントが経験し得る一時的なエラーの例には、以下があります。
    • アップグレード中や、Messaging Service の計画外停止中に Messaging Service コンポーネントが再起動されると、接続が切断されることがあります。 これにより公開またはコンシューム操作が失敗し、操作を再試行する前に再接続、または producer もしくは consumer の再確立が必要になる場合があります。
    • to-device トピックのバックログクォータ制限に達すると、公開メッセージは拒否されます。 この状況を避けるための助言については、reliable delivery best practices を参照してください。
    • Messaging Service 上の他の制限またはクォータに達した場合も、公開メッセージが拒否されることがあります。

クライアントが Java クライアントライブラリを使用している場合、ほとんどすべてのエラーは、クライアントライブラリメソッドによってスローされる PulsarClientException として報告されます。 ごくまれに、クライアントが Schema.BYTES スキーマとバイト配列ペイロードだけを使用していない場合、SchemaSerializationException ランタイムエラーもスローされることがあります。 PulsarClientException クラスには多数のサブクラスがあり、クライアントはエラー原因をより正確に判定できます。 他のクライアントライブラリにも、同様の言語固有エラー報告メカニズムがあります。

一般に、クライアント実装における致命的な設定エラーまたはロジックエラーから回復することはできません。 エラー修正後にクライアントを再起動する必要があります。 一時的なエラーについては、通常、一定時間待ってから再試行する戦略が適切です。 producer または consumer に対する操作が失敗した場合、正確な根本原因と最適な対応を特定するのは難しいことがあります。 ほとんどのシナリオをカバーする単純な回復方法としては、失敗した producer または consumer を削除し、新しいものを作成してから操作を再試行する方法があります。 これにより、エラー後に producer または consumer が再接続できないケースを回避できます。 より高度な戦略では、スローされた PulsarClientException の特定のサブクラスに応じて対応を調整できます。 サービスが回復するまで、再試行間隔を増やすために exponential backoff 戦略を使用してください。

MQTTサービスクライアントの例

MQTTサービス用のいくつかのサンプルクライアントのソースコードは、cumulocity-examples GitHubリポジトリで確認できます。
これらのサンプルは、独自のクライアントを開発する際のよい出発点となる場合があります。
さらに、Things Cloud Tech Community は、Things Cloud 開発のあらゆる側面に関するアドバイスやサンプルの優れた情報源です。

ビルド方法と実行方法の詳細については、各サンプルに含まれる README.md ファイルを参照してください。

マイクロサービスおよび外部アプリケーションとの統合 セクションのコードスニペットに基づく完全な Javaクライアントのサンプル が利用可能です。
これに加えて、MQTTデバイスをシミュレートし、Javaクライアントの動作をテストするために使用できる、シンプルな Python MQTTクライアント があります。
デバイスに送信されたメッセージが確実に受信されるように、まずPythonクライアントを起動し、その後でJavaクライアントを起動してください。

マイクロサービス開発者向けに、2つのサンプルが利用可能です。
Javaマイクロサービスのサンプル では、Things Cloud マイクロサービスSDKを使用して、JavaベースのマイクロサービスからPulsarに接続し、MQTTサービスからメッセージをコンシュームし、メッセージを Things Cloud メジャーメントに変換します。
同様に、Python言語を使用して同様の機能を実装する Pythonマイクロサービスのサンプル があります。
どちらのマイクロサービスサンプルでも、MQTTメッセージからデバイスの外部IDを特定し、これを Things Cloud マネージドオブジェクトにマッピングし、まだ存在しない場合はマネージドオブジェクトを作成する方法も示しています。

サンプルリポジトリは継続的に保守されており、このドキュメントの作成後に追加のサンプルが追加されている場合があります。