盒子
盒子
文章目录
  1. 1.消息中间件
  2. 1.数据可靠性
  3. 2.Kafka
    1. 2.1.什么是Zookeeper
      1. 2.1.1.Zookeeper的重要概念
      2. 2.1.2.Zookeeper的特点
      3. 2.1.3.ZooKeeper集群角色
      4. 2.1.4.ZooKeeper应用举例参考文章
    2. 2.2.Kafka介绍
      1. 2.2.1.Kafka 的基本术语
      2. 2.2.2.Kafka的特性
      3. 2.2.3.Kafka的使用场景
      4. 2.2.4.Kafka为何如此之快
      5. 2.2.5.核心API
    3. 2.3.Kafka与ZooKeeper的关系
    4. 2.4.Docker搭建Kafka集群
      1. 2.4.1.安装Docker Compose
      2. 2.4.2.创建Docker Network
      3. 2.4.3.编写Docker-Compose文件
      4. 2.4.4.启动Kafka集群
      5. 2.4.5.验证
    5. 2.5.Kafka与其他MQ的区别
    6. 2.6.参考
  4. 3.NSQ
    1. 3.1.NSQ组件
    2. 3.2.NSQ的特点
    3. 3.3.NSQ架构
    4. 3.1.Channel消息传递的通道
    5. 3.2.Consumer消息的消费者
    6. 3.4.NSQ数据流模型结构
    7. 3.5.本地搭建NSQ服务
    8. 3.6.参考
  5. 4.Hippo
    1. 4.1.Hippo系统架构
  6. 5.结语

Kafka、Nsq、Hippo等消息中间件的梳理总结

1.消息中间件

  这周把 KafkaNsqHippo简单地学习了一遍,这三者都是常用的消息中间件。并且通过学习,发现消息中间件都有共通之处,所以非常值得大家多看多比较。这里先介绍一下消息中间件的优势:

  • 屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。
  • 异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。
  • 解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
  • 复用:一次发送多次消费。

  另外,推荐阅读一下分布式高可靠消息中间件Hippo文章中,提到的“传统消息系统数据丢失风险点”,学习一下要实现高可靠性的消息中间件需要注意的几个关键问题。

1.数据可靠性

  • 消息存储可靠性:WAL(预写式日志)+持久化;数据存储多副本;存储节点自动failover;

  • 消息传输可靠性:ACK机制;数据CRC(循环冗余校验)校验;

  • 消息投递可靠性:producer->broker 数据存储后才返回成功确认;broker->consumer 数据处理完成后需进行确认

  • 服务(Qos)级别:不能丢消息;At-least-once 可能会有重复;极端情况下通过客户端进行数据去重;

2.Kafka

  Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

2.1.什么是Zookeeper

  ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

  Zookeeper 一个最常用的使用场景就是用于担任服务生产者和服务消费者的注册中心。 ZooKeeper 主要提供下面几个功能:1、集群管理:容错、负载均衡。2、配置文件的集中管理。3、集群的入口。

2.1.1.Zookeeper的重要概念

  • ZooKeeper 本身就是一个分布式程序(只要半数以上节点存活,ZooKeeper 就能正常服务)。
  • 为了保证高可用,最好是以集群形态来部署 ZooKeeper。
  • ZooKeeper 将数据保存在内存中,这也就保证了高吞吐量和低延迟
  • ZooKeeper 是高性能的。 在“读”多于“写”的应用程序中尤其地高性能,因为“写”会导致所有的服务器间同步状态。(“读”多于“写”是协调服务的典型场景。)
  • ZooKeeper有临时节点的概念。 当创建临时节点的客户端会话一直保持活动,临时节点就一直存在。而当会话终结时,临时节点被删除。持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNode将一直保存在Zookeeper上。
  • ZooKeeper 底层其实只提供了两个功能:1、管理(存储、读取)用户程序提交的数据;2、为用户程序提交数据节点监听服务

2.1.2.Zookeeper的特点

  • 顺序一致性: 从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到 ZooKeeper 中去。
  • 原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用。
  • 单一系统映像: 无论客户端连到哪一个 ZooKeeper 服务器上,其看到的服务端数据模型都是一致的。
  • 可靠性: 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖。

2.1.3.ZooKeeper集群角色

  ZooKeeper 集群中的所有机器通过一个 Leader 选举过程来选定一台称为 Leader 的机器,Leader 既可以为客户端提供写服务又能提供读服务。除了 Leader 外,Follower 和 Observer 都只能提供读服务。Follower 和 Observer 唯一的区别在于 Observer 机器不参与 Leader 的选举过程,也不参与写操作的“过半写成功”策略,因此 Observer 机器可以在不影响写性能的情况下提升集群的读性能。

2.1.4.ZooKeeper应用举例参考文章

  这部分篇幅较长,暂不在此举例。推荐阅读ZooKeeper学习第一期—Zookeeper简单介绍的“七、ZooKeeper应用举例”部分,能较好地描述Zoopker在Kafka中扮演的部分角色。

2.2.Kafka介绍

  Kafka 是一个分布式流式平台,它有三个关键能力:

  1. 订阅发布记录流,它类似于企业中的消息队列 或 企业消息传递系统
  2. 以容错的方式存储记录流
  3. 实时记录流

  Kafka的应用场景:

  1. 作为消息系统
  2. 作为存储系统
  3. 作为流处理器

  Kafka 可以建立流数据管道,可靠性的在系统或应用之间获取数据。建立流式应用传输和响应数据。

  下图是一个典型的 Kafka 集群:

2.2.1.Kafka 的基本术语

  • 消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

  • 批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

  • 主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

  • 分区:主题可以被分为若干个 分区(Partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。

  • 生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

  • 消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。一个消费者可以消费多个 Topic 的消息,对于某一个 Topic 的消息,其只会消费同一个 Partition 中的消息。

  • 消费者群组:一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

  • 偏移量偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

  • broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

  • broker 集群:broker 是 集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

  • 副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

  • 重平衡:Rebalance,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

2.2.2.Kafka的特性

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。

  • 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。

  • 持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。

  • 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。

  • 高并发:支持数千个客户端同时读写。

2.2.3.Kafka的使用场景

  • 活动跟踪
  • 传递消息
  • 度量指标
  • 日志记录
  • 流式处理
  • 限流削峰

2.2.4.Kafka为何如此之快

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

  Kafka 实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka 可以将数据记录分批发送,从生产者到文件系统(Kafka 主题日志)到消费者,可以端到端的查看这些批次的数据。

  批处理能够进行更有效的数据压缩并减少 I/O 延迟,Kafka 采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费。

2.2.5.核心API

  Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 Topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 Topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

2.3.Kafka与ZooKeeper的关系

  在 kafka 集群中,ZooKeeper 集群用于存放集群元数据成员管理Controller 选举,以及其他一些管理类任务

  • 存放元数据: 是指主题分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他 “人” 都要与它保持对齐。
  • 成员管理: 是指 Broker 节点的注册、注销以及属性变更。
  • Controller 选举: 是指选举集群 Controller,而其他管理类任务包括但不限于主题删除、参数配置。

  这部分强烈推荐阅读Kafka元数据在Zookeeper中的存储分布,它能让你对Zookeeper的作用和两者之间的关系有一个更清晰的了解。另外,可以参考Kafka(四)Kafka在ZooKeeper中的存储

2.4.Docker搭建Kafka集群

  这里主要参考了【Kafka精进系列003】Docker环境下搭建Kafka集群,利用 docker-compose 可以更轻松的搭建出一个Kafka集群。

2.4.1.安装Docker Compose

  • 从Github上下载最新版的docker-compose文件:
1
sudo curl -L https://github.com/docker/compose/releases/download/1.26.2/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
  • 添加可执行权限:
1
sudo chmod +x /usr/local/bin/docker-compose
  • 测试安装结果:
1
2
$ docker-compose --version
docker-compose version 1.26.2, build eefe0d31

2.4.2.创建Docker Network

  • 创建一个名为 docker_net 的桥接网络,子网为192.168.100.1/24
1
docker network create docker_net --subnet="192.168.100.1/24"

2.4.3.编写Docker-Compose文件

  • 创建并编写 docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- docker_net
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.100.1
KAFKA_CREATE_TOPICS: TestComposeTopic:4:3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.100.1:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
container_name: kafka01
volumes:
- "./kafka1/docker.sock:/var/run/docker.sock"
- "./kafka1/data/:/kafka"
networks:
- docker_net
kafka2:
image: wurstmeister/kafka
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.100.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.100.1:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
container_name: kafka02
volumes:
- "./kafka2/docker.sock:/var/run/docker.sock"
- "./kafka2/data/:/kafka"
networks:
- docker_net
kafka3:
image: wurstmeister/kafka
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.100.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.100.1:9094
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
container_name: kafka03
volumes:
- "./kafka3/docker.sock:/var/run/docker.sock"
- "./kafka3/data/:/kafka"
networks:
- docker_net
networks:
docker_net:
external: true

2.4.4.启动Kafka集群

  • 使用Docker Compose启动Kafka集群:
1
2
3
4
# 前台启动
docker-compose up
# 后台启动(推荐)
docker-compose up -d
  • 启动后,可以通过以下命令查看容器的IP:
1
2
3
4
5
docker inspect --format '{{ .NetworkSettings.Networks.your_docker_net_name.IPAddress }}' <container id>
# 或
docker inspect <container id>
# 或
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' <container id | container name>

2.4.5.验证

  • 进入其中一个 Kafka Broker 容器,执行以下命令,查看Topic信息:
1
kafka-topics.sh --describe --zookeeper 192.168.100.5:2181 --topic TestComposeTopic

  • 进入Broker1容器,执行:
1
kafka-console-producer.sh --broker-list 192.168.100.2:9092 --topic TestComposeTopic
  • 进入Broker2容器,执行:
1
kafka-console-consumer.sh --bootstrap-server 192.168.100.4:9093 --topic TestComposeTopic --from-beginning
  • 进入Broker3容器,执行:
1
kafka-console-consumer.sh --bootstrap-server 192.168.100.3:9094 --topic TestComposeTopic --from-beginning



2.5.Kafka与其他MQ的区别

  首先推荐阅读Kafka、RabbitMQ、RocketMQ等消息中间件的介绍和对比,里面比较详细的介绍并比较了多种消息中间件。下面以RabbitMQ与Kafka进行对比:

  • 在应用场景方面
      RabbitMQ,遵循AMQP协议,由内在高并发的erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上。
      Kafka是Linkedin于2010年12月份开源的消息发布订阅系统,它主要用于处理活跃的流式数据,大数据量的数据处理上。

  • 在架构模型方面
      RabbitMQ的broker由Exchange、Binding、queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费。rabbitMQ以broker为中心;有消息的确认机制。
      Kafka遵从一般的MQ结构:producer、broker、consumer。以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。

  • 在吞吐量
      RabbitMQ在吞吐量方面稍逊于kafka,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。
      Kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。

  • 在可用性方面
      rabbitMQ支持miror的queue,主queue失效,miror queue接管。
      Kafka的broker支持主备模式。

  • 在集群负载均衡方面
      RabbitMQ的负载均衡需要单独的loadbalancer进行支持。
      Kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。

2.6.参考

可能是全网把 ZooKeeper 概念讲的最清楚的一篇文章
带你涨姿势的认识一下kafka
真的,Kafka 入门一篇文章就够了
kafka 中 zookeeper 具体是做什么的?
zookeeper在kafka中的作用
RabbitMQ和kafka从几个角度简单的对比

3.NSQ

  NSQ 是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。 NSQ 可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。 NSQ 具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。

3.1.NSQ组件

  NSQ 由 3 个守护进程组成:

  • nsqd 一个负责接收、排队、转发消息到客户端的守护进程。
  • nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。
  • nsqadmin 是一个 Web UI 来实时监控集群和执行各种管理任务。

3.2.NSQ的特点

  • 具有分布式且无单点故障的拓扑结构,支持水平扩展,在无中断情况下能够无缝地添加集群节点;
  • 低延迟的消息推送;
  • 具有组合式的负载均衡和多播形式的消息路由;
  • 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向 Job 的(低吞吐量)工作负载;
  • 消息数据既可以存储于内存中,也可以存储在磁盘中;
  • 实现了生产者、消费者自动发现和消费者自动连接生产者,参见 nsqlookupd;
  • 支持安全传输层协议(TLS),从而确保了消息传递的安全性;
  • 具有与数据格式无关的消息结构,支持 JSON、Protocol Buffers、MsgPacek 等消息格式;
  • 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置);
  • 使用了简单的 TCP 协议且具有多种语言的客户端功能库;
  • 具有用于信息统计、管理员操作和实现生产者等的 HTTP 接口;
  • 为实时检测集成了统计数据收集器 StatsD;
  • 具有强大的集群管理界面,参见 nsqadmin。

3.3.NSQ架构

  既然 NSQ 也是消息中间件,也就有 Topic、Producer、Consumer这三个主要概念。其中 Topic、Producer 不再详细描述,主要讲讲不同之处。

3.1.Channel消息传递的通道

  • 当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个 Channel ,Channel 起到队列的作用
  • Channel 与 Consumer 相关,是消费者之间的负载均衡,消费者通过这个特殊的 Channel 读取消息。
  • 在 Consumer 想单独获取某个 Topic 的消息时,可以 Subscribe(订阅) 一个自己单独命名的 nsqd 中还不存在的 Channel , nsqd 会为这个 Consumer 创建其命名的 Channel 。
  • Channel 会将消息进行排列,如果没有 Consumer 读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
  • 一个 channel 一般会有多个 Consumer 连接。假设所有已连接的 Consumer 处于准备接收消息的状态,每个消息将被传递到一个随机的 Consumer
  • Go语言中的 Channel 是表达队列的一种自然方式,因此一个NSQ的 Topic/Channel,其核心就是一个存放消息指针的Go-channel缓冲区

3.2.Consumer消息的消费者

  • Consumer 通过 TCP Subscribe 自己需要的 Channel。
  • Topic 和 Channel 都没有预先配置。 Topic 由第一次发布消息到命名 Topic 的 Producer 创建 或 第一次通过 Subscribe 订阅一个命名 Topic 的 Consumer 来创建。 Channel 被 Consumer 第一次 Subscribe 订阅到指定的 Channel 创建。
  • 多个 Consumer Subscribe 一个 Channel ,假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个 随机 的 Consumer 。
  • NSQ 支持延时消息, Consumer 在配置的延时时间后才能接受相关消息。
  • Channel 在 Consumer 退出后并不会删除

3.4.NSQ数据流模型结构

  单个 nsqd 被设计为一次能够处理多个流数据,NSQ 中的数据流模型是由 stream 和 consumer 组成。Topic 是一种独特的 stream,Channel 是一个订阅了给定 Topic 的 consumer 逻辑分组。NSQ 的数据流模型结构如下图所示:

  从上图可以看出,单个 nsqd 可以有多个 Topic,每个 Topic 又可以有多个 Channel。Channel 能够接收 Topic 所有消息的副本,从而实现了消息多播分发;而 Channel 上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

3.5.本地搭建NSQ服务

  这部分很简单,直接从Github Release上,下载对应版本的压缩包,例如我这里下载的是 nsq-1.2.0.windows-amd64.go1.12.9.tar.gz。解压压缩包,并开启多个终端,进入目标目录。

  • 启动nsqlookud:
1
.\nsqlookud.exe
  • 启动nsqd,并接入刚刚启动的nsqlookud:
1
2
3
.\nsqd.exe --lookupd-tcp-address=127.0.0.1:4160
.\nsqd.exe --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=0.0.0.0:4152 --http-address=0.0.0.0:4153
  • 启动nqsadmin,可以在localhost:4171访问Web页面:
1
.\nsqadmin.exe --lookupd-http-address=127.0.0.1:4161
  • Nsq发送端示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main
import (
"bufio"
"fmt"
"os"
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
// 主函数
func main() {
strIP1 := "127.0.0.1:4150"
strIP2 := "127.0.0.1:4152"
InitProducer(strIP1)
running := true
//读取控制台输入
reader := bufio.NewReader(os.Stdin)
for running {
data, _, _ := reader.ReadLine()
command := string(data)
if command == "stop" {
running = false
}
for err := Publish("test", command); err != nil; err = Publish("test", command) {
//切换IP重连
strIP1, strIP2 = strIP2, strIP1
InitProducer(strIP1)
}
}
//关闭
producer.Stop()
}
// 初始化生产者
func InitProducer(str string) {
var err error
fmt.Println("address: ", str)
producer, err = nsq.NewProducer(str, nsq.NewConfig())
if err != nil {
panic(err)
}
}
//发布消息
func Publish(topic string, message string) error {
var err error
if producer != nil {
if message == "" { //不能发布空串,否则会导致error
return nil
}
err = producer.Publish(topic, []byte(message)) // 发布消息
return err
}
return fmt.Errorf("producer is nil", err)
}
  • Nsq接收端示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
// 消费者
type ConsumerT struct{}
// 主函数
func main() {
InitConsumer("test", "test-channel", "127.0.0.1:4161")
for {
time.Sleep(time.Second * 10)
}
}
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
//初始化消费者
func InitConsumer(topic string, channel string, address string) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second //设置重连时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddHandler(&ConsumerT{}) // 添加消费者接口
//建立NSQLookupd连接
//if err := c.ConnectToNSQLookupd(address); err != nil {
// panic(err)
//}
//建立多个nsqd连接
if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil {
panic(err)
}
}
  • 分别运行发送端和接收端,然后在发送端发送数据,得到如下结果:

3.6.参考

NSQ:分布式的实时消息平台
golang使用Nsq

4.Hippo

  Hippo 是腾讯开发的分布式消息系统,推荐阅读一下分布式高可靠消息中间件Hippo。官方介绍说可以满足具有高可靠高可用应用场景的业务需求,用以支撑广告计费,交易流水等高价值数据的业务。

  Hippo 由于是内部闭源组件,可以参考的资料十分有限,但通过上述的学习,其内部设计思路有很多相似之处,可以相互借鉴参考。例如,Kafka中的Zookeeper、Nsq中的nsqlookupd、Hippo中的Controller三者扮演的角色十分相近,主要是提供服务发现、故障转移等管理类服务。

4.1.Hippo系统架构

  系统逻辑结构图如下:

  系统交互图如下:

  Hippo系统存在四种角色,分别为生产者(producer)、 消费者(consumer)、存储层(broker)、中心控制节点(controller)。

  • Controller
      以组的形势存在,三台controller一主两备组成一个组承担着整个系统节点数据的收集、状态的共享及事件的分发角色。

  • Broker
      以组的形势存在,三台broker一主两备组成一个组由主broker向controller定期汇报心跳以告知controller当前组的存活状态,心跳携带当前组所管理的topic及queue信息。
      数据在broker以多副本的方式存储,Master broker为数据写入入口,并把数据实时同步给同组的两台Slave broker,主备broker之间存在心跳检测功能。数据冗余存储在不同的物理机器中,即使存在机器宕机或磁盘损坏的情况也不影响系统可靠对外提供服务。

  • Producer
      轮询发送:向controller发布某个topic的信息,controller返回相应topic所在的所有broker组对应的IP端口及queue信息。producer轮询所获取的broker组信息列表发送消息并保持与controller的心跳,以便在broker组存在变更时,能够通过controller及时获取到最新的broker组信息。

  • Consumer
      负载均衡:每个consumer都隶属于一个消费组,向controller订阅某个topic的消息,controller返回topic对应的所有broker组信息列表、同处一个消费组的其它消费者信息列表,当前消费者获取到这两部分信息之后会进行排序然后按照固定的算法进行负载均衡以确定每个消费者具体消费哪个队列分区。同时每个consumer都会定期的向controller上报心跳,一旦消费组有节点数量或broker组存在变更,controller都会及时的通过心跳响应返回给当前组所有存活的consumer节点,以进行新一轮的负载均衡。
      消费确认:consumer进行消费的过程中对队列分区是以独占的形式存在的,即一个队列在一个消费组中只能被一个消费者占有并消费。为了保证消费的可靠对于每次拉取的数据,都需要consumer端在消费完成之后进行一次确认,否则下次拉取还是从原来的偏移量开始。
      限时锁定:为了使某个consumer宕机其占有的队列分区能够顺利的释放并被其他consumer获取到,需要在每个消费者拉取数据与确认回调之间设置一个超时时间,一旦超过这个时间还没确认,那么队列自动解锁,解锁之后的队列最终能够被别的存活消费者占有并消费。

5.结语

  这是在实习过程中学习到的新知识,且全部是开源数据或者资料,因此分享出来,希望我和大家都有所收获。

参考资料已经在文中列出,这里不再一一列举。

转载说明

转载请注明出处,无偿提供。

支持一下
感谢大佬们的支持