nsq介绍

NSQ 是一个消息队列中间件,用 go 实现。特点:

  • 分布式: 它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。

  • 易于扩展: 它支持水平扩展,没有中心化的消息代理( Broker ),内置的发现服务让集群中增加节点非常容易。

  • 运维方便: 它非常容易配置和部署,灵活性高。

  • 高度集成: 现在已经有官方的 Golang、Python 和 JavaScript 客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

集群及组件

nsqd

nsq 核心逻辑所在,负责接收消息、排队消息、并投递消息给消费者。

nsqlookup

负责管理拓扑信息,类似于kafka的 zookeeper。用于服务注册、发现。

Topic

一个 topic 就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建 topic

Channel

类似 kafka 中的消费组,是消费者之间的负载均衡。每当一个发布者发送一条消息到一个 topic,消息会被复制到所有消费者连接的 channel 上,然后将消息随机推送到其中一个消费者

数据流动

img

topic 中的消息会被复制到多个 channel 中,并将消息推送到其中一个消费这个 channel 的消费者手中。

示例1. 消息A,发送给3个Channels,其中metrics这个channel,推送到第二个消费者

消息B,发送给3个Chnnels,其中metrics这个channel,推送给第一个消费者

集群架构

img

图中表明了 3 中类型的连接:

  • 黑实线带箭头。consumer 会直连 nsqd,并从 nsqd 获取消息

  • 蓝虚线。consumer 会询问 nsqlookup 某个 topic 在哪些 nsqd 上存在,nsqlookup 会返回 nsqd 的信息

  • 灰虚线。nsqd 和 nsqlookup 会建立一个长连接,并在 topic 或者 channel 发生变更时,将 topic 和 channel 信息同步到 nsqlookup

Docker部署

nsqd 支持tcp和http链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171"
  • nsqlookupd:为集群服务控制,注册与发现, **tcp 端口:4160,用于 nsqd 集群广播;http 端口:4161,用于客户端访问
  • nsqd:实际工作节点, tcp 端口: 4150,用于生产者、消费者访问;http 端口: 4151,用于生产者访问
  • nsqadmin:web管理工具, 端口:4171, 可以通过浏览器访问http://localhost:4171/

命令工具

nsq_stat, nsq_tail, nsq_to_file, nsq_to_http, nsq_to_nsq, nsqadmin, nsqd, nsqlookupd, to_nsq

可以进入容器,查看

1
2
3
4
5
docker exec -it nsq-nsqd-1 sh
cd /
# 查看
ps aux
ls -al

发送消息

topic可以由生产者,或者消费者首次使用时,自动创建

1
2
3
4
# 创建一个topic,发送消息
to_nsq --nsqd-tcp-address=nsqd:4150 --topic=test
# 或者使用http发送消息
curl -d 'hello world ' 'http://127.0.0.1:4151/pub?topic=test'

消费

1
2
3
4
5
6
7
# 消费
# 查看 此时会创建一个channel,类似tail686673#ephemeral
nsq_tail --nsqd-tcp-address=nsqd:4150 --topic=test -print-topic
# 查看 指定channel
nsq_tail --nsqd-tcp-address=nsqd:4150 --topic=test -print-topic --channel=channel1
# 也可以这样
nsq_tail --lookupd-http-address=nsqlookupd:4161 --topic=test --channel=channel1

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建一个nsq node
/nsqd --lookupd-tcp-address=nsqlookupd:4160 --tcp-address="0.0.0.0:5150" --http-address="0.0.0.0:5151"

# 转发到其他node
nsq_to_nsq --destination-nsqd-tcp-address=nsqd:5150 --nsqd-tcp-address=nsqd:4150 --topic=test

# 统计
nsq_stat --lookupd-http-address=nsqlookupd:4161 --topic=test --channel=channel1
# 注意连接的是nsqd
nsq_to_nsq --destination-nsqd-tcp-address=nsqd:4150 --nsqd-tcp-address=nsqd:4150 --topic=test

# 查看node
curl localhost:4161/nodes
curl "http://127.0.0.1:4161/lookup?topic=test_topic"

WebUI

如果启动了nsqadmin, 可以访问http://localhost:4171/, 提供了丰富的功能。

  1. 查看node节点
  2. 查看topic
  3. 配置Lookup

image-20240530224508924

Golang demo

生产者代码(producer.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)

func main() {
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}

message := "Hello, NSQ!"
topic := "test_topic"

err = producer.Publish(topic, []byte(message))
if err != nil {
log.Fatal(err)
}

fmt.Println("Message sent:", message)
producer.Stop()
}

消费者代码(consumer.go)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"log"
"github.com/nsqio/go-nsq"
)

type MessageHandler struct{}

func (h *MessageHandler) HandleMessage(m *nsq.Message) error {
fmt.Println("Received message:", string(m.Body))
return nil
}

func main() {
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("test_topic", "channel_1", config)
if err != nil {
log.Fatal(err)
}

consumer.AddHandler(&MessageHandler{})

err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Fatal(err)
}

// Keep the consumer running
select {}
}

4. 运行示例

确保 Docker Compose 中的 NSQ 服务正在运行,然后分别在两个终端中运行生产者和消费者。

1
2
3
4
5
# 运行生产者
go run producer.go

# 运行消费者
go run consumer.go

验证输出

在消费者的终端中,你应该会看到输出 Received message: Hello, NSQ!,这表示消费者成功接收到了生产者发送的消息。

可能错误信息

no such host

error connecting to nsqd - dial tcp: lookup d8971f05caa5: no such host

1
curl http://127.0.0.1:4161/lookup?topic=test_topic

可以查看到返回的host

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"channels": [
"channel_1"
],
"producers": [
{
"remote_address": "172.22.0.5:54888",
"hostname": "d8971f05caa5",
"broadcast_address": "nsqd",
"tcp_port": 4150,
"http_port": 4151,
"version": "1.3.0"
}
]
}

注意docker中网络和本地网络区别

TOPIC_NOT_FOUND

error querying nsqlookupd (http://nsqlookupd:4161/lookup?topic=test_topic) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"

topic未创建

参考文章

NSQ 实现逻辑探秘

docker 部署 nsq

https://nsq.io/

NSQ 介绍

https://www.bootwiki.com/nsq/nsq-guide-nsqd.html

nsq 客户端 lib 库开源地址为:https://github.com/nsqio/go-nsq

万字解析 go 语言分布式消息队列 nsq