月度归档:2014年12月

Play Framework Web开发简介

在研究kafka过程中需要用到kafka的管理平台, 但这个管理平台用sbt进行编译,并且使用了 Play Framework框架进行开发,因此查阅了资料, 发现了下面很好的相关教程

从文章中获取 部分信息如下, 全部信息请到 原文的论坛处获取:http://www.scala-china.net/discuz/forum.php?mod=viewthread&tid=2376&extra=page%3D2

在博客前面已经介绍过两个用于开发Web应用的框架,采用PHP开发的Yii 开发教程 和基于纯Java的Vaadin Web开发教程 ,此外还有基于Ruby的Ruby on Rail 和基于.Net 的 ASP.NET MVC4 ,其中Ruby on Rail 和 ASP.NET MVC 教程已经非常多,也不准备再重复这些教程。

本 篇开始介绍Play Framework ,它也是一个用来开发Web应用的开发框架,可以使用Java和Scala,但本教程将使用Scala作为开发语言,Scala语言比较新,但它已经成为 我最喜欢的语言之一,如果你喜欢使用Java,PHP,Perl,C#或者JavaScript那么你一定也会喜欢上Scala, 它集中了这些语言的优点,同时除了支持面对象的编程外,它还是一种函数式编程语言(Functional programming),支持扩展,你可以根据需要扩展Scala语言,这是一般的开发语言所不支持的。

Play Framework 基于Akka (并行应用开发包,这个开发包的一个显著特定是基于Actor模型,并行运算是这个某些的基本特点,可以避免由于一般多线程开发不正确使用同步或互锁造成 的程序死锁等问题)。采用MVC模型(非常类似于ASP.NET MVC4),缺省支持REST和JSON,支持编译CoffeeScript和LESS代码。 支持数据库编程,并且可以和Slick(类似于LINQ的数据库接口)。开发可以使用Eclipse ,Intellij IDEA或是Sbt(Typesafe 自带的编译工具)。

本教程采用Eclipse 开发环境配合sbt 工具开发。

下面简单介绍一下Play Framework开发环境的搭建,(Windows开发环境)。

安装好之后,注意把这些命令行所在的目录添加的系统环境变量PATH中,然后到到命令行中执行

play new myFirstPlayApp

在提示选择语言(Java或Scala)选择 Scala

然后进入 新创建myFirstPlayApp ,运行 play run ,如果编译成功,将显示下面类似的消息,表示新创建的Web应用运行在端口 9000

打开浏览器 输入地址http://localhost:9000,如果显示如下页面,将表示开发环境已经就绪,后面文章将介绍这个例子。

 

相信Scala 语言会逐渐普及开来,Play Framework 也会得到广泛的应用,目前使用Play Framework的一些著名网站有 Linked In

 

注:本博客的一个新的别名 http://www.scala-playframework.com ,之后可能会作为专门介绍Scala和Play Framework的中文技术论坛。

 

Play应用标准的目录结构

app                      → 应用的源码
└ assets                → 可以编译的Asset源码
└ stylesheets        → 通常为Less代码
└ javascripts        → 通常为CoffeeScipts代码
└ controllers           → 应用的controllers类定义
└ models                → 应用的Model类定义
└ views                 → UI模板
conf                     → 配置文件
└ application.conf      → 主应用配置文件
└ routes                → 路由定义
public                   → 公开的Asset定义
└ stylesheets           → CSS文件
└ javascripts           → Javascript文件
└ images                → Image文件
project                  → sbt配置文件(用来编译项目)
└ build.properties      → sbt 项目属性定义
└ Build.scala           → 应用编译脚本定义
└ plugins.sbt           → sbt插件定义
lib                      → 应用使用的库文件(不由sbt管理的其它库)
logs                     → 日志目录
└ application.log       → 缺省的应用日志文件
target                   → 编译生成的文件目录
└ scala-2.10.0
└ cache
└ classes            → 编译过的class文件
└ classes_managed    → Managed class 文件 (模板等, ...)
└ resource_managed   → Managed 资源(less, ...)
└ src_managed        → 生成的源码(模板, ...)
test                     → 单元测试或功能测试代码目录

app目录

app目录包含了所有可运行代码或资源:Scala代码,UI模板和Less代码,CoffeeScript代码, app通常包含下面三个子目录:对于与MVC的三个部分:

  • app/controllers
  • app/models
  • app/views

当然你可以添加你自己的目录,比如app/utils。此外还有一个可选的标准目录名assets
app/assets ,用来存放可供编译的LESS或CoffeeScript代码

public目录

public目录存放了可以供网页直接访问的资源,比如CSS,图像,JavaScript文件。

conf目录

该目录存放应用的配置文件,主要有下面两种配置:

  • application.conf 应用主配置文件,定义了应用的一些配置参数
  • routes ,路由配置文件

lib目录

主要用来添加一些第三方库文件,这些库主要指一些没有通过sbt或编译脚本定义的库文件。直接把jar文件添加到这个目录下,编译时会直接添加到应用的classpath.

project目录

定义sbt编译项目的定义,包含下面三个文件:

  • plugins.sbt 定义项目使用的插件定义
  • Build.scala 定义项目编译脚本.
  • build.properties 指定编译sbt的版本等.

target目录

包含了编译后生成的一下文件。

如果你使用GIT做为版本管理工具,下面为典型的 .gitingore 文件:
logs
project/project
project/target
target
tmp
dist
.cache
即使你使用IntelliJ IDEA或是Eclipse IDE来开发Play应用,了解Play Console的使用还是非常有用的,Play Console 为一基于sbt的命令行工具,可以用来管理Play应用的开发生命周期。

启动Play Console

使用Play Console前,注意把Play所在目录添加到系统目录中,这样,启动Play Console 非常简单,只需在Play应用所在目录在命令行输入 “play”

play

20130914001

获得Play 命令帮助

你可以输入help play 获得关于Play命令使用的基本方法和命令:
常见的命令如下:
1    play                         Enter the play console
2    help                         Displays this help message or prints detailed help on requested commands (run 'help <command').
3    about                        Displays basic information about sbt and the build.
4    tasks                        Lists the tasks defined for the current project.
5    settings                     Lists the settings defined for the current project.
6    reload                       (Re)loads the project in the current directory
7    projects                     Lists the names of available projects or temporarily adds/removes extra builds to the sessio.
8    project                      Displays the current project or changes to the provided `project`.
9    set                          Evaluates a Setting and applies it to the current project.
10    session                      Manipulates session settings.  For details, run 'help session'.
11    inspect                      Prints the value for 'key', the defining scope, delegates, related definitions, and dependenies.
12    ; <command> (; <command>)*   Runs the provided semicolon-separated commands.
13    ~ <command>                  Executes the specified command whenever source files change.
14    last                         Displays output from a previous command or the output from a specific task.
15    last-grep                    Shows lines from the last output for 'key' that match 'pattern'.
16    exit                         Terminates the build.
17    show <key>                   Displays the result of evaluating the setting or task associated with 'key'.

在开发模式下启动Play应用

使 用run 命令,可以开发模式下启动Play应用,此时一些调整信息可以在屏幕上显示出来,缺省情况下,Play Web应用使用端口9000,如果你修改代码,Play Console会检测到代码变化,在下个请求时会自动重新编译代码或资源,如要必要,还可以重新启动Play应用。如果出现编译错误,那么在浏览器中会显 示错误信息:
20130914002

如果要停止Web服务,可以按Ctrl+D ,此外还可以通过任务管理器,杀死对应的Java进程来停止Web服务。

编译

Compile 命令可以编译Play 源码或是资源(如CoffeeScript代码,LESS代码,Scala代码等),如果出现编译错误,屏幕上显示对应的错误提示:

20130914003

调试命令

如 果你使用Eclipse IDE,你可以使用Eclipse来调试代码,此时需要使用 play debug 启动一个JPDA调试端口,缺省调试端口为 9999,此后可以通过配置Eclipse远程调试指向这个端口,就可以使用Eclipse和其它Java 调试器来调试代码,如果使用IntelliJ IDEA IDE可以自动完成这些配置。

强制清理编译结果

如果你觉得编译的结果出现意想不到的机会,那么你的sbt的缓存可能出现问题,此时可以通过 clean-all 命令强制清除之前的编译结果,然后使用compile 重新编译生成新的编译内容。

此外play Console 基于 sbt 工具,因此你也可以使用sbt 的一些功能 ,比如 ~compile ,~run, ~test等

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 的数据存储结构

Cassandra 中的数据主要分为三种:

  1. CommitLog:主要记录下客户端提交过来的数据以及操作。这个数据将被持久化到磁盘中,以便数据没有被持久化到磁盘时可以用来恢复。
  2. Memtable:用户写的数据在内存中的形式,它的对象结构在后面详细介绍。其实还有另外一种形式是 BinaryMemtable 这个格式目前 Cassandra 并没有使用,这里不再介绍了。
  3. SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。

CommitLog 数据格式

CommitLog 的数据只有一种,那就是按照一定格式组成 byte 组数,写到 IO 缓冲区中定时的被刷到磁盘中持久化,在上一篇的配置文件详解中已经有说到 CommitLog 的持久化方式有两种,一个是 Periodic 一个是 Batch,它们的数据格式都是一样的,只是前者是异步的,后者是同步的,数据被刷到磁盘的频繁度不一样。关于 CommitLog 的相关的类结构图如下:

图 1. CommitLog 的相关的类结构图

图 1. CommitLog 的相关的类结构图它 持久化的策略也很简单,就是首先将用户提交的数据所在的对象 RowMutation 序列化成 byte 数组,然后把这个对象和 byte 数组传给 LogRecordAdder 对象,由 LogRecordAdder 对象调用 CommitLogSegment 的 write 方法去完成写操作,这个 write 方法的代码如下:

清单 1. CommitLogSegment. write
public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, 
 Object serializedRow){ 
        long currentPosition = -1L; 
 ... 
            Checksum checkum = new CRC32(); 
            if (serializedRow instanceof DataOutputBuffer){ 
                DataOutputBuffer buffer = (DataOutputBuffer) serializedRow; 
                logWriter.writeLong(buffer.getLength()); 
                logWriter.write(buffer.getData(), 0, buffer.getLength()); 
                checkum.update(buffer.getData(), 0, buffer.getLength()); 
            } 
            else{ 
                assert serializedRow instanceof byte[]; 
                byte[] bytes = (byte[]) serializedRow; 
                logWriter.writeLong(bytes.length); 
                logWriter.write(bytes); 
                checkum.update(bytes, 0, bytes.length); 
            } 
            logWriter.writeLong(checkum.getValue()); 
 ... 
 }

这个代码的主要作用就是如果当前这个根据 columnFamily 的 id 还没有被序列化过,将会根据这个 id 生成一个 CommitLogHeader 对象,记录下在当前的 CommitLog 文件中的位置,并将这个 header 序列化,覆盖以前的 header。这个 header 中可能包含多个没有被序列化到磁盘中的 RowMutation 对应的 columnFamily 的 id。如果已经存在,直接把 RowMutation 对象的序列化结果写到 CommitLog 的文件缓存区中后面再加一个 CRC32 校验码。Byte 数组的格式如下:

图 2. CommitLog 文件数组结构

图 2. CommitLog 文件数组结构上图中每个不同的 columnFamily 的 id 都包含在 header 中,这样做的目的是更容易的判断那些数据没有被序列化。

CommitLog 的作用是为恢复没有被写到磁盘中的数据,那如何根据 CommitLog 文件中存储的数据恢复呢?这段代码在 recover 方法中:

清单 2. CommitLog.recover
 public static void recover(File[] clogs) throws IOException{ 
 ... 
         final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader); 
         int lowPos = CommitLogHeader.getLowestPosition(clHeader); 
            if (lowPos == 0) break; 
            reader.seek(lowPos); 
            while (!reader.isEOF()){ 
                try{ 
                    bytes = new byte[(int) reader.readLong()]; 
                    reader.readFully(bytes); 
                    claimedCRC32 = reader.readLong(); 
                } 
 ... 
                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes); 
                Checksum checksum = new CRC32(); 
                checksum.update(bytes, 0, bytes.length); 
                if (claimedCRC32 != checksum.getValue()){continue;} 
            final RowMutation rm = 
              RowMutation.serializer().deserialize(new DataInputStream(bufIn));
            } 
 ... 
 }

这段代码的思路是:反序列化 CommitLog 文件的 header 为 CommitLogHeader 对象,寻找 header 对象中没有被回写的最小 RowMutation 位置,然后根据这个位置取出这个 RowMutation 对象的序列化数据,然后反序列化为 RowMutation 对象,然后取出 RowMutation 对象中的数据重新保存到 Memtable 中,而不是直接写到磁盘中。CommitLog 的操作过程可以用下图来清楚的表示:

图 3. CommitLog 数据格式的变化过程

图 3. CommitLog 数据格式的变化过程

Memtable 内存中数据结构

Memtable 内存中数据结构比较简单,一个 ColumnFamily 对应一个唯一的 Memtable 对象,所以 Memtable 主要就是维护一个 ConcurrentSkipListMap<DecoratedKey, ColumnFamily> 类型的数据结构,当一个新的 RowMutation 对象加进来时,Memtable 只要看看这个结构是否 <DecoratedKey, ColumnFamily> 集合已经存在,没有的话就加进来,有的话取出这个 Key 对应的 ColumnFamily,再把它们的 Column 合并。Memtable 相关的类结构图如下:

图 4. Memtable 相关的类结构图

图 4. Memtable 相关的类结构图Memtable 中的数据会根据配置文件中的相应配置参数刷到本地磁盘中。这些参数在上一篇中已经做了详细说明。

前 面已经多处提到了 Cassandra 的写的性能很好,好的原因就是因为 Cassandra 写到数据首先被写到 Memtable 中,而 Memtable 是内存中的数据结构,所以 Cassandra 的写是写内存的,下图基本上描述了一个 key/value 数据是怎么样写到 Cassandra 中的 Memtable 数据结构中的。

图 5. 数据被写到 Memtable

图 5. 数据被写到 Memtable

SSTable 数据格式

每添加一条数据到 Memtable 中,程序都会检查一下这个 Memtable 是否已经满足被写到磁盘的条件,如果条件满足这个 Memtable 就会写到磁盘中。先看一下这个过程涉及到的类。相关类图如图 6 所示:

图 6. SSTable 持久化类结构图

图 6. SSTable 持久化类结构图Memtable 的条件满足后,它会创建一个 SSTableWriter 对象,然后取出 Memtable 中所有的 <DecoratedKey, ColumnFamily> 集合,将 ColumnFamily 对象的序列化结构写到 DataOutputBuffer 中。接下去 SSTableWriter 根据 DecoratedKey 和 DataOutputBuffer 分别写到 Date、Index 和 Filter 三个文件中。

Data 文件格式如下:

图 7. SSTable 的 Data 文件结构

图 7. SSTable 的 Data 文件结构Data 文件就是按照上述 byte 数组来组织文件的,数据被写到 Data 文件中是接着就会往 Index 文件中写,Index 中到底写什么数据呢?

其实 Index 文件就是记录下所有 Key 和这个 Key 对应在 Data 文件中的启示地址,如图 8 所示:

图 8. Index 文件结构

图 8. Index 文件结构Index 文件实际上就是 Key 的一个索引文件,目前只对 Key 做索引,对 super column 和 column 都没有建索引,所以要匹配 column 相对来说要比 Key 更慢。

Index 文件写完后接着写 Filter 文件,Filter 文件存的内容就是 BloomFilter 对象的序列化结果。它的文件结构如图 9 所示:

图 9. Filter 文件结构

图 9. Filter 文件结构BloomFilter 对象实际上对应一个 Hash 算法,这个算法能够快速的判断给定的某个 Key 在不在当前这个 SSTable 中,而且每个 SSTable 对应的 BloomFilter 对象都在内存中,Filter 文件指示 BloomFilter 持久化的一个副本。三个文件对应的数据格式可以用下图来清楚的表示:

图 10. SSTable 数据格式转化

图 10. SSTable 数据格式转化

在 Memtable 往磁盘中写的过程中,这个 Memtable 被放到 memtablesPendingFlush 容器中,以保证在读时候它里面存的数据能被正确读到,这个在后面数据读取时还会介绍。

数据的写入

数据要写到 Cassandra 中有两个步骤:

  1. 找到应该保存这个数据的节点
  2. 往这个节点写数据。客户端写一条数据必须指定 Keyspace、ColumnFamily、Key、Column Name 和 Value,还可以指定 Timestamp,以及数据的安全等级。

数据写入涉及的主要相关类如下图所示:

图 11. Insert 相关类图

图 11. Insert 相关类图大慨的写入逻辑是这样的:

CassandraServer 接收到要写入的数据时,首先创建一个 RowMutation 对象,再创建一个 QueryPath 对象,这个对象中保存了 ColumnFamily、Column Name 或者 Super Column Name。接着把用户提交的所有数据保存在 RowMutation 对象的 Map<String, ColumnFamily> 结构中。接下去就是根据提交的 Key 计算集群中那个节点应该保存这条数据。这个计算的规则是:将 Key 转化成 Token,然后在整个集群的 Token 环中根据二分查找算法找到与给定的 Token 最接近的一个节点。如果用户指定了数据要保存多个备份,那么将会顺序在 Token 环中返回与备份数相等的节点。这是一个基本的节点列表,后面 Cassandra 会判断这些节点是否正常工作,如果不正常寻找替换节点。还有还要检查是否有节点正在启动,这种节点也是要在考虑的范围内,最终会形成一个目标节点列表。最 后把数据发送到这些节点。

接下去就是将数据保存到 Memtable 中和 CommitLog 中,关于结果的返回根据用户指定的安全等级不同,可以是异步的,也可以是同步的。如果某个节点返回失败,将会再次发送数据。下图是当 Cassandra 接收到一条数据时到将数据写到 Memtable 中的时序图。

图 12. Insert 操作的时序图

图 12. Insert 操作的时序图

数据的读取

Cassandra 的写的性能要好于读的性能,为何写的性能要比读好很多呢?原因是,Cassandra 的设计原则就是充分让写的速度更快、更方便而牺牲了读的性能。事实也的确如此,仅仅看 Cassandra 的数据的存储形式就能发现,首先是写到 Memtable 中,然后将 Memtable 中数据刷到磁盘中,而且都是顺序保存的不检查数据的唯一性,而且是只写不删(删除规则在后面介绍),最后才将顺序结构的多个 SSTable 文件合并。这每一步难道不是让 Cassandra 写的更快。这个设计想想对读会有什么影响。首先,数据结构的复杂性,Memtable 中和 SSTable 中数据结构肯定不同,但是返回给用户的肯定是一样的,这必然会要转化。其次,数据在多个文件中,要找的数据可能在 Memtable 中,也可能在某个 SSTable 中,如果有 10 个 SSTable,那么就要在到 10 个 SSTable 中每个找一遍,虽然使用了 BloomFilter 算法可以很快判断到底哪个 SSTable 中含有指定的 key。还有可能在 Memtable 到 SSTable 的转化过程中,这也是要检查一遍的,也就是数据有可能存在什么地方,就要到哪里去找一遍。还有找出来的数据可能是已经被删除的,但也没办法还是要取。

下面是读取数据的相关类图:

图 13. 读取相关类图

图 13. 读取相关类图根 据上面的类图读取的逻辑是,CassandraServer 创建 ReadCommand 对象,这个对象保存了用户要获取记录的所有必须指定的条件。然后交给 weakReadLocalCallable 这个线程去到 ColumnFamilyStore 对象中去搜索数据,包括 Memtable 和 SSTable。将找到的数据组装成 Row 返回,这样一个查询过程就结束了。这个查询逻辑可以用下面的时序图来表示:

图 14. 查询数据时序图

图 14. 查询数据时序图在 上图中还一个地方要说明的是,取得 key 对应的 ColumnFamily 要至少在三个地方查询,第一个就是 Memtable 中,第二个是 MemtablesPendingFlush,这个是将 Memtable 转化为 SSTable 之前的一个临时 Memtable。第三个是 SSTable。在 SSTable 中查询最为复杂,它首先将要查询的 key 与每个 SSTable 所对应的 Filter 做比较,这个 Filter 保存了所有这个 SSTable 文件中含有的所有 key 的 Hash 值,这个 Hsah 算法能快速判断指定的 key 在不在这个 SSTable 中,这个 Filter 的值在全部保存在内存中,这样能快速判断要查询的 key 在那个 SSTable 中。接下去就要在 SSTable 所对应的 Index 中查询 key 所对应的位置,从前面的 Index 文件的存储结构知道,Index 中保存了具体数据在 Data 文件中的 Offset。,拿到这个 Offset 后就可以直接到 Data 文件中取出相应的长度的字节数据,反序列化就可以达到目标的 ColumnFamily。由于 Cassandra 的存储方式,同一个 key 所对应的值可能存在于多个 SSTable 中,所以直到查找完所有的 SSTable 文件后再与前面的两个 Memtable 查找出来的结果合并,最终才是要查询的值。

另外,前面所描述的是最坏的情况,也就是查询在完全没有缓存的情况下,当然 Cassandra 在对查询操作也提供了多级缓存。第一级直接针对查询结果做缓存,这个缓存的设置的配置项是 Keyspace 下面的 RowsCached。查询的时候首先会在这个 Cache 中找。第二级 Cache 对应 SSTable 的 Index 文件,它可以直接缓存要查询 key 所对应的索引。这个配置项同样在 Keyspace 下面的 KeysCached 中,如果这个 Cache 能命中,将会省去 Index 文件的一次 IO 查询。最后一级 Cache 是做磁盘文件与内存文件的 mmap,这种方式可以提高磁盘 IO 的操作效率,鉴于索引大小的限制,如果 Data 文件太大只能在 64 位机器上使用这个技术。

数据的删除

从前面的数据写入规则可以想象,Cassandra 要想删除数据是一件麻烦的事,为何这样说?理由如下:

  1. 数据有多处 同时还可能在多个节点都有保存。
  2. 数据的结构有多种 数据会写在 CommitLog 中、Memtable 中、SSTable 中,它们的数据结构都不一样。
  3. 数据时效性不一致 由于是集群,所以数据在节点之间传输必然有延时。

除了这三点之外还有其它一些难点如 SSTable 持久化数据是顺序存储的,如果删除中间一段,那数据有如何移动,这些问题都非常棘手,如果设计不合理,性能将会非常之差。

本部分将讨论 Cassandra 是如何解决这些问题的。

CassandraServer 中删除数据的接口只有一个 remove,下面是 remove 方法的源码:

清单 3. CassandraServer.remove
public void remove(String table, String key, ColumnPath column_path, 
          long timestamp, ConsistencyLevel consistency_level){
        checkLoginDone();
        ThriftValidation.validateKey(key);
        ThriftValidation.validateColumnPathOrParent(table, column_path);
        RowMutation rm = new RowMutation(table, key);
        rm.delete(new QueryPath(column_path), timestamp);
        doInsert(consistency_level, rm);
    }

仔细和 insert 方法比较,发现只有一行不同:insert 方法调用的是 rm.add 而这里是 rm.delete。那么这个 rm.delete 又做了什么事情呢?下面是 delete 方法的源码:

清单 4. RowMutation. Delete
public void delete(QueryPath path, long timestamp){
...
        if (columnFamily == null)
            columnFamily = ColumnFamily.create(table_, cfName);
        if (path.superColumnName == null && path.columnName == null){
            columnFamily.delete(localDeleteTime, timestamp);
        }else if (path.columnName == null){
            SuperColumn sc = new SuperColumn(path.superColumnName, 
              DatabaseDescriptor.getSubComparator(table_, cfName));
            sc.markForDeleteAt(localDeleteTime, timestamp);
            columnFamily.addColumn(sc);
        }else{
            ByteBuffer bytes = ByteBuffer.allocate(4);
            bytes.putInt(localDeleteTime);
            columnFamily.addColumn(path, bytes.array(), timestamp, true);
        }
    }

这段代码的主要逻辑就是,如果是删除指定 Key 下的某个 Column,那么将这个 Key 所对应的 Column 的 vlaue 设置为当前系统时间,并将 Column 的 isMarkedForDelete 属性设置为 TRUE,如果是要删除这个 Key 下的所有 Column 则设置这个 ColumnFamily 的删除时间期限属性。然后将这个新增的一条数据按照 Insert 方法执行下去。

这个思路现在已经很明显了,它就是通过设置同一个 Key 下对应不同的数据来更新已经在 ConcurrentSkipListMap 集合中存在的数据。这种方法的确很好,它能够达到如下目的:

  1. 简化了数据的操作逻辑。将添加、修改和删除逻辑都统一起来。
  2. 解决了前面提到的三个难点。因为它就是按照数据产生的方式,来修改数据。有点以其人之道还治其人之身的意思。

但是这仍然有两个 问题:这个只是修改了指定的数据,它并没有删除这条数据;还有就是 SSTable 是根据 Memtable 中的数据保存的,很可能会出现不同的 SSTable 中保存相同的数据,这个又怎么解决?的确如此,Cassandra 并没有删除你要删除的数据,Cassandra 只是在你查询数据返回之前,过滤掉 isMarkedForDelete 为 TRUE 的记录。它能够保证你删除的数据你不能再查到,至于什么时候真正删除,你就不需要关心了。Cassandra 删除数据的过程很复杂,真正删除数据是在 SSTable 被压缩的过程中,SSTable 压缩的目的就是把同一个 Key 下对应的数据都统一到一个 SSTable 文件中,这样就解决了同一条数据在多处的问题。压缩的过程中 Cassandra 会根据判断规则判定哪些数据应该被删除。

SSTable 的压缩

数 据的压缩实际上是数据写入 Cassandra 的一个延伸,前面描述的数据写入和数据的读取都有一些限制,如:在写的过程中,数据会不停的将一定大小的 Memtable 刷到磁盘中,这样不停的刷,势必会产生很多的同样大小的 SSTable 文件,不可能这样无限下去。同样在读的过程中,如果太多的 SSTable 文件必然会影响读的效率,SSTable 越多就会越影响查询。还有一个 Key 对应的 Column 分散在多个 SSTable 同样也会是问题。还有我们知道 Cassandra 的删除同样也是一个写操作,同样要处理这些无效的数据。

鉴于以上问题,必然要对 SSTable 文件进行合并,合并的最终目的就是要将一个 Key 对应的所有 value 合并在一起。该组合的组合、该修改的修改,该删除的删除。然后将这个 Key 所对应的数据写在 SSTable 所对应的 Data 文件的一段连续的空间上。

何时压缩 SSTable 文件由 Cassandra 来控制,理想的 SSTable 文件个数在 4~32 个。当新增一个 SSTable 文件后 Cassandra 会计算当期的平均 SSTable 文件的大小当新增的 SSTable 大小在平均 SSTable 大小的 0.5~1.5 倍时 Cassandra 就会调用压缩程序压缩 SSTable 文件,导致的结果就是重新建立 Key 的索引。这个过程可以用下图描述:

图 15 数据压缩

图 15 数据压缩

总结

本文首先描述了 Cassandra 中数据的主要的存储格式,包括内存中和磁盘中数据的格式,接下去介绍了 Cassandra 处理这些数据的方式,包括数据的添加、删除和修改,本质上修改和删除是一个操作。最后介绍了数据的压缩。

接下去两篇将向软件开发人员介绍 Cassandra 中使用的设计模式、巧妙的设计方法和 Cassandra 的高级使用方法——利用 Cassandra 搭建存储与检索一体化的实时检索系统

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Storm分布式RPC

Distributed RPC(分布式RPC)

下面是 storm的原文(http://storm.apache.org/documentation/Distributed-RPC.html)

The idea behind distributed RPC (DRPC) is to parallelize the computation of really intense functions on the fly using Storm. The Storm topology takes in as input a stream of function arguments, and it emits an output stream of the results for each of those function calls.
DRPC是为了真正实现在storm上高速计算的。Storm topology获取输入方法参数流,发射每个方法调用的结果数据流。

DRPC is not so much a feature of Storm as it is a pattern expressed from Storm’s primitives of streams, spouts, bolts, and topologies. DRPC could have been packaged as a separate library from Storm, but it’s so useful that it’s bundled with Storm.
DRPC并不是Storm的一个特性,不像Storm的streams,spouts,bolts和topology。DRPC可以被打包到一个独立于Storm的包,但是它对Storm是很有用的。

分布式RPC(distributed RPC,DRPC)用于对Storm上大量的函数调用进行并行计算过程。对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。

DRPC本身算不上Storm的特性,它是通过Storm的基本元素:streams,spouts,bolts,topologies而衍生的一个模式。DRPC可以单独作为一个独立于Storm的库发布,但由于其重要性还是和Storm捆绑在了一起。

总体概述

Distributed RPC is coordinated by a “DRPC server” (Storm comes packaged with an implementation of this). The DRPC server coordinates receiving an RPC request, sending the request to the Storm topology, receiving the results from the Storm topology, and sending the results back to the waiting client. From a client’s perspective, a distributed RPC call looks just like a regular RPC call. For example, here’s how a client would compute the results for the “reach” function with the argument “http://twitter.com”:
分布式RPC是需要”DRPC server”配合的(storm有一个实现)。DRPC server协调接收RPC请求,发送请求到Storm topology, 接收Storm topology的结果,发送结果给等待的客户端。从客户端来看,一个分布式RPC调用就像一个regular RPC调用。例如这有一个客户端使用参数”http://twitter.com“计算”reach”方法结果:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

The distributed RPC workflow looks like this:
DRPC工作流程如下:

The distributed RPC workflow looks like this:

Tasks in a topology

A client sends the DRPC

A client sends the DRPC server the name of the function to execute and the arguments to that function. The topology implementing that function uses a DRPCSpout to receive a function invocation stream from the DRPC server. Each function invocation is tagged with a unique id by the DRPC server. The topology then computes the result and at the end of the topology a bolt called ReturnResults connects to the DRPC server and gives it the result for the function invocation id. The DRPC server then uses the id to match up that result with which client is waiting, unblocks the waiting client, and sends it the result.
一个客户端发送DRPC server方法名和方法执行参数。topology实现该方法,用DRPCSpout接收方法调用数据流从DRPC server。每个方法调用会被DRPC server被打上唯一id标识。topology计算结果,在topology的结尾bolt上调用ReturnResults链接DRPC server,给它该方法调用id的结果。DRPC server用这个id匹配等待的客户端,解锁该等待客户端,发送结果给它。

DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:

  1. 接收到一个RPC调用请求;
  2. 发送请求到Storm上的拓扑;
  3. 从Storm上接收计算结果;
  4. 将计算结果返回给客户端。

以上过程,在client客户端看来,一个DRPC调用看起来和一般的RPC调用没什么区别。下面代码是client通过DRPC调用“reach”函数,参数为“http://twitter.com”:

DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com");

DRPC内部工作流程如下:

  1. Client向DRPC Server发送被调用执行的DRPC函数名称及参数。
  2. Storm上的topology通过DRPCSpout实现这一函数,从DPRC Server接收到函数调用流;
  3. DRPC Server会为每次函数调用生成唯一的id;
  4. Storm上运行的topology开始计算结果,最后通过一个ReturnResults的Bolt连接到DRPC Server,发送指定id的计算结果;
  5. DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。

LinearDRPCTopologyBuilder

Storm提供了一个topology builder——LinearDRPCTopologyBuilder,它可以自动完成几乎所有的DRPC步骤。包括:

  1. 构建spout;
  2. 向DRPC Server返回结果;
  3. 为Bolt提供函数用于对tuples进行聚集。

下面是一个简单的例子,这个DRPC拓扑只是简单的在输入参数后追加“!”后返回:

public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); // ... }

由上述例子可见,我们只需很少的工作即可完成拓扑。当创建LinearDRPCTopologyBuilder的时候,需要指定拓扑中DRPC函数的名称“exclamation”。一个DRPC Server可以协调多个函数,每个函数有不同的函数名称。拓扑中的第一个bolt的输入是两个字段:第一个是请求的id号;第二个是请求的参数。

LinearDRPCTopologyBuilder同时需要最后一个bolt发射一个包含两个字段的输出流:第一个字段是请求id;第二个字段是计算结果。因此,所有的中间tuples必须包含请求id作为第一个字段。

例子中,ExclaimBolt在输入tuple的第二个字段后面追加“!”,LinearDRPCTopologyBuilder负责处理其余的协调工作:与DRPC Server建立连接,发送结果给DRPC Server。

本地模式DRPC

DRPC可以以本地模式运行,下面的代码是如何在本地模式运行上面的例子:

LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();

首先创建一个LocalDRPC对象,该对象在本地模拟一个DRPC Server,正如LocalCluster在本地模拟一个Storm集群一样。然后创建一个LocalCluster对象在本地模式下运行拓扑。LinearDRPCTopologyBuilder含有单独的方法用于创建本地拓扑和远程拓扑。

本地模式下,LocalDRPC并不绑定任何端口,因此Storm的拓扑需要了解要通讯的对象——这就是为什么createLocalTopology方法需要以LocalDRPC对象作为输入。

加载完拓扑之后,通过对LocalDRPC调用execute方法,就可以执行DRPC函数调用了。

远程模式DRPC

在实际的Storm集群上运行DRPC也一样很简单。只需完成以下步骤:

  1. 启动DRPC Server(s);
  2. 配置DRPC Server(s)地址;
  3. 向Storm集群提交DRPC拓扑。

首先,通过storm脚本启动DRPC Server:

bin/storm drpc

然后,在Storm集群中配置DRPC Server地址,这就是DRPCSpout读取函数调用请求的地方。这一步的配置可以通过storm.yaml文件或者拓扑的配置来完成。通过storm.yaml文件的配置方式如下:

drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com"

最后,通过StormSubmitter启动DRPC拓扑。为了以远程模式运行上面的例子,代码如下:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology被用于为Storm集群创建合适的拓扑。

一个复杂的例子

上面的exclamation只是一个简单的DRPC例子。下面通过一个复杂的例子介绍如何在Storm集群内进行DRPC——计算Twitter上每个URL的到达度(reach),也就是每个URL暴露给的不同人的个数。

为了完成这一计算,需要完成以下步骤:

  1. 获取所有点选了(tweet)该URL的人;
  2. 获取步骤1中所有人的关注者(followers,粉丝);
  3. 对所有关注者followers进行去重;
  4. 对步骤3中的关注者人数进行求和。

一个简单的URL到达度计算可能涉及成千上万次数据库调用以及数以百万的followers记录,计算量非常大。有了Storm,将很容易实现这一计算过程。单机上可能需要运行几分钟才能完成,在Storm集群上,即使是最难计算的URL也只需要几秒钟。

这个例子的代码在storm-starter:点击这里。这里是如何创建拓扑的代码:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 3); builder.addBolt(new GetFollowers(), 12) .shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6) .fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 2) .fieldsGrouping(new Fields("id"));

拓扑的执行分为以下四步:

  1. GetTweeters:获取所有tweet了指定URL的用户列表,这个Bolt将输入流[id, url]转换成输出流[id, tweeter],每个url元组被映射为多个tweeter元组。
  2. GetFollowers:获取步骤1中所有用户列表的followers,这个Bolt将输入流[id, twetter]转换成输出流[id, follower],当某个人同时是多个人的关注者follower,而且这些人都tweet了指定的URL,那么将产生重复的follower元组。
  3. PartialUniquer:将所有followers按照follower id分组,使得同一个follower在同一个task中被处理。这个Bolt接收follower并进行去重计数。
  4. CountAggregator:从各个PartialUniquer中接收各部分的计数结果,累加后完成到达度计算。

下面是PartialUniquer这个Bolt的代码实现:

public class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); } }

PartialUniquer通过继承BaseBatchBolt实现了IBatchBolt接口,batch bolt提供了API用于将一批tuples作为整体来处理。每个请求id会创建一个新的batch bolt实例,同时Storm负责这些实例的清理工作。

当PartialUniquer接收到一个follower元组时执行execute方法,将follower添加到请求id对应的HashSet集合中。

Batch bolt同时提供了finishBatch方法用于当这个task已经处理完所有的元组时调用。PartialUniquer发射一个包含当前task所处理的follower ids子集去重后个数的元组。

在内部实现上,CoordinatedBolt用于检测指定的bolt是否已经收到指定请求id的所有tuples元组。CoordinatedBolt使用direct streams管理实现这一协作过程。

拓扑的其他部分易于理解。到达度的每一步的计算过程都是并行进行的,通过DRPC实现也是非常容易的。

Non-linear DRPC拓扑

LinearDRPCTopologyBuilder只能处理“线性的”DRPC拓扑——正如到达度这样可以通过一系列步骤序列来完成的计算。不难想象,DRPC调用中包含有更复杂的带有分支和合并Bolt的拓扑。目前,必须自己直接使用CoordinatedBolt来完成这种非线性拓扑的计算。

LinearDRPCTopologyBuilder工作过程

  • DRPCSpout发射[args, return-info],其中return-info包含DRPC Server的主机和端口号,以及DRPC Server为该次请求生成的唯一id号;
  • 构造一个Storm拓扑包含以下部分:
    • DRPCSpout
    • PrepareRequest(生成一个请求id,为return info创建一个流,为args创建一个流)
    • CoordinatedBolt wrappers以及direct groupings
    • JoinResult(将结果与return info拼接起来)
    • ReturnResult(连接到DRPC Server,返回结果)
  • LinearDRPCTopologyBuilder是建立在Storm基本元素之上的高层抽象。

高级进阶

KeyedFairBolt for weaving the processing of multiple requests at the same time

How to use CoordinatedBolt directly

  • KeyedFairBolt用于组织同一时刻多请求的处理过程;
  • 如何直接使用CoordinatedBolt。

Cassandra 分布式数据库 配置、启动与集群

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/

本文首先介绍了 Cassandra 服务器的配置文件各个配置项的意义,接着讨论了它的启动过程,包括 Cassandra 在启动过程中主要都完成了那些操作,为什么要执行这些操作,最终达到什么状态等。接着介绍如果在集群情况下,集群中节点如何自治理,节点间如何通信、如何 控制数据在集群中的分布等关键问题。

Cassandra 的配置详解

了解一个软件的配置项的意义是使用这个软件的前提,这里详细介绍 Cassandra 的配置文件(storage-config.xml)中各个配置项的意义,这其中包含有很多配置参数,我们可以对其进行调整以达到理想的性能。为了节省篇 幅这里没有列出 storage-config.xml 文件的内容,你可以对照着这个文件看下面的内容。

ClusterName

Cluster Name 代表一个族的标识,它通常代表一个集群。这个配置项在 Cassandra 没有存储数据时就必须指定,当 Cassandra 第一次启动后,它就会被写到 Cassandra 的系统表中,如果你要修改 Cluster Name 必须要删除 Cassandra 中数据。

AutoBootstrap

这个配置项看起来十分简单,但是如果你对 Cassandra 没有深入了解的话,恐怕不知道当你改变这个配置项时 Cassandra 可能会发生什么?

我们知道 Cassandra 集群是通过维护一个自适应的 Token 环来达到集群中的节点的自治理,它们不仅要保证每台机器的状态的同步和一致性还要保证它们之间 Token 分布的合理性,通过重新划分 Token 来达到每台机器的负载的均衡性。

那 这个配置项与 Token 和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成 False 时它是不是就不加入集群呢?显然不是,这还要看你有没有配置 seeds,如果你配置了其它 seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟 seed 配置项有关而且和 Cassandra 是否是第一次启动也有关。Cassandra 的启动规则大慨如下:

  1. 当 AutoBootstrap 设为 FALSE,第一次启动时 Cassandra 会在系统表中记录 AutoBootstrap=TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。
  2. 当 AutoBootstrap 设为 TRUE,第一次启动,Cassandra 会判断当前节点有没有被配置成 seed 节点,也就是在本机 ip 有没有在 seeds 中。如果在 seeds 中,Cassandra 的启动情况和 1 是一样的。
  3. 当 AutoBootstrap 设为 TRUE,第一次启动,并且没有配置为 seed,Cassandra 将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。这时 Cassandra 将会根据当前集群的负载,来动态调整它们的均衡。调整均衡的方式就是根据当前的 Token 环分配一个合适的 Token 给这个节点,并将这个符合这个 Token 的数据传给它。

从以上分析可以看出,AutoBootstrap 设置的主要目的是是否调整当前集群中的负载均衡。这其实还有一个很重要的问题就是,如果按照第一种情况启动,如果没有指定 Token,这个节点的 Token 将会是随机生成的,那么问题就来了,当这个随机生成是 Token 加入集群的 Token 环时,Cassandra 如何保证 Token 和 Token 所对应的数据的一致性,这个问题将在后面说明。

Keyspaces

Cassandra 中 Keyspace 相当于关系数据库中的表空间的概念,可以理解为操作表的一个容器,它下面可以定义多个 ColumnFamily,这个 ColumnFamily 就相当于表了,它是存储数据的实体。

ColumnFamily 中几个属性的意义如下:

  • ColumnType。列的类型,有两种:Standard 和 Super,分别是标准列和超列,超列的含义是列还有一个父列。
  • CompareWith。表示的是列的排序规则,可以根据不同的数据类型进行排序如 TimeUUIDType,可以根据插入的时间排序
  • CompareSubcolumnsWith。子列的排序规则与 CompareWith 类似
  • RowsCached。查询时缓存的数据量,可以是多少条,也可以是百分比,如 10% 就是缓存 10% 的数据量,这个对查询性能影响很大,如果命中率高的话,可以显著提高查询效率。
  • KeysCached。缓存 ColumnFamily 中的 key,这个 key 就是对应到 Index.db 中的数据,如果没有在 RowsCached 中命中,那么就要到每个 SSTable 中查询,这时必然要查询 key,如果在 KeysCached 能命中就不需要到 Index.db 中查询了,省去了 IO 操作。

Cassandra 是一个 Key/Value 系统,从它的存储的逻辑结构来看分为:Keyspace、Key、ColumnFamily、Super Column 以及 Column 几个部分。很明显我们能看出每一对 Key/Value 都有一个寄生的容器,所以它实际上是由一个个 Map 容器构成的。这个容器结构可以用图 1 和图 2 来表示:

图 1. 标准的 Column 结构图

图 1. 标准的 Column 结构图

图 2. 含有 Super Column 的结构图

图 2. 含有 Super Column 的结构图

ReplicaPlacementStrategy

定 义数据复制策略,默认是 org.apache.cassandra.locator.RackUnawareStrategy,数据复制到其它节点没有特别的规定。 org.apache.cassandra.locator.RackAwareStrategy 是将节点分为不同的 Rack,这种方式不管是存数据还是查数据,都从不同的 Rack 的节点取数据或写数据。org.apache.cassandra.locator.DatacenterShardStategy 又将节点划分为不同的 Data Center,让数据放在不同数据中心,从而保证数据的安全性,例如可以按机房划分 Data Center,从而避免一个机房出现故障,会影响整个集群。

ReplicationFactor

定义数据要保存几个备份,结合 ReplicaPlacementStrategy 可以把数据放在不同的地方。

EndPointSnitch

org.apache.cassandra.locator.EndPointSnitch 可以根据当前的网络情况选择更好的节点路由,一般默认即可。

Authenticator

这个配置项可以控制数据访问的安全性,可以在 access.properties 和 passwd.properties 设置用户和密码。

Partitioner

控 制数据的分布规则,org.apache.cassandra.dht.RandomPartitioner 是随机分布,Cassandra 控制数据在不同的节点是通过 key 的来划分的,这个方式是将 key 进行 MD5 Hash,从而形成随机分布的 Token,然后根据这个 Token 将数据分布到不同的节点上。

org.apache.cassandra.dht.OrderPreservingPartitioner 是取 key 的 Ascii 字符来划分的,因此我们可以根据 key 来主动控制数据的分布,例如我们可以给 key 加一个前缀,相同前缀的 key 分布在同一个节点中。

InitialToken

给节点分配一个初始 Token,当节点第一次启动后这个 Token 就被写在系统表中。结合 Partitioner 就可以控制数据的分布。这个配置项可以让我们能调整集群的负载均衡。

CommitLogDirectory、DataFileDirectories

这两个配置项是设置 CommitLog 和 SSTable 存储的目录。

Seeds

关于 Seeds 节点的配置有这样几个疑问:

  1. 是不是集群中的所有节点都要配置在 seed 中。
  2. 本机需不需要配置在 seed 中。

关于第二个问题在前面中已经说明了,是否配置就决定是否作为 seed 节点来启动。关于第一个问题,答案是否定的,因为即使你把集群中的所有节点都配置在 seed 中,当 Cassandra 在启动时它也不会往每个 seed 发送心跳信息,而是随机选择一个节点与其同步集群中的其他所有节点状态。几个回合后这个节点同样能够获取集群中所有的节点的列表。这就是集群自治理的优 点,只要能发现其中一个节点就能发现全部节点。

ListenAddress

ListenAddress 这个配置是用来监听集群中其它节点与本节点交换状态信息和数据的地址。需要注意的是当你配置为本机的 ip 地址没有问题,不配置通常也没问题,但是如果你没有配置或者配置成主机名,而你又把你的主机名绑定到 127.0.0.1 时,这时将会导致本节点不能加入到集群中,因为它接受不到其他节点过来的任何信息,防止出错直接绑定本机 ip 最好。

ThriftAddress

监听 Client 的连接请求,不设或者配置成 0.0.0.0,监听所有地址的请求。

RowWarningThresholdInMB

当 Cassandra 压缩时,如果一个 row 超出了配置的大小时打印 warn 日志,没有任何其它作用。

SlicedBufferSizeInKB 和 ColumnIndexSizeInKB

分别是用来配置,根据 Slice 和 Column Name 来查询时 Cassandra 缓存数据的大小,当查询范围较小时可以适当设置大一点以提高命中率。

FlushDataBufferSizeInMB 和 FlushIndexBufferSizeInMB

这两个配置项是设置 Cassandra 在将内存中的数据写到磁盘时一次写入的缓存量,适当提高这个两个值可以提高 Cassandra 的写性能。

MemtableThroughputInMB、MemtableOperationsInMillions 和 MemtableFlushAfterMinutes

MemtableOperationsInMillions 是定义当前 Keyspace 对应的数据在内存中的缓存大小,Cassandra 默认是 64M,也就是当写到 Cassandra 的数据达到 64M 时,Cassandra 会将内存的数据写到本地磁盘中。

MemtableOperationsInMillions 是定义当前这个 Memtable 中所持有数据对象的个数,真实的个数是 MemtableOperationsInMillions*1024*1024。当超出这个数值时 Memtable 同样会被写到磁盘中。

MemtableFlushAfterMinutes 的作用是,当前两个条件都长时间不满足时,Memtable 中数据会一直不会写到磁盘,这也不合适,所以设置了一个时间限制,当超过这个时间长度时 Memtable 中的数据也会被写到磁盘中。

所以 Memtable 中的数据何时被写到写到磁盘是由这三个值决定,任何一个条件满足都会写到磁盘。

ConcurrentReads 和 ConcurrentWrites

这 两个是定义 Cassandra 用来处理 read 和 write 的线程池中线程的个数,根据当前的测试结果,读写的性能大慨是 1:10,适当的设置这两个值不仅要根据读写的性能,还要参考当前机器的处理性能。当机器的 load 很高,但是 cpu 的利用率却很低时,很明显是由于连接数过多,Cassandra 的已经处理不过来都处于等待状态。这样就可以适当增加读写的线程数,同样如果当读的请求大于写的请求时,也应该适当增加读的线程数,反之亦然。

CommitLogSync、CommitLogSyncPeriodInMS 和 CommitLogSyncBatchWindowInMS

我们知道 Cassandra 是先写到 CommitLog 中再写到 Memtable 和磁盘中。如果每写一条数据都要写一次到磁盘那样性能将会大打折扣。Cassandra 为了提高写 CommitLog 的性能提供了两种写的方式。

  1. Periodic。周期性的把 CommitLog 数据写到磁盘中,这个时间周期由 CommitLogSyncPeriodInMS 指定,默认是 10000MS, 如果是这种方式,可想而知 Cassandra 并不能完全保证写到 Cassandra 的数据不会丢失,最坏的情况就是在这个时间段的数据会被丢失,但是 Cassandra 的解释是通过数据的多个备份,来能提高安全性。但是如果是单机存储数据,最坏的情况仍然会丢失 10000MS 时间段写入的数据。可以说这种方式写 CommitLog 是完全的异步的方式。
  2. Batch。这种方式是等待数据被写到磁盘中才会返回,与前面相比安全性会得到保证,它能保证 100% 数据的正确性。但也并不是每写一条数据都立即写到磁盘中,而是有一个延迟时间,这个延迟时间就是由 CommitLogSyncBatchWindowInMS 指定的,也就是写一条数据到 CommitLog 的最大时间是 CommitLogSyncBatchWindowInMS 指定的时间,理想的时间范围是 0.1~10MS 之间。这个时间既要平衡客户端的相应时间也要考虑服务器写数据到磁盘的性能。

这两种方式各有好处,如果数据是存储在有多个备份的集群中,第一种情况下,丢数据的情况几乎为零,但是性能肯定会比第二种要好很多。如果是单机情况下,要保证数据的安全性第二种较合适。

GCGraceSeconds

这 个配置项不是 Java 中的 gc 回收内存,但是其功能类似于 jvm 中 gc,它也是回收已经没有被关联的数据,例如已经被标识为删除的数据,Cassandra 处理数据有点奇怪,即使数据被标识为删除,但是只要是没有超过 GCGraceSeconds 的时间这个数据仍然是存在的,也就是可以定制数据的实效时间,超出这个时间数据将会被回收。

Cassandra 的启动过程

Cassandra 的功能模块

按照我的理解我将 Cassandra 的功能模块划分为三个部分:

  1. 客户端协议解析。目前这个版本 Cassandra 支持两个客户端 avro 和 thrift,使用的较多的是后者,它们都是通过 socket 协议作为网络层协议,然后再包装一层应用层协议,这个应用层协议的包装和解析都是由它们的客户端和相应的服务端模块来完成的。这样设计的目的是解决多种多 样的客户端的连接方式,既可以是短连接也可以是长连接。既可以是 Java 程序调用也可以是 PHP 调用或者多种其它编程语言都可以调用。
  2. 集群 Gossip 协议。集群中节点之间相互通信是通过 Gossip 协议来完成的,它的实现都在 org.apache.cassandra.gms.Gossiper 类中。它的主要作用就是每个节点向集群中的其它节点发送心跳,心跳携带的信息是本身这个节点持有的其它节点的状态信息包括本节点的状态,如果发现两边的状 态信息不是不一致,则会用最新的状态信息替换,同时通过心跳来判断某个节点是否还在线,把这种状态变化通知感兴趣的事件监听者,以做出相应的修改,包括新 增节点、节点死去、节点复活等。除了维护节点状态信息外,还需做另外一些事,如集群之间的数据的转移,这些数据包括:读取的数据、写入的数据、状态检查的 数据、修复的数据等等。
  3. 数据的存储。数据的存储包括,内存中数据的组织形式,它又包括 CommitLog 和 Memtable。磁盘的数据组织方式,它又包括 date、filter 和 index 的数据。

其它剩下的就是如何读取和操作这些数据了,可以用下图来描述 Cassandra 是如何工作的:

图 3. Cassandra 的工作模型

图 3. Cassandra 的工作模型

Cassandra 的启动过程

这里将详细介绍 Cassandra 的启动过程。Cassandra 的启动过程大慨分为下面几个阶段:

storage-config.xml 配置文件的解析

配 置文件的读取和解析都是在 org.apache.cassandra.config.DatabaseDescriptor 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。值得注意的是关于 Keyspace 的解析,按照 ColumnFamily 的配置信息构建成 org.apache.cassandra.config.CFMetaData 对象,最后把这些所有 ColumnFamily 放入 Keyspace 的 HashMap 对象 org.apache.cassandra.config.KSMetaData 中,每个 Keyspace 就是一个 Table。这些信息都是作为基本的元信息,可以通过 DatabaseDescriptor 类直接获取。DatabaseDescriptor 类相关的类结构如下图 4 所示:

图 4. DatabaseDescriptor 类相关的类结构

图 4. DatabaseDescriptor 类相关的类结构创建每个 Table 的实例

创 建 Table 的实例将完成:1)获取该 Table 的元信息 TableMatedate。2)创建改 Table 下每个 ColumnFamily 的存储操作对象 ColumnFamilyStore。3)启动定时程序,检查该 ColumnFamily 的 Memtable 设置的 MemtableFlushAfterMinutes 是否已经过期,过期立即写到磁盘。与 Table 相关的类如图 5 所示:

图 5. Table 相关的类图

图 5. Table 相关的类图一 个 Keyspace 对应一个 Table,一个 Table 持有多个 ColumnFamilyStore,而一个 ColumnFamily 对应一个 ColumnFamilyStore。Table 并没有直接持有 ColumnFamily 的引用而是持有 ColumnFamilyStore,这是因为 ColumnFamilyStore 类中不仅定义了对 ColumnFamily 的各种操作而且它还持有 ColumnFamily 在各种状态下数据对象的引用,所以持有了 ColumnFamilyStore 就可以操作任何与 ColumnFamily 相关的数据了。与 ColumnFamilyStore 相关的类如图 6 所示

图 6. ColumnFamilyStore 相关的类

图 6. ColumnFamilyStore 相关的类CommitLog 日志恢复

这里主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的

ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。关于 CommitLog 日志文件的存储格式以及数据如何写到 CommitLog 文件中,将在后面第三部分详细介绍。

启动存储服务

这里是启动过程中最重要的一步。这里将会启动一系列服务,主要包括如下步骤。

  1. 创建 StorageMetadata。StorageMetadata 将包含三个关键信息:本节点的 Token、当前 generation 以及 ClusterName,Cassandra 判断如果是第一次启动,Cassandra 将会创建三列分别存储这些信息并将它们存在在系统表的 LocationInfo ColumnFamily 中,key 是“L”。如果不是第一次启动将会更新这三个值。这里的 Token 是判断用户是否指定,如果指定了使用用户指定的,否则随机生成一个 Token。但是这个 Token 有可能在后面被修改。这三个信息被存在 StorageService 类的 storageMetadata_ 属性中,以便后面随时调用。
  2. GCInspector.instance.start 服务。主要是统计统计当前系统中资源的使用情况,将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。
  3. 启动消息监听服务。这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra 会根据每个消息的类型,做出相应的反应。关于消息的处理将在后面详细介绍。
  4. StorageLoadBalancer.instance.startBroadcasting 服务。这个服务是每个一段时间会收集当前这个节点所存的数据总量,也就是节点的 load 数据。把这个数据更新到本节点的 ApplicationState 中,然后就可以通过这个 state 来和其它节点交换信息。这个 load 信息在数据的存储和新节点加入的时候,会有参考价值。
  5. 启动 Gossiper 服务。在启动 Gossiper 服务之前,将 StorageService 注册为观察者,一旦节点的某些状态发生变化,而这些状态是 StorageService 感兴趣的,StorageService 的 onChange 方法就会触发。Gossiper 服务就是一个定时程序,它会向本节点加入一个 HeartBeatState 对象,这个对象标识了当前节点是 Live 的,并且记录当前心跳的 generation 和 version。这个 StorageMetadata 和前面的 StorageMetadata 存储的 generation 是一致的,version 是从 0 开始的。这个定时程序每隔一秒钟随机向 seed 中定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径。这个消息如何同步,将在后面详细介绍。
  6. 判断启动模式。是否是 AutoBootstrap 模式启动,又是如何判断的,以及应作出那些相应的操作,在前面的第一部分中已有介绍,这里不再赘述。这里主要说一下,当是 Bootstrap 模式启动时,Cassandra 都做了那些事情。这一步很重要,因为它关系到后面的很多操作,对 Cassandra 的性能也会有影响。

这个过程如下:

  1. 通过之前的消息同步获取集群中所有节点的 load 信息
  2. 找出 load 最大的节点的 ip 地址
  3. 向这个节点发送消息,获取其一半 key 范围所对应的 Token,这个 Token 是前半部分值。
  4. 将这个 Token 写到本地节点
  5. 本地节点会根据这个 Token 计算以及集群中的 Token 环,计算这个 Token 应该分摊集群中数据的一个范围(range)这个环应该就是,最大 load 节点的一半 key 的所对应的 range。
  6. 向这个 range 所在的节点请求数据。发送 STREAM-STAGE 类型的消息,要经过 STREAM_REQUEST、STREAM_INITIATE、STREAM_INITIATE_DONE、STREAM_FINISHED 几次握手,最终才将正确的数据传输到本节点。
  7. 数据传输完成时设置 SystemTable.setBootstrapped(true) 标记 Bootstrap 已经启动,这个标记的目的是防止再次重启时,Cassandra 仍然会执行相同的操作。

这个过程可以用下面的时序图来描述:

图 7. StorageService 服务启动时序图

图 7. StorageService 服务启动时序图以上是 AutoBootstrap 模式启动,如果是以非 AutoBootstrap 模式启动,那么启动将会非常简单,这个过程如下:

  1. 检查配置项 InitialToken 有没有指定,如果指定了初始 Token,使用用户指定的 Token,否则将根据 Partitioner 配置项指定的数据分配策略生成一个默认的 Token,并把它写到系统表中。
  2. 更新 generation=generation+1 到系统表中
  3. 设置 SystemTable.setBootstrapped(true),标记启动方式,防止用户再修改 AutoBootstrap 的启动模式。

Cassandra 集群中的节点状态的同步策略

我 们知道 Cassandra 集群中节点是通过自治理来对外提供服务的,它不像 Hadoop 这种 Master/Slave 形式的集群结构,会有一个主服务节点来管理所有节点中的原信息和对外提供服务的负载均衡。这种方式管理集群中的节点逻辑上比较简单也很方便,但是也有其弱 点,那就是这个 Master 容易形成瓶颈,其稳定性也是一种挑战。而 Cassandra 的集群管理方式就是一种自适应的管理方式,集群中的节点没有 Master、Slave 之分,它们都是平等的,每个节点都可以单独对外提供服务,某个节点 Crash 也不会影响到其它节点。但是一旦某个节点的状态发生变化,整个集群中的所有节点都要知道,并且都会执行预先设定好的应对方案,这会造成节点间要发送大量的 消息交换各自状态,这样也增加了集群中状态和数据一致性的复杂度,但是优点是它是一个高度自治的组织,健壮性比较好。

消息交换

那么 Cassandra 是如何做到这么高度自治的呢?这个问题的关键就是它们如何同步各自的状态信息,同步消息的前提是它们有一种约定的消息交换机制。这个机制就是 Gossip 协议,Cassandra 就是通过 Gossip 协议相互交换消息。

前 面在 Cassandra 服务启动时提到了 Gossiper 服务的启动,一旦 Cassandra 启动成功,Gossiper 服务就是一直执行下去,它是一个定时程序。这个服务的代码在 org.apache.cassandra.gms.Gossiper 类中,下面是定时程序执行的关键代码如清单 1 所示:

清单 1. Gossiper.GossipTimerTask.run
 public void run(){ 
   synchronized( Gossiper.instance ){ 
       endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat(); 
       List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); 
       Gossiper.instance.makeRandomGossipDigest(gDigests); 
       if ( gDigests.size() > 0 ){ 
          Message message = makeGossipDigestSynMessage(gDigests); 
          boolean gossipedToSeed = doGossipToLiveMember(message); 
          doGossipToUnreachableMember(message); 
          if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) 
                       doGossipToSeed(message); 
                        doStatusCheck(); 
          } 
       } 
 }

Cassandra 通过向其它节点发送心跳来证明自己仍然是活着的,心跳里面包含有当前的 generation,用来表示有的节点是不是死了又复活的。

本 地节点所保存的所有其它节点的状态信息都被放在了 GossipDigest 集合中。一个 GossipDigest 对象将包含这个节点的 generation、maxVersion 和节点地址。接着将会组装一个 Syn 消息(关于 Cassandra 中的消息格式将在后面介绍),同步一次状态信息 Cassandra 要进行三次会话,这三次会话分别是 Syn、Ack 和 Ack2。当组装成 Syn 消息后 Cassandra 将随机在当前活着的节点列表中选择一个向其发送消息。

Cassandra 中的消息格式如下:

  1. header:消息头 org.apache.cassandra.net.Header,消息头中包含五个属性:消息编号(messageId)、发送方地址(from)、消息类型(type)、所要做的动作(verb)和一个 map 结构(details)
  2. body:消息内容,是一个 byte 数组,用来存放序列化的消息主体。

可以用下面的图 8 更形象的表示:

图 8. message 消息结构

图 8. message 消息结构当组装成一个 message 后,再将这个消息按照 Gossip 协议组装成一个 pocket 发送到目的地址。关于这个 pocket 数据包的结构如下:

  1. header:包头,4 bytes。前两个是 serializer type;第三个是是否压缩包,默认是否;最后一个 byte 表示是否是 streaming mode。
  2. body:包体,message 的序列化字节数据。

这个 pocket 的序列化字节结构如下:

图 9. 通信协议包的结构

图 9. 通信协议包的结构当 另外一个节点接受到 Syn 消息后,反序列化 message 的 byte 数组,它会取出这个消息的 verb 执行相应的动作,Syn 的 verb 就是解析出发送节点传过来的节点的状态信息与本地节点的状态信息进行比对,看哪边的状态信息更新,如果发送方更新,将这个更新的状态所对应的节点加入请求 列表,如果本地更新,则将本地的状态再回传给发送方。回送的消息是 Ack,当发送方接受到这个 Ack 消息后,将接受方的状态信息更新的本地对应的节点。再将接收方请求的节点列表的状态发送给接受方,这个消息是 Ack2,接受方法接受到这个 Ack2 消息后将请求的节点的状态更新到本地,这样一次状态同步就完成了。

不管是发送方还是接受方每当节点的状态发生变化时都将通知感兴趣的观察者做出相应的反应。消息同步所涉及到的类由下面图 10 的关系图表示:

图 10. 节点状态同步相关类结构图

图 10. 节点状态同步相关类结构图节点的状态同步操作有点复杂,如果前面描述的还不是很清楚的话,再结合下面的时序图,你就会更加明白了,如图 11 所示:

图 11. 节点状态同步时序图

图 11. 节点状态同步时序图上图中省去了一部分重复的消息,还有节点是如何更新状态也没有在图中反映出来,这些部分在后面还有介绍,这里也无法完整的描述出来。

状态更新

前 面提到了消息的交换,它的目的就是可以根据交换的信息更新各自的状态。Cassandra 更新状态是通过观察者设计模式来完成的,订阅者被注册在 Gossiper 的集合中,当交换的消息中的节点的状态和本地节点不一致时,这时就会更新本地状态,更改本地状态本身并没有太大的意义,有意义的是状态发生变化这个动作, 这个动作发生时,就会通知订阅者来完成这个状态发生变化后应该做出那些相应的改动,例如,发现某个节点已经不在集群中时,那么对这个节点应该要在本地保存 的 Live 节点列表中移去,防止还会有数据发送到这个无法到达的节点。和状态相关的类如下:

图 12. 更新状态相关的类

图 12. 更新状态相关的类从 上图可以看出节点的状态信息由 ApplicationState 表示,并保存在 EndPointState 的集合中。状态的修改将会通知 IendPointStateChangeSubscriber,继而再更新 Subscriber 的具体实现类修改相应的状态。

下面是新节点加入的时序图,如图 13 所示:

图 13. 新加入节点的时序图

图 13. 新加入节点的时序图上 图基本描述了 Cassandra 更新状态的过程,需要说明的点是,Cassandra 为何要更新节点的状态,这实际上就是关于 Cassandra 对集群中节点的管理,它不是集中管理的方式,所以每个节点都必须保存集群中所有其它节点的最新状态,所以将本节点所持有的其它节点的状态与另外一个节点交 换,这样做有一个好处就是,并不需要和某个节点通信就能从其它节点获取它的状态信息,这样就加快了获取状态的时间,同时也减少了集群中节点交换信息的频 度。另外,节点状态信息的交换的根本还是为了控制集群中 Cassandra 所维护的一个 Token 环,这个 Token 是 Cassandra 集群管理的基础。因为数据的存储和数据流动都在这个 Token 环上进行,一旦环上的节点发生变化,Cassandra 就要马上调整这个 Token 环,只有这样才能始终保持整个集群正确运行。

到底哪些状态信息对整个集群是重要的,这个在 TokenMetadata 类中,它主要记录了当前这个集群中,哪些节点是 live 的哪些节点现在已经不可用了,哪些节点可能正在启动,以及每个节点它们的 Token 是多少。而这些信息都是为了能够精确控制集群中的那个 Token 环。只要每个集群中每个节点所保存的是同一个 Token 环,整个集群中的节点的状态就是同步的,反之,集群中节点的状态就没有同步。

当然 Cassandra 用这种集群管理方式有其优点,但也存在一些缺点。例如现在部分使用者在大规模集群(上千台服务器)的使用中发现不太稳定,这个跟 gossip 协议的本身也有关,所以这是 Cassandra 社区要致力解决的问题。

总结

本 文从配置文件开始介绍了 Cassandra 的启动过程,以及 Cassandra 是如何管理集群的。实际上 Cassandra 的启动和集群的管理是连在一起的,启动过程中的很多步骤都是集群管理的一部分,如节点以 AutoBootstrap 方式启动,在启动过程中就涉及到数据的重新分配,这个分配的过程正是在动态调整集群中 Token 环的过程。所以当你掌握了 Cassandra 是如何动态调整这个 Token 环,你也就掌握了 Cassandra 的集群是如何管理的了。下一篇将详细介绍 Cassandra 内部是如何组织数据和操作数据。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门