标签归档:nosql

Riak与MongoDB的对比

最近在学习riak,搜索了一些文章, 尽管比较老了, 但是还是保存下来, 学习一下

原文地址:http://blog.nosqlfan.com/html/2705.html

本文来自Riak所属的Basho公司的技术WiKi,文章从几个方面对Riak和MongoDB进行了对比,这不是一篇PK文章,NoSQLFan翻译给大家,希望本文能让您对Riak和MongoDB有更多的了解。

来源地址:wiki.basho.com

riak-001-2

机制与概念上的异同

Riak和MongoDB在使用特性上有下面几个相同点:

  • 都是文档型的数据模型
  • 具体存储方式都不是以文档型进行存储
  • 写性能及写吞吐都很高

虽然上面几点看起来二者挺像,但在内部实现上两者却是相去甚远。比如Riak是一个分布式的存储,而MongoDB可以理解为是一个单一的数据库系 统,同时加上了Replication和Sharding功能。MongoDB的内部数据结构上还是文档,而Riak是不用关心存储内容的二进制。 MongoDB提供GridFS机制来存储二进制内容,而Riak的二进制内容与普通内容存储方式一样。MongoDB的写入方式是 in-place方式,修改一个文档是原子性的,而Riak是通过quormNRW的机制保证写入操作安全性的。

  • http://www.mongodb.org/display/DOCS/Home
  • http://blog.mongodb.org/post/248614779/fast-updates-with-mongodb-update-in-place
  • http://www.mongodb.org/display/DOCS/Updating#Updating-Update

复制备份及横向扩展

Riak主要通过一致性hash算法来实现其数据的复制及分片,一致性hash机制是Riak的核心思想之一。在Riak中,每个节点都是对等的,所以其不存在单点故障。

  • Add Nodes to Riak
  • Consistent Hashing

而MongoDB在1.6版本后也推出了强有力的复制备份功能

1.主从复制

  • http://www.mongodb.org/display/DOCS/Master+Slave

2.Replica Sets

Replica Sets是MongoDB的重头功能之一,它让几个节点组成一个集合,在这个集合中的节点中有一个主机提供写入,其它节点会从主机上备份数据,主机故障后会自动在从机中选取产生新的主机。

  • http://www.mongodb.org/display/DOCS/Replica+Sets

而在数据分片上,MongoDB提供了一种叫auto-sharding的机制,使数据在多个节点间可以均匀分布,提供动态添加删除节点的功能。

  • http://www.mongodb.org/display/DOCS/Sharding
  • http://www.mongodb.org/display/DOCS/Sharding+Introduction
  • http://en.wikipedia.org/wiki/Sharding

数据分片的自动调整

Riak基于一致性hash策略,在有节点从hash环上移除后,其数据会自动分摊整个环上的其它节点上。其负载也就被均匀分摊了。而MongoDB也支持在Sharding中摘除节点后的自动数据迁移,具体见此文:

  • http://www.mongodb.org/display/DOCS/Configuring+Sharding#ConfiguringSharding-Removingashard

性能对比

Riak的存储引擎本身是作为插件的形式挂载的,Riak支持BitCask,InnoDB和LevelDB等存储引擎,使用默认的BitCask 引擎,你可以在性能和数据持久化的选择上进行调节。相比之下,MongoDB由于采用了mmap机制,如果索引和热数据能被内存完全装下,那么其操作基本 上相当于内存操作,所以MongoDB的当机性能是相当高的。

  • http://www.mongodb.org/display/DOCS/Durability+and+Repair
  • http://blog.mongodb.org/post/381927266/what-about-durability

数据模型

Riak的数据存储没有特定的格式需求,它允许你存储不同体积的文档型数据,另外Riak还可以在数据间创建link来为数据建立关联。

  • Data Storage in Riak

MongoDB的数据是以BSON格式存储的,你可以在MongoDB中存储任意JSON格式的文档,在存储时会被转成BSON进行存储,另外二进制数据也可以转换成相应的一种BSON数据类型进行存储,GridFS正是基于这种类型来实现的。

查询语句及分布式操作

Riak只提供key-value式的数据操作接口,它支持key-value数据的各种操作,也支持link-walking和 MapReduce操作,像二级索引这种东西,在Riak里是不存在的,因为Riak根本不关心它存的数据是什么样的,value对它来说只是一串数据。

  • https://wiki.basho.com/display/RIAK/MapReduce

MongoDB提供与关系型数据库类似的各种数据操作(除了关联查询),其索引机制更是与关系型数据库几乎一模一样。同时MongoDB也提供MapReduce的操作接口,用以处理一些批量任务。

  • http://www.mongodb.org/display/DOCS/Indexes
  • http://www.mongodb.org/display/DOCS/Querying
  • http://www.mongodb.org/display/DOCS/MapReduce

冲突解决策略

Riak使用vector-clock机制来进行冲突检测,所以其冲突解决的选择权是留给应用层来做的。应用层可以决定两个用户对同一行数据的更新哪一个会胜出。

  • Vector Clocks

MongoDB使用的是最近更新者胜出的方式,相对来说更简单直接。

  • http://www.mongodb.org/display/DOCS/Atomic+Operations

API

Riak提供给非Erlang的客户端两种操作方式

  • 1. HTTP
  • 2. Protocol Buffers

MongoDB的协议是自己制定的一套特有协议,其客户端由其所属的10gen公司开发并维护,基本主流的语言都有相应的官方客户端。

  • http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol

Riak学习入门

学习nosql过程中, 偶然发现了riak,以前没接触过, 到网络上查找了写文章, 把文章转帖到这里, 方便阅读。

NoSQL数据库 这里有几个nosql的介绍

riak的下载地址:http://docs.basho.com/riak/latest/downloads/

一. Riak 简介

来源:http://www.ibm.com/developerworks/cn/opensource/os-riak1/

使用 Riak 的 HTTP 界面存储和检索数据

典型的现代关系数据库在某些类型的应用程序中表现平平,难以满足如今的互联网应用程序的性能和可扩展 性要求。因此,需要采用不同的方法。在过去几年中,一种新的数据存储类型变得非常流行,通常称为 NoSQL,因为它可以直接解决关系数据库的一些缺陷。Riak 就是这类数据存储类型中的一种。

Riak 并不是惟一的一种 NoSQL 数据存储。另外两种较流行的数据存储是 MongoDB 和 Cassandra。尽管在许多方面十分相似,但是它们之间也存在明显的不同。例如,Riak 是一种分布式系统,而 MongoDB 是一种单独的系统数据库,也就是说,Riak 没有主节点的概念,因此在处理故障方面有更好的弹性。尽管 Cassandra 同样是基于 Amazon 的 Dynamo 描述,但是它在组织数据方面摒弃了向量时钟和相容散列等特性。Riak 的数据模型更加灵活。在 Riak 中,在第一次访问 bucket 时会动态创建这些 bucket;Cassandra 的数据模型是在 XML 文件中定义的,因此在修改它们过后需要重启整个群集。

Riak 的另一个优势是它是用 Erlang 编写的。而 MongoDB 和 Cassandra 是用通用语言(分别为 C++和 Java)编写,因此 Erlang 从一开始就支持分布式、容错应用程序,所以更加适用于开发 NoSQL 数据存储等应用程序,这些应用程序与使用 Erlang 编写的应用程序有一些共同的特征。

Map/Reduce 作业只能使用 Erlang 或 JavaScript 编写。对于本文呢,我们选择使用 JavaScript 编写 mapreduce 函数,但是也可以用 Erlang 编写它们。虽然 Erlang 代码的执行速度可能稍快一些,然而我们选择 JavaScript 代码的理由是它的受众更广。参阅 参考资料 中的链接,详细了解 Erlang。

开始

如果您希望尝试本文中的一些示例,则需要在您的系统中安装 Riak(参阅 参考资料)和 Erlang。

您 还需要构建一个包含三个节点的群集并在您的本地机器上运行它。Riak 中保存的所有数据都被复制到群集的大量节点中。数据所在的 bucket 的一个属性 (n_val) 决定了将要复制的节点的数量。该属性的默认值为 3,因此,要完成本示例,我们需要创建一个至少包含三个节点的群集(之后您可以创建任意数量的节点)。

下载了源代码后,您需要进行构建。基本步骤如下:

  1. 解压缩源代码:$ tar xzvf riak-1.0.1.tar.gz
  2. 修改目录:$ cd riak-1.0.1
  3. 构建:$ make all rel

这 将构建 Riak (./rel/riak)。要在本地运行多个节点,则需要生成 ./rel/riak 的副本,对每个额外的节点使用一个副本。将 ./rel/riak 复制到 ./rel/riak2、./rel/riak3 等地方,然后对每个副本执行下面的修改:

  • 在 riakN/etc/app.config 中,修改下面的值:http{} 部分中指定的端口,handoff_port 和 pb_port,将它们修改为惟一值
  • 打开 riakN/etc/vm.args 并修改名称,同样是修改为惟一值,例如 -name riak2@127.0.0.1

现在依次启动每个节点,如 清单 1 所示。

清单 1. 清单 1. 启动每个节点
$ cd rel
$ ./riak/bin/riak start
$ ./riak2/bin/riak start
$ ./riak3/bin/riak start

最后,将节点连接起来形成群集,如 清单 2 所示。

清单 2. 清单 2. 形成群集
$ ./riak2/bin/riak-admin join riak@127.0.0.1
$ ./riak3/bin/riak-admin join riak@127.0.0.1

您现在应该创建了一个在本地运行的 3 节点群集。要进行测试,运行如下命令: $ ./riak/bin/riak-admin status | grep ring_members

您应当看到,每个节点都是刚刚创建的群集的一部分,例如 ring_members : ['riak2@127.0.0.1','riak3@127.0.0.1','riak@127.0.0.1']

Riak API

目 前有三种方式可以访问 Riak:HTTP API(RESTful 界面)、Protocol Buffers 和一个原生 Erlang 界面。提供多个界面使您能够选择如何集成应用程序。如果您使用 Erlang 编写应用程序,那么应当使用原生的 Erlang 界面,这样就可以将二者紧密地集成在一起。其他一些因素也会影响界面的选择,比如性能。例如,使用 Protocol Buffers 界面的客户端的性能要比使用 HTTP API 的客户端性能更高一些;从性能方面讲,数据通信量变小,解析所有这些 HTTP 标头的开销相对更高。然而,使用 HTTP API 的优点是,如今的大部分开发人员(特别是 Web 开发人员)非常熟悉 RESTful 界面,再加上大多数编程语言都有内置的原语,支持通过 HTTP 请求资源,例如,打开一个 URL,因此不需要额外的软件。在本文中,我们将重点介绍 HTTP API。

所有示例都将使用 curl 通过 HTTP 界面与 Riak 交互。这样做是为了更好地理解底层的 API。许多语言都提供了大量客户端库,在开发使用 Riak 作为数据存储的应用程序时,应当考虑使用这些客户端库。客户端库提供了与 Riak 连接的 API,可以轻松地与应用程序集成;您不必亲自编写代码来处理在使用 curl 时出现的响应。

API 支持常见的 HTTP 方法:GETPUTPOSTDELETE,它们将分别用于检索、更新、创建和删除对象。我们稍后将依次介绍每一种方法。

存储对象

您可以将 Riak 看成是创建键(字符串)与值(对象)的分布式映射。Riak 将值保存在 bucket 中。在保存对象之前,不需要显式地创建 bucket;如果将对象保存到一个不存在的 bucket 中,则会自动创建该 bucket。

Bucket 在 Riak 中是一个虚拟概念,主要是为了对相关对象分组而存在。bucket 还具有其他一些属性,这些属性的值定义了 Riak 对存储在其中的对象的处理。下面是 bucket 属性的一些示例:

  • n_val:对象在群集内进行复制的次数
  • allow_mult:是否允许并发更新

您可以通过对 bucket 发出 GET 请求查看 bucket 的属性(及其当前值)。

要存储对象,我们将对 清单 3 所示的其中一个 URL 执行 HTTP POST

清单 3. 清单 3. 存储对象
POST -> /riak/<bucket> (1)
POST -> /riak/<bucket>/<key> (2)

键可以由 Riak (1)自动分配,或由用户 (2) 定义。

当使用用户定义的键存储对象时,也可以向 (2) 执行一个 HTTP PUT 操作来创建对象。

Riak 的最新版本还支持以下 URL 格式:/buckets/<bucket>/keys/<key>,但是在本文中,我们将使用更旧的格式来维持与早期 Riak 版本的向后兼容性。

如果没有指定键,Riak 会自动为对象分配一个键。例如,我们将在 bucket “foo” 中存储一个明文对象,并且不会显式指定键(参见 清单 4)。

清单 4. 清单 4. 在不显式指定键的情况下存储一个明文对象
$ curl -i -H "Content-Type: plain/text" -d "Some text" \

http://localhost:8098/riak/foo/

HTTP/1.1 201 Created
Vary: Accept-Encoding
Location: /riak/foo/3vbskqUuCdtLZjX5hx2JHKD2FTK
Content-Type: plain/text
Content-Length: ...

通过检查 Location 标头,您可以看到 Riak 分配给对象的键。这样做不容易记忆,因此另一种选择是让用户提供键。让我们创建一个艺术家 bucket,并添加一个叫做 Bruce 的艺术家(参见 清单 5)。

清单 5. 清单 5. 创建一个艺术家 bucket 并添加一个艺术家
$ curl -i -d '{"name":"Bruce"}' -H "Content-Type: application/json" \

http://localhost:8098/riak/artists/Bruce

HTTP/1.1 204 No Content
Vary: Accept-Encoding
Content-Type: application/json
Content-Length: ...

如果使用我们指定的键成功存储了对象,我们将从服务器得到一个 204 No Content 响应。

在本例中,我们将对象的值保存为 JSON,但是它既可以是明文格式,也可以是其他格式。在存储对象时,需要注意正确设置 Content-Type 标头。例如,如果希望存储一个 JPEG 图像,那么您必须将内容类型设置为 image/jpeg。

检索对象

要检索已存储的对象,使用您希望检索的对象的键对 bucket 运行 GET 方法。如果对象存在,则会在响应的正文中返回对象,否则服务器会返回 404 Object Not Found 响应(参见 清单 6)。

清单 6. 清单 6. 在 bucket 上执行一个 GET 方法
$ curl http://localhost:8098/riak/artists/Bruce

HTTP/1.1 200 OK
...
{ "name" : "Bruce" }

更新对象

在更新对象时,和存储对象一样,需要用到 Content-Type 标头。例如,让我们来添加 Bruce 的别名,如 清单 7 所示。

清单 7. 清单 7. 添加 Bruce 的别名
$ curl -i -X PUT -d '{"name":"Bruce", "nickname":"The Boss"}' \
-H "Content-Type: application/json" http://localhost:8098/riak/artists/Bruce

如 前所述,Riak 自动创建了 bucket。这些 bucket 拥有一些属性,其中一个属性为 allow_mult,用于确定是否允许执行并发写操作。默认情况下,该属性被设置为 false;但是,如果允许进行并发更新,则需要向每个更新发送 X-Riak-Vclock 标头。应该将该标头的值设置为与客户端最后一次读取对象时看到的值相同。

Riak 使用向量时钟 (vector clock) 判断修改对象的原因。向量时钟的工作原理超出了本文的讨论范围,但是,在允许执行并发写操作时,可能会出现冲突,这时需要使用应用程序来解决这些冲突(参阅 参考资料)。

删除对象

删除对象的操作使用了一个与前面的命令类似的模式,我们只需要对希望删除的对象所对应的 URL 执行一个 HTTP DELETE 方法: $ curl -i -X DELETE http://localhost:8098/riak/artists/Bruce

如果成功删除对象,我们会从服务器获得一个 204 No Content 响应;如果试图删除的对象不存在,那么服务器会返回一个 404 Object Not Found 响应。

目前为止,我们已经了解了如何通过将对象与特定键相关联来存储对象,稍后可以使用此特定键来检索对象。如果能够将这个简单的模型进行扩展以表示对象如何(以及是否)与其他对象相关,那么这会非常有用。我们当然可以实现这一点,并且 Riak 是使用链接实现的。

那么,什么是链接?链接允许用户创建对象之间的关系。如果熟悉 UML 类图的话,您可以将链接看作是对象之间的某种关联,并用一个书签说明这种关系;在关系数据库中,该关系被表示为一个外键。

通过 “Link” 标头,以将链接 “依附” 到对象上。下面演示了链接标头看起来是什么样子。例如,关系的目标(即我们准备进行链接的对象)是尖括号中的内容。关系内容(本例中为 “performer”)是通过 riaktag 属性来表示的:Link: </riak/artists/Bruce>; riaktag="performer"

现在让我们添加一些专辑,并将它们与专辑的表演者艺术家 Bruce 关联起来(参见 清单 8)。

清单 8. 清单 8. 添加一些专辑
$ curl -H "Content-Type: text/plain" \
-H 'Link: </riak/artists/Bruce> riaktag="performer"' \
-d "The River" http://localhost:8098/riak/albums/TheRiver

$ curl -H "Content-Type: text/plain" \
-H 'Link: </riak/artists/Bruce> riaktag="performer"' \
-d "Born To Run" http://localhost:8098/riak/albums/BornToRun

现在我们已经设置了一些关系,接下来要通过 link walking 查询它们,link walking 是一个用于查询对象关系的进程。例如,要查找表演 River 专辑的艺术家,您应当这样做:$ curl -i http://localhost:8098/riak/albums/TheRiver/artists,performer,1

末尾的位是链接说明。链接查询的外观就是这个样子。第一个部分(artists)指定我们应当执行查询的 bucket。第二个部分(performer)指定了我们希望用于限制结果的标签,最后的 1 部分表示我们希望包含这个查询阶段的结果。

还可以发出过渡性查询。假设我们在专辑和艺术家之间建立了关系,如 图 1 所示。

图 1. 图 1. 专辑和艺术家之间的关系

图中的四个角为不同的专辑,Bruce 位于图中心,箭头表示 Bruce 与每个专辑的关系通过执行下面的命令,可以发出 “哪些艺术家与表演 The River 专辑的艺术家合作过” 之类的查询:$ curl -i http://localhost:8098/riak/albums/TheRiver/artists,_,0/artists,collaborator,1。链接说明中的下划线的作用类似于通配符,表示我们不关心具体的关系是什么。

运行 Map/Reduce 查询

Map/Reduce 是一个由 Google 推广的框架,用于在大型数据集上同时运行分布式计算。Riak 还提供 Map/Reduce 支持,它允许对群集中的数据运行功能更强大的查询。

Map/Reduce 函数包括一个 map 阶段和一个 reduce 阶段。map 阶段应用于某些数据并生成 0 个或多个结果;这在编程中类似于通过列表中的每一项映射函数。map 阶段是并行发生的。reduce 阶段将获取 map 阶段的所有结果,并将它们组合起来。

例如,计算某个单词在大量文档中出现的次数。每个 map 阶段都将计算每个单词在特定文档中出现的次数。这些中间计数在计算完后将发送到 reduce 函数,然后计算总数并得出在所有文档中的次数。参见 参考资料,获得有关 Google 的 Map/Reduce 文章的链接。

示例:分布式 grep

对于本文,我们将开发一个 Map/Reduce 函数,该函数将对 Riak 中存储的一组文档执行一次分布式 grep。和 grep 一样,最终的输出是一些匹配所提供模式的行。此外,每个结果还将表示文档中出现匹配时所在位置的行号。

要执行一个 Map/Reduce 查询,我们将对 /mapred 资源执行 POST 操作。请求的内容是查询的 JSON 表示;和前面的例子一样,必须提供 Content-Type 标头,并且始终将其设置为 application/json。清单 9 显示了我们为执行分布式 grep 而做的查询。后面将依次讨论查询的每一个部分。

清单 9. 清单 9. 示例 Map/Reduce 查询
{
  "inputs": [["documents","s1"],["documents","s2"]],
  "query": [
    { "map": { 
        "language": "javascript", 
        "name": "GrepUtils.map", 
        "keep": true, 
        "arg": "[s|S]herlock" } 
    },
    { "reduce": { "language": "javascript", "name": "GrepUtils.reduce" } }
  ]
}

每个查询都包含若干输入,例如,我们希望对之执行计算的文档,在 map 和 reduce 阶段运行的函数的名称。也可以直接在查询中包含 mapreduce 函数的源代码,只需要使用源属性替代名称即可,但是我在本例中没有这样做;然而,要使用指定的函数,则需要对 Riak 的默认配置进行一些修改。将清单 9 中的代码保存到某个目录中。对于群集中的每个节点,找到文件 etc/app.config,打开它并将属性 property js_source_dir 设置为您用于保存代码的目录。您需要重启群集中的所有节点使变更生效。

清单 10 中的代码包含将在 map 和 reduce 阶段执行的函数。map 函数将查看文档的每一行,确定是否与提供的模式(arg 参数)匹配。本例中的 reduce 函数并不会执行太多操作;它类似于一个恒等函数,仅仅用于返回输入。

清单 10. 清单 10. GrepUtils.js
var GrepUtils = {       
    map: function (v, k, arg) {
        var i, len, lines, r = [], re = new RegExp(arg);
        lines = v.values[0].data.split(/\r?\n/);  
        for (i = 0, len = lines.length; i < len; i += 1) {
            var match = re.exec(lines[i]);
            if (match) {
                r.push((i+1) + “. “ + lines[i]);
            }
        }
        return r;
    }, 
    reduce: function (v) {
        return [v];
    }    
};

在运行查询之前,我们需要一些数据。我从 Project Gutenberg Web 站点下载了 Sherlock Holmes 电子图书(参见 参考资料)。第一个文本存储在键 “s1” 下的 “documents” bucket 中;第二个文本位于同一个 bucket 中,键为 “s2”。

清单 11 展示了如何将这类文档上传到 Riak。

清单 11. 清单 11. 将文档上传到 Riak
$ curl -i -X POST http://localhost:8098/riak/documents/s1 \
-H “Content-Type: text/plain” --data-binary @s1.txt

上传文档后,我们现在可以对文档执行搜索。在本例中,我们想输出匹配常规表达式 "[s|S]herlock"(参见 清单 12)的所有行。

清单 12. 清单 12. 搜索文档
$ curl -X POST -H "Content-Type: application/json" \
http://localhost:8098/mapred --data @-<<\EOF
{
  "inputs": [["documents","s1"],["documents","s2"]],
  "query": [
    { "map": { 
        "language":"javascript", 
        "name":"GrepUtils.map",  
        "keep":true, 
        "arg": "[s|S]herlock" } 
    },
    { "reduce": { "language": "javascript", "name": "GrepUtils.reduce" } }
  ]
}
EOF

查询中的 arg 属性包含我们希望在文档中对其执行 grep 查询的模式;该值被作为 arg 参数传递给 map 函数。

清单 13 中显示了对样例数据运行 Map/Reduce 作业所产生的输出。

清单 13. 清单 13. 运行 Map/Reduce 作业的样例输出
[["1. Project Gutenberg's The Adventures of Sherlock Holmes, by Arthur Conan 
Doyle","9. Title: The Adventures of Sherlock Holmes","62. To Sherlock Holmes 
she is always THE woman. I have seldom heard","819. as I had pictured it from  
Sherlock Holmes' succinct description,","1017. \"Good-night, Mister Sherlock 
Holmes.\"","1034. \"You have really got it!\" he cried, grasping Sherlock 
Holmes by" …]]

流化 Map/Reduce

在关于 Map/Reduce 的最后部分中,我们将简单地了解 Riak 的 Map/Reduce 流化 (streaming) 特性。该特性对于包含 map 阶段并需要花一些时间完成这些阶段的作业非常有用,因为对结果进行流化允许您在生成每个 map 阶段的结果后立即访问它们,并且在执行 reduce 阶段之前访问它们。

我们可以对分布式 grep 查询应用这个特性。本例中的 reduce 步骤并没有多少实际操作。事实上,我们完全可以去掉 reduce 阶段,只需要将每个 map 阶段的结果直接发送到客户端即可。为了实现此目标,需要对查询进行修改,删除 reduce 步骤,将 ?chunked=true 添加到 URL 末尾,表示我们希望对结果进行流化(参见 清单 14)。

清单 14. 清单 14. 修改查询以流化结果
$ curl -X POST -H "Content-Type: application/json" \
http://localhost:8098/mapred?chunked=true --data @-<<\EOF
{ 
  "inputs": [["documents","s1"],["documents","s2"]],
  "query": [
        { "map": {
            "language": "javascript", 
            "name": "GrepUtils.map",
            "keep": true, "arg": "[s|S]herlock" } }
  ]
}
EOF

在完成 map 阶段后,会将每个 map 阶段的结果(在本例中为匹配查询字符串的行)返回给客户端。该方法可用于需要在查询的中间结果可用时就对它们进行处理的应用程序。

Riak 是基于 Amazon 的 Dynamo 文件中记载的规则的一种开源的、高度可扩展的键值存储库。Riak 非常易于部署和扩展。可以无缝地向群集添加额外的节点。link walking 之类的特性以及对 Map/Reduce 的支持允许实现更加复杂的查询。除了 HTTP API 外,Riak 还提供了一个原生 Erlang API 以及对 Protocol Buffer 的支持。在本系列的第 2 部分中,我们将探讨各种不同语言中的大量客户端库,并展示如何将 Riak 用作一种高度可扩展的缓存。

 

二. Riak 应用

来源:http://www.ibm.com/developerworks/cn/opensource/os-riak2/

将 Riak 集成为 Web 应用程序的重负荷缓存服务器, 使用 Riak 作为一个缓存服务器,帮助缓解应用程序和数据库服务器上的负载

某些类型的数据表现出使自己适合于被缓存的访问模式。例如,在线投注站点具有一个有趣的负载特征:用户常常请求提供赔率和投注单,而这些信息相对来说很少被更新。

本系列中的其他文章

查看 Riak 简介 系列中的更多文章。

这些情况需要具有以下特征的高度可扩展的系统,以应对高负荷的要求:

  • 该系统充当一个可靠的缓存,以减少对应用服务器和数据库的需求
  • 缓存项目是可搜索的,所以您可以更新它们或使它们失效
  • 任何解决方案都能被轻松地集成到现有站点

Riak 对于这样的解决方案是一个不错的选择。

Riak 对于实现这样一个缓存解决方案并非惟一的候选者;有许多不同的缓存可用。其中较为流行的一种是 memcached;然而,与 Riak 不同,memcached 不提供任何类型的数据复制,这意味着,如果保存特定项目的服务器停机,该项目会变得不可用。另一种流行的键/值存储是 Redis,它也可作为缓存使用,通过主从配置支持复制;Riak 没有一个主人(节点)的概念,因此,这使系统对故障更有弹性。

网站集成

任 何解决方案都需要很容易地被集成到现有网站。能够做到这一点很重要,因为并不一定有可能(或者甚至有需要)将您现有的全部数据迁移到 Riak。如前所述,某些类型的数据适合缓存,在一个键/值存储的情况下,如果您通过一个主键访问数据则更是如此。这是一种更适合迁移到 Riak 的数据。

正如在本系列的有关 Riak 的 Riak 简介,第 1 部分:与语言无关的 HTTP API 所述,PHP、Ruby 和 Java™ 等语言中提供了大量客户端库;这些库提供一个 API,使集成 Riak 非常简单。在本例中,我演示了 PHP 库的使用,以展示如何将 Riak 与现有网站集成。

图 1 显示了本例需要考虑的设置。我忽略了负载均衡、防火墙等细节。在本例中,服务器本身只是安装了一个 LAMP 堆栈的简单的前端箱。

我将假设,Riak 仅在内部使用(不能从外面访问它),且在一个非敌对的环境中运行,所以不存在身份验证等与安全相关的问题。该假设并不是像它看起来那么差劲,因为不管怎样 Riak 并没有任何内置的授权;您真的应该将身份验证等安全措施委托给应用程序。

图 1. 一个简单的网站集成

该图显示了服务器如何与关系数据库及 Riak 集群进行交互下面是一个基本示例,演示您可以如何将 Riak 集成到您的现有网站。您将创建一个简单的表单,在提交表单时,根据在表单中输入的值,该表单将使用 PHP 客户端存储 Riak 中的对象。

图 2 显示了一个简单的表单示例,管理员可能会使用它在系统中创建一个投注项。用 HTML 创建该表单,并让它对 清单 1 中的 PHP 脚本执行一个 POST;您可以将本文所附的 源代码 中的类似表单作为一个起点。表单中输入的 “key” 字段将被用作在桶中存储的对象的键。

图 2. 创建投注的示例表单

带有 Key、Odds 和 Description 输入字段及一个 Create 按钮的表单屏幕截图清单 1 的示例 PHP 代码显示了如何使用 PHP 客户端库来集成 Riak。将 PHP 客户端库路径(在 require_once 中指定)更改为您安装它的位置。在本例中,我只是将它与 PHP 脚本放在同一目录中。默认情况下,所有的客户端库都期待在端口 8098 上提供 Riak。

清单 1. 集成 Riak 的示例 PHP 代码
<?php

require_once('./riak.php');

# Could do check here to see if the current user has the
# appropriate credentials ? delegated to application.

$client = new RiakClient('192.168.1.1', 8098);
$bucket = $client->bucket('odds');

$bet = $bucket->newObject($_POST['key']);        
$data = array(
    'odds' => $_POST['odds'],
    'description' => $_POST['description']
);
$bet->setData($data);

# Save the object to Riak
$bet->store();

echo "Thanks!";
?>

将代码保存为一个 PHP 文件(按您喜欢的方式命名),将其和表单上传到您的网站上的某个位置,例如,http://www.yoursite.com/riak- test.php。填写示例表单,并提交它。为了证明它有效,尝试使用您在创建项目时在表单中输入的键直接从 Riak 中检索(参见 清单 2)。

清单 2. 从 Riak 中检索项目
$ curl -i http://localhost:8098/riak/odds/<key>
...
{ "odds":"", "description":"" }

虽然该集成示例使用了 PHP 客户端,但其方法与 Java 或 Ruby on Rails 等其他语言或应用程序框架类似。

直接向请求提供服务

除了使用客户端库将 Riak 集成到当前设置外,还可以从 Riak 向用户请求直接提供服务,并将它用作一个简单的 HTTP 引擎。为了演示这一点,我将创建一个简单的演示,向您展示如何从 Riak 直接请求页面。

下载本文的 源代码。请确保 Riak 正在运行,然后执行脚本 load.sh。这个脚本会将所有的 HTML 和 JavaScript 文件复制到一个名称为 demo 的桶中。本例使用 JavaScript 客户端。

要查看演示,请在您的浏览器中打开以下 URL:http://localhost:8098/riak/demo/demo.html

如果您在表单中输入了一些值来创建一个投注,并提交了表单,则会将一个 JSON 对象存储在 Riak 中。对象的属性将与表单中的字段对应。您会被重定向到一个显示您刚刚创建的对象值的页面。

清单 3 显示通过您输入的值来创建对象的代码。keyoddsdescription 等值来自在表单中输入的值。

清单 3. JavaScript 客户端库在 Riak 中的示例用法
client.bucket("odds", function(bucket) {
    var key = $('#key').val();
    bucket.get_or_new(key, function(status, object) {
        object.contentType = 'application/json';
        object.body = { 'odds': $('#odds').val(), 'description': $('#desc').val() };
        object.store(function(status, object, request) {
            if (status == 'ok') {
                window.location = "http://localhost:8098/riak/odds/"+key;
            } else {
            alert("Failed to create object.");
        }
        }); 
    });
});

如前所述,我假设,Riak 在一个可信的环境中运行。在这种情况下,Riak 中用于存储和检索项目所添加的页面就不会产生安全问题;但是,您并不希望这种功能在没有某种形式的身份验证的前提下就完全暴露在 Internet 中。

虽 然这是一个简单的示例,但它使您了解到了 Riak 如何可以直接向页面请求提供服务。例如,您可以使用 JSONP 或跨源资源共享(AJAX 请求被相同的域策略限制在页面所驻留的同一台服务器上)等技术,也可以代理通过服务器向 Riak 发送的请求,从而在您现有的 Web 页面中直接包括存储在 Riak 中的数据,以获取所需的数据。

使用 Riak 作为缓存

缓 存用于提供数据的快速访问。如果缓存中包含了请求的数据(缓存命中),应用程序可以通过从缓存中读取值来快速向请求提供服务,这比从数据库中检索值更快。 如果缓存中没有数据(缓存未命中),那么应用程序通常必须在数据库中检索数据。一般情况下,您可以从缓存中服务的请求越多,系统将会越快。Riak 具有多项特性,这使其成为缓存解决方案实现的一个不错的选择。

其中一个这样的 Riak 特性是其可插拔的 (pluggable) 存储后端;存储后端决定如何存储数据。有若干个可用的存储后端,但我不打算在这里全部一一介绍(有关更多信息,请参阅 参考资料)。默认存储后端是 Bitcask,这是一个 Erlang 应用程序,提供一个 API,用于存储和检索受散列表支持的数据,该散列表提供了数据的快速访问;数据是永久性的。

有 一个后端也许与本文关系更紧密:Memory 后端。Memory 后端使用一个内存表来存储其所有的数据(它在内部使用 Erlang 的 ets 表),并且,在启用时,使 Riak 的行为像一个设定了有效期的 LRU 缓存。比起必须在磁盘上检索数据,使用内存存储的优势在于它明显快得多。当数据被存储在内存中(它不是永久的)且一个节点出现故障时,在该节点中存储的数 据将丢失。若您将它用作缓存,这就不是一个问题了(应用程序总是可以从数据库检索数据),就像您将 Riak 用作主数据存储一样。Riak 在集群中跨多个节点复制数据,因此它仍然是可用的。

Riak 自带 Memory 后端。为了使用 Memory 后端,请打开集群中每个节点的 app.config,定位属性 storage_backend,并将其从 riak_kv_bitcask_backend 更改为 riak_kv_memory_backend。现在将 清单 4 中的代码添加到文件的末尾。

清单 4. 使用 Memory 后端
{memory_backend, [
    {max_memory, 4096},	%% 4GB of memory
    {ttl, 86400}        %% Time in seconds
]}

将值更改为适合于您的设置的值。重新启动集群中的节点。

在 Riak 集群内也可以运行多个存储后端。这非常有用,因为这意味着可以针对不同的桶使用不同的后端。例如,您可以配置一个桶(让我们称之为 cache)来使用 Memory 后端,但对于其他桶(那些应当保存数据的桶),则使用 Bitcask。

既然您已经让 Riak 设置的行为像缓存一样,那么您需要一些方法来访问集群中的数据,以便更新它,或出于某种原因使它失效(在它的有效期结束前)。

查找什么内容吗?

正如您已经看到的,当使用 HTTP 界面检索在 Riak 中存储的数据时,您要构造一个 URL,其中包括桶的名称以及您要检索的对象的键,然后在该 URL 上执行一个 HTTP GET。当您知道键是什么时,这就完全足够了!但是,有时您并不知道要检索的对象的键,或者您要检索满足一定条件的一组对象。那么,您需要一种方法来搜索在集群中保存的对象。

您 已经看到如何通过存储在集群中的文档运行一个 Map/Reduce 作业来查询数据。一般来说,执行查询的时间与集群中的文档数量成正比;文档越多,查询这些文档所需要的时间越长。对于时间不敏感的查询,这不是一个问题。 我这样说的意思是,用户并不指望立即得到答复的查询。对于像搜索这样的操作,每次都(动态)搜索所有文档是不可行的;获得结果的时间可能是几分钟,也可能 是几小时!

幸运的是,Riak 对该问题已经有一个解决方案:Riak Search。Riak Search 提供搜索存储在整个集群中的文档时所需要的功能。搜索这个主题对于本文来说过于庞大,无法深入讨论,但从高层次来说,它的工作方式是这样的:文档被标记化 (Riak Search 使用标准的 Lucene 分析器),并被添加到一个反向索引。然后,根据用户输入的搜索项查询该索引。当新文件被添加时,它们也被索引并添加到索引中。

Riak Search 默认被禁用。在您可以使用它之前,您需要先启用它。在集群中的每个节点上,打开 rel/riakN/etc/app.config,定位属性 riak_search 并将它设置为 true。您需要重新启动集群中的节点。

Riak 通过使用提交前 (pre-commit) 和提交后 (post-commit) 挂钩,允许您指定文档被添加到桶之前和之后要运行的函数的名称。例如,在将文档添加到桶之前,您可能要检查文档是否有特定的必需字段。要搜索一个文档,需 要先对其进行索引。要做到这一点,需在存储文档的桶上安装一个 pre-commit 挂钩。要做到这一点,请运行以下命令:$ rel/riak/bin/search-cmd install <bucket name>

这将在桶上安装一个提交前挂钩 riak_search_kv_hook。现在,每当文档被添加到该桶,它就会被分析,并被添加到索引。空白分析器是默认的分析器;它基于空白将字符处理成标记,然后标记被索引。有一些不同的分析器可供使用,您也可以定义自己的分析器。

在许多情况下,Riak Search 知道如何索引您的数据。例如,开箱即用的,如果一个 JSON 对象被添加到某个桶,每个属性的值将被索引,并且可以在查询字符串中使用属性名称来查询。搜索示例请参见 清单 5。对于更复杂的结构,您可以定义自己的模式,告诉 Riak Search 如何索引数据。

当您已索引一些文档后,您需要能够对它们发出查询。一种方法是从 Erlang shell 运行查询。例如,在 清单 5 中的查询搜索与赛马有关的所有投注的赔率桶;您通过查询存储项的 description 属性完成该搜索。

清单 5. 搜索与赛马有关的投注的赔率桶
$ rel/riak/bin/riak attach

search:search(<<"odds">>, <<"description:horse">>).

此 外,Riak Search 还为文档搜索提供了一个 Solr 兼容的 HTTP API。Apache Solr 是一个流行的企业搜索服务器,带有一个类似于 REST 的 API。通过使 API 与 Solr 兼容,应该可以断开 Solr(如果您使用它),并改为使用 Riak Search 支持搜索。例如,要使用 Solr 界面搜索特定活动的赔率,您可以这样做:$ curl "http:localhost:8098/solr/odds/select?start=0&q=description:horse"

利用搜索设置,您现在即使不知道正在查找的项目的主键,也可以在数据存储中定位这些项目。

 

三. riak的安装和使用

参考:http://blog.csdn.net/freewebsys/article/details/12609995

来源:http://blog.csdn.net/freewebsys/article/details/12615047
1. Riak的接口访问有两种方式:
HTTP
Protocol Buffers  基于http的和pb的类似。

2,基于PB方式的调用

工程采用 maven,引入依赖:

代码放在github上面了:

https://github.com/freewebsys/riak_demo

<dependencies>
<dependency>
<groupId>com.basho.riak</groupId>
<artifactId>riak-client</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.basho.riak.protobuf</groupId>
<artifactId>riak-pb</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>

3,简单的做一个表的CRUD

创建一个UserInfo类:

/**
* 用户信息.
*/
public class UserInfo {
private String uid;
private String name;
private String city;
private String nickName;
...get set 方法忽略

使用Riak进行CRUD:只是简单的将uid作为key存储,没有创建索引。

package com.demo;

import java.io.IOException;
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.RiakRetryFailedException;
import com.basho.riak.client.bucket.Bucket;

public class ClientTest {

public static void main(String[] args) throws IOException {

IRiakClient client = null;
try {// 使用pbc方式连接,而不是http,在/etc/riak/app.config
client = RiakFactory.pbcClient("127.0.0.1", 8087);
} catch (RiakException e) {
e.printStackTrace();
}
// 显示.
System.out.println(client);
Bucket myBucket = null;
String bucketName = "userInfo";
try {
myBucket = client.fetchBucket(bucketName).execute();
if (myBucket == null) {
myBucket = client.createBucket(bucketName).execute();
}
} catch (RiakRetryFailedException e) {
e.printStackTrace();
}
// ################保存数据 .
UserInfo info = new UserInfo();
info.setUid("001");
info.setName("张三");
info.setCity("北京");
try {
myBucket.store(info.getUid(), info).execute();
} catch (Exception e) {
e.printStackTrace();
}

// ################查询数据.
UserInfo fetchedUserInfo = null;
try {
fetchedUserInfo = myBucket.fetch("001", UserInfo.class).execute();
System.out.println(fetchedUserInfo);
} catch (Exception e) {
e.printStackTrace();
}
// ################修改数据.
try {
fetchedUserInfo = myBucket.fetch("001", UserInfo.class).execute();
fetchedUserInfo.setName("李四");
fetchedUserInfo.setNickName("老李");
myBucket.store(info.getUid(), info).execute();
// 保存 新数据
fetchedUserInfo = myBucket.fetch("001", UserInfo.class).execute();
System.out.println("新数据:" + fetchedUserInfo);
} catch (Exception e) {
e.printStackTrace();
}

// ################删除数据.
try {
myBucket.delete("001").execute();
fetchedUserInfo = myBucket.fetch("001", UserInfo.class).execute();
System.out.println("删除收数据." + fetchedUserInfo);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭。
client.shutdown();

}

}

运行结果:

com.basho.riak.client.DefaultRiakClient@145edcf5
UserInfo [uid=001, name=张三, city=北京, nickName=null]
新数据:UserInfo [uid=001, name=张三, city=北京, nickName=null]
删除收数据.null

4,代码分析

在Riak当中,可以简单的把Bucket理解成一个表。
首先要创建一个这样的Bucket,然后把数据按照key放进去。

数据类型可以是字符串,基本类型,或是对象(如UserInfo)。

每次操作的时候都是通过执行Bucket的方法执行达到CRUD的操作。

StoreObject<IRiakObject> store(String key, byte[] value);
StoreObject<IRiakObject> store(String key, String value);
<T> StoreObject<T> store(T o);
<T> StoreObject<T> store(String key, T o);
FetchObject<IRiakObject> fetch(String key);
<T> FetchObject<T> fetch(String key, Class<T> type);
<T> FetchObject<T> fetch(T o);
MultiFetchObject<IRiakObject> multiFetch(String[] keys);
<T> MultiFetchObject<T> multiFetch(List<String> keys, Class<T> type);
<T> MultiFetchObject<T> multiFetch(List<T> o);
CounterObject counter(String counter);
<T> DeleteObject delete(T o);
DeleteObject delete(String key);
StreamingOperation<String> keys() throws RiakException;
<T> FetchIndex<T> fetchIndex(RiakIndex<T> index);

5,总结

java通过使用Protocol Buffers方式调用Riak服务,直接操作对象进行CRUD。

有了这些,可以做一个简单的评论系统了。评论系统上面不需要事物,并且数量会随着业务增长,使用Rick可以平稳的进行扩展。

这个只是简单的,对Rick服务进行CRUD。最没有用到其他功能,同时没有关于key的设计。

Rick的其他功能,以后继续研究。

 

 

cassandra入门

Cassandra是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩放性,被Digg、Twitter等知名Web 2.0网站所采纳,成为了一种流行的分布式结构化数据存储方案。
Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比Dynamo (分布式的Key-Value存储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型)。Cassandra最初由Facebook开发,后转变成了开源项目。它是一个网络社交云计算方面理想的数据库。以Amazon专有的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型。P2P去中心化的存储。很多方面都可以称之为Dynamo 2.0

网站地址:http://cassandra.apache.org/

以下资料来源:http://asyty.iteye.com/blog/1202072

一、Cassandra框架

cassandra-001-1

图1 Cassandra

Cassandra是社交网络理想的数据库,适合于实时事务处理和提供交互型数据。以Amazon的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型,P2P去中心化的存储,目前twitter和digg中都有使用。

在CAP特性上,HBase选择了CP,Cassandra更倾向于AP,而在一致性上有所减弱。

Cassandra的类Dynamo特性有以下几点:

l 对称的,P2P架构

n 无特殊节点,无单点故障

l 基于Gossip的分布式管理

l 通过分布式hash表放置数据

n 可插拔的分区

n 可插拔的拓扑发现

n 可配置的放置策略

l 可配置的,最终一致性

 

类BigTable特性:

l 列族数据模型

n 可配置,2级maps,Super Colum Family

l SSTable磁盘存储

n Append-only commit log

n Mentable (buffer and sort)

n 不可修改的SSTable文件

l 集成Hadoop

二、 Cassandra数据模型

Colum / Colum Family, SuperColum / SuperColum Family

Column是数据增量最底层(也就是最小)的部分。它是一个包含名称(name)、值(value)和时间戳(timestamp)的三重元组。

下面是一个用JSON格式表示的column:

{ // 这是一个Column

name: "emailAddress",

value: "arin@example.com",

timestamp: 123456789

}

需要注意的是,name和value都是二进制的(技术上指byte[]),并且可以是任意长度。

与HBase相比,除了Colum/Colum Family外,Cassandra还支持SuperColum/SuperColum Family。

SuperColum与Colum的区别就是,标准Column的value是一个“字符串”,而 SuperColumn的value是一个包含多个Column的map,另一个细微的差别是:SuperColumn没有时间戳。

{ // 这是一个SuperColumn

name: "homeAddress",

// 无限数量的Column

value: {

street: {name: "street", value: "1234 x street", timestamp: 123456789},

city: {name: "city", value: "san francisco", timestamp: 123456789},

zip: {name: "zip", value: "94107", timestamp: 123456789},

}

}

Column Family(CF)是某个特定Key的Colum集合,是一个行结构类型,每个CF物理上被存放在单独的文件中。从概念上看,CF像数据库中的Table。

SuperColum Family概念上和Column Family(CF)相似,只不过它是Super Colum的集合。

Colum排序

不同于数据库可以通过Order by定义排序规则,Cassandra取出的数据顺序是总是一定的,数据保存时已经按照定义的规则存放,所以取出来的顺序已经确定了。另外,Cassandra按照column name而不是column value来进行排序。

Cassandra可以通过Colum Family的CompareWith属性配置Colume值的排序,在SuperColum中,则是通过SuperColum Family的CompareSubcolumnsWith属性配置Colum的排序。

Cassandra提供了以下一些选:BytesType,UTF8Type,LexicalUUIDType,TimeUUIDType,AsciiType, Column name识别成为不同的类型,以此来达到灵活排序的目的。

三、分区策略

Token,Partitioner

Cassandra中,Token是用来分区数据的关键。每个节点都有一个第一无二的Token,表明该节点分配的数据范围。节点的Token形成一个Token环。例如使用一致性HASH进行分区时,键值对将根据一致性Hash值来判断数据应当属于哪个Token。

cassandra-002-2

 Token Ring

 

分区策略的不同,Token的类型和设置原则也有所不同。 Cassandra (0.6版本)本身支持三种分区策略:

RandomPartitioner:随机分区是一种hash分区策略,使用的Token是大整数型(BigInteger),范围为0~2^127,Cassandra采用了MD5作为hash函数,其结果是128位的整数值(其中一位是符号位,Token取绝对值为结果)。因此极端情况下,一个采用随机分区策略的Cassandra集群的节点可以达到2^127+1个节点。采用随机分区策略的集群无法支持针对Key的范围查询。

OrderPreservingPartitioner:如果要支持针对Key的范围查询,那么可以选择这种有序分区策略。该策略采用的是字符串类型的Token。每个节点的具体选择需要根据Key的情况来确定。如果没有指定InitialToken,则系统会使用一个长度为16的随机字符串作为Token,字符串包含大小写字符和数字。

CollatingOrderPreservingPartitioner:和OrderPreservingPartitioner一样是有序分区策略。只是排序的方式不一样,采用的是字节型Token,支持设置不同语言环境的排序方式,代码中默认是en_US。

分区策略和每个节点的Token(Initial Token)都可以在storage-conf.xml配置文件中设置。

 

bloom-filter, HASH

Bloom Filter是一种空间效率很高的随机数据结构,本质上就是利用一个位数组来表示一个集合,并能判断一个元素是否属于这个集合。Bloom Filter的这种高效是有误差的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合,而在能容忍低错误率的场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

原理:位数组 + K个独立hash(y)函数。将位数组中hash函数对应的值的位置设为1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是完全正确的。

 

在Cassandra中,每个键值对使用1Byte的位数组来实现bloom-filter。

cassandra-003

Bloom Filter

四、副本存储

Cassandra不像HBase是基于HDFS的分布式存储,它的数据是存在每个节点的本地文件系统中。

Cassandra有三种副本配置策略:

1) SimpleStrategy RackUnawareStrategy

副本不考虑机架的因素,按照Token放置在连续下几个节点。如图3所示,假如副本数为3,属于A节点的数据在B.C两个节点中也放置副本。

2) OldNetworkTopologyStrategy RackAwareStrategy:

考虑机架的因素,除了基本的数据外,先找一个处于不同数据中心的点放置一个副本,其余N-2个副本放置在同一数据中心的不同机架中。

3) NetworkTopologyStrategy DatacenterShardStrategy

将M个副本放置到其他的数据中心,将N-M-1的副本放置在同一数据中心的不同机架中。

五、网络嗅探

网络嗅探主要用来计算不同host的相对距离,进而告诉Cassandra网络拓扑结构,以便更高效地对用户请求进行路由。主要有三种配置策略:

1) org.apache.cassandra.locator.SimpleSnitch

将不同host逻辑上的距离(Cassandra Ring)作为他们之间的相对距离。

2) org.apache.cassandra.locator.RackInferringSnitch:

相对距离是由rack和data center决定的,分别对应ip的第3和第2个八位组。即,如果两个节点的ip的前3个八位组相同,则认为它们在同一个rack(同一个rack中不同节点,距离相同);如果两个节点的ip的前两个八位组相同,则认为它们在同一个数据中心(同一个data center中不同节点,距离相同)。

3) org.apache.cassandra.locator.PropertyFileSnitch:

相对距离是由rack和data center决定的,且它们是在配置文件cassandra-topology.properties中设置的。

六、一致性

在一致性上,Cassandra采用了最终一致性,可以根据具体情况来选择一个最佳的折衷,来满足特定操作的需求。Cassandra可以让用户指定读/插入/删除操作的一致性级别,一致性级别有多种,如图5所示。

 

cassandra-005-2

图5 Cassandra一致性级别

 

注:一致性级别是由副本数决定,而不是集群的节点数目决定。

 

Quorum NRW

  • N: 复制的节点数量,即副本数
  • R: 成功读操作的最小节点数
  • W: 成功写操作的最小节点数

Quorum协议中,R 代表一次成功的读取操作中最小参与节点数量,W 代表一次成功的写操作中最小参与节点数量。R + W>N ,则会产生类似quorum 的效果。该模型中的读(写)延迟由最慢的 R(W)复制决定,为得到比较小的延迟,R和W有的时候的和比N小。

Quorum协议中,只需W + R > N,就可以保证强一致性。因为读取数据的节点和被同步写入的节点是有重叠的。在一个RDBMS的复制模型中(Master/salve),假如N=2,那么W=2,R=1此时是一种强一致性,但是这样造成的问题就是可用性的减低,因为要想写操作成功,必须要等 2个节点的写操作都完成以后才可以。

在分布式系统中,一般都要有容错性,因此N一般大于3的,此时根据CAP理论,我们就需要在一致性和分区容错性之间做一平衡,如果要高的一致性,那么就配置N=W,R=1,这个时候可用性就会大大降低。如果想要高的可用性,那么此时就需要放松一致性的要求,此时可以配置W=1,这样使得写操作延迟最低,同时通过异步的机制更新剩余的N-W个节点。

当存储系统保证最终一致性时,存储系统的配置一般是W+R<=N,此时读取和写入操作是不重叠的,不一致性的窗口就依赖于存储系统的异步实现方式,不一致性的窗口大小也就等于从更新开始到所有的节点都异步更新完成之间的时间。

一般来说,Quorum中比较典型的NRW为(3,2,2)。

维护最终一致性

Cassandra 通过4个技术来维护数据的最终一致性,分别为逆熵(Anti-Entropy),读修复(Read Repair),提示移交(Hinted Handoff)和分布式删除。

1) 逆熵

这是一种备份之间的同步机制。节点之间定期互相检查数据对象的一致性,这里采用的检查不一致的方法是 Merkle Tree;

2) 读修复

客户端读取某个对象的时候,触发对该对象的一致性检查:

读取Key A的数据时,系统会读取Key A的所有数据副本,如果发现有不一致,则进行一致性修复。

如果读一致性要求为ONE,会立即返回离客户端最近的一份数据副本。然后会在后台执行Read Repair。这意味着第一次读取到的数据可能不是最新的数据;如果读一致性要求为QUORUM,则会在读取超过半数的一致性的副本后返回一份副本给客户端,剩余节点的一致性检查和修复则在后台执行;如果读一致性要求高(ALL),则只有Read Repair完成后才能返回一致性的一份数据副本给客户端。可见,该机制有利于减少最终一致的时间窗口。

3) 提示移交

对写操作,如果其中一个目标节点不在线,先将该对象中继到另一个节点上,中继节点等目标节点上线再把对象给它:

Key A按照规则首要写入节点为N1,然后复制到N2。假如N1宕机,如果写入N2能满足ConsistencyLevel要求,则Key A对应的RowMutation将封装一个带hint信息的头部(包含了目标为N1的信息),然后随机写入一个节点N3,此副本不可读。同时正常复制一份数据到N2,此副本可以提供读。如果写N2不满足写一致性要求,则写会失败。 等到N1恢复后,原本应该写入N1的带hint头的信息将重新写回N1。

4) 分布式删除

单机删除非常简单,只需要把数据直接从磁盘上去掉即可,而对于分布式,则不同,分布式删除的难点在于:如果某对象的一个备份节点 A 当前不在线,而其他备份节点删除了该对象,那么等 A 再次上线时,它并不知道该数据已被删除,所以会尝试恢复其他备份节点上的这个对象,这使得删除操作无效。Cassandra 的解决方案是:本地并不立即删除一个数据对象,而是给该对象标记一个hint,定期对标记了hint的对象进行垃圾回收。在垃圾回收之前,hint一直存在,这使得其他节点可以有机会由其他几个一致性保证机制得到这个hint。Cassandra 通过将删除操作转化为一个插入操作,巧妙地解决了这个问题。

七、存储机制

Cassandra的存储机制借鉴了Bigtable的设计,采用Memtable和SSTable的方式。

CommitLog

和HBase一样,Cassandra在写数据之前,也需要先记录日志,称之为Commit Log,然后数据才会写入到Column Family对应的MemTable中,且MemTable中的数据是按照key排序好的。SSTable一旦完成写入,就不可变更,只能读取。下一次Memtable需要刷新到一个新的SSTable文件中。所以对于Cassandra来说,可以认为只有顺序写,没有随机写操作。

MenTable

MemTable是一种内存结构,当数据量达到块大小时,将批量flush到磁盘上,存储为SSTable。这种机制,相当于缓存写回机制(Write-back Cache),优势在于将随机IO写变成顺序IO写,降低大量的写操作对于存储系统的压力。所以我们可以认为Cassandra中只有顺序写操作,没有随机写操作。

SSTable

SSTable是Read Only的,且一般情况下,一个CF会对应多个SSTable,当用户检索数据时,Cassandra使用了Bloom Filter,即通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。

为了减少大量SSTable带来的开销,Cassandra会定期进行compaction,简单的说,compaction就是将同一个CF的多个SSTable合并成一个SSTable。在Cassandra中,compaction主要完成的任务是:

1) 垃圾回收: cassandra并不直接删除数据,因此磁盘空间会消耗得越来越多,compaction 会把标记为删除的数据真正删除;

2) 合并SSTable:compaction 将多个 SSTable 合并为一个(合并的文件包括索引文件,数据文件,bloom filter文件),以提高读操作的效率;

3) 生成 MerkleTree:在合并的过程中会生成关于这个 CF 中数据的 MerkleTree,用于与其他存储节点对比以及修复数据。

详细存储数据结构参考 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu2

单体、模块化

Cassandra和HBase的一个重要区别是, Cassandra在每个节点是是一个单 Java 进程,而完整的HBase 解决方案却由不同部分组成:有数据库进程本身,它可能会运行在多个模式;一个配置好的 hadoop HDFS 分布式文件系统,以及一个 Zookeeper 系统来协调不同的 HBase 进程。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 的数据存储结构

Cassandra 中的数据主要分为三种:

  1. CommitLog:主要记录下客户端提交过来的数据以及操作。这个数据将被持久化到磁盘中,以便数据没有被持久化到磁盘时可以用来恢复。
  2. Memtable:用户写的数据在内存中的形式,它的对象结构在后面详细介绍。其实还有另外一种形式是 BinaryMemtable 这个格式目前 Cassandra 并没有使用,这里不再介绍了。
  3. SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。

CommitLog 数据格式

CommitLog 的数据只有一种,那就是按照一定格式组成 byte 组数,写到 IO 缓冲区中定时的被刷到磁盘中持久化,在上一篇的配置文件详解中已经有说到 CommitLog 的持久化方式有两种,一个是 Periodic 一个是 Batch,它们的数据格式都是一样的,只是前者是异步的,后者是同步的,数据被刷到磁盘的频繁度不一样。关于 CommitLog 的相关的类结构图如下:

图 1. CommitLog 的相关的类结构图

图 1. CommitLog 的相关的类结构图它 持久化的策略也很简单,就是首先将用户提交的数据所在的对象 RowMutation 序列化成 byte 数组,然后把这个对象和 byte 数组传给 LogRecordAdder 对象,由 LogRecordAdder 对象调用 CommitLogSegment 的 write 方法去完成写操作,这个 write 方法的代码如下:

清单 1. CommitLogSegment. write
public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, 
 Object serializedRow){ 
        long currentPosition = -1L; 
 ... 
            Checksum checkum = new CRC32(); 
            if (serializedRow instanceof DataOutputBuffer){ 
                DataOutputBuffer buffer = (DataOutputBuffer) serializedRow; 
                logWriter.writeLong(buffer.getLength()); 
                logWriter.write(buffer.getData(), 0, buffer.getLength()); 
                checkum.update(buffer.getData(), 0, buffer.getLength()); 
            } 
            else{ 
                assert serializedRow instanceof byte[]; 
                byte[] bytes = (byte[]) serializedRow; 
                logWriter.writeLong(bytes.length); 
                logWriter.write(bytes); 
                checkum.update(bytes, 0, bytes.length); 
            } 
            logWriter.writeLong(checkum.getValue()); 
 ... 
 }

这个代码的主要作用就是如果当前这个根据 columnFamily 的 id 还没有被序列化过,将会根据这个 id 生成一个 CommitLogHeader 对象,记录下在当前的 CommitLog 文件中的位置,并将这个 header 序列化,覆盖以前的 header。这个 header 中可能包含多个没有被序列化到磁盘中的 RowMutation 对应的 columnFamily 的 id。如果已经存在,直接把 RowMutation 对象的序列化结果写到 CommitLog 的文件缓存区中后面再加一个 CRC32 校验码。Byte 数组的格式如下:

图 2. CommitLog 文件数组结构

图 2. CommitLog 文件数组结构上图中每个不同的 columnFamily 的 id 都包含在 header 中,这样做的目的是更容易的判断那些数据没有被序列化。

CommitLog 的作用是为恢复没有被写到磁盘中的数据,那如何根据 CommitLog 文件中存储的数据恢复呢?这段代码在 recover 方法中:

清单 2. CommitLog.recover
 public static void recover(File[] clogs) throws IOException{ 
 ... 
         final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader); 
         int lowPos = CommitLogHeader.getLowestPosition(clHeader); 
            if (lowPos == 0) break; 
            reader.seek(lowPos); 
            while (!reader.isEOF()){ 
                try{ 
                    bytes = new byte[(int) reader.readLong()]; 
                    reader.readFully(bytes); 
                    claimedCRC32 = reader.readLong(); 
                } 
 ... 
                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes); 
                Checksum checksum = new CRC32(); 
                checksum.update(bytes, 0, bytes.length); 
                if (claimedCRC32 != checksum.getValue()){continue;} 
            final RowMutation rm = 
              RowMutation.serializer().deserialize(new DataInputStream(bufIn));
            } 
 ... 
 }

这段代码的思路是:反序列化 CommitLog 文件的 header 为 CommitLogHeader 对象,寻找 header 对象中没有被回写的最小 RowMutation 位置,然后根据这个位置取出这个 RowMutation 对象的序列化数据,然后反序列化为 RowMutation 对象,然后取出 RowMutation 对象中的数据重新保存到 Memtable 中,而不是直接写到磁盘中。CommitLog 的操作过程可以用下图来清楚的表示:

图 3. CommitLog 数据格式的变化过程

图 3. CommitLog 数据格式的变化过程

Memtable 内存中数据结构

Memtable 内存中数据结构比较简单,一个 ColumnFamily 对应一个唯一的 Memtable 对象,所以 Memtable 主要就是维护一个 ConcurrentSkipListMap<DecoratedKey, ColumnFamily> 类型的数据结构,当一个新的 RowMutation 对象加进来时,Memtable 只要看看这个结构是否 <DecoratedKey, ColumnFamily> 集合已经存在,没有的话就加进来,有的话取出这个 Key 对应的 ColumnFamily,再把它们的 Column 合并。Memtable 相关的类结构图如下:

图 4. Memtable 相关的类结构图

图 4. Memtable 相关的类结构图Memtable 中的数据会根据配置文件中的相应配置参数刷到本地磁盘中。这些参数在上一篇中已经做了详细说明。

前 面已经多处提到了 Cassandra 的写的性能很好,好的原因就是因为 Cassandra 写到数据首先被写到 Memtable 中,而 Memtable 是内存中的数据结构,所以 Cassandra 的写是写内存的,下图基本上描述了一个 key/value 数据是怎么样写到 Cassandra 中的 Memtable 数据结构中的。

图 5. 数据被写到 Memtable

图 5. 数据被写到 Memtable

SSTable 数据格式

每添加一条数据到 Memtable 中,程序都会检查一下这个 Memtable 是否已经满足被写到磁盘的条件,如果条件满足这个 Memtable 就会写到磁盘中。先看一下这个过程涉及到的类。相关类图如图 6 所示:

图 6. SSTable 持久化类结构图

图 6. SSTable 持久化类结构图Memtable 的条件满足后,它会创建一个 SSTableWriter 对象,然后取出 Memtable 中所有的 <DecoratedKey, ColumnFamily> 集合,将 ColumnFamily 对象的序列化结构写到 DataOutputBuffer 中。接下去 SSTableWriter 根据 DecoratedKey 和 DataOutputBuffer 分别写到 Date、Index 和 Filter 三个文件中。

Data 文件格式如下:

图 7. SSTable 的 Data 文件结构

图 7. SSTable 的 Data 文件结构Data 文件就是按照上述 byte 数组来组织文件的,数据被写到 Data 文件中是接着就会往 Index 文件中写,Index 中到底写什么数据呢?

其实 Index 文件就是记录下所有 Key 和这个 Key 对应在 Data 文件中的启示地址,如图 8 所示:

图 8. Index 文件结构

图 8. Index 文件结构Index 文件实际上就是 Key 的一个索引文件,目前只对 Key 做索引,对 super column 和 column 都没有建索引,所以要匹配 column 相对来说要比 Key 更慢。

Index 文件写完后接着写 Filter 文件,Filter 文件存的内容就是 BloomFilter 对象的序列化结果。它的文件结构如图 9 所示:

图 9. Filter 文件结构

图 9. Filter 文件结构BloomFilter 对象实际上对应一个 Hash 算法,这个算法能够快速的判断给定的某个 Key 在不在当前这个 SSTable 中,而且每个 SSTable 对应的 BloomFilter 对象都在内存中,Filter 文件指示 BloomFilter 持久化的一个副本。三个文件对应的数据格式可以用下图来清楚的表示:

图 10. SSTable 数据格式转化

图 10. SSTable 数据格式转化

在 Memtable 往磁盘中写的过程中,这个 Memtable 被放到 memtablesPendingFlush 容器中,以保证在读时候它里面存的数据能被正确读到,这个在后面数据读取时还会介绍。

数据的写入

数据要写到 Cassandra 中有两个步骤:

  1. 找到应该保存这个数据的节点
  2. 往这个节点写数据。客户端写一条数据必须指定 Keyspace、ColumnFamily、Key、Column Name 和 Value,还可以指定 Timestamp,以及数据的安全等级。

数据写入涉及的主要相关类如下图所示:

图 11. Insert 相关类图

图 11. Insert 相关类图大慨的写入逻辑是这样的:

CassandraServer 接收到要写入的数据时,首先创建一个 RowMutation 对象,再创建一个 QueryPath 对象,这个对象中保存了 ColumnFamily、Column Name 或者 Super Column Name。接着把用户提交的所有数据保存在 RowMutation 对象的 Map<String, ColumnFamily> 结构中。接下去就是根据提交的 Key 计算集群中那个节点应该保存这条数据。这个计算的规则是:将 Key 转化成 Token,然后在整个集群的 Token 环中根据二分查找算法找到与给定的 Token 最接近的一个节点。如果用户指定了数据要保存多个备份,那么将会顺序在 Token 环中返回与备份数相等的节点。这是一个基本的节点列表,后面 Cassandra 会判断这些节点是否正常工作,如果不正常寻找替换节点。还有还要检查是否有节点正在启动,这种节点也是要在考虑的范围内,最终会形成一个目标节点列表。最 后把数据发送到这些节点。

接下去就是将数据保存到 Memtable 中和 CommitLog 中,关于结果的返回根据用户指定的安全等级不同,可以是异步的,也可以是同步的。如果某个节点返回失败,将会再次发送数据。下图是当 Cassandra 接收到一条数据时到将数据写到 Memtable 中的时序图。

图 12. Insert 操作的时序图

图 12. Insert 操作的时序图

数据的读取

Cassandra 的写的性能要好于读的性能,为何写的性能要比读好很多呢?原因是,Cassandra 的设计原则就是充分让写的速度更快、更方便而牺牲了读的性能。事实也的确如此,仅仅看 Cassandra 的数据的存储形式就能发现,首先是写到 Memtable 中,然后将 Memtable 中数据刷到磁盘中,而且都是顺序保存的不检查数据的唯一性,而且是只写不删(删除规则在后面介绍),最后才将顺序结构的多个 SSTable 文件合并。这每一步难道不是让 Cassandra 写的更快。这个设计想想对读会有什么影响。首先,数据结构的复杂性,Memtable 中和 SSTable 中数据结构肯定不同,但是返回给用户的肯定是一样的,这必然会要转化。其次,数据在多个文件中,要找的数据可能在 Memtable 中,也可能在某个 SSTable 中,如果有 10 个 SSTable,那么就要在到 10 个 SSTable 中每个找一遍,虽然使用了 BloomFilter 算法可以很快判断到底哪个 SSTable 中含有指定的 key。还有可能在 Memtable 到 SSTable 的转化过程中,这也是要检查一遍的,也就是数据有可能存在什么地方,就要到哪里去找一遍。还有找出来的数据可能是已经被删除的,但也没办法还是要取。

下面是读取数据的相关类图:

图 13. 读取相关类图

图 13. 读取相关类图根 据上面的类图读取的逻辑是,CassandraServer 创建 ReadCommand 对象,这个对象保存了用户要获取记录的所有必须指定的条件。然后交给 weakReadLocalCallable 这个线程去到 ColumnFamilyStore 对象中去搜索数据,包括 Memtable 和 SSTable。将找到的数据组装成 Row 返回,这样一个查询过程就结束了。这个查询逻辑可以用下面的时序图来表示:

图 14. 查询数据时序图

图 14. 查询数据时序图在 上图中还一个地方要说明的是,取得 key 对应的 ColumnFamily 要至少在三个地方查询,第一个就是 Memtable 中,第二个是 MemtablesPendingFlush,这个是将 Memtable 转化为 SSTable 之前的一个临时 Memtable。第三个是 SSTable。在 SSTable 中查询最为复杂,它首先将要查询的 key 与每个 SSTable 所对应的 Filter 做比较,这个 Filter 保存了所有这个 SSTable 文件中含有的所有 key 的 Hash 值,这个 Hsah 算法能快速判断指定的 key 在不在这个 SSTable 中,这个 Filter 的值在全部保存在内存中,这样能快速判断要查询的 key 在那个 SSTable 中。接下去就要在 SSTable 所对应的 Index 中查询 key 所对应的位置,从前面的 Index 文件的存储结构知道,Index 中保存了具体数据在 Data 文件中的 Offset。,拿到这个 Offset 后就可以直接到 Data 文件中取出相应的长度的字节数据,反序列化就可以达到目标的 ColumnFamily。由于 Cassandra 的存储方式,同一个 key 所对应的值可能存在于多个 SSTable 中,所以直到查找完所有的 SSTable 文件后再与前面的两个 Memtable 查找出来的结果合并,最终才是要查询的值。

另外,前面所描述的是最坏的情况,也就是查询在完全没有缓存的情况下,当然 Cassandra 在对查询操作也提供了多级缓存。第一级直接针对查询结果做缓存,这个缓存的设置的配置项是 Keyspace 下面的 RowsCached。查询的时候首先会在这个 Cache 中找。第二级 Cache 对应 SSTable 的 Index 文件,它可以直接缓存要查询 key 所对应的索引。这个配置项同样在 Keyspace 下面的 KeysCached 中,如果这个 Cache 能命中,将会省去 Index 文件的一次 IO 查询。最后一级 Cache 是做磁盘文件与内存文件的 mmap,这种方式可以提高磁盘 IO 的操作效率,鉴于索引大小的限制,如果 Data 文件太大只能在 64 位机器上使用这个技术。

数据的删除

从前面的数据写入规则可以想象,Cassandra 要想删除数据是一件麻烦的事,为何这样说?理由如下:

  1. 数据有多处 同时还可能在多个节点都有保存。
  2. 数据的结构有多种 数据会写在 CommitLog 中、Memtable 中、SSTable 中,它们的数据结构都不一样。
  3. 数据时效性不一致 由于是集群,所以数据在节点之间传输必然有延时。

除了这三点之外还有其它一些难点如 SSTable 持久化数据是顺序存储的,如果删除中间一段,那数据有如何移动,这些问题都非常棘手,如果设计不合理,性能将会非常之差。

本部分将讨论 Cassandra 是如何解决这些问题的。

CassandraServer 中删除数据的接口只有一个 remove,下面是 remove 方法的源码:

清单 3. CassandraServer.remove
public void remove(String table, String key, ColumnPath column_path, 
          long timestamp, ConsistencyLevel consistency_level){
        checkLoginDone();
        ThriftValidation.validateKey(key);
        ThriftValidation.validateColumnPathOrParent(table, column_path);
        RowMutation rm = new RowMutation(table, key);
        rm.delete(new QueryPath(column_path), timestamp);
        doInsert(consistency_level, rm);
    }

仔细和 insert 方法比较,发现只有一行不同:insert 方法调用的是 rm.add 而这里是 rm.delete。那么这个 rm.delete 又做了什么事情呢?下面是 delete 方法的源码:

清单 4. RowMutation. Delete
public void delete(QueryPath path, long timestamp){
...
        if (columnFamily == null)
            columnFamily = ColumnFamily.create(table_, cfName);
        if (path.superColumnName == null && path.columnName == null){
            columnFamily.delete(localDeleteTime, timestamp);
        }else if (path.columnName == null){
            SuperColumn sc = new SuperColumn(path.superColumnName, 
              DatabaseDescriptor.getSubComparator(table_, cfName));
            sc.markForDeleteAt(localDeleteTime, timestamp);
            columnFamily.addColumn(sc);
        }else{
            ByteBuffer bytes = ByteBuffer.allocate(4);
            bytes.putInt(localDeleteTime);
            columnFamily.addColumn(path, bytes.array(), timestamp, true);
        }
    }

这段代码的主要逻辑就是,如果是删除指定 Key 下的某个 Column,那么将这个 Key 所对应的 Column 的 vlaue 设置为当前系统时间,并将 Column 的 isMarkedForDelete 属性设置为 TRUE,如果是要删除这个 Key 下的所有 Column 则设置这个 ColumnFamily 的删除时间期限属性。然后将这个新增的一条数据按照 Insert 方法执行下去。

这个思路现在已经很明显了,它就是通过设置同一个 Key 下对应不同的数据来更新已经在 ConcurrentSkipListMap 集合中存在的数据。这种方法的确很好,它能够达到如下目的:

  1. 简化了数据的操作逻辑。将添加、修改和删除逻辑都统一起来。
  2. 解决了前面提到的三个难点。因为它就是按照数据产生的方式,来修改数据。有点以其人之道还治其人之身的意思。

但是这仍然有两个 问题:这个只是修改了指定的数据,它并没有删除这条数据;还有就是 SSTable 是根据 Memtable 中的数据保存的,很可能会出现不同的 SSTable 中保存相同的数据,这个又怎么解决?的确如此,Cassandra 并没有删除你要删除的数据,Cassandra 只是在你查询数据返回之前,过滤掉 isMarkedForDelete 为 TRUE 的记录。它能够保证你删除的数据你不能再查到,至于什么时候真正删除,你就不需要关心了。Cassandra 删除数据的过程很复杂,真正删除数据是在 SSTable 被压缩的过程中,SSTable 压缩的目的就是把同一个 Key 下对应的数据都统一到一个 SSTable 文件中,这样就解决了同一条数据在多处的问题。压缩的过程中 Cassandra 会根据判断规则判定哪些数据应该被删除。

SSTable 的压缩

数 据的压缩实际上是数据写入 Cassandra 的一个延伸,前面描述的数据写入和数据的读取都有一些限制,如:在写的过程中,数据会不停的将一定大小的 Memtable 刷到磁盘中,这样不停的刷,势必会产生很多的同样大小的 SSTable 文件,不可能这样无限下去。同样在读的过程中,如果太多的 SSTable 文件必然会影响读的效率,SSTable 越多就会越影响查询。还有一个 Key 对应的 Column 分散在多个 SSTable 同样也会是问题。还有我们知道 Cassandra 的删除同样也是一个写操作,同样要处理这些无效的数据。

鉴于以上问题,必然要对 SSTable 文件进行合并,合并的最终目的就是要将一个 Key 对应的所有 value 合并在一起。该组合的组合、该修改的修改,该删除的删除。然后将这个 Key 所对应的数据写在 SSTable 所对应的 Data 文件的一段连续的空间上。

何时压缩 SSTable 文件由 Cassandra 来控制,理想的 SSTable 文件个数在 4~32 个。当新增一个 SSTable 文件后 Cassandra 会计算当期的平均 SSTable 文件的大小当新增的 SSTable 大小在平均 SSTable 大小的 0.5~1.5 倍时 Cassandra 就会调用压缩程序压缩 SSTable 文件,导致的结果就是重新建立 Key 的索引。这个过程可以用下图描述:

图 15 数据压缩

图 15 数据压缩

总结

本文首先描述了 Cassandra 中数据的主要的存储格式,包括内存中和磁盘中数据的格式,接下去介绍了 Cassandra 处理这些数据的方式,包括数据的添加、删除和修改,本质上修改和删除是一个操作。最后介绍了数据的压缩。

接下去两篇将向软件开发人员介绍 Cassandra 中使用的设计模式、巧妙的设计方法和 Cassandra 的高级使用方法——利用 Cassandra 搭建存储与检索一体化的实时检索系统

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Cassandra 分布式数据库 配置、启动与集群

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/

本文首先介绍了 Cassandra 服务器的配置文件各个配置项的意义,接着讨论了它的启动过程,包括 Cassandra 在启动过程中主要都完成了那些操作,为什么要执行这些操作,最终达到什么状态等。接着介绍如果在集群情况下,集群中节点如何自治理,节点间如何通信、如何 控制数据在集群中的分布等关键问题。

Cassandra 的配置详解

了解一个软件的配置项的意义是使用这个软件的前提,这里详细介绍 Cassandra 的配置文件(storage-config.xml)中各个配置项的意义,这其中包含有很多配置参数,我们可以对其进行调整以达到理想的性能。为了节省篇 幅这里没有列出 storage-config.xml 文件的内容,你可以对照着这个文件看下面的内容。

ClusterName

Cluster Name 代表一个族的标识,它通常代表一个集群。这个配置项在 Cassandra 没有存储数据时就必须指定,当 Cassandra 第一次启动后,它就会被写到 Cassandra 的系统表中,如果你要修改 Cluster Name 必须要删除 Cassandra 中数据。

AutoBootstrap

这个配置项看起来十分简单,但是如果你对 Cassandra 没有深入了解的话,恐怕不知道当你改变这个配置项时 Cassandra 可能会发生什么?

我们知道 Cassandra 集群是通过维护一个自适应的 Token 环来达到集群中的节点的自治理,它们不仅要保证每台机器的状态的同步和一致性还要保证它们之间 Token 分布的合理性,通过重新划分 Token 来达到每台机器的负载的均衡性。

那 这个配置项与 Token 和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成 False 时它是不是就不加入集群呢?显然不是,这还要看你有没有配置 seeds,如果你配置了其它 seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟 seed 配置项有关而且和 Cassandra 是否是第一次启动也有关。Cassandra 的启动规则大慨如下:

  1. 当 AutoBootstrap 设为 FALSE,第一次启动时 Cassandra 会在系统表中记录 AutoBootstrap=TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。
  2. 当 AutoBootstrap 设为 TRUE,第一次启动,Cassandra 会判断当前节点有没有被配置成 seed 节点,也就是在本机 ip 有没有在 seeds 中。如果在 seeds 中,Cassandra 的启动情况和 1 是一样的。
  3. 当 AutoBootstrap 设为 TRUE,第一次启动,并且没有配置为 seed,Cassandra 将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。这时 Cassandra 将会根据当前集群的负载,来动态调整它们的均衡。调整均衡的方式就是根据当前的 Token 环分配一个合适的 Token 给这个节点,并将这个符合这个 Token 的数据传给它。

从以上分析可以看出,AutoBootstrap 设置的主要目的是是否调整当前集群中的负载均衡。这其实还有一个很重要的问题就是,如果按照第一种情况启动,如果没有指定 Token,这个节点的 Token 将会是随机生成的,那么问题就来了,当这个随机生成是 Token 加入集群的 Token 环时,Cassandra 如何保证 Token 和 Token 所对应的数据的一致性,这个问题将在后面说明。

Keyspaces

Cassandra 中 Keyspace 相当于关系数据库中的表空间的概念,可以理解为操作表的一个容器,它下面可以定义多个 ColumnFamily,这个 ColumnFamily 就相当于表了,它是存储数据的实体。

ColumnFamily 中几个属性的意义如下:

  • ColumnType。列的类型,有两种:Standard 和 Super,分别是标准列和超列,超列的含义是列还有一个父列。
  • CompareWith。表示的是列的排序规则,可以根据不同的数据类型进行排序如 TimeUUIDType,可以根据插入的时间排序
  • CompareSubcolumnsWith。子列的排序规则与 CompareWith 类似
  • RowsCached。查询时缓存的数据量,可以是多少条,也可以是百分比,如 10% 就是缓存 10% 的数据量,这个对查询性能影响很大,如果命中率高的话,可以显著提高查询效率。
  • KeysCached。缓存 ColumnFamily 中的 key,这个 key 就是对应到 Index.db 中的数据,如果没有在 RowsCached 中命中,那么就要到每个 SSTable 中查询,这时必然要查询 key,如果在 KeysCached 能命中就不需要到 Index.db 中查询了,省去了 IO 操作。

Cassandra 是一个 Key/Value 系统,从它的存储的逻辑结构来看分为:Keyspace、Key、ColumnFamily、Super Column 以及 Column 几个部分。很明显我们能看出每一对 Key/Value 都有一个寄生的容器,所以它实际上是由一个个 Map 容器构成的。这个容器结构可以用图 1 和图 2 来表示:

图 1. 标准的 Column 结构图

图 1. 标准的 Column 结构图

图 2. 含有 Super Column 的结构图

图 2. 含有 Super Column 的结构图

ReplicaPlacementStrategy

定 义数据复制策略,默认是 org.apache.cassandra.locator.RackUnawareStrategy,数据复制到其它节点没有特别的规定。 org.apache.cassandra.locator.RackAwareStrategy 是将节点分为不同的 Rack,这种方式不管是存数据还是查数据,都从不同的 Rack 的节点取数据或写数据。org.apache.cassandra.locator.DatacenterShardStategy 又将节点划分为不同的 Data Center,让数据放在不同数据中心,从而保证数据的安全性,例如可以按机房划分 Data Center,从而避免一个机房出现故障,会影响整个集群。

ReplicationFactor

定义数据要保存几个备份,结合 ReplicaPlacementStrategy 可以把数据放在不同的地方。

EndPointSnitch

org.apache.cassandra.locator.EndPointSnitch 可以根据当前的网络情况选择更好的节点路由,一般默认即可。

Authenticator

这个配置项可以控制数据访问的安全性,可以在 access.properties 和 passwd.properties 设置用户和密码。

Partitioner

控 制数据的分布规则,org.apache.cassandra.dht.RandomPartitioner 是随机分布,Cassandra 控制数据在不同的节点是通过 key 的来划分的,这个方式是将 key 进行 MD5 Hash,从而形成随机分布的 Token,然后根据这个 Token 将数据分布到不同的节点上。

org.apache.cassandra.dht.OrderPreservingPartitioner 是取 key 的 Ascii 字符来划分的,因此我们可以根据 key 来主动控制数据的分布,例如我们可以给 key 加一个前缀,相同前缀的 key 分布在同一个节点中。

InitialToken

给节点分配一个初始 Token,当节点第一次启动后这个 Token 就被写在系统表中。结合 Partitioner 就可以控制数据的分布。这个配置项可以让我们能调整集群的负载均衡。

CommitLogDirectory、DataFileDirectories

这两个配置项是设置 CommitLog 和 SSTable 存储的目录。

Seeds

关于 Seeds 节点的配置有这样几个疑问:

  1. 是不是集群中的所有节点都要配置在 seed 中。
  2. 本机需不需要配置在 seed 中。

关于第二个问题在前面中已经说明了,是否配置就决定是否作为 seed 节点来启动。关于第一个问题,答案是否定的,因为即使你把集群中的所有节点都配置在 seed 中,当 Cassandra 在启动时它也不会往每个 seed 发送心跳信息,而是随机选择一个节点与其同步集群中的其他所有节点状态。几个回合后这个节点同样能够获取集群中所有的节点的列表。这就是集群自治理的优 点,只要能发现其中一个节点就能发现全部节点。

ListenAddress

ListenAddress 这个配置是用来监听集群中其它节点与本节点交换状态信息和数据的地址。需要注意的是当你配置为本机的 ip 地址没有问题,不配置通常也没问题,但是如果你没有配置或者配置成主机名,而你又把你的主机名绑定到 127.0.0.1 时,这时将会导致本节点不能加入到集群中,因为它接受不到其他节点过来的任何信息,防止出错直接绑定本机 ip 最好。

ThriftAddress

监听 Client 的连接请求,不设或者配置成 0.0.0.0,监听所有地址的请求。

RowWarningThresholdInMB

当 Cassandra 压缩时,如果一个 row 超出了配置的大小时打印 warn 日志,没有任何其它作用。

SlicedBufferSizeInKB 和 ColumnIndexSizeInKB

分别是用来配置,根据 Slice 和 Column Name 来查询时 Cassandra 缓存数据的大小,当查询范围较小时可以适当设置大一点以提高命中率。

FlushDataBufferSizeInMB 和 FlushIndexBufferSizeInMB

这两个配置项是设置 Cassandra 在将内存中的数据写到磁盘时一次写入的缓存量,适当提高这个两个值可以提高 Cassandra 的写性能。

MemtableThroughputInMB、MemtableOperationsInMillions 和 MemtableFlushAfterMinutes

MemtableOperationsInMillions 是定义当前 Keyspace 对应的数据在内存中的缓存大小,Cassandra 默认是 64M,也就是当写到 Cassandra 的数据达到 64M 时,Cassandra 会将内存的数据写到本地磁盘中。

MemtableOperationsInMillions 是定义当前这个 Memtable 中所持有数据对象的个数,真实的个数是 MemtableOperationsInMillions*1024*1024。当超出这个数值时 Memtable 同样会被写到磁盘中。

MemtableFlushAfterMinutes 的作用是,当前两个条件都长时间不满足时,Memtable 中数据会一直不会写到磁盘,这也不合适,所以设置了一个时间限制,当超过这个时间长度时 Memtable 中的数据也会被写到磁盘中。

所以 Memtable 中的数据何时被写到写到磁盘是由这三个值决定,任何一个条件满足都会写到磁盘。

ConcurrentReads 和 ConcurrentWrites

这 两个是定义 Cassandra 用来处理 read 和 write 的线程池中线程的个数,根据当前的测试结果,读写的性能大慨是 1:10,适当的设置这两个值不仅要根据读写的性能,还要参考当前机器的处理性能。当机器的 load 很高,但是 cpu 的利用率却很低时,很明显是由于连接数过多,Cassandra 的已经处理不过来都处于等待状态。这样就可以适当增加读写的线程数,同样如果当读的请求大于写的请求时,也应该适当增加读的线程数,反之亦然。

CommitLogSync、CommitLogSyncPeriodInMS 和 CommitLogSyncBatchWindowInMS

我们知道 Cassandra 是先写到 CommitLog 中再写到 Memtable 和磁盘中。如果每写一条数据都要写一次到磁盘那样性能将会大打折扣。Cassandra 为了提高写 CommitLog 的性能提供了两种写的方式。

  1. Periodic。周期性的把 CommitLog 数据写到磁盘中,这个时间周期由 CommitLogSyncPeriodInMS 指定,默认是 10000MS, 如果是这种方式,可想而知 Cassandra 并不能完全保证写到 Cassandra 的数据不会丢失,最坏的情况就是在这个时间段的数据会被丢失,但是 Cassandra 的解释是通过数据的多个备份,来能提高安全性。但是如果是单机存储数据,最坏的情况仍然会丢失 10000MS 时间段写入的数据。可以说这种方式写 CommitLog 是完全的异步的方式。
  2. Batch。这种方式是等待数据被写到磁盘中才会返回,与前面相比安全性会得到保证,它能保证 100% 数据的正确性。但也并不是每写一条数据都立即写到磁盘中,而是有一个延迟时间,这个延迟时间就是由 CommitLogSyncBatchWindowInMS 指定的,也就是写一条数据到 CommitLog 的最大时间是 CommitLogSyncBatchWindowInMS 指定的时间,理想的时间范围是 0.1~10MS 之间。这个时间既要平衡客户端的相应时间也要考虑服务器写数据到磁盘的性能。

这两种方式各有好处,如果数据是存储在有多个备份的集群中,第一种情况下,丢数据的情况几乎为零,但是性能肯定会比第二种要好很多。如果是单机情况下,要保证数据的安全性第二种较合适。

GCGraceSeconds

这 个配置项不是 Java 中的 gc 回收内存,但是其功能类似于 jvm 中 gc,它也是回收已经没有被关联的数据,例如已经被标识为删除的数据,Cassandra 处理数据有点奇怪,即使数据被标识为删除,但是只要是没有超过 GCGraceSeconds 的时间这个数据仍然是存在的,也就是可以定制数据的实效时间,超出这个时间数据将会被回收。

Cassandra 的启动过程

Cassandra 的功能模块

按照我的理解我将 Cassandra 的功能模块划分为三个部分:

  1. 客户端协议解析。目前这个版本 Cassandra 支持两个客户端 avro 和 thrift,使用的较多的是后者,它们都是通过 socket 协议作为网络层协议,然后再包装一层应用层协议,这个应用层协议的包装和解析都是由它们的客户端和相应的服务端模块来完成的。这样设计的目的是解决多种多 样的客户端的连接方式,既可以是短连接也可以是长连接。既可以是 Java 程序调用也可以是 PHP 调用或者多种其它编程语言都可以调用。
  2. 集群 Gossip 协议。集群中节点之间相互通信是通过 Gossip 协议来完成的,它的实现都在 org.apache.cassandra.gms.Gossiper 类中。它的主要作用就是每个节点向集群中的其它节点发送心跳,心跳携带的信息是本身这个节点持有的其它节点的状态信息包括本节点的状态,如果发现两边的状 态信息不是不一致,则会用最新的状态信息替换,同时通过心跳来判断某个节点是否还在线,把这种状态变化通知感兴趣的事件监听者,以做出相应的修改,包括新 增节点、节点死去、节点复活等。除了维护节点状态信息外,还需做另外一些事,如集群之间的数据的转移,这些数据包括:读取的数据、写入的数据、状态检查的 数据、修复的数据等等。
  3. 数据的存储。数据的存储包括,内存中数据的组织形式,它又包括 CommitLog 和 Memtable。磁盘的数据组织方式,它又包括 date、filter 和 index 的数据。

其它剩下的就是如何读取和操作这些数据了,可以用下图来描述 Cassandra 是如何工作的:

图 3. Cassandra 的工作模型

图 3. Cassandra 的工作模型

Cassandra 的启动过程

这里将详细介绍 Cassandra 的启动过程。Cassandra 的启动过程大慨分为下面几个阶段:

storage-config.xml 配置文件的解析

配 置文件的读取和解析都是在 org.apache.cassandra.config.DatabaseDescriptor 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。值得注意的是关于 Keyspace 的解析,按照 ColumnFamily 的配置信息构建成 org.apache.cassandra.config.CFMetaData 对象,最后把这些所有 ColumnFamily 放入 Keyspace 的 HashMap 对象 org.apache.cassandra.config.KSMetaData 中,每个 Keyspace 就是一个 Table。这些信息都是作为基本的元信息,可以通过 DatabaseDescriptor 类直接获取。DatabaseDescriptor 类相关的类结构如下图 4 所示:

图 4. DatabaseDescriptor 类相关的类结构

图 4. DatabaseDescriptor 类相关的类结构创建每个 Table 的实例

创 建 Table 的实例将完成:1)获取该 Table 的元信息 TableMatedate。2)创建改 Table 下每个 ColumnFamily 的存储操作对象 ColumnFamilyStore。3)启动定时程序,检查该 ColumnFamily 的 Memtable 设置的 MemtableFlushAfterMinutes 是否已经过期,过期立即写到磁盘。与 Table 相关的类如图 5 所示:

图 5. Table 相关的类图

图 5. Table 相关的类图一 个 Keyspace 对应一个 Table,一个 Table 持有多个 ColumnFamilyStore,而一个 ColumnFamily 对应一个 ColumnFamilyStore。Table 并没有直接持有 ColumnFamily 的引用而是持有 ColumnFamilyStore,这是因为 ColumnFamilyStore 类中不仅定义了对 ColumnFamily 的各种操作而且它还持有 ColumnFamily 在各种状态下数据对象的引用,所以持有了 ColumnFamilyStore 就可以操作任何与 ColumnFamily 相关的数据了。与 ColumnFamilyStore 相关的类如图 6 所示

图 6. ColumnFamilyStore 相关的类

图 6. ColumnFamilyStore 相关的类CommitLog 日志恢复

这里主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的

ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。关于 CommitLog 日志文件的存储格式以及数据如何写到 CommitLog 文件中,将在后面第三部分详细介绍。

启动存储服务

这里是启动过程中最重要的一步。这里将会启动一系列服务,主要包括如下步骤。

  1. 创建 StorageMetadata。StorageMetadata 将包含三个关键信息:本节点的 Token、当前 generation 以及 ClusterName,Cassandra 判断如果是第一次启动,Cassandra 将会创建三列分别存储这些信息并将它们存在在系统表的 LocationInfo ColumnFamily 中,key 是“L”。如果不是第一次启动将会更新这三个值。这里的 Token 是判断用户是否指定,如果指定了使用用户指定的,否则随机生成一个 Token。但是这个 Token 有可能在后面被修改。这三个信息被存在 StorageService 类的 storageMetadata_ 属性中,以便后面随时调用。
  2. GCInspector.instance.start 服务。主要是统计统计当前系统中资源的使用情况,将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。
  3. 启动消息监听服务。这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra 会根据每个消息的类型,做出相应的反应。关于消息的处理将在后面详细介绍。
  4. StorageLoadBalancer.instance.startBroadcasting 服务。这个服务是每个一段时间会收集当前这个节点所存的数据总量,也就是节点的 load 数据。把这个数据更新到本节点的 ApplicationState 中,然后就可以通过这个 state 来和其它节点交换信息。这个 load 信息在数据的存储和新节点加入的时候,会有参考价值。
  5. 启动 Gossiper 服务。在启动 Gossiper 服务之前,将 StorageService 注册为观察者,一旦节点的某些状态发生变化,而这些状态是 StorageService 感兴趣的,StorageService 的 onChange 方法就会触发。Gossiper 服务就是一个定时程序,它会向本节点加入一个 HeartBeatState 对象,这个对象标识了当前节点是 Live 的,并且记录当前心跳的 generation 和 version。这个 StorageMetadata 和前面的 StorageMetadata 存储的 generation 是一致的,version 是从 0 开始的。这个定时程序每隔一秒钟随机向 seed 中定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径。这个消息如何同步,将在后面详细介绍。
  6. 判断启动模式。是否是 AutoBootstrap 模式启动,又是如何判断的,以及应作出那些相应的操作,在前面的第一部分中已有介绍,这里不再赘述。这里主要说一下,当是 Bootstrap 模式启动时,Cassandra 都做了那些事情。这一步很重要,因为它关系到后面的很多操作,对 Cassandra 的性能也会有影响。

这个过程如下:

  1. 通过之前的消息同步获取集群中所有节点的 load 信息
  2. 找出 load 最大的节点的 ip 地址
  3. 向这个节点发送消息,获取其一半 key 范围所对应的 Token,这个 Token 是前半部分值。
  4. 将这个 Token 写到本地节点
  5. 本地节点会根据这个 Token 计算以及集群中的 Token 环,计算这个 Token 应该分摊集群中数据的一个范围(range)这个环应该就是,最大 load 节点的一半 key 的所对应的 range。
  6. 向这个 range 所在的节点请求数据。发送 STREAM-STAGE 类型的消息,要经过 STREAM_REQUEST、STREAM_INITIATE、STREAM_INITIATE_DONE、STREAM_FINISHED 几次握手,最终才将正确的数据传输到本节点。
  7. 数据传输完成时设置 SystemTable.setBootstrapped(true) 标记 Bootstrap 已经启动,这个标记的目的是防止再次重启时,Cassandra 仍然会执行相同的操作。

这个过程可以用下面的时序图来描述:

图 7. StorageService 服务启动时序图

图 7. StorageService 服务启动时序图以上是 AutoBootstrap 模式启动,如果是以非 AutoBootstrap 模式启动,那么启动将会非常简单,这个过程如下:

  1. 检查配置项 InitialToken 有没有指定,如果指定了初始 Token,使用用户指定的 Token,否则将根据 Partitioner 配置项指定的数据分配策略生成一个默认的 Token,并把它写到系统表中。
  2. 更新 generation=generation+1 到系统表中
  3. 设置 SystemTable.setBootstrapped(true),标记启动方式,防止用户再修改 AutoBootstrap 的启动模式。

Cassandra 集群中的节点状态的同步策略

我 们知道 Cassandra 集群中节点是通过自治理来对外提供服务的,它不像 Hadoop 这种 Master/Slave 形式的集群结构,会有一个主服务节点来管理所有节点中的原信息和对外提供服务的负载均衡。这种方式管理集群中的节点逻辑上比较简单也很方便,但是也有其弱 点,那就是这个 Master 容易形成瓶颈,其稳定性也是一种挑战。而 Cassandra 的集群管理方式就是一种自适应的管理方式,集群中的节点没有 Master、Slave 之分,它们都是平等的,每个节点都可以单独对外提供服务,某个节点 Crash 也不会影响到其它节点。但是一旦某个节点的状态发生变化,整个集群中的所有节点都要知道,并且都会执行预先设定好的应对方案,这会造成节点间要发送大量的 消息交换各自状态,这样也增加了集群中状态和数据一致性的复杂度,但是优点是它是一个高度自治的组织,健壮性比较好。

消息交换

那么 Cassandra 是如何做到这么高度自治的呢?这个问题的关键就是它们如何同步各自的状态信息,同步消息的前提是它们有一种约定的消息交换机制。这个机制就是 Gossip 协议,Cassandra 就是通过 Gossip 协议相互交换消息。

前 面在 Cassandra 服务启动时提到了 Gossiper 服务的启动,一旦 Cassandra 启动成功,Gossiper 服务就是一直执行下去,它是一个定时程序。这个服务的代码在 org.apache.cassandra.gms.Gossiper 类中,下面是定时程序执行的关键代码如清单 1 所示:

清单 1. Gossiper.GossipTimerTask.run
 public void run(){ 
   synchronized( Gossiper.instance ){ 
       endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat(); 
       List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); 
       Gossiper.instance.makeRandomGossipDigest(gDigests); 
       if ( gDigests.size() > 0 ){ 
          Message message = makeGossipDigestSynMessage(gDigests); 
          boolean gossipedToSeed = doGossipToLiveMember(message); 
          doGossipToUnreachableMember(message); 
          if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) 
                       doGossipToSeed(message); 
                        doStatusCheck(); 
          } 
       } 
 }

Cassandra 通过向其它节点发送心跳来证明自己仍然是活着的,心跳里面包含有当前的 generation,用来表示有的节点是不是死了又复活的。

本 地节点所保存的所有其它节点的状态信息都被放在了 GossipDigest 集合中。一个 GossipDigest 对象将包含这个节点的 generation、maxVersion 和节点地址。接着将会组装一个 Syn 消息(关于 Cassandra 中的消息格式将在后面介绍),同步一次状态信息 Cassandra 要进行三次会话,这三次会话分别是 Syn、Ack 和 Ack2。当组装成 Syn 消息后 Cassandra 将随机在当前活着的节点列表中选择一个向其发送消息。

Cassandra 中的消息格式如下:

  1. header:消息头 org.apache.cassandra.net.Header,消息头中包含五个属性:消息编号(messageId)、发送方地址(from)、消息类型(type)、所要做的动作(verb)和一个 map 结构(details)
  2. body:消息内容,是一个 byte 数组,用来存放序列化的消息主体。

可以用下面的图 8 更形象的表示:

图 8. message 消息结构

图 8. message 消息结构当组装成一个 message 后,再将这个消息按照 Gossip 协议组装成一个 pocket 发送到目的地址。关于这个 pocket 数据包的结构如下:

  1. header:包头,4 bytes。前两个是 serializer type;第三个是是否压缩包,默认是否;最后一个 byte 表示是否是 streaming mode。
  2. body:包体,message 的序列化字节数据。

这个 pocket 的序列化字节结构如下:

图 9. 通信协议包的结构

图 9. 通信协议包的结构当 另外一个节点接受到 Syn 消息后,反序列化 message 的 byte 数组,它会取出这个消息的 verb 执行相应的动作,Syn 的 verb 就是解析出发送节点传过来的节点的状态信息与本地节点的状态信息进行比对,看哪边的状态信息更新,如果发送方更新,将这个更新的状态所对应的节点加入请求 列表,如果本地更新,则将本地的状态再回传给发送方。回送的消息是 Ack,当发送方接受到这个 Ack 消息后,将接受方的状态信息更新的本地对应的节点。再将接收方请求的节点列表的状态发送给接受方,这个消息是 Ack2,接受方法接受到这个 Ack2 消息后将请求的节点的状态更新到本地,这样一次状态同步就完成了。

不管是发送方还是接受方每当节点的状态发生变化时都将通知感兴趣的观察者做出相应的反应。消息同步所涉及到的类由下面图 10 的关系图表示:

图 10. 节点状态同步相关类结构图

图 10. 节点状态同步相关类结构图节点的状态同步操作有点复杂,如果前面描述的还不是很清楚的话,再结合下面的时序图,你就会更加明白了,如图 11 所示:

图 11. 节点状态同步时序图

图 11. 节点状态同步时序图上图中省去了一部分重复的消息,还有节点是如何更新状态也没有在图中反映出来,这些部分在后面还有介绍,这里也无法完整的描述出来。

状态更新

前 面提到了消息的交换,它的目的就是可以根据交换的信息更新各自的状态。Cassandra 更新状态是通过观察者设计模式来完成的,订阅者被注册在 Gossiper 的集合中,当交换的消息中的节点的状态和本地节点不一致时,这时就会更新本地状态,更改本地状态本身并没有太大的意义,有意义的是状态发生变化这个动作, 这个动作发生时,就会通知订阅者来完成这个状态发生变化后应该做出那些相应的改动,例如,发现某个节点已经不在集群中时,那么对这个节点应该要在本地保存 的 Live 节点列表中移去,防止还会有数据发送到这个无法到达的节点。和状态相关的类如下:

图 12. 更新状态相关的类

图 12. 更新状态相关的类从 上图可以看出节点的状态信息由 ApplicationState 表示,并保存在 EndPointState 的集合中。状态的修改将会通知 IendPointStateChangeSubscriber,继而再更新 Subscriber 的具体实现类修改相应的状态。

下面是新节点加入的时序图,如图 13 所示:

图 13. 新加入节点的时序图

图 13. 新加入节点的时序图上 图基本描述了 Cassandra 更新状态的过程,需要说明的点是,Cassandra 为何要更新节点的状态,这实际上就是关于 Cassandra 对集群中节点的管理,它不是集中管理的方式,所以每个节点都必须保存集群中所有其它节点的最新状态,所以将本节点所持有的其它节点的状态与另外一个节点交 换,这样做有一个好处就是,并不需要和某个节点通信就能从其它节点获取它的状态信息,这样就加快了获取状态的时间,同时也减少了集群中节点交换信息的频 度。另外,节点状态信息的交换的根本还是为了控制集群中 Cassandra 所维护的一个 Token 环,这个 Token 是 Cassandra 集群管理的基础。因为数据的存储和数据流动都在这个 Token 环上进行,一旦环上的节点发生变化,Cassandra 就要马上调整这个 Token 环,只有这样才能始终保持整个集群正确运行。

到底哪些状态信息对整个集群是重要的,这个在 TokenMetadata 类中,它主要记录了当前这个集群中,哪些节点是 live 的哪些节点现在已经不可用了,哪些节点可能正在启动,以及每个节点它们的 Token 是多少。而这些信息都是为了能够精确控制集群中的那个 Token 环。只要每个集群中每个节点所保存的是同一个 Token 环,整个集群中的节点的状态就是同步的,反之,集群中节点的状态就没有同步。

当然 Cassandra 用这种集群管理方式有其优点,但也存在一些缺点。例如现在部分使用者在大规模集群(上千台服务器)的使用中发现不太稳定,这个跟 gossip 协议的本身也有关,所以这是 Cassandra 社区要致力解决的问题。

总结

本 文从配置文件开始介绍了 Cassandra 的启动过程,以及 Cassandra 是如何管理集群的。实际上 Cassandra 的启动和集群的管理是连在一起的,启动过程中的很多步骤都是集群管理的一部分,如节点以 AutoBootstrap 方式启动,在启动过程中就涉及到数据的重新分配,这个分配的过程正是在动态调整集群中 Token 环的过程。所以当你掌握了 Cassandra 是如何动态调整这个 Token 环,你也就掌握了 Cassandra 的集群是如何管理的了。下一篇将详细介绍 Cassandra 内部是如何组织数据和操作数据。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门