マイクロサービスおよび外部クライアントは、デバイスプロトコルと Things Cloud ドメインモデルの間を変換するために Messaging Service に直接接続 します。 Messaging Service クライアントは、必要に応じてデバイスを Things Cloud マネージドオブジェクトとして登録する責任もあります。
MQTT デバイスによって公開されたすべてのメッセージは Messaging Service に転送され、そこでコンシュームされるまで永続化されます。
Core MQTT プロトコルで使用される MQTT トピックに公開された Things Cloud ドメインモデルメッセージは、Things Cloud コアによって直接コンシュームされます。
他の MQTT トピックに公開されたメッセージは、メッセージを Things Cloud ドメインモデルにマッピングする責任を持つマイクロサービスや外部クライアントによってコンシュームされます。
同様に、Things Cloud コアおよびクライアントは Messaging Service にメッセージを公開でき、それらは MQTT Service によってコンシュームされてデバイスに転送されます。
デバイス分離
device isolation 機能により、異なるクライアントが使用する同じ名前のトピック間に相互作用はありません。
実質的に、すべてのデバイスはそのデバイスだけがアクセスできる独自のプライベートなトピック空間を持ちます。
これは、図の中で device 1 と device N がどちらも topic A で公開とサブスクライブを行っていることから確認できます。
デバイスは分離されているため、device 1 は device N によって公開されたメッセージを一切見ることができず、その逆も同様です。
ただし、Messaging Service に直接接続するマイクロサービスまたは外部アプリケーションクライアントは、すべてのデバイスで使用されるトピックに完全にアクセスでき、必要に応じてクライアント間でメッセージを転送できます。
MQTTデバイスの接続
このセクションでは、MQTT Service に MQTT デバイスを接続して認証する際の詳細について説明します。
Things Cloud と MQTT デバイスを統合するすべての人にとって関心のある内容です。
一般に、MQTT Service は、デバイスが Things Cloud の Core MQTT プロトコルを使用するか、非 Things Cloud プロトコルを使用するかにかかわらず、すべてのデバイスに対して同じように動作します。
デバイスで使用されるアプリケーションプロトコルに関連する違いがある場合は、該当箇所で文書化されます。
特定のテナント内で MQTT Service に接続するすべてのデバイスは、一意の クライアント識別子(client ID)を使用する必要があります。
すでに接続されている client ID を使用してデバイスが接続した場合、MQTT 仕様に従って、既存の 接続は終了されます。
異なるテナントに属するデバイスは、同じ client ID を使用して同時に接続できます。
空の client ID は許可されていません。
一般に、client ID は構造を持たない識別子として扱われ、MQTT Service がそれを何らかの方法で解釈することはありません。
ただし、MQTT Service に接続する事前登録済みの Core MQTT デバイスに対しては、いくつかの特別な処理が行われます。
詳細については、Core MQTT デバイスサポート セクションを参照してください。
認証
MQTT Service は次の認証方式をサポートしています。
いずれの場合も、MQTT Service がデバイスに関連付けられた Things Cloud テナントを識別できるように、MQTT ユーザー名が正しく設定されていることを確認することが重要です。
ユーザー名とパスワード(基本認証)
テナント上の任意のユーザーの認証情報を使用して、MQTT Service に対してデバイスを認証できます。
MQTT CONNECT パケット内のユーザー名には、<tenantID>/<username> の形式でテナント ID とユーザー名を必ず含める必要があります。
CONNECT パケット内のパスワードは、ユーザーの暗号化されていないパスワードでなければなりません。
この認証方式に関連付けられたユーザーには、Mqtt service 権限タイプに対する ADMIN 権限を割り当てる必要があります。
X.509 デバイス証明書(証明書認証)
証明書を使用して認証するには、デバイスは Things Cloud テナント用に設定された トラストアンカー によって信頼される 証明書チェーン を提示する必要があります。
証明書の Common Name(CN)フィールドは、MQTT CONNECT パケット内の client ID フィールドと必ず一致している必要があります。
デバイスは MQTT CONNECT パケットのユーザー名フィールドにテナント ID を指定することが推奨されます。
トラストアンカーとデバイス証明書の作成および管理の詳細については、以下の TLS 証明書の使用 セクションを参照してください。
TLS 証明書の使用
このセクションには、MQTT Service における TLS 証明書サポートの簡略化された概要が記載されています。
詳細については、Things Cloud の一般的な デバイス証明書 のドキュメントを参照してください。
サーバー証明書
MQTT Service は、メインの Things Cloud 環境ドメインに割り当てられているものと同じサーバー証明書を使用します。
エンタープライズテナント(親テナント) は、SSL 管理機能を使用してこれらの証明書をカスタマイズすることはできません。
デバイス(クライアント)証明書
MQTT Service で使用されるデバイス証明書は、一般ドキュメントに記載されている同じ前提条件を共有します。
さらに、MQTT Service に接続するデバイスについては、以下が適用されます。
証明書の Common Name(CN)フィールドは、MQTT CONNECT パケット内の client ID フィールドと必ず一致している必要があります。
client ID と証明書の CN が一致しない場合、接続は拒否されます。
デバイスは MQTT CONNECT パケットのユーザー名フィールドにテナント ID を指定することが推奨されます。
テナント ID が指定されている場合、それは指定された証明書を信頼するテナントに対応していなければなりません。そうでない場合、接続は拒否されます。
同様に、トラストアンカーが複数のテナントによって信頼されていて、テナント ID が指定されて_いない_場合、接続は拒否されます。
現時点では、マルチテナントのトラストアンカーは Things Cloud でサポートされていませんが、この機能は将来的に導入される可能性があります。
トラストアンカーの設定が変更された場合でもデバイスが接続を継続できるよう、ユーザー名フィールドには常にテナント ID を指定することを推奨します。
証明書トラストアンカーの設定
Things Cloud プラットフォームの TLS トラストアンカーは、テナントごとに定義されます。
認証にデバイス証明書を使用するには、デバイス証明書に署名するルート証明書または中間証明書をプラットフォームにアップロードし、テナントの信頼済み証明書リストに追加する必要があります。
たとえば、トラストアンカーとしてルート証明書のみが設定されている場合、デバイスは(少なくとも)そのデバイス固有の証明書と、ルートによって信頼される中間証明書を含む証明書チェーンを送信する必要があります。
逆に、中間証明書がトラストアンカーとして設定されている場合、デバイスはその中間証明書によって信頼されるデバイスごとの固有証明書のみを送信できます。
トラストアンカーは、UI の 信頼済み証明書 ページ、または REST API を通じて設定できます。
さらに、証明書を追加する際には Auto registration オプションが有効になっていることを確認してください。これにより、有効な証明書を提示する任意のデバイスが初回接続時にプラットフォームへ自動登録されます。
CA 証明書が Things Cloud にアップロードされ、信頼されるようになると、デバイスは信頼された CA によって署名されたクライアント証明書を使用して認証できます。
任意の MQTT クライアントを使用して接続するには、前に生成したクライアント証明書と鍵を使用します。
この例では、Mosquitto MQTT クライアントを使用しています。
--cafile cumulocity.com.pem: このファイルには Things Cloud の MQTT Service ブローカーの CA 証明書が含まれており、サーバーの識別情報を検証するために使用されます。
--key client-key.pem と --cert client-chain.pem: これらは、信頼された CA によって署名されたクライアント証明書と秘密鍵です。
-u t11101: (任意)MQTT ユーザー名を指定します。これはテナント ID である必要があります。
CA 証明書のダウンロード
Things Cloud は、よく知られた公開 CA によって署名された証明書を使用しています。
一部のクライアント(Mosquitto など)では CA ファイルを明示的に指定する必要がありますが、他のクライアント(MQTTX など)ではこれらの証明書を自動的に信頼します。
Things Cloud MQTT Service ブローカーの CA 証明書をダウンロードするには:
MQTT Service によって適用される Messaging Service のクォータと上限 に関する説明も参照してください。
これらはデバイス接続に直接影響しませんが、Messaging Service からの「バックプレッシャー」により、たとえば Messaging Service がデバイスからの追加メッセージを受け付けられない場合などに、デバイスエラーが発生する可能性があります。
アラーム
MQTT Service は、デバイス接続上の一部のエラー条件に応じて Things Cloud アラームを生成します。
これにより、テナントユーザーやアプリケーションは問題をより可視化でき、特にデバイスから十分な診断データを取得することが難しい場合に有用です。
アラームには レート制限 が適用され、Things Cloud プラットフォームに過剰な数のアラームが送られて負荷がかかることを防ぎます。
これは、たとえば多くのデバイスが短時間に許可最大サイズを超えるメッセージをパブリッシュした場合でも、問題のすべての発生に対してアラームが生成されるわけではないことを意味します。
ただし、テナントユーザーはデバイスが大きすぎるメッセージをパブリッシュしていることを把握でき、是正措置を講じることができます。
MQTT Service におけるプレビュー版の Core MQTT サポートは、デバイスを Things Cloud コアに直接接続する場合と比較して、いくつかの動作の違いがあります。
これらの違いの一部は、アーキテクチャ図 に示すとおり、MQTT Service が Things Cloud コアから 分離されており、両者の間でメッセージが 非同期に 転送されるために発生します。
これは第一に、MQTT Service が受信し、場合によっては確認応答したメッセージが、Core MQTT 実装でまだ処理されていない可能性があることを意味します。
第二に、Core MQTT 実装は、MQTT Service に接続されたデバイスの接続ライフサイクルおよびトピックサブスクリプションを完全には把握できません。
MQTT Service における Core MQTT サポートが一般提供に達する前に、これらの違いの多くを解消する予定です。
ただし、デバイス への「push connection」トラフィックの監視は MQTT Service ではサポートされていません。
レート制限
Things Cloud の レート制限 メカニズムは、MQTT Service 経由で接続された MQTT デバイスには使用されません。
マイクロサービスおよび外部アプリケーションとの統合
Things Cloud のマイクロサービスおよび外部アプリケーションは、MQTT Service に接続されたデバイスによって公開されたメッセージをコンシュームし、それらのデバイスにメッセージを公開できます。
これを行うには、マイクロサービスまたは外部アプリケーションを Things Cloud Messaging Service(Apache Pulsar のデプロイメント)に接続し、Pulsar プロトコルを使用して MQTT メッセージの公開とコンシュームを行います。
以下の図は、Pulsar を介して MQTT Service とやり取りする際に使用される重要なインターフェイスとデータフローを示しています。
備考
MQTT Service messaging client とは、Pulsar を介して MQTT Service とやり取りするソフトウェアコンポーネントです。
これは、Things Cloud プラットフォームでホストされるマイクロサービスとしてデプロイすることも、プラットフォーム外でホストされる外部アプリケーションの一部としてデプロイすることもできます。
このドキュメントでは、このようなコンポーネントを単に client と呼びます。
実装または動作がクライアントのホスト場所によって異なる場合は、該当箇所でその違いを記載します。
MQTT Service は device isolation を実装しています。つまり、MQTT Service に接続された MQTT デバイス同士は、MQTT プロトコルを使用して直接通信することはできません。
すべてのデバイス間通信は、図に示すように、クライアントによって明示的に管理する必要があります。
package c8y.example.mqttservice;
import java.text.MessageFormat;
import java.nio.charset.StandardCharsets;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
publicclassSimplePulsarClient {
publicstaticvoidmain(String[] args) throws Exception {
// Validate command line.if (args.length != 2) {
System.err.println("Usage: SimplePulsarClient <tenantID> <username>");
System.err.println("The Pulsar URL will be read from the C8Y_BASEURL_PULSAR environment variable");
System.err.println("The password will be read from the console");
System.exit(-1);
}
// Collect all the configuration properties.final String pulsarUrl = System.getenv("C8Y_BASEURL_PULSAR");
final String tenantID = args[0];
final String username = args[1];
final String password = new String(System.console().readPassword("Password for user %s/%s: ", tenantID, username));
// Create the basic authentication credentials object.final AuthenticationBasic basicAuth = new AuthenticationBasic();
basicAuth.configure(MessageFormat.format("'{'\"userId\":\"{0}/{1}\",\"password\":\"{2}\"'}'", tenantID, username, password));
// Create a Pulsar client using the basic authentication credentials.// The client will *not* try to connect and authenticate immediately.final PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.authentication(basicAuth)
.build();
System.out.println("Created Pulsar client");
// The rest of the example will go here... }
}
tx. プレフィックスは、そのプロパティが transport、この場合は MQTT Service に固有であることを示します。
他の transport はそれぞれ独自の transport 固有プロパティを定義しますが、すべての transport で topic と clientID が使用されます。
デバイスが証明書認証を使用して MQTT Service に接続する場合、サービスは厳密なバインディングを適用し、
証明書の Common Name が clientID と一致することを保証します。
ただし、デバイスが basic 認証を使用して接続する場合、認証されたユーザーと clientID の間に自動的なバインディングはありません。
クライアントのなりすましを防ぐため、認可の検証を実装するのはコンシューマーの責任です。tx.clientAuthType および tx.clientUsername プロパティを確認することで、下流のコンシューマー(マイクロサービスなど)は、認証されたユーザーが、指定された clientID を名乗ってメッセージを公開する権限を実際に持っているかどうかを検証できます。
MQTT バージョン 5 の仕様では、同じ名前の user property を複数含むメッセージを許可しています。
この機能は MQTT Service ではサポートされていません。
デバイスが同じ名前の複数の user property を含むメッセージを公開した場合、そのうち 1 つだけが Pulsar メッセージにコピーされます。
どのプロパティがコピーされるかは未定義です。
MQTT デバイスからのメッセージのコンシューム
特定のテナントについて MQTT Service に接続されたデバイスによって公開されたすべてのメッセージは、URL persistent://<tenantID>/mqtt/from-device で識別される single Pulsar topic に公開されます。
このトピック URL は 4 つのコンポーネントに分解できます。
コンポーネント
説明
persistent
これは永続トピックであり、コンポーネント障害や再起動をまたいで Messaging Service に保持されることを示します。これにより “at least once” 配信保証が提供されます
<tenantID>
Pulsar テナント ID。Things Cloud テナント ID と一致します
mqtt
テナント内の Pulsar namespace。MQTT Service では常に mqtt です
from-device
namespace 内の Pulsar topic。MQTT Service に接続されたデバイスからのメッセージでは常に from-device です
認証されたユーザーが “Mqtt service messaging topics” ロールに対して “read” 権限を持っている場合にのみ、クライアントはこのトピックをコンシュームできます。
クライアントは他のどのトピックからもコンシュームできません。
メッセージを公開したデバイスのクライアント識別子と、そのメッセージが公開された MQTT トピックは、前述のとおりメッセージプロパティ clientID および topic から取得できます。
Pulsar の eventTime フィールドは、メッセージが MQTT Service によって受信された正確な時刻を提供します。
これは、クライアントが、関心のないものも含めて、そのテナントの MQTT Service に接続されたすべてのデバイスによって公開されたすべてのメッセージを必ずコンシュームしなければならないことを意味します。
クライアントにとって不要なメッセージは、追加処理せずに単に確認応答できます。
注意
クライアントは、テナント内の MQTT Service に接続されたすべてのデバイスによって公開されたすべてのメッセージを安全に処理できるものとして信頼されている必要があります。
信頼できないユーザーがテナントにアクセスできる場合、それらのユーザーにはマイクロサービスのアップロードも、Messaging Service への外部アプリケーションクライアント接続も許可しないでください。
この推奨事項は、互いに信頼していない複数の顧客が 1 つのテナントを共有している場合にも当てはまります。
永続サブスクリプションとメッセージ確認応答
コンシューマーをトピックにサブスクライブすると、そのトピックに対する durable subscription が確立されます。
これは、Messaging Service が、そのトピックに公開されたメッセージを、クライアントに配信され、確認応答されるまで保持することを意味します。
サブスクリプションは、明示的に削除されるまで残ります。
クライアントが現在実行されていないという理由だけでは削除されません。
クライアントの切断中に公開されたメッセージは、再接続時にコンシュームできます。
各メッセージをコンシュームした後、クライアントはそれを明示的に確認応答する必要があります。
メッセージに確認応答すると、クライアントがそのメッセージにこれ以上関心がないことを Messaging Service に伝え、メッセージを破棄できるようになります。
永続サブスクリプションを正しく管理するための詳細は、以下の best practices セクションを参照してください。
サンプルコード – メッセージのコンシューム
以下のコードスニペットは、Pulsar Java クライアントライブラリを使用して MQTT Service の from-device トピックからメッセージをコンシュームする方法を示しています。
これは、Pulsar サーバーへの接続を設定する前の例を拡張したものです。
// Create a simple message listener that will log some details of// each message received, when registered with a consumer.final MessageListener<byte[]> listener = new MessageListener<byte[]>() {
@Overridepublicvoidreceived(Consumer<byte[]> consumer, Message<byte[]> message) {
final String clientId = message.getProperty("clientID");
final String topic = message.getProperty("topic");
finallong eventTime = message.getEventTime();
System.out.println(MessageFormat.format("Received message from MQTT device {0} on MQTT topic {1}", clientId, topic));
System.out.println(MessageFormat.format("MQTT PUBLISH arrival timestamp: {0}", eventTime));
System.out.println(MessageFormat.format("Message payload: {0}", new String(message.getValue(), StandardCharsets.UTF_8)));
System.out.println(MessageFormat.format("Message properties: {0}", message.getProperties()));
try {
// Acknowledge the message. consumer.acknowledge(message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
};
// Create a Pulsar consumer on the from-device topic for the tenant,// using the listener defined above to process each message.// This will trigger connection and authentication by the client.final Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
.topic(MessageFormat.format("persistent://{0}/mqtt/from-device", tenantID))
.subscriptionName("demoSubscription")
.messageListener(listener)
.subscribe();
System.out.println("Created Pulsar consumer");
MQTT デバイスへのメッセージの公開
特定のテナントについてクライアントが MQTT Service に接続されたデバイスへ送信したいすべてのメッセージは、URL persistent://<tenantID>/mqtt/to-device で識別される single Pulsar topic に公開する必要があります。
URL のコンポーネントは、上記の MQTT デバイスからのメッセージのコンシューム で説明したとおりに解釈してください。
認証されたユーザーが “Mqtt service messaging topics” ロールに対して “update” 権限を持っている場合にのみ、クライアントはこのトピックへ公開できます。
クライアントは他のどのトピックにも公開できません。
// Wrap all the operations that might fail after we create the// durable subscription in a try-catch, so that we can delete the// subscription if something goes wrong.try {
// Create a Pulsar producer on the to-device topic for the tenant.final Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(MessageFormat.format("persistent://{0}/mqtt/to-device", tenantID))
.create();
System.out.println("Created Pulsar producer");
// Publish a message to a single MQTT device. producer.newMessage()
.property("clientID", "demoClient")
.property("topic", "demoTopicB")
.key("demoClient")
.value("Message sent to a single device".getBytes(StandardCharsets.UTF_8))
.send();
System.out.println("Sent message to single device");
// Publish a message to all MQTT devices subscribed to a topic.// Note that the "clientID" property is omitted here. producer.newMessage()
.property("topic", "demoTopicB")
.key("demoTopicB")
.value("Message sent to all subscribed devices".getBytes(StandardCharsets.UTF_8))
.send();
System.out.println("Sent message to all subscribed devices");
// Close the producer. producer.close();
}
Messaging Service のクォータと制限
Pulsar topic に公開されたメッセージは、関心を持つすべてのコンシューマーに配信され、確認応答されるまで、Messaging Service によって永続的に保存されます。
MQTT Service によって from-device トピックへ公開されたメッセージの場合、コンシューマーはそのトピックに永続サブスクリプションを作成したすべてのクライアントです。
クライアントによって to-device トピックへ公開されたメッセージの場合、コンシューマーはそのメッセージをデバイスへ配信する MQTT Service のインスタンスです。
リソース使用量を最適化するため、Messaging Service は永続保存されたメッセージに対してストレージ制限とメッセージ time-to-live(TTL)を課しています。
これらの制限はテナント単位で設定可能です。
ユースケースで異なる設定が必要な場合、または質問や懸念がある場合は、product support に連絡してください。
不要になったサブスクリプションは明示的に削除してください。たとえば、クライアントを長期間停止する場合は、consumer の unsubscribe() メソッドを呼び出すか、Messaging Service の monitoring and management インターフェイスを使用してサブスクリプションを削除してください。
finally {
// Delete the durable subscription.// This is only necessary if messages should *not* be retained// on the topic while the client is disconnected. consumer.unsubscribe();
}
// Close the other Pulsar objects that we created. consumer.close();
client.close();
Messaging Service エラーの処理
Things Cloud Messaging Service は、クライアントから離れた場所で動作する複雑な分散サービスです。
すべての分散システムと同様に、完全な信頼性は保証できないため、クライアントは Pulsar クライアントライブラリによって報告されるエラーを処理できるようにしておく必要があります。
これらのエラーは、大きく 2 つのカテゴリに分けられます。
クライアント実装における設定またはロジックエラー。
このカテゴリのエラーは通常 “fatal” であり、クライアントが Messaging Service に接続したり、メッセージを公開またはコンシュームしたりすることを妨げます。
この種のエラーの典型例には、以下があります。