Kakfa utils源代码分析

Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。

下图就是kafka.utils包的所有代码:

因为很难像其他包代码之间有逻辑关系,我们就一个一个说吧:
一、Annotations.scala
这个源代码文件中定义了3个注释类:threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的@Target(ElementType.TYPE),因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation),很容易知道它们的含义:分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为@threadsafe的。
二. CommandLineUtils.scala
这个文件使用JOpt Simple库负责解析命令行参数,具体使用用法参见官网:http://pholser.github.io/jopt-simple/
Kafka在这个文件中提供了一个object:CommandLineUtils。具体包含的方法有:
1. printUsageAndDie: 打印命令使用方法并终止程序
2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)检查是否缺少必要参数
3. checkInvalidArgs:检查指定的参数是否存在不兼容情况,即哪些参数不能同时使用
4. parseKeyValueArgs:解析key=value格式的参数对,并返回一个Properties对象
三、Crc32.scala
这个类就是CRC32校验码的实现类,来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长,里面有很多位操作,由于CRC32计算不在本次研究范围,所以就了解到这吧。
四、DelayedItem.scala
这个类是个泛型类,实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T,一个延迟时间以及延迟时间的单位。另外,实现这个接口的话必须要实现一个compareTo和getDelay方法。
1. getDelay: 计算距离触发时间还剩下多长时间
2. compareTo: 比较2个Delayed对象的延迟触发时间
五、FileLock.scala
顾名思义,FileLock就是一个文件锁,它的构造函数接收一个文件对象,并总是先尝试创建这个文件(如果不存在的话),然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法:
1. lock: 对文件加锁,如果该文件上已有锁抛出异常
2. tryLock: 尝试对文件加锁,如果成功返回true,否则返回false
3. unlock: 如果持有锁使用FileLock.release方法释放锁
4. destroy: 先释放锁然后调用FileChannel的close方法销毁该channel
六、IteratorTemplate.scala
这个文件视图定义一个迭代器模板,主要为遍历消息集合使用。迭代器模板有一个状态字段,因此在定义迭代器模板抽象类之前首先定义了一个State状态object,以及一组具体的状态object:完成(DONE),READY(准备就绪),NOT_READY(未准备)和FAILED(失败)。
之后就是定义IteratorTemplate抽象类了,它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:)

如前所述,该类有个字段表明了迭代器的状态:state,还有一个nextItem字段执行遍历中的下一个对象,当然初始化为null——说起null,想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员,Scala推荐使用Option来代替null的,可Kafka的代码中null还是随处可见,当然可能也是为了更好更自然地与Java集成。

这个抽象类提供很多方法,但似乎只有一个抽象方法:makeNext,其他全是具体方法:
1. next:如果迭代器已遍历完并无法找到下一项或下一项为空,直接抛出异常;否则将状态置为NOT_READY并返回下一项
2. peek:只是探查一下迭代器是否遍历完,如果是抛出异常,否则直接返回下一项,并不做非空判断,也不做状态设置
3. hasNext: 如果状态为FAILED直接抛出异常,如果是DONE返回false,如果是READY返回true,否则调用maybeComputeNext方法
4. makeNext: 返回下一项,这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新
5. maybeComputeNext:调用makeNext获取到下一项,如果状态是DONE返回false,否则返回true并将状态置为READY
6. allDone: 将状态置为DONE并返回null
7. resetStatus:顾名思义,就是重置状态字段为NOT_READY
七、JSON.scala
JSON的一个封装类,用于JSON到String的相互转换,该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double,不过该类创建一个简单函数用于将数字型字符串转为换Integer,并指定其为JSON.globalNumberParser。该类只有2个方法:
1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象,如果出错则抛出异常
2. encode: 讲一个对象编码成json字符串。这个对象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一种,否则会报错

 

我们继续研究kafka.utils包
八、KafkaScheduler.scala
首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器。任务调度的方式支持重复执行的后台任务或是一次性的延时任务。这个trait定义了三个抽象方法:
1. startup: 启动调度器,用于接收调度任务
2. shutdown: 关闭调度器。一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务。
3. schedule: 调度一个任务的执行。方法接收4个参数
3.1 任务名称
3.2 完全是副作用(side effect,返回Unit)的函数,用于任务调度时执行
3.3 延时时间
3.4 执行间隔,如果小于0,说明是一次性任务调度
3.5 延时时间单位,默认是毫秒
其次,该文件还定义了一个线程安全(使用@threadsafe标记)的KafkaScheduer实现了前面定义的Scheduler接口——该调度器主要是基于java concurrent包中的SchedulerThreadPoolExecutor类来实现线程池方式的任务调度。既然是线程池的方式,你在构造该类时需要提供线程数(threads),线程池中的线程名字前缀(threadNamePrefix,默认是kakfa-scheduler-)以及指定是否是后台守护进程(daemon),即这些线程不会阻塞JVM关闭。

该类定义了还定义了2个字段:一个是ScheduledThreadPoolExecutor对象持有该线程池,并标记为@volatile,保证对该对象的读取不走寄存器,直接内存读取,保证内存可见性。;另一个是AtomicInteger的schedulerThreadId,与线程池线程前缀一起组成和线程名称。AtomicInteger类型保证了对该变量的访问是线程安全的。KafkaScheduler实现了Scheduler trait,所以必须要实现startup、shutdown和schedule方法:

1. startup: 如果调度器正常关闭类字段executor应该总是null,所以在startup方法开始需要先判断executor是否为空,如果不为空抛出异常说明调度器可能已经运行。否则创建具有threads个线程的线程池,并设置线程池关闭后不再执行任何类型的调度任务(包括重复调度执行的后台任务和一次性的延迟调度任务)。之后创建一个线程工厂来初始化那些线程。这里用到了包中Utils.scala中的newThread方法来创建线程。后面谈到Utils.scala时我们再说。
2. ensureStarted: 一个纯副作用的函数,只会被用在shutdown方法中。主要目的就是确保调度器已经启动。就是单纯地判断executor是否为空,如果为空抛出异常。
3. shutdown: 在确保调度器是启动状态的前提下,调用ScheduledThreadPoolExecutor.shutdown方法并设置了1天的超时时间(注意,这里的1天是硬编码方式,不支持配置的方式),以阻塞的方式来等待shutdown请求被完整地执行。按照《Java concurrency in practice》的理论,这其实是一个阻塞方法(blocking method),严格来说应该允许用户发起中断机制,可能是开发人员觉得shutdown不会运行很长时间。当然也许是我说错了:)
4. schedule:调度器最重要的逻辑代码。在确保调度器已启动(调用ensureStarted)的前提下,调用Utils.scala的runnable方法(同样,我们后面再说)将指定的函数封装到一个Runnable对象中。然后判断调度任务的类型(如果period参数大于0,说明是需要重复调度执行的任务;反之是一次性的延时任务)调用ScheduledThreadPoolExecutor的不同方法(scheduleAtFixedRate或schedule)来执行这个runnable

九、Log4jController.scala
看名字就知道和Log4j管理相关。代码结构也很清晰:一对伴生对象(companion object)和一个私有trait。先说那个private trait:Log4jControllerMBean。既然是trait,通常都是类似于Java的接口,定义一些抽象方法:
1. getLoggers: 返回一个日志器名称的列表List[String]
2. getLogLevel: 获取日志级别
3. setLogLevel: 设置日志级别。不过与普通的setter方法不同的是,该方法返回一个boolean,原因后面在其实现类里面说。

既然定义了trait,自然也要有实现它的具体类:Log4jController——允许在运行时动态地修改log4j的日志级别。该类还提供了2个辅助私有方法:newLogger和existingLogger,加上实现Log4jControllerMBean声明的3个抽象方法,一共是5个方法:
1. newLogger: 创建一个日志器logger,可能是root logger,更可能是普通的logger。
2. existingLogger: 根据loggerName返回对应的logger
3. getLoggers: 返回当前的一组logger(也包括root logger),每个元素都是logger名称=日志级别这样的格式
4. getLogLevel: 根据给定的logger name获取对应的日志级别
5. setLogLevel: 设定日志级别。值得注意的是,如果loggerName为空或日志级别为空,返回false表明设置不成功
Log4jController object则很简单,只是初始化了一个Log4jController实例,并使用Utils.scala中的registerMBean方法将其注册到平台MBean服务器,注册名为kafka:type=kafka.Log4jController

十、Logger.scala
这个trait前面虽然没有怎么提及,但其实很多类都实现了这个trait。其名字含义就极具自描述性——就是操作日志的方法类。该trait还创建了一个logger对象——以lazy val的形式。Scala中的lazy表示延迟加载,只有第一次用到该logger时才初始化该值。因为很多类都实现了Logging trait,因此将logger作为一个lazy val是很有必要的,否则每次构造一个新的实现类实例时都要构建一个logger对象。这完全没必要,我们只有在用的时候在初始化岂不是很好吗?

另外,该trait还有一个logIdent字段,初始化为null,但因为是protected var,所以很明显是让实现该trait的子类来指定。从变量命名来看,似乎是表示日志标识符的格式。后面的代码中有大量的类都指定了不同的logIdent。
这个trait定义了大量的写日志方法,当然都是针对不同的日志级别,比如TRACE、DEBUG、INFO、WARN、ERROR和FATAL。有意思的是,每一个级别上都有一个swallow***方法——该方法会接收一个无返回值的函数(严格来说,返回值是Unit)然后运行该函数。如果碰到异常只是将异常记录下来,直接吞掉,而不是再次抛出。Utils.scala中的swallow方法帮忙实现了这个功能。鉴于Logging trait很多方法都是重复且很简单的,就不一一赘述了。

十一、Mx4jLoader.scala
从名字来看,它应该是用到了mx4j-tools开源库(官网:http://mx4j.sourceforge.net/),但Kafka源代码中并不包含对应的jar包。如果要使用需要自己下载,然后放到CLASSPATH下面。最新的版本是3.0.2,下载地址:http://sourceforge.net/projects/mx4j/files/MX4J%20Binary/3.0.2/

该文件提供了一个object,主要启用JMX——使用-Dmx4jenable=true启用该特性。默认的ip地址和端口分别为0.0.0.0和8082。使用-Dmx4jport=8083和-Dmx4jaddress=127.0.0.1的方式来覆盖默认设置。在后面的KafkaServer中调用了Mx4jLoader.maybeLoad来加载JMX设置:
maybeLoad: 从名字来看——maybe load——也有可能不加载,要么是因为mx4j-tools jar包不在classpath下,要么是没有在配置文件中进行设置(默认也不是不开启的)。具体流程为:首先加载系统设置(Kafka实现了一个VerifiableProperties封装了java的Properties对象),然后查看是否存在kafka_mx4jenable属性。如果不存在直接返回false——表示不需要加载jmx。如果存在的话获取mx4jaddress和mx4jport属性。通过反射机制实例化HttpAdaptor对象实例以及XSLTProcessor对象实例(这两个类都是mx4j-tools提供的),然后对它们进行注册。如果中间过程捕获了ClassNotFoundException异常,直接返回false表明mx4j-tools jar包不在classpath;如果是MBean注册相关的异常,也返回false并抛出该异常。

十二、Os.scala
很短小精悍的一个object,只提供了name字符串和isWindows两个变量分别获取操作系统名称以及判断是否为Windows平台。

十三、Pool.scala
名字虽然是Pool(池),但字段pool的数据结构其实就是一个ConcurrentHashMap,更像是对ConcurrentHashMap数据结构做了一层封装,所以其提供的很多方法实现起来也都是直接调用ConcurrentHashMap的同名方法。而且也是泛型的——[K, V]。
值得注意的是, 这个类的构造函数接收一个Option[(K) => V]的参数类型,实际上就是Option[function],这个函数接收一个K类型的参数返回V类型的值,默认的类构造函数参数是None。它同时还提供了一个辅助构造函数,将一个Map中的[K,V]对赋值到这个类底层的HashMap上。
由于大多提供的方法都是调用标准的ConcurrentHashMap方法,我就不一一赘述了,但要特别地说一下getAndMaybePut方法:

getAndMaybePut: 名字就很有自描述性——根据给定key获取value,如果不存在就增加这个key的记录——即从valueFactory中生成一个值增加到pool中,并返回该值。但是如果是增加的情况,value怎么取值呢?我们来看看代码。

从图中可以看到,代码先判断了valueFactory是否空,如果为空直接抛出异常。但其实我们可以先判断是否存在值,如果已经存在直接返回,即使valueFactory为空也没关系,因为我们此时不需要从valueFactory中生成一个值。因此我觉得可以讲代码改写为:
总之就是将valueFactory的非空判断推迟到需要使用它的时刻。还有一个需要注意的是,虽然这个方法使用了同步机制,但因为该类中还提供了其他的方法(比如put)可以对ConcurrentHashMap增加记录,因此getAndMaybePut返回的时候你可能会发现返回值与valueFactory计算的值不一样——这是因为另一个线程成功地插入了[key,value]对,当然这一切都是拜ConcurrentHashMap是基于CAS所赐。

十四、ReplicationUtils.scala
Kafka的消息要在集群间做持久化必须提供某种程度的冗余机制——即副本机制。类似于Hadoop,Kafka也有对应的副本因子(replication factor)。具体实现我们在谈及replication时候再说。这个文件提供的object只是副本机制使用的一个常用套件类。我们一个一个方法说:

1. parseLeaderAndIsr: ISR表示in-sync replicas,表示当前依然活跃(alive)且持有的状态与leader副本相差无多的一组副本。很自然地,我们需要定义与leader相差多少是我们能够承受的,可以通过两个参数配置:replica.lag.time.max.ms和replica.lag.max.messages。这个方法接收一个json格式的字符串,包含了leader、leader_epoch、一组isr列表和controller_epoch信息,解析之后返回一个LeaderIsrAndControllerEpoch对象。后者位于kafka.collections包中,就是一个简单的case类——主要目的是打印Leader和Isr的一些基本信息:包括id,时间epoch等——这些信息都要保存在ZooKeeper中。
2. checkLeaderAndIsrZkData: 顾名思义,检查给定zookeeper path上的leader和isr列表数据。使用ZkUtils.readDataMaybeNull读取对应路径上的数据(当然有可能是null),如果调用第一个方法parseLeaderAndIsr尝试做解析,如果成功元组(true, zookeeper版本),有任何异常出现则返回(false, -1)表明检查失败
3. updateLeaderAndIsr: 使用Zookeeper client对象更新保存于zk上的leader和isr信息。因为Kafka提供的副本机制是针对topic的分区而言的,所以该方法还接收一个partitionId。最后返回一个boolean值表明更新结果是否成功。代码逻辑也很清晰:先获取要更新的zookeeper路径,然后调用ZkUtils上的leaderAndIsrZkData方法组装新的json串,最后使用conditionUpdatePersistentPath方法执行更新操作。从名字来看这个更新是有条件的,也就是说有可能更新失败(比如path不存在,或当前版本不匹配等)。这两个方法等我们研究ZkUtils.scala时候再说。总之最后返回一个boolean表明更新是否成功。
总的来说,前两个方法主要服务于updateLeaderAndIsr方法,在kafka.cluster.Partition中也调用了updateLeaderAndIsr方法。

此部分 来源:https://www.cnblogs.com/huxi2b/p/4380155.html

Kakfa utils源代码分析(三)

Kafka utils包最后一篇~~~
十五、ShutdownableThread.scala
可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表明是否允许中断。既然是可关闭的,因此一定不是守护线程,而是一个用户线程(不会阻塞JVM关闭)。提供的方法有:

1. doWork: 抽象方法。子类必须实现这个方法,从名字来说应该是指定线程要完成的操作。
2. initiateShutdown: 发起关闭请求。首先通过CAS的方式判断是否线程在运行中;如果是的话将表明运行状态的isRunning变量设置为false,同时根据该线程的可中断性(通过isInterruptible)调用Thread.interupt方法中断该线程并返回true。
3. awaitShutdown: 用于发起关闭请求(initiateShutdown)之后调用该方法等待关闭操作完全结束。实现方式是调用CountDownLatch的await方法等待shutdownLacth对象变为0——shutdownLatch是该类维护的一个关闭阀门。在运行线程结束之后程序会将该对象减为0。
4. run方法: 复写了Thread的run方法。该类维护的isRunning类似于一个状态标志,控制着线程何时退出执行操作的循环。每次循环前检测isRunning值,如果一旦发现是false,则退出执行并将shutdownLatch阀门至于关闭状态;如果是true,则调用doWork来执行真正的操作。

十六、Throttle.scala
主要目的是限制某些操作的执行速度,其实主要用于清理日志时限制IO速度。这个类会接收一个给定的期望速率(单位是**/每秒,这里的**其实不重要,可以是字节或个数,主要是限制速率)。构造函数参数如下:

1. desiredRatePerSec: 期望速率
2. checkIntervalMs: 检查间隔,单位ms
3. throttleDown: 是否需要往下调节速度,即降低速率
4. metricName: 待调节项名称
5. units: 待调节项单位,默认是字节
6. time: 时间字段
该类还实现了KafkaMetricsGroup trait,你可以认为后者就是构造度量元对象用的(例如通过newMeter)。Throttle类只有一个方法: maybeThrottle。该方法代码写了一大堆,一句一句分析太枯燥,我直接举个例子说吧: 假设我们要限制IO速率,单位是字节/秒,每100毫秒查一次。我们想要限制速率为10字节/毫秒。现在我们在500ms内检测到一共发送了6000字节,那么实际速率是6000/500 = 12字节/毫秒,比期望速率要高,因此我们要限制IO速率,此时怎么办呢?很简单,如果是按照期望速率,应该花费6000/10 = 600ms,比实际多花了100ms,因此程序sleep 100ms把那段多花的时间浪费掉就起到了限制速率的效果。简单来说程序就是这么实现的: )

十七、Time.scala
封装了一些与时间相关的常量,另外提供了很多抽象方法供子类实现,比如milliseconds、nanoseconds和sleep。最后真的提供了一个子类实现了Time trait并实现了前面的三个方法。

十八、ToolsUtils.scala
使用jopt-simple工具库验证命令行参数提供的host/port格式是否正确,validatePortOrDie方法对接收到的host: port字符串进行split,过滤掉那些不符合host: port格式的项得到一个Array[String],如果这个数组的个数与split出来的Array长度不匹配,说明有不合格项,直接打印调用方法并结束程序。
okay! 我们快接近胜利了!不过一想到kafka.uitls包中还有3个超长的源代码文件就脑袋疼。我们还是一个一个啃吧。

十九、Utils.scala
按照这个文件中注释提示的那样,这个文件里面的帮助函数都是最最常用的函数,并不限定为Kafka的逻辑服务。因此如果要在这个文件中添加一个函数,一定要确保这个函数有很完善的文档另外还要保证一定不能只在某个特定的领域使用,应该适用于最常见的场景中。
okay,反正是一个个独立的function,还是老规矩,一个一个说:

1. runnable: 接收一个无返回值的函数封装到一个新建的Java Runnable对象中返回。
2. daemonThread: 创建一个守护线程用于后台执行。注意,只是创建,不开启该线程。一共提供了3个方法,目的都是一样
3. newThread: 创建新的线程,参数决定线程名称、执行操作以及是否为守护线程
4. readBytes: 读取一段给定的ByteBuffer以及给定位移到一个字节数组
5. loadProps: 加载一个给定路径下的.properties文件到一个Properties对象中
6. openChannel: 根据可变性打开一个文件通道(file channel),如果可以修改,以RandomAccessFile方法打开,如果不可修改,以FileInputStream方法打开
7. swallow: 执行给定的操作(以函数参数方式传递), 记录下任何异常但绝不抛出异常,而是吞掉异常
8. equal: 比较两段ByteBuffer——先比较position,然后是remaining,最后是内部的每个字节
9. readString: 将ByteBuffer中的内容读出来生成一个字符串
10. croak: 打印错误消息并关闭JVM
11. rm: 递归地删除文件或子目录
12. registerMBean: 将给定的mbean注册到平台mbean服务器。如果已经注册过会先卸载该mbean再重新注册。该方法不会抛出任何异常,只是简单滴返回false表示注册失败
13. unregisterMBean: 取消mbean的注册。如果没有注册过,就什么都不做
14. readUnsignedInt: 一共有2个方法,一个是从ByteBuffer当前位置读取4个字节的integer,并更新位移;另一个是从给定位置读取4个字节的integer,但并不修改position信息
15. writeUnsignedInt: 也是提供了2个方法: 一个是将一个long型作为一个4个字节的integer写入ByteBuffer,不考虑溢出的问题;第二个方法就是在给定的位置做第一个方法的事情
16. crc32: 计算字节数组的CRC32校验码
17. hashcode: 计算给定参数项的hashCode
18. groupby: 按照给定的函数计算出来的key进行分组,将相同key的值都放入一个List[V]中,整体作为一个HashMap返回。不过貌似这个函数没有被用到: (
19. read: 读取给定ReadableByteChannel的buffer到一个ByteBuffer,如果读取失败抛出EOFException异常,否则返回读取的字节数
20. notNull: 带泛型类型的方法,如果给定值是null抛出异常,否则直接返回该值
21. stackTrace: 获取给定异常类的栈追踪信息(stack trace),作为字符串返回
22. parseCsvMap: 接收一个逗号分隔的key/value对,类似于key1: value1,key2: value2,... 使用正则表达式解析成一个HashMap并返回。该方法还有个形式,是返回成一个字符串序列,即字符串数组
23. createObject: 通过反射机制为给定类名表示的类创建一个实例
24. nullOrEmpty: 判断给定字符串是否为空。但实现有个问题,如果是多个空格,该方法就会返回false
25. loopIterator: 其实就是个无限遍历的迭代器,为传入的集合生成一个无限流Stream,并返回它的迭代器
26. readFileAsString: 读取文件内容并作为一个字符串返回
27. abs: 计算某个整数的绝对值。如果是最小整数返回0——与Java的实现不同
28. replaceSuffix: 替换字符串的后缀,如果无法找到要替换的后缀直接抛出异常
29. createFile: 根据给定路径创建文件
30. asString: 将一个Properties对象转换成字符串返回
31. readProps: 从给定的字符串中读取属性并返回一个Properties
32. readInt: 从字节数组中的指定位移处读取一个integer
33. inLock: 在加锁的情况下执行一段函数
34. inReadLock/inWriterLock: 使用给定的锁分别获取一个读锁和写锁,然后执行函数体
35. JSONEscapeString: 将字符串中的某些字符进行转义,比如\b转成\\b
36. duplicates: 返回列表中重复项

二十、VerifiableProperties.scala
这个类就是封装了Properties对象,同时维护了一个HashSet表示属性名称集合。具体的方法有:
1. containsKey: 判断是否包含某个key
2. getProperty: 将属性名加入到属性名称集合,然后从props中获取某个属性值之后返回
3. getString: 先检测是否包含这个属性,如果包含则调用getProperty返回属性值
4. getInt: 获取一个integer类型的属性值
5. getIntInRange: 返回一个integer类型的属性值,但该值必须在给定的范围内
6. getShort: 返回一个short类型的属性值
7. getShortInRange: 返回一个short类型的属性值,但该值必须在 给定的范围内
8. getLong: 返回一个long类型的属性值
9. getLongInRange: 返回一个long类型的属性值,但该值必须在给定的范围内
10. getDouble: 返回一个double类型的属性值
11. getBoolean: 返回一个boolean类型的属性值
12. getMap: 从一个属性列表中解析出一个Map[String, String]并返回
13. getCompressionCodec: 从属性列表中读取处codec信息。该方法同时支持解析codec的序号和名称,并返回对应的codec
14. verify: 主要就是验证Properties对象中每个属性是否都在属性名称集合中,即使不在也只是打印一个log而已

二十一、ZkUtils.scala
终于到包里面的最后一个文件了。这个文件代码很长,定义了三个class和两个object。先从简单的开始说吧:
1. ZKConfig class: 这个类定义了zookeeper配置信息,其构造函数接收一个VerifiableProperties对象。Kafka维护的zookeeper配置信息包括:一个CSV格式的zookeeper连接串、最大会话超时时间、连接最大超时时间以及一个zookeeper follower被允许落后于leader的最大时间间隔,默认是2秒
2. ZKGroupDirs class: 名字中的group是指kafka的消费组的,因为Kafka中的consumer都属于一个consumer group。在这个类中定义了3个方法分别返回消费者的根路径,消费者组的路径以及消费者组的id的路径
3. ZKGroupTopicDirs class: 该类继承了ZKGroupDirs类,并提供了2个新的方法用于计算消费者组的位移和拥有者信息。如下图所示:
ZKUtils.scala中还定义了一个ZKStringSerializer object用于序列化/反序列化zookeeper中保存的字节数组。提供的两个方法也很直观:serialize方法将字符串转化成字节数组;deserialize方法将字节数组转换成字符串。
好了, 就差最后一个object没说了。ZkUtils object首先定义了很多常量,都是Kafka不同组件使用的Zookeeper上路径,比如Consumer的根路径是/consumers等。值得注意的是,目前kafka还不支持我们定制这些路径的值。这个object定义了很多方法,虽然很直观,但还是值得我们简单地浏览一遍:
1. getTopicPath: 获取topic的zk路径
2. getTopicPartitionsPath: 获取topic分区的zk路径
3. getTopicConfigPath: 获取topic config的zk路径。这个路径是在0.8.1引入的,主要是在创建或修改topic分区赋值时会更新该路径下的zk信息
4. getDeleteTopicPath: 获取已删除topic的zk路径
5. readDataMaybeNull: 返回给定/controller路径下的(controller名,状态)元组,如果无法找到那个zk节点直接返回(None,空Stat)
6. getController: 调用readDataMaybeNull方法获取controller名称,如果为空直接抛出异常,否则返回一个integer类型的controller id(通过调用KafkaController.parseControllerId方法)
7. getTopicPartitionPath: 返回/brokers/topics/<my-topic>/partitions/<my-partition-id>这样形式的zk路径
8. getTopicPartitionLeaderAndIsrPath: 返回/brokers/topics/<my-topic>/partitions/<my-partition-id>/state这样形式的zk节点
9. getChildren: 返回给定path路径下的所有子路径名
10. getSortedBrokerList: 为/borkers/ids下所有broker按照broker id排序之后做成一个列表返回
11. getChildrenParentMayNotExist: 获取给定path的所有子目录名,如果path代表的节点不存在返回Nil
12. getBrokerInfo: 读取/brokers/ids/<给定broker id>节点的数据,如果存在则调用Broker.createBroker创建一个broker实例返回,否则返回None
13. getAllBrokerInCluster: 获取/brokers/ids下的broker列表并排序,遍历该列表顺序地为每个非None的broker创建一个Broker对象,最后封装到一个Seq中返回
14. getLeaderAndIsrForPartition: 为指定的topic及其partition查询出leader node和所有isr以及epoch信息,然后提取出leader和isr信息封装为一个LeaderAndIsr对象返回——LeaderAndIsr是个case class封装了leader信息及其isr列表信息
15. makeSurePersistentPathExists: 确保给定的路径在zookeeper中存在,如果不存在创建该path
16. setupCommandPaths: 调用makeSurePersistentPathExists确保类开头定义的所有路径都已经存在
17. getLeaderForPartition: 获取某个topic的某个partition的leader id。例如在下图中,该方法应该返回0
18. getEpochForPartition: 读取某个topic某个分区下的epoch信息。如果还是上图的话应该返回leader_epoch的值,即20
19. getInSyncReplicasForPartition: 读取某个topic某个分区下的所有isr列表,如果还是上图的话,应该返回isr的值,也是0——笔者的Kafka是个单节点的集群,因此并不能看出太大的区别
20. getReplicasForPartition: 读取某个topic某个分区下的一组副本,如果是下图的话topic名称是log-topic,分区号是0的话,应该返回[0]——如果是生产环境应该是一组已分配的副本
21. registerBrokerInZk: 为给定的broker id在zookeeper上创建一个临时节点
22. getConsumerPartitionOwnerPath: 返回诸如/consumers/console-consumer-33497/owners/log-topic/partitionId这样形式的zk路径
23. leaderAndIsrZkData: 将给定的LeaderAndIsr对象和epoch编码为json格式的字符串
24. replicaAssignmentZkData: 将一组replica编码成json格式的字符串: "version" : 1, "partitions":{...}
25. createParentPath: 创建父路径代表的持久化节点
26. createEphemeralPath: 根据给定数据和路径创建临时节点
27. createEphemeralPathExpectConflict: 顾名思义,创建一个临时节点,如果已经存在抛出异常
28. createEphemeralPathExpectConflictHandleZKBug: 更加严谨的一个方法用于创建zk临时节点——主要是处理了Zookeeper会话超时的bug
29. createPersistentPath: 级联创建一个持久化节点
30. createSequentialPersistentPath: 创建一个顺序持久节点(persistent sequential)
31. updatePersistentPath: 更新一个持久节点的值。必要时候创建其父路径,但不会抛出节点已存在的异常
32. conditionalUpdatePersistentPath: 如果给定的path和version都没有问题的话更新节点数据,返回(true, 新版本),否则返回(false, -1)。另外如果捕获了失去连接的异常(比如:ConnectionLossException),zkClient会自动地重试,但有可能上一次更新操作已经成功,因此更新过的版本号与期望版本号不再匹配从而导致该方法失败,因此该方法需要处理这种情况的发生,具体做法就是再提供一个optional函数做进一步的检查
33. conditionalUpdatePersistentPathIfExists: 也是先判断条件然后再决定是否进行更新操作。成功的话返回(true,新版本),否则返回(false, -1)。另外如果path不存在直接抛出异常结束
34. updateEphemeralPath: 更新一个持久化节点的值,必要的时候创建其父路径,但不会抛出NodeExistsException
35. deletePath: 删除给定的path节点,返回true表示成功;如果节点不存在,直接返回false
36. deletePathRecursive: 已递归方式删除给定的path
37. maybeDeletePath: 根据给定的zookeeper客户端连接串创建一个zk client连接,设置会话超时和连接超时都是30秒,然后递归地删除给定的path,之后关闭zkClient对象。如果出现任何异常都不做任何处理,直接吞掉
38. readData: 读取path节点的数据,返回(数据,状态信息)元组
39. pathExists: 检查给定的path是否存在
40. getCluster: 遍历/brokers/ids下面的所有broker,创建Broker对象之后加入到一个Cluster对象实例(Cluster保存的是当前所有可用的broker),然后返回cluster
41. getPartitionLeaderAndIsrForTopics: 给定一组TopicAndPartition对象(TopicAndPartition就是一个topic加上一个分区信息),为每一个查询出对应与这个topic这个分区的LeaderIsrAndControllerEpoch并封装到一个HashMap中返回——简单来说就是就是获取某个topic某个分区的leader信息、isr信息、leaderEpoch信息以及controllerEpoch信息
42. getReplicaAssignmentForTopics: 获取一组topic的所有分区对应的副本broker号
43. getPartitionAssignmentForTopics: 返回HashMap[topic名称, Map[该topic下每一个分区号,一组副本]]这样形式的map
44. getPartitionsForTopics: 返回Map[topic名称,该topic下一组partition]这样形式的map
45. getPartitionsBeingReassigned: 读取所有topic及其分区对应的所有新的副本列表
46. parsePartitionReassignmentDataWithoutDedup: 根据给定的json字符串创建一个元组序列,每个元组都是(TopicAndPartition, Seq[Int])格式的,指定了某个topic的某个分区对应的replica号。其实就是给定一个json字符串,里面指定了哪些replica号是属于哪个topic的哪个partition的,然后提取出来做了一个Seq保存。该方法还有一个变体就是返回一个Map,总之就是保存的数据结构不同罢了
47. parseTopicsData: 从给定的json字符串中提取出topic信息封装到一个List中返回
48. getPartitionReassignmentZkData: 生成{"version" : 1, "partitions" : {"topic" -> ***, "partition" -> ***, "replicas" : {**, **}}}这样形式的json字符串
49. updatePartitionReassignmentData: 调用getPartitionReassignmentZkData方法生成json字符串,然后调用updatePersistentPath方法更新zookeeper
50. getPartitionsUndergoingPreferredReplicaElection: 读取/admin/preferred_replica_election下的所有信息,返回一组TopicAndPartition
51. deletePartition: 删除/brokers/ids/brokerId的节点,并删除/borkers/topics/brokerId的节点
52. getConsumersInGroup: 获取/consumers/group/ids下所有节点名称
53. getConsumersPerTopic: 获取每个topic的所有consumser
54. getBrokerInfo: 根据broker id获取Broker信息
55. getAllTopics: 获取所有的topic封装到一个Seq中
56. getAllPartitions: 获取所有topic的所有partition,扁平化到一个Set中,每个都是TopicAndPartition对象
标签: Kafka

来源: https://www.cnblogs.com/huxi2b/p/4381640.html