rocketmq的NameServer模块

  categories:资料  author:

[mq@dev1 bin]$ sh mqnamesrv -help
usage: mqnamesrv [-c <arg>] [-h] [-n <arg>] [-p]
-c,–configFile <arg> Name server config properties file
-h,–help Print help
-n,–namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-p,–printConfigItem Print all config item

 

rmq的 namesrv服务的详细配置信息如下(采用下面命令, 可以打印全部配置信息)

sh mqnamesrv -p

rocketmqHome=/home/mq/tmp5/xb-rmq-server-4.5.1
kvConfigPath=/home/mq/namesrv/kvConfig.json
configStorePath=/home/mq/namesrv/namesrv.properties
productEnvName=center
clusterTest=false
orderMessageEnable=false
listenPort=9876
serverWorkerThreads=8
serverCallbackExecutorThreads=0
serverSelectorThreads=3
serverOnewaySemaphoreValue=256
serverAsyncSemaphoreValue=64
serverChannelMaxIdleTimeSeconds=120
serverSocketSndBufSize=65535
serverSocketRcvBufSize=65535
serverPooledByteBufAllocatorEnable=true
useEpollNativeSelector=false

部分信息的介绍如下(信息搜集于网络)

NamesrvConfig是NameServer的配置类

这个类放在了common模块中,但是几乎都是被NameServer模块引用,这里就当在NameServer模块中讲

主要讲解字段,方法都是get和set
字段
rocketmqHome

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

RocketMQ home 目录
如果没有指定的话,默认值为系统环境变量ROCKETMQ_HOME
通过System.getenv获取,可以在~/.profile中export
或者可以在配置文件中指定rocketmqHome=***

kvConfigPath

private String kvConfigPath = System.getProperty(“user.home”) + File.separator + “namesrv” + File.separator + “kvConfig.json”;

KvConfig的配置文件路径
默认为System.getProperty(“user.home”)/namesrv/kvConfig.json

configStorePath

private String configStorePath = System.getProperty(“user.home”) + File.separator + “namesrv” + File.separator + “namesrv.properties”;

NamesrvConfig(以及NettyServerConfig)的配置文件路径
默认为System.getProperty(“user.home”)/namesrv/namesrv.properties

productEnvName

private String productEnvName = “center”;

clusterTest为true时work
用于构造ClusterTestRequestProcessor

clusterTest

private boolean clusterTest = false;

代表不同的模式
决定NamesrvController#registerProcessor()注册处理器时
用的是ClusterTestRequestProcessor 还是 DefaultRequestProcessor

orderMessageEnable

是否开启顺序消息功能

private boolean orderMessageEnable = false;

问题
productEnvName有什么用
clusterTest,orderMessageEnable是如何实现对应功能
refer

http://bboyjing.github.io/2017/04/21/RocketMQ%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B%E3%80%90rocketmq-namesrv%E3%80%91/

http://hiant.github.io/2016/08/26/rocketmq-0x5/ clusterTest作用
1.1. NamesrvStartup 处理流程

设置版本号
com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer

设置Socket缓冲区
如果没有设置 SEND 缓冲区:NettySystemConfig.SocketSndbufSize = 2048
如果没有设置 RECV 缓冲区:NettySystemConfig.SocketRcvbufSize = 2048

FastJson版本冲突检测
com.alibaba.rocketmq.common.conflict.PackageConflictDetect#detectFastjson

命令行参数解析
使用 com.alibaba.rocketmq.srvutil.ServerUtil#parseCmdLine 解析命令行,如果解析失败,则程序退出

配置文件加载
命令行中包含 -c 选项,则加载指定的 properties 配置文件,并通过 com.alibaba.rocketmq.common.MixAll#properties2Object 设置 NamesrvConfig、NettyServerConfig。其中,如果配置中没有设置 ListenPort,默认为 9876

输出配置参数列表
命令行中包含 -p 选项,则通过 com.alibaba.rocketmq.common.MixAll#printObjectProperties(org.slf4j.Logger, java.lang.Object) 输出 NamesrvConfig、NettyServerConfig 的声明字段,输出完成后程序退出

配置 NamesrvConfig
通过 com.alibaba.rocketmq.common.MixAll#properties2Object 将命令行参数加到 NamesrvConfig

配置 logback
logback 配置文件固定:namesrvConfig.getRocketmqHome() + “/conf/logback_namesrv.xml”。
注意:此前会先判断 namesrvConfig.getRocketmqHome() 未设置时,会导致程序退出。不过这种情况仅发生在直接运行
com.alibaba.rocketmq.namesrv.NamesrvStartup#main,如果是脚本启动,会自动将该值设置为 mqnamesrv 脚本的上上层目录

实例化并初始化 NamesrvController
使用 NamesrvConfig、NettyServerConfig 实例化 NamesrvController 然后调用 com.alibaba.rocketmq.namesrv.NamesrvController#initialize 初始化,初始化失败时程序退出
否则增加 ShutdownHook ,在程序退出时记录日志,并调用 com.alibaba.rocketmq.namesrv.NamesrvController#shutdown 释放资源

启动 NamesrvController
调用 com.alibaba.rocketmq.namesrv.NamesrvController#start 启动线程工作

1.2. NamesrvController 初始化流程

加载 KvConfig
如果 NamesrvConfig 中 kvConfigPath 路径指定的文件内容不为空,则将文件存储的 json 内容加载进Map:HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>>

实例化 NettyRemotingServer
rocketmq-remoting 模块中的 com.alibaba.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(com.alibaba.rocketmq.remoting.netty.NettyServerConfig, com.alibaba.rocketmq.remoting.ChannelEventListener)

构造 RemotingExecutorThread 线程池
使用 java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory) 构造固定数目线程池,数目由 NettyServerConfig.getServerWorkerThreads 决定

注册
NamesrvConfig 中 clusterTest 为 true 时,调用 com.alibaba.rocketmq.remoting.RemotingServer#registerDefaultProcessor 将一个 com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor 和 RemotingExecutorThread 线程池绑定起来;否则将一个 com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor 和 RemotingExecutorThread 线程池绑定起来。绑定到 NettyRemotingServer 的 defaultRequestProcessor 上

两者区别: ClusterTestRequestProcessor 继承自 DefaultRequestProcessor 并重写了 getRouteInfoByTopic 方法,RouteInfoManager 中未能获取到 TopicRouteData 时,去集群上查询一次

启动定时任务
5s 后启动,每隔 10s 执行一次:清理失效的 Broker 信息
1m 后启动,每隔 10m 执行一次:在日志中 INFO 级别输出 KvConfig 的内容

1.3. NamesrvController 启动流程

实际上是启动 NamesrvController 的 NettyRemotingServer

构造 DefaultEventExecutorGroup
线程数目由 NettyServerConfig.getServerWorkerThreads 决定,线程名:”NettyServerCodecThread_” + this.threadIndex.incrementAndGet()

构造 BOSS 线程
线程数固定为1,线程名:NettyBoss_1

构造 WORKER 线程
如果是 Linux,且 NettyServerConfig 中 useEpollNativeSelector 为 true,返回一个 EpollEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format(“NettyServerEPOLLSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet());否则返回一个 NioEventLoopGroup,线程数目由 NettyServerConfig.getServerSelectorThreads 决定,线程名:String.format(“NettyServerNIOSelector_%d_%d”, threadTotal, this.threadIndex.incrementAndGet())

配置 ServerBootstrap
ChannelOption.SO_BACKLOG:1024
ChannelOption.SO_REUSEADDR:true
ChannelOption.SO_KEEPALIVE:false
ChannelOption.SO_SNDBUF:nettyServerConfig.getServerSocketSndBufSize()
ChannelOption.SO_RCVBUF:nettyServerConfig.getServerSocketRcvBufSize()
ChannelOption.TCP_NODELAY:true 设置的是childOption
localAddress:new InetSocketAddress(this.nettyServerConfig.getListenPort())
childHandler:
NettyEncoder:编码
NettyDecoder:解码
IdleStateHandler:空闲处理,空闲时间由 nettyServerConfig.getServerChannelMaxIdleTimeSeconds() 决定
NettyConnetManageHandler:连接管理,记录连接日志,并处理连接事件(空闲事件)
NettyServerHandler:业务逻辑处理。根据 RemotingCommand 的 type 调用 com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand 或 com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
NettyServerConfig 未将 serverPooledByteBufAllocatorEnable 设置为 false,则 ChannelOption.ALLOCATOR 设置为 PooledByteBufAllocator.DEFAULT,即默认开启 Netty 的池化 ByteBuf

启动 ServerBootstrap
调用 io.netty.bootstrap.AbstractBootstrap#bind()

启动定时任务
3s 后启动,每隔 1s 执行一次:处理超时请求

 

 



快乐成长 每天进步一点点      京ICP备18032580号-1