-
Notifications
You must be signed in to change notification settings - Fork 69
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c948afc
commit d66ae3e
Showing
4 changed files
with
341 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,275 @@ | ||
## はじめに | ||
|
||
[MQTT](https://www.emqx.com/ja/blog/the-easiest-guide-to-getting-started-with-mqtt)は、パブリッシュ/サブスクライブモデルに基づく軽量な[IoTメッセージング](https://www.emqx.com/ja/solutions/reliable-mqtt-messaging)プロトコルです。非常に少ないコードと帯域幅で、ネットワーク接続されたデバイスにリアルタイムかつ信頼性の高いメッセージングサービスを提供できます。IoT、モバイルインターネット、スマートハードウェア、[自動車のインターネット](https://www.emqx.com/ja/use-cases/internet-of-vehicles)、電力エネルギーなどの産業で広く使用されています。 | ||
|
||
Goは、クロスプラットフォームのオープンソースプログラミング言語です。高性能のアプリケーションを作成するために使用できます。GolangとMQTTを組み合わせることで、開発者はスケーラブルで安全なIoTアプリケーションを構築し、リアルタイムでデバイスと通信し、情報を交換し、複雑なデータ分析を実行できます。 | ||
|
||
この記事では、GolangプロジェクトでMQTTを使用して、クライアントと[MQTTブローカー](https://www.emqx.com/ja/blog/the-ultimate-guide-to-mqtt-broker-comparison)間のシームレスな通信を実現する方法について包括的なガイドを提供します。接続の確立、トピックの購読と購読解除、メッセージの公開、リアルタイムでのメッセージの受信方法を学びます。このガイドは、MQTTを活用してスケーラブルで効率的なIoTアプリケーションを構築するためのスキルを身につけるのに役立ちます。 | ||
|
||
## Golang MQTTプロジェクトの準備 | ||
|
||
### Golangバージョンの確認 | ||
|
||
このプロジェクトでは、開発とテストにgo **v1.21.1**を使用します。正しいバージョンのGolangがインストールされていることを確認するには、次のコマンドを使用します。 | ||
|
||
```shell | ||
$ go version | ||
go version go1.21.1 darwin/amd64 | ||
``` | ||
|
||
### Golang MQTTクライアントのインストール | ||
|
||
このプロジェクトでは、[MQTT クライアントライブラリ](https://www.emqx.com/ja/mqtt-client-sdk)として [paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) を使用します。インストール方法は次のとおりです。 | ||
|
||
```shell | ||
go get github.com/eclipse/paho.mqtt.golang | ||
``` | ||
|
||
## MQTTブローカーの準備 | ||
|
||
続行する前に、通信およびテストに使用するMQTTブローカーがあることを確認してください。EMQX Cloudの使用をお勧めします。 | ||
|
||
[EMQX Cloud](https://www.emqx.com/ja/cloud)は、完全に管理されたクラウドネイティブのMQTTサービスであり、大量のIoTデバイスに接続し、さまざまなデータベースとビジネスシステムを統合できます。EMQX Cloudを使用すると、わずか数分で開始でき、AWS、Google Cloud、Microsoft Azureの20以上のリージョンでMQTTサービスを実行できるため、グローバルな可用性と高速な接続が保証されます。 | ||
|
||
- サーバー:`broker.emqx.io` | ||
- TCPポート:`1883` | ||
- WebSocketポート:`8083` | ||
- SSL/TLSポート:`8883` | ||
- セキュアWebSocketポート:`8084` | ||
|
||
## Golang MQTTの使用 | ||
|
||
### MQTT接続の作成 | ||
|
||
#### TCP接続 | ||
|
||
MQTT接続を確立するには、接続アドレス、ポート、クライアントIDを設定する必要があります。 | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
mqtt "github.com/eclipse/paho.mqtt.golang" | ||
"time" | ||
) | ||
|
||
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) | ||
} | ||
|
||
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { | ||
fmt.Println("Connected") | ||
} | ||
|
||
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { | ||
fmt.Printf("Connect lost: %v", err) | ||
} | ||
|
||
func main() { | ||
var broker = "broker.emqx.io" | ||
var port = 1883 | ||
opts := mqtt.NewClientOptions() | ||
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) | ||
opts.SetClientID("go_mqtt_client") | ||
opts.SetUsername("emqx") | ||
opts.SetPassword("public") | ||
opts.SetDefaultPublishHandler(messagePubHandler) | ||
opts.OnConnect = connectHandler | ||
opts.OnConnectionLost = connectLostHandler | ||
client := mqtt.NewClient(opts) | ||
if token := client.Connect(); token.Wait() && token.Error() != nil { | ||
panic(token.Error()) | ||
} | ||
} | ||
``` | ||
|
||
- ClientOptions:ブローカー、ポート、クライアントID、ユーザー名、パスワードなどのオプションを設定するために使用します。 | ||
- messagePubHandler:グローバルなMQTTパブリッシュメッセージ処理 | ||
- connectHandler:接続のコールバック | ||
- connectLostHandler:接続が失われた場合のコールバック | ||
|
||
#### TLS/SSL | ||
|
||
[MQTTでTLSを使用する](https://www.emqx.com/ja/blog/fortifying-mqtt-communication-security-with-ssl-tls)と、情報の機密性と完全性を確保し、情報の漏洩や改ざんを防ぐことができます。TLS認証は、一方向認証と双方向認証に分類できます。 | ||
|
||
TSL接続を使用する場合は、次の設定を使用できます。 | ||
|
||
```go | ||
func NewTlsConfig() *tls.Config { | ||
certpool := x509.NewCertPool() | ||
ca, err := ioutil.ReadFile("ca.pem") | ||
if err != nil { | ||
log.Fatalln(err.Error()) | ||
} | ||
certpool.AppendCertsFromPEM(ca) | ||
// Import client certificate/key pair | ||
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem") | ||
if err != nil { | ||
panic(err) | ||
} | ||
return &tls.Config{ | ||
RootCAs: certpool, | ||
ClientAuth: tls.NoClientCert, | ||
ClientCAs: nil, | ||
InsecureSkipVerify: true, | ||
Certificates: []tls.Certificate{clientKeyPair}, | ||
} | ||
} | ||
``` | ||
|
||
クライアント証明書が設定されていない場合は、次のように設定できます。 | ||
|
||
```go | ||
func NewTlsConfig() *tls.Config { | ||
certpool := x509.NewCertPool() | ||
ca, err := ioutil.ReadFile("ca.pem") | ||
if err != nil { | ||
log.Fatalln(err.Error()) | ||
} | ||
certpool.AppendCertsFromPEM(ca) | ||
return &tls.Config{ | ||
RootCAs: certpool, | ||
} | ||
} | ||
``` | ||
|
||
次に、TLSを設定します。 | ||
|
||
```go | ||
var broker = "broker.emqx.io" | ||
var port = 8883 | ||
opts := mqtt.NewClientOptions() | ||
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port)) | ||
tlsConfig := NewTlsConfig() | ||
opts.SetTLSConfig(tlsConfig) | ||
// other options | ||
``` | ||
|
||
### MQTTトピックのサブスクライブ | ||
|
||
MQTTブローカーからトピックをサブスクライブするには、次のコードを使用します。 | ||
|
||
```go | ||
func sub(client mqtt.Client) { | ||
topic := "topic/test" | ||
token := client.Subscribe(topic, 1, nil) | ||
token.Wait() | ||
fmt.Printf("Subscribed to topic %s", topic) | ||
} | ||
``` | ||
|
||
### MQTTメッセージのパブリッシュ | ||
|
||
上記のトピックのサブスクライブとメッセージの監視が完了したら、メッセージをパブリッシュするための関数を作成します。 | ||
|
||
```go | ||
func publish(client mqtt.Client) { | ||
num := 10 | ||
for i := 0; i < num; i++ { | ||
text := fmt.Sprintf("Message %d", i) | ||
token := client.Publish("topic/test", 0, false, text) | ||
token.Wait() | ||
time.Sleep(time.Second) | ||
} | ||
} | ||
``` | ||
|
||
### テスト | ||
|
||
テストには次のコードを使用します。 | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
mqtt "github.com/eclipse/paho.mqtt.golang" | ||
"log" | ||
"time" | ||
) | ||
|
||
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { | ||
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) | ||
} | ||
|
||
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { | ||
fmt.Println("Connected") | ||
} | ||
|
||
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { | ||
fmt.Printf("Connect lost: %v", err) | ||
} | ||
|
||
func main() { | ||
var broker = "broker.emqx.io" | ||
var port = 1883 | ||
opts := mqtt.NewClientOptions() | ||
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) | ||
opts.SetClientID("go_mqtt_client") | ||
opts.SetUsername("emqx") | ||
opts.SetPassword("public") | ||
opts.SetDefaultPublishHandler(messagePubHandler) | ||
opts.OnConnect = connectHandler | ||
opts.OnConnectionLost = connectLostHandler | ||
client := mqtt.NewClient(opts) | ||
if token := client.Connect(); token.Wait() && token.Error() != nil { | ||
panic(token.Error()) | ||
} | ||
|
||
sub(client) | ||
publish(client) | ||
|
||
client.Disconnect(250) | ||
} | ||
|
||
func publish(client mqtt.Client) { | ||
num := 10 | ||
for i := 0; i < num; i++ { | ||
text := fmt.Sprintf("Message %d", i) | ||
token := client.Publish("topic/test", 0, false, text) | ||
token.Wait() | ||
time.Sleep(time.Second) | ||
} | ||
} | ||
|
||
func sub(client mqtt.Client) { | ||
topic := "topic/test" | ||
token := client.Subscribe(topic, 1, nil) | ||
token.Wait() | ||
fmt.Printf("Subscribed to topic: %s", topic) | ||
} | ||
``` | ||
|
||
コードを実行すると、MQTT接続とサブスクリプションに成功し、サブスクライブしたトピックのメッセージを正常に受信できることがわかります。 | ||
|
||
![Run code](https://assets.emqx.com/images/8882115bb9e0154ccfab16c26dd47566.png) | ||
|
||
## Q&A | ||
|
||
### **送信されたMQTTメッセージがJSON形式ではない場合はどうなりますか?** | ||
|
||
MQTTメッセージがJSONではない場合でも、`toString()`メソッドを使用して文字列に変換できます。ただし、コンテンツが元々文字列ではない場合(たとえば、バイナリデータの場合)、データの性質に応じて異なる処理が必要になる場合があります。 | ||
|
||
### **メッセージのパブリッシュまたはサブスクライブ中に接続が切断された場合はどうなりますか?** | ||
|
||
**paho.mqtt.golang**クライアントは、メッセージのパブリッシュまたはサブスクライブ中にブローカーに自動接続するオプションを提供します。また、自動再接続機能はデフォルトで有効になっています。 | ||
|
||
### **複数の接続を確立すると、なぜMQTTクライアント接続が切断されるのですか?** | ||
|
||
複数のMQTT接続を確立するときは、異なるクライアントIDを使用してください。クライアントIDは、MQTT接続をMQTTブローカーに識別するために重要です。さらに、特定のデバイスまたはクライアントを識別します。トレーサビリティの観点から、デバイスのブローカーへの接続を明確に識別するクライアントIDを設定すると便利です。**MQTTブローカーは通常、既存の(古い)接続と同じクライアントIDを持つ新しい接続要求を受信したときに古い接続を閉じるメカニズムを実装し、その後、新しい接続を受け入れます。** | ||
|
||
## まとめ | ||
|
||
**paho.mqtt.golang**クライアントを使用して[パブリックMQTTブローカー](https://www.emqx.com/ja/mqtt/public-mqtt5-broker)に接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュとサブスクリプションを実装しました。 | ||
|
||
次に、EMQが提供する[わかりやすいMQTTプロトコルガイド](https://www.emqx.com/ja/mqtt-guide)シリーズの記事をチェックして、MQTTプロトコルの機能について学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションとサービス開発を開始できます。 | ||
|
||
|
||
|
||
<section class="promotion"> | ||
<div> | ||
専門家と話します | ||
</div> | ||
<a href="https://www.emqx.com/ja/contact?product=solutions" class="button is-gradient">お問い合わせ →</a> | ||
</section> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
[NeuronEX](https://www.emqx.com/zh/products/neuronex) 是一款工业边缘网关软件,提供工业多协议接入与边缘计算能力。它主要部署在工业现场,用于工业设备通信、工业总线协议采集、数据集成、边端数据过滤分析、AI 算法集成以及工业互联网平台对接,为工业场景提供低延迟的数据接入管理及智能分析服务。 | ||
|
||
本文将重点介绍 NeuronEX 的反向代理和进程管理功能。反向代理可简化访问,提升安全性保护服务端,以及分配请求防止过载。进程管理可自动化启停和监控子程序,确保系统稳定性和可靠性。 | ||
|
||
## **反向代理功能** | ||
|
||
NeuronEX 的两个核心模块是 Neuron 和 eKuiper。Neuron 负责支持 100 多种常见的工业协议数据接入能力,eKuiper 负责边缘端提供流式计算分析以及集成 AI 算法。 | ||
|
||
反向代理可对外提供一个统一接入地址,然后根据需要将请求发送到后端提供服务的端口,简化外部访问和管理的同时,也很好的保护内部服务器地址。NeuronEX 通过 Go 标准库的 `http` 工具包提供反向代理服务。 `http` 工具包提供的 `ReverseProxy `结构体可以创建到目标地址的 HTTP 连接,同时该结构体实现了 HTTP 的核心接口: | ||
|
||
``` | ||
type Handler interface { | ||
ServeHTTP(ResponseWriter, *Request) | ||
} | ||
``` | ||
|
||
只需将原始请求的 `ResponseWriter`, `*Request` 参数赋值到 `ReverseProxy` 提供的 `ServeHTTP` 方法中,即可实现将原请求转发到特定后端服务端口的功能。那么如何区分应该把请求转发到 NeuronEX 模块下的 Neuron 还是 eKuiper 呢?关键在于识别请求路径中的特定前缀。因此,在 NeuronEX 的配置项中提供了如下设置,以支持流量的定向转发。 | ||
|
||
```shell | ||
neuron: | ||
reverseProxies: | ||
- location: /api/neuron | ||
proxyPath: http://127.0.0.1:7000/api/v2 | ||
|
||
ekuiper: | ||
reverseProxies: | ||
- location: /api/ekuiper | ||
proxyPath: http://127.0.0.1:9081 | ||
- location: /ws/ekuiper | ||
proxyPath: ws://127.0.0.1:10081 | ||
``` | ||
|
||
举例来说,假设 NeuronEX 所在机器地址为 hostname,对外暴露的端口号为 8085。对于获取 ekuiper 版本信息的请求,其完整路径为 `http://hostname:8085/api/ekuiper/version` ,经过代理层处理后,代理将向真正的后端地址 `http://127.0.0.1:9081/version` 发起请求。同样的,对于获取 Neuron 版本信息的请求,其完整路径为 `http://hostname:8085/api/neuron/version` ,经过代理层处理后,代理将向真正的后端地址 `http://127.0.0.1:7000/api/v2/version` 发起请求。由此可见,代理层能够根据请求路径中的特定前缀,识别并匹配到特定反向代理地址,然后将请求路径中的剩余路径与反向代理地址相结合,形成最终的访问地址。 | ||
|
||
![image.png](https://assets.emqx.com/images/d3913f91ddb80ff14de0a836e5625a2e.png) | ||
|
||
## 进程管理功能 | ||
|
||
NeuronEX 负责启动并监控 Neuron 和 eKuiper 这两个核心模块。一旦发现任一软件异常退出,NeuronEX 将终止所有相关程序并退出。NeuronEX 通过 Go 标准库的 exec 工具包的 Command 方法启动子程序,以下示例以 Neuron 启动为例: | ||
|
||
``` | ||
NeuronCmd := exec.Command("/bin/bash", "cd /opt/neuronex/software/neuron && ./neuron --disable_auth") | ||
``` | ||
|
||
`exec.Command` 方法的第一个参数是 shell 程序名,第二个参数是在 shell 中启动 Neuron 的命令行语句。将这两者结合后,得到的命令行语句用于在 shell 中执行 Neuron 的启动命令。通过调用 `NeuronCmd` 的 `Start` 方法来启动程序,随后调用 `Wait` 方法等待 Neuron 进程的退出。一旦 Neuron 进程退出,NeuronEX 进程在接收到信号后会立即退出。 | ||
|
||
程序正常启动后,可以通过在 Linux 命令行中执行 ps 命令查看程序的运行状态。这时可以观察到,通过此方法启动的 Neuron 和 eKuiper 的父进程为 NeuronEX。 | ||
|
||
![image.png](https://assets.emqx.com/images/f1e2da361407d47fc92245b0ce0a0346.png) | ||
|
||
## 总结 | ||
|
||
通过 Neuron 和 eKuiper 两个功能模块,NeuronEX 在边端数据采集和数据处理功能之外,又增强了用户管理能力,方便运维人员操作。 | ||
|
||
凭借丰富的工业协议接入(80多种工业现场总线实时采集、100 多种驱动支持)、多源数据集成、流式计算分析、AI/ML 算法集成以及支持各类数据库与平台的数据转发存储,NeuronEX 可帮助工业企业打通数据孤岛、利用工业人工智能、提高设备健康管理、支持预测性维护以及增强数据安全性和实时监控能力。 | ||
|
||
|
||
|
||
<section class="promotion"> | ||
<div> | ||
咨询 EMQ 技术专家 | ||
</div> | ||
<a href="https://www.emqx.com/zh/contact?product=solutions" class="button is-gradient">联系我们 →</a> | ||
</section> |