标签归档:cassandra

cassandra入门

Cassandra是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩放性,被Digg、Twitter等知名Web 2.0网站所采纳,成为了一种流行的分布式结构化数据存储方案。
Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比Dynamo (分布式的Key-Value存储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型)。Cassandra最初由Facebook开发,后转变成了开源项目。它是一个网络社交云计算方面理想的数据库。以Amazon专有的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型。P2P去中心化的存储。很多方面都可以称之为Dynamo 2.0

网站地址:http://cassandra.apache.org/

以下资料来源:http://asyty.iteye.com/blog/1202072

一、Cassandra框架

cassandra-001-1

图1 Cassandra

Cassandra是社交网络理想的数据库,适合于实时事务处理和提供交互型数据。以Amazon的完全分布式的Dynamo为基础,结合了Google BigTable基于列族(Column Family)的数据模型,P2P去中心化的存储,目前twitter和digg中都有使用。

在CAP特性上,HBase选择了CP,Cassandra更倾向于AP,而在一致性上有所减弱。

Cassandra的类Dynamo特性有以下几点:

l 对称的,P2P架构

n 无特殊节点,无单点故障

l 基于Gossip的分布式管理

l 通过分布式hash表放置数据

n 可插拔的分区

n 可插拔的拓扑发现

n 可配置的放置策略

l 可配置的,最终一致性

 

类BigTable特性:

l 列族数据模型

n 可配置,2级maps,Super Colum Family

l SSTable磁盘存储

n Append-only commit log

n Mentable (buffer and sort)

n 不可修改的SSTable文件

l 集成Hadoop

二、 Cassandra数据模型

Colum / Colum Family, SuperColum / SuperColum Family

Column是数据增量最底层(也就是最小)的部分。它是一个包含名称(name)、值(value)和时间戳(timestamp)的三重元组。

下面是一个用JSON格式表示的column:

{ // 这是一个Column

name: "emailAddress",

value: "arin@example.com",

timestamp: 123456789

}

需要注意的是,name和value都是二进制的(技术上指byte[]),并且可以是任意长度。

与HBase相比,除了Colum/Colum Family外,Cassandra还支持SuperColum/SuperColum Family。

SuperColum与Colum的区别就是,标准Column的value是一个“字符串”,而 SuperColumn的value是一个包含多个Column的map,另一个细微的差别是:SuperColumn没有时间戳。

{ // 这是一个SuperColumn

name: "homeAddress",

// 无限数量的Column

value: {

street: {name: "street", value: "1234 x street", timestamp: 123456789},

city: {name: "city", value: "san francisco", timestamp: 123456789},

zip: {name: "zip", value: "94107", timestamp: 123456789},

}

}

Column Family(CF)是某个特定Key的Colum集合,是一个行结构类型,每个CF物理上被存放在单独的文件中。从概念上看,CF像数据库中的Table。

SuperColum Family概念上和Column Family(CF)相似,只不过它是Super Colum的集合。

Colum排序

不同于数据库可以通过Order by定义排序规则,Cassandra取出的数据顺序是总是一定的,数据保存时已经按照定义的规则存放,所以取出来的顺序已经确定了。另外,Cassandra按照column name而不是column value来进行排序。

Cassandra可以通过Colum Family的CompareWith属性配置Colume值的排序,在SuperColum中,则是通过SuperColum Family的CompareSubcolumnsWith属性配置Colum的排序。

Cassandra提供了以下一些选:BytesType,UTF8Type,LexicalUUIDType,TimeUUIDType,AsciiType, Column name识别成为不同的类型,以此来达到灵活排序的目的。

三、分区策略

Token,Partitioner

Cassandra中,Token是用来分区数据的关键。每个节点都有一个第一无二的Token,表明该节点分配的数据范围。节点的Token形成一个Token环。例如使用一致性HASH进行分区时,键值对将根据一致性Hash值来判断数据应当属于哪个Token。

cassandra-002-2

 Token Ring

 

分区策略的不同,Token的类型和设置原则也有所不同。 Cassandra (0.6版本)本身支持三种分区策略:

RandomPartitioner:随机分区是一种hash分区策略,使用的Token是大整数型(BigInteger),范围为0~2^127,Cassandra采用了MD5作为hash函数,其结果是128位的整数值(其中一位是符号位,Token取绝对值为结果)。因此极端情况下,一个采用随机分区策略的Cassandra集群的节点可以达到2^127+1个节点。采用随机分区策略的集群无法支持针对Key的范围查询。

OrderPreservingPartitioner:如果要支持针对Key的范围查询,那么可以选择这种有序分区策略。该策略采用的是字符串类型的Token。每个节点的具体选择需要根据Key的情况来确定。如果没有指定InitialToken,则系统会使用一个长度为16的随机字符串作为Token,字符串包含大小写字符和数字。

CollatingOrderPreservingPartitioner:和OrderPreservingPartitioner一样是有序分区策略。只是排序的方式不一样,采用的是字节型Token,支持设置不同语言环境的排序方式,代码中默认是en_US。

分区策略和每个节点的Token(Initial Token)都可以在storage-conf.xml配置文件中设置。

 

bloom-filter, HASH

Bloom Filter是一种空间效率很高的随机数据结构,本质上就是利用一个位数组来表示一个集合,并能判断一个元素是否属于这个集合。Bloom Filter的这种高效是有误差的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合,而在能容忍低错误率的场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

原理:位数组 + K个独立hash(y)函数。将位数组中hash函数对应的值的位置设为1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是完全正确的。

 

在Cassandra中,每个键值对使用1Byte的位数组来实现bloom-filter。

cassandra-003

Bloom Filter

四、副本存储

Cassandra不像HBase是基于HDFS的分布式存储,它的数据是存在每个节点的本地文件系统中。

Cassandra有三种副本配置策略:

1) SimpleStrategy RackUnawareStrategy

副本不考虑机架的因素,按照Token放置在连续下几个节点。如图3所示,假如副本数为3,属于A节点的数据在B.C两个节点中也放置副本。

2) OldNetworkTopologyStrategy RackAwareStrategy:

考虑机架的因素,除了基本的数据外,先找一个处于不同数据中心的点放置一个副本,其余N-2个副本放置在同一数据中心的不同机架中。

3) NetworkTopologyStrategy DatacenterShardStrategy

将M个副本放置到其他的数据中心,将N-M-1的副本放置在同一数据中心的不同机架中。

五、网络嗅探

网络嗅探主要用来计算不同host的相对距离,进而告诉Cassandra网络拓扑结构,以便更高效地对用户请求进行路由。主要有三种配置策略:

1) org.apache.cassandra.locator.SimpleSnitch

将不同host逻辑上的距离(Cassandra Ring)作为他们之间的相对距离。

2) org.apache.cassandra.locator.RackInferringSnitch:

相对距离是由rack和data center决定的,分别对应ip的第3和第2个八位组。即,如果两个节点的ip的前3个八位组相同,则认为它们在同一个rack(同一个rack中不同节点,距离相同);如果两个节点的ip的前两个八位组相同,则认为它们在同一个数据中心(同一个data center中不同节点,距离相同)。

3) org.apache.cassandra.locator.PropertyFileSnitch:

相对距离是由rack和data center决定的,且它们是在配置文件cassandra-topology.properties中设置的。

六、一致性

在一致性上,Cassandra采用了最终一致性,可以根据具体情况来选择一个最佳的折衷,来满足特定操作的需求。Cassandra可以让用户指定读/插入/删除操作的一致性级别,一致性级别有多种,如图5所示。

 

cassandra-005-2

图5 Cassandra一致性级别

 

注:一致性级别是由副本数决定,而不是集群的节点数目决定。

 

Quorum NRW

  • N: 复制的节点数量,即副本数
  • R: 成功读操作的最小节点数
  • W: 成功写操作的最小节点数

Quorum协议中,R 代表一次成功的读取操作中最小参与节点数量,W 代表一次成功的写操作中最小参与节点数量。R + W>N ,则会产生类似quorum 的效果。该模型中的读(写)延迟由最慢的 R(W)复制决定,为得到比较小的延迟,R和W有的时候的和比N小。

Quorum协议中,只需W + R > N,就可以保证强一致性。因为读取数据的节点和被同步写入的节点是有重叠的。在一个RDBMS的复制模型中(Master/salve),假如N=2,那么W=2,R=1此时是一种强一致性,但是这样造成的问题就是可用性的减低,因为要想写操作成功,必须要等 2个节点的写操作都完成以后才可以。

在分布式系统中,一般都要有容错性,因此N一般大于3的,此时根据CAP理论,我们就需要在一致性和分区容错性之间做一平衡,如果要高的一致性,那么就配置N=W,R=1,这个时候可用性就会大大降低。如果想要高的可用性,那么此时就需要放松一致性的要求,此时可以配置W=1,这样使得写操作延迟最低,同时通过异步的机制更新剩余的N-W个节点。

当存储系统保证最终一致性时,存储系统的配置一般是W+R<=N,此时读取和写入操作是不重叠的,不一致性的窗口就依赖于存储系统的异步实现方式,不一致性的窗口大小也就等于从更新开始到所有的节点都异步更新完成之间的时间。

一般来说,Quorum中比较典型的NRW为(3,2,2)。

维护最终一致性

Cassandra 通过4个技术来维护数据的最终一致性,分别为逆熵(Anti-Entropy),读修复(Read Repair),提示移交(Hinted Handoff)和分布式删除。

1) 逆熵

这是一种备份之间的同步机制。节点之间定期互相检查数据对象的一致性,这里采用的检查不一致的方法是 Merkle Tree;

2) 读修复

客户端读取某个对象的时候,触发对该对象的一致性检查:

读取Key A的数据时,系统会读取Key A的所有数据副本,如果发现有不一致,则进行一致性修复。

如果读一致性要求为ONE,会立即返回离客户端最近的一份数据副本。然后会在后台执行Read Repair。这意味着第一次读取到的数据可能不是最新的数据;如果读一致性要求为QUORUM,则会在读取超过半数的一致性的副本后返回一份副本给客户端,剩余节点的一致性检查和修复则在后台执行;如果读一致性要求高(ALL),则只有Read Repair完成后才能返回一致性的一份数据副本给客户端。可见,该机制有利于减少最终一致的时间窗口。

3) 提示移交

对写操作,如果其中一个目标节点不在线,先将该对象中继到另一个节点上,中继节点等目标节点上线再把对象给它:

Key A按照规则首要写入节点为N1,然后复制到N2。假如N1宕机,如果写入N2能满足ConsistencyLevel要求,则Key A对应的RowMutation将封装一个带hint信息的头部(包含了目标为N1的信息),然后随机写入一个节点N3,此副本不可读。同时正常复制一份数据到N2,此副本可以提供读。如果写N2不满足写一致性要求,则写会失败。 等到N1恢复后,原本应该写入N1的带hint头的信息将重新写回N1。

4) 分布式删除

单机删除非常简单,只需要把数据直接从磁盘上去掉即可,而对于分布式,则不同,分布式删除的难点在于:如果某对象的一个备份节点 A 当前不在线,而其他备份节点删除了该对象,那么等 A 再次上线时,它并不知道该数据已被删除,所以会尝试恢复其他备份节点上的这个对象,这使得删除操作无效。Cassandra 的解决方案是:本地并不立即删除一个数据对象,而是给该对象标记一个hint,定期对标记了hint的对象进行垃圾回收。在垃圾回收之前,hint一直存在,这使得其他节点可以有机会由其他几个一致性保证机制得到这个hint。Cassandra 通过将删除操作转化为一个插入操作,巧妙地解决了这个问题。

七、存储机制

Cassandra的存储机制借鉴了Bigtable的设计,采用Memtable和SSTable的方式。

CommitLog

和HBase一样,Cassandra在写数据之前,也需要先记录日志,称之为Commit Log,然后数据才会写入到Column Family对应的MemTable中,且MemTable中的数据是按照key排序好的。SSTable一旦完成写入,就不可变更,只能读取。下一次Memtable需要刷新到一个新的SSTable文件中。所以对于Cassandra来说,可以认为只有顺序写,没有随机写操作。

MenTable

MemTable是一种内存结构,当数据量达到块大小时,将批量flush到磁盘上,存储为SSTable。这种机制,相当于缓存写回机制(Write-back Cache),优势在于将随机IO写变成顺序IO写,降低大量的写操作对于存储系统的压力。所以我们可以认为Cassandra中只有顺序写操作,没有随机写操作。

SSTable

SSTable是Read Only的,且一般情况下,一个CF会对应多个SSTable,当用户检索数据时,Cassandra使用了Bloom Filter,即通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。

为了减少大量SSTable带来的开销,Cassandra会定期进行compaction,简单的说,compaction就是将同一个CF的多个SSTable合并成一个SSTable。在Cassandra中,compaction主要完成的任务是:

1) 垃圾回收: cassandra并不直接删除数据,因此磁盘空间会消耗得越来越多,compaction 会把标记为删除的数据真正删除;

2) 合并SSTable:compaction 将多个 SSTable 合并为一个(合并的文件包括索引文件,数据文件,bloom filter文件),以提高读操作的效率;

3) 生成 MerkleTree:在合并的过程中会生成关于这个 CF 中数据的 MerkleTree,用于与其他存储节点对比以及修复数据。

详细存储数据结构参考 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu2

单体、模块化

Cassandra和HBase的一个重要区别是, Cassandra在每个节点是是一个单 Java 进程,而完整的HBase 解决方案却由不同部分组成:有数据库进程本身,它可能会运行在多个模式;一个配置好的 hadoop HDFS 分布式文件系统,以及一个 Zookeeper 系统来协调不同的 HBase 进程。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 的数据存储结构

Cassandra 中的数据主要分为三种:

  1. CommitLog:主要记录下客户端提交过来的数据以及操作。这个数据将被持久化到磁盘中,以便数据没有被持久化到磁盘时可以用来恢复。
  2. Memtable:用户写的数据在内存中的形式,它的对象结构在后面详细介绍。其实还有另外一种形式是 BinaryMemtable 这个格式目前 Cassandra 并没有使用,这里不再介绍了。
  3. SSTable:数据被持久化到磁盘,这又分为 Data、Index 和 Filter 三种数据格式。

CommitLog 数据格式

CommitLog 的数据只有一种,那就是按照一定格式组成 byte 组数,写到 IO 缓冲区中定时的被刷到磁盘中持久化,在上一篇的配置文件详解中已经有说到 CommitLog 的持久化方式有两种,一个是 Periodic 一个是 Batch,它们的数据格式都是一样的,只是前者是异步的,后者是同步的,数据被刷到磁盘的频繁度不一样。关于 CommitLog 的相关的类结构图如下:

图 1. CommitLog 的相关的类结构图

图 1. CommitLog 的相关的类结构图它 持久化的策略也很简单,就是首先将用户提交的数据所在的对象 RowMutation 序列化成 byte 数组,然后把这个对象和 byte 数组传给 LogRecordAdder 对象,由 LogRecordAdder 对象调用 CommitLogSegment 的 write 方法去完成写操作,这个 write 方法的代码如下:

清单 1. CommitLogSegment. write
public CommitLogSegment.CommitLogContext write(RowMutation rowMutation, 
 Object serializedRow){ 
        long currentPosition = -1L; 
 ... 
            Checksum checkum = new CRC32(); 
            if (serializedRow instanceof DataOutputBuffer){ 
                DataOutputBuffer buffer = (DataOutputBuffer) serializedRow; 
                logWriter.writeLong(buffer.getLength()); 
                logWriter.write(buffer.getData(), 0, buffer.getLength()); 
                checkum.update(buffer.getData(), 0, buffer.getLength()); 
            } 
            else{ 
                assert serializedRow instanceof byte[]; 
                byte[] bytes = (byte[]) serializedRow; 
                logWriter.writeLong(bytes.length); 
                logWriter.write(bytes); 
                checkum.update(bytes, 0, bytes.length); 
            } 
            logWriter.writeLong(checkum.getValue()); 
 ... 
 }

这个代码的主要作用就是如果当前这个根据 columnFamily 的 id 还没有被序列化过,将会根据这个 id 生成一个 CommitLogHeader 对象,记录下在当前的 CommitLog 文件中的位置,并将这个 header 序列化,覆盖以前的 header。这个 header 中可能包含多个没有被序列化到磁盘中的 RowMutation 对应的 columnFamily 的 id。如果已经存在,直接把 RowMutation 对象的序列化结果写到 CommitLog 的文件缓存区中后面再加一个 CRC32 校验码。Byte 数组的格式如下:

图 2. CommitLog 文件数组结构

图 2. CommitLog 文件数组结构上图中每个不同的 columnFamily 的 id 都包含在 header 中,这样做的目的是更容易的判断那些数据没有被序列化。

CommitLog 的作用是为恢复没有被写到磁盘中的数据,那如何根据 CommitLog 文件中存储的数据恢复呢?这段代码在 recover 方法中:

清单 2. CommitLog.recover
 public static void recover(File[] clogs) throws IOException{ 
 ... 
         final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader); 
         int lowPos = CommitLogHeader.getLowestPosition(clHeader); 
            if (lowPos == 0) break; 
            reader.seek(lowPos); 
            while (!reader.isEOF()){ 
                try{ 
                    bytes = new byte[(int) reader.readLong()]; 
                    reader.readFully(bytes); 
                    claimedCRC32 = reader.readLong(); 
                } 
 ... 
                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes); 
                Checksum checksum = new CRC32(); 
                checksum.update(bytes, 0, bytes.length); 
                if (claimedCRC32 != checksum.getValue()){continue;} 
            final RowMutation rm = 
              RowMutation.serializer().deserialize(new DataInputStream(bufIn));
            } 
 ... 
 }

这段代码的思路是:反序列化 CommitLog 文件的 header 为 CommitLogHeader 对象,寻找 header 对象中没有被回写的最小 RowMutation 位置,然后根据这个位置取出这个 RowMutation 对象的序列化数据,然后反序列化为 RowMutation 对象,然后取出 RowMutation 对象中的数据重新保存到 Memtable 中,而不是直接写到磁盘中。CommitLog 的操作过程可以用下图来清楚的表示:

图 3. CommitLog 数据格式的变化过程

图 3. CommitLog 数据格式的变化过程

Memtable 内存中数据结构

Memtable 内存中数据结构比较简单,一个 ColumnFamily 对应一个唯一的 Memtable 对象,所以 Memtable 主要就是维护一个 ConcurrentSkipListMap<DecoratedKey, ColumnFamily> 类型的数据结构,当一个新的 RowMutation 对象加进来时,Memtable 只要看看这个结构是否 <DecoratedKey, ColumnFamily> 集合已经存在,没有的话就加进来,有的话取出这个 Key 对应的 ColumnFamily,再把它们的 Column 合并。Memtable 相关的类结构图如下:

图 4. Memtable 相关的类结构图

图 4. Memtable 相关的类结构图Memtable 中的数据会根据配置文件中的相应配置参数刷到本地磁盘中。这些参数在上一篇中已经做了详细说明。

前 面已经多处提到了 Cassandra 的写的性能很好,好的原因就是因为 Cassandra 写到数据首先被写到 Memtable 中,而 Memtable 是内存中的数据结构,所以 Cassandra 的写是写内存的,下图基本上描述了一个 key/value 数据是怎么样写到 Cassandra 中的 Memtable 数据结构中的。

图 5. 数据被写到 Memtable

图 5. 数据被写到 Memtable

SSTable 数据格式

每添加一条数据到 Memtable 中,程序都会检查一下这个 Memtable 是否已经满足被写到磁盘的条件,如果条件满足这个 Memtable 就会写到磁盘中。先看一下这个过程涉及到的类。相关类图如图 6 所示:

图 6. SSTable 持久化类结构图

图 6. SSTable 持久化类结构图Memtable 的条件满足后,它会创建一个 SSTableWriter 对象,然后取出 Memtable 中所有的 <DecoratedKey, ColumnFamily> 集合,将 ColumnFamily 对象的序列化结构写到 DataOutputBuffer 中。接下去 SSTableWriter 根据 DecoratedKey 和 DataOutputBuffer 分别写到 Date、Index 和 Filter 三个文件中。

Data 文件格式如下:

图 7. SSTable 的 Data 文件结构

图 7. SSTable 的 Data 文件结构Data 文件就是按照上述 byte 数组来组织文件的,数据被写到 Data 文件中是接着就会往 Index 文件中写,Index 中到底写什么数据呢?

其实 Index 文件就是记录下所有 Key 和这个 Key 对应在 Data 文件中的启示地址,如图 8 所示:

图 8. Index 文件结构

图 8. Index 文件结构Index 文件实际上就是 Key 的一个索引文件,目前只对 Key 做索引,对 super column 和 column 都没有建索引,所以要匹配 column 相对来说要比 Key 更慢。

Index 文件写完后接着写 Filter 文件,Filter 文件存的内容就是 BloomFilter 对象的序列化结果。它的文件结构如图 9 所示:

图 9. Filter 文件结构

图 9. Filter 文件结构BloomFilter 对象实际上对应一个 Hash 算法,这个算法能够快速的判断给定的某个 Key 在不在当前这个 SSTable 中,而且每个 SSTable 对应的 BloomFilter 对象都在内存中,Filter 文件指示 BloomFilter 持久化的一个副本。三个文件对应的数据格式可以用下图来清楚的表示:

图 10. SSTable 数据格式转化

图 10. SSTable 数据格式转化

在 Memtable 往磁盘中写的过程中,这个 Memtable 被放到 memtablesPendingFlush 容器中,以保证在读时候它里面存的数据能被正确读到,这个在后面数据读取时还会介绍。

数据的写入

数据要写到 Cassandra 中有两个步骤:

  1. 找到应该保存这个数据的节点
  2. 往这个节点写数据。客户端写一条数据必须指定 Keyspace、ColumnFamily、Key、Column Name 和 Value,还可以指定 Timestamp,以及数据的安全等级。

数据写入涉及的主要相关类如下图所示:

图 11. Insert 相关类图

图 11. Insert 相关类图大慨的写入逻辑是这样的:

CassandraServer 接收到要写入的数据时,首先创建一个 RowMutation 对象,再创建一个 QueryPath 对象,这个对象中保存了 ColumnFamily、Column Name 或者 Super Column Name。接着把用户提交的所有数据保存在 RowMutation 对象的 Map<String, ColumnFamily> 结构中。接下去就是根据提交的 Key 计算集群中那个节点应该保存这条数据。这个计算的规则是:将 Key 转化成 Token,然后在整个集群的 Token 环中根据二分查找算法找到与给定的 Token 最接近的一个节点。如果用户指定了数据要保存多个备份,那么将会顺序在 Token 环中返回与备份数相等的节点。这是一个基本的节点列表,后面 Cassandra 会判断这些节点是否正常工作,如果不正常寻找替换节点。还有还要检查是否有节点正在启动,这种节点也是要在考虑的范围内,最终会形成一个目标节点列表。最 后把数据发送到这些节点。

接下去就是将数据保存到 Memtable 中和 CommitLog 中,关于结果的返回根据用户指定的安全等级不同,可以是异步的,也可以是同步的。如果某个节点返回失败,将会再次发送数据。下图是当 Cassandra 接收到一条数据时到将数据写到 Memtable 中的时序图。

图 12. Insert 操作的时序图

图 12. Insert 操作的时序图

数据的读取

Cassandra 的写的性能要好于读的性能,为何写的性能要比读好很多呢?原因是,Cassandra 的设计原则就是充分让写的速度更快、更方便而牺牲了读的性能。事实也的确如此,仅仅看 Cassandra 的数据的存储形式就能发现,首先是写到 Memtable 中,然后将 Memtable 中数据刷到磁盘中,而且都是顺序保存的不检查数据的唯一性,而且是只写不删(删除规则在后面介绍),最后才将顺序结构的多个 SSTable 文件合并。这每一步难道不是让 Cassandra 写的更快。这个设计想想对读会有什么影响。首先,数据结构的复杂性,Memtable 中和 SSTable 中数据结构肯定不同,但是返回给用户的肯定是一样的,这必然会要转化。其次,数据在多个文件中,要找的数据可能在 Memtable 中,也可能在某个 SSTable 中,如果有 10 个 SSTable,那么就要在到 10 个 SSTable 中每个找一遍,虽然使用了 BloomFilter 算法可以很快判断到底哪个 SSTable 中含有指定的 key。还有可能在 Memtable 到 SSTable 的转化过程中,这也是要检查一遍的,也就是数据有可能存在什么地方,就要到哪里去找一遍。还有找出来的数据可能是已经被删除的,但也没办法还是要取。

下面是读取数据的相关类图:

图 13. 读取相关类图

图 13. 读取相关类图根 据上面的类图读取的逻辑是,CassandraServer 创建 ReadCommand 对象,这个对象保存了用户要获取记录的所有必须指定的条件。然后交给 weakReadLocalCallable 这个线程去到 ColumnFamilyStore 对象中去搜索数据,包括 Memtable 和 SSTable。将找到的数据组装成 Row 返回,这样一个查询过程就结束了。这个查询逻辑可以用下面的时序图来表示:

图 14. 查询数据时序图

图 14. 查询数据时序图在 上图中还一个地方要说明的是,取得 key 对应的 ColumnFamily 要至少在三个地方查询,第一个就是 Memtable 中,第二个是 MemtablesPendingFlush,这个是将 Memtable 转化为 SSTable 之前的一个临时 Memtable。第三个是 SSTable。在 SSTable 中查询最为复杂,它首先将要查询的 key 与每个 SSTable 所对应的 Filter 做比较,这个 Filter 保存了所有这个 SSTable 文件中含有的所有 key 的 Hash 值,这个 Hsah 算法能快速判断指定的 key 在不在这个 SSTable 中,这个 Filter 的值在全部保存在内存中,这样能快速判断要查询的 key 在那个 SSTable 中。接下去就要在 SSTable 所对应的 Index 中查询 key 所对应的位置,从前面的 Index 文件的存储结构知道,Index 中保存了具体数据在 Data 文件中的 Offset。,拿到这个 Offset 后就可以直接到 Data 文件中取出相应的长度的字节数据,反序列化就可以达到目标的 ColumnFamily。由于 Cassandra 的存储方式,同一个 key 所对应的值可能存在于多个 SSTable 中,所以直到查找完所有的 SSTable 文件后再与前面的两个 Memtable 查找出来的结果合并,最终才是要查询的值。

另外,前面所描述的是最坏的情况,也就是查询在完全没有缓存的情况下,当然 Cassandra 在对查询操作也提供了多级缓存。第一级直接针对查询结果做缓存,这个缓存的设置的配置项是 Keyspace 下面的 RowsCached。查询的时候首先会在这个 Cache 中找。第二级 Cache 对应 SSTable 的 Index 文件,它可以直接缓存要查询 key 所对应的索引。这个配置项同样在 Keyspace 下面的 KeysCached 中,如果这个 Cache 能命中,将会省去 Index 文件的一次 IO 查询。最后一级 Cache 是做磁盘文件与内存文件的 mmap,这种方式可以提高磁盘 IO 的操作效率,鉴于索引大小的限制,如果 Data 文件太大只能在 64 位机器上使用这个技术。

数据的删除

从前面的数据写入规则可以想象,Cassandra 要想删除数据是一件麻烦的事,为何这样说?理由如下:

  1. 数据有多处 同时还可能在多个节点都有保存。
  2. 数据的结构有多种 数据会写在 CommitLog 中、Memtable 中、SSTable 中,它们的数据结构都不一样。
  3. 数据时效性不一致 由于是集群,所以数据在节点之间传输必然有延时。

除了这三点之外还有其它一些难点如 SSTable 持久化数据是顺序存储的,如果删除中间一段,那数据有如何移动,这些问题都非常棘手,如果设计不合理,性能将会非常之差。

本部分将讨论 Cassandra 是如何解决这些问题的。

CassandraServer 中删除数据的接口只有一个 remove,下面是 remove 方法的源码:

清单 3. CassandraServer.remove
public void remove(String table, String key, ColumnPath column_path, 
          long timestamp, ConsistencyLevel consistency_level){
        checkLoginDone();
        ThriftValidation.validateKey(key);
        ThriftValidation.validateColumnPathOrParent(table, column_path);
        RowMutation rm = new RowMutation(table, key);
        rm.delete(new QueryPath(column_path), timestamp);
        doInsert(consistency_level, rm);
    }

仔细和 insert 方法比较,发现只有一行不同:insert 方法调用的是 rm.add 而这里是 rm.delete。那么这个 rm.delete 又做了什么事情呢?下面是 delete 方法的源码:

清单 4. RowMutation. Delete
public void delete(QueryPath path, long timestamp){
...
        if (columnFamily == null)
            columnFamily = ColumnFamily.create(table_, cfName);
        if (path.superColumnName == null && path.columnName == null){
            columnFamily.delete(localDeleteTime, timestamp);
        }else if (path.columnName == null){
            SuperColumn sc = new SuperColumn(path.superColumnName, 
              DatabaseDescriptor.getSubComparator(table_, cfName));
            sc.markForDeleteAt(localDeleteTime, timestamp);
            columnFamily.addColumn(sc);
        }else{
            ByteBuffer bytes = ByteBuffer.allocate(4);
            bytes.putInt(localDeleteTime);
            columnFamily.addColumn(path, bytes.array(), timestamp, true);
        }
    }

这段代码的主要逻辑就是,如果是删除指定 Key 下的某个 Column,那么将这个 Key 所对应的 Column 的 vlaue 设置为当前系统时间,并将 Column 的 isMarkedForDelete 属性设置为 TRUE,如果是要删除这个 Key 下的所有 Column 则设置这个 ColumnFamily 的删除时间期限属性。然后将这个新增的一条数据按照 Insert 方法执行下去。

这个思路现在已经很明显了,它就是通过设置同一个 Key 下对应不同的数据来更新已经在 ConcurrentSkipListMap 集合中存在的数据。这种方法的确很好,它能够达到如下目的:

  1. 简化了数据的操作逻辑。将添加、修改和删除逻辑都统一起来。
  2. 解决了前面提到的三个难点。因为它就是按照数据产生的方式,来修改数据。有点以其人之道还治其人之身的意思。

但是这仍然有两个 问题:这个只是修改了指定的数据,它并没有删除这条数据;还有就是 SSTable 是根据 Memtable 中的数据保存的,很可能会出现不同的 SSTable 中保存相同的数据,这个又怎么解决?的确如此,Cassandra 并没有删除你要删除的数据,Cassandra 只是在你查询数据返回之前,过滤掉 isMarkedForDelete 为 TRUE 的记录。它能够保证你删除的数据你不能再查到,至于什么时候真正删除,你就不需要关心了。Cassandra 删除数据的过程很复杂,真正删除数据是在 SSTable 被压缩的过程中,SSTable 压缩的目的就是把同一个 Key 下对应的数据都统一到一个 SSTable 文件中,这样就解决了同一条数据在多处的问题。压缩的过程中 Cassandra 会根据判断规则判定哪些数据应该被删除。

SSTable 的压缩

数 据的压缩实际上是数据写入 Cassandra 的一个延伸,前面描述的数据写入和数据的读取都有一些限制,如:在写的过程中,数据会不停的将一定大小的 Memtable 刷到磁盘中,这样不停的刷,势必会产生很多的同样大小的 SSTable 文件,不可能这样无限下去。同样在读的过程中,如果太多的 SSTable 文件必然会影响读的效率,SSTable 越多就会越影响查询。还有一个 Key 对应的 Column 分散在多个 SSTable 同样也会是问题。还有我们知道 Cassandra 的删除同样也是一个写操作,同样要处理这些无效的数据。

鉴于以上问题,必然要对 SSTable 文件进行合并,合并的最终目的就是要将一个 Key 对应的所有 value 合并在一起。该组合的组合、该修改的修改,该删除的删除。然后将这个 Key 所对应的数据写在 SSTable 所对应的 Data 文件的一段连续的空间上。

何时压缩 SSTable 文件由 Cassandra 来控制,理想的 SSTable 文件个数在 4~32 个。当新增一个 SSTable 文件后 Cassandra 会计算当期的平均 SSTable 文件的大小当新增的 SSTable 大小在平均 SSTable 大小的 0.5~1.5 倍时 Cassandra 就会调用压缩程序压缩 SSTable 文件,导致的结果就是重新建立 Key 的索引。这个过程可以用下图描述:

图 15 数据压缩

图 15 数据压缩

总结

本文首先描述了 Cassandra 中数据的主要的存储格式,包括内存中和磁盘中数据的格式,接下去介绍了 Cassandra 处理这些数据的方式,包括数据的添加、删除和修改,本质上修改和删除是一个操作。最后介绍了数据的压缩。

接下去两篇将向软件开发人员介绍 Cassandra 中使用的设计模式、巧妙的设计方法和 Cassandra 的高级使用方法——利用 Cassandra 搭建存储与检索一体化的实时检索系统

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Cassandra 分布式数据库 配置、启动与集群

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandraxu1/

本文首先介绍了 Cassandra 服务器的配置文件各个配置项的意义,接着讨论了它的启动过程,包括 Cassandra 在启动过程中主要都完成了那些操作,为什么要执行这些操作,最终达到什么状态等。接着介绍如果在集群情况下,集群中节点如何自治理,节点间如何通信、如何 控制数据在集群中的分布等关键问题。

Cassandra 的配置详解

了解一个软件的配置项的意义是使用这个软件的前提,这里详细介绍 Cassandra 的配置文件(storage-config.xml)中各个配置项的意义,这其中包含有很多配置参数,我们可以对其进行调整以达到理想的性能。为了节省篇 幅这里没有列出 storage-config.xml 文件的内容,你可以对照着这个文件看下面的内容。

ClusterName

Cluster Name 代表一个族的标识,它通常代表一个集群。这个配置项在 Cassandra 没有存储数据时就必须指定,当 Cassandra 第一次启动后,它就会被写到 Cassandra 的系统表中,如果你要修改 Cluster Name 必须要删除 Cassandra 中数据。

AutoBootstrap

这个配置项看起来十分简单,但是如果你对 Cassandra 没有深入了解的话,恐怕不知道当你改变这个配置项时 Cassandra 可能会发生什么?

我们知道 Cassandra 集群是通过维护一个自适应的 Token 环来达到集群中的节点的自治理,它们不仅要保证每台机器的状态的同步和一致性还要保证它们之间 Token 分布的合理性,通过重新划分 Token 来达到每台机器的负载的均衡性。

那 这个配置项与 Token 和负载又有何关联性?其实表面上看起来这个配置项是当这个节点启动时是否自动加入集群。但是,当你设置成 False 时它是不是就不加入集群呢?显然不是,这还要看你有没有配置 seeds,如果你配置了其它 seed,那么它仍然会去加入集群。

那么到底有何区别,通过分析其启动代码发现,这个配置项不仅跟 seed 配置项有关而且和 Cassandra 是否是第一次启动也有关。Cassandra 的启动规则大慨如下:

  1. 当 AutoBootstrap 设为 FALSE,第一次启动时 Cassandra 会在系统表中记录 AutoBootstrap=TRUE,以表示这是由系统自动设置的,其实这个只是作为一个标志来判断你以后的启动情况。
  2. 当 AutoBootstrap 设为 TRUE,第一次启动,Cassandra 会判断当前节点有没有被配置成 seed 节点,也就是在本机 ip 有没有在 seeds 中。如果在 seeds 中,Cassandra 的启动情况和 1 是一样的。
  3. 当 AutoBootstrap 设为 TRUE,第一次启动,并且没有配置为 seed,Cassandra 将会有一个漫长的启动过程,当然这个时间的长短和你的当前的集群的数据量有很大的关系。这时 Cassandra 将会根据当前集群的负载,来动态调整它们的均衡。调整均衡的方式就是根据当前的 Token 环分配一个合适的 Token 给这个节点,并将这个符合这个 Token 的数据传给它。

从以上分析可以看出,AutoBootstrap 设置的主要目的是是否调整当前集群中的负载均衡。这其实还有一个很重要的问题就是,如果按照第一种情况启动,如果没有指定 Token,这个节点的 Token 将会是随机生成的,那么问题就来了,当这个随机生成是 Token 加入集群的 Token 环时,Cassandra 如何保证 Token 和 Token 所对应的数据的一致性,这个问题将在后面说明。

Keyspaces

Cassandra 中 Keyspace 相当于关系数据库中的表空间的概念,可以理解为操作表的一个容器,它下面可以定义多个 ColumnFamily,这个 ColumnFamily 就相当于表了,它是存储数据的实体。

ColumnFamily 中几个属性的意义如下:

  • ColumnType。列的类型,有两种:Standard 和 Super,分别是标准列和超列,超列的含义是列还有一个父列。
  • CompareWith。表示的是列的排序规则,可以根据不同的数据类型进行排序如 TimeUUIDType,可以根据插入的时间排序
  • CompareSubcolumnsWith。子列的排序规则与 CompareWith 类似
  • RowsCached。查询时缓存的数据量,可以是多少条,也可以是百分比,如 10% 就是缓存 10% 的数据量,这个对查询性能影响很大,如果命中率高的话,可以显著提高查询效率。
  • KeysCached。缓存 ColumnFamily 中的 key,这个 key 就是对应到 Index.db 中的数据,如果没有在 RowsCached 中命中,那么就要到每个 SSTable 中查询,这时必然要查询 key,如果在 KeysCached 能命中就不需要到 Index.db 中查询了,省去了 IO 操作。

Cassandra 是一个 Key/Value 系统,从它的存储的逻辑结构来看分为:Keyspace、Key、ColumnFamily、Super Column 以及 Column 几个部分。很明显我们能看出每一对 Key/Value 都有一个寄生的容器,所以它实际上是由一个个 Map 容器构成的。这个容器结构可以用图 1 和图 2 来表示:

图 1. 标准的 Column 结构图

图 1. 标准的 Column 结构图

图 2. 含有 Super Column 的结构图

图 2. 含有 Super Column 的结构图

ReplicaPlacementStrategy

定 义数据复制策略,默认是 org.apache.cassandra.locator.RackUnawareStrategy,数据复制到其它节点没有特别的规定。 org.apache.cassandra.locator.RackAwareStrategy 是将节点分为不同的 Rack,这种方式不管是存数据还是查数据,都从不同的 Rack 的节点取数据或写数据。org.apache.cassandra.locator.DatacenterShardStategy 又将节点划分为不同的 Data Center,让数据放在不同数据中心,从而保证数据的安全性,例如可以按机房划分 Data Center,从而避免一个机房出现故障,会影响整个集群。

ReplicationFactor

定义数据要保存几个备份,结合 ReplicaPlacementStrategy 可以把数据放在不同的地方。

EndPointSnitch

org.apache.cassandra.locator.EndPointSnitch 可以根据当前的网络情况选择更好的节点路由,一般默认即可。

Authenticator

这个配置项可以控制数据访问的安全性,可以在 access.properties 和 passwd.properties 设置用户和密码。

Partitioner

控 制数据的分布规则,org.apache.cassandra.dht.RandomPartitioner 是随机分布,Cassandra 控制数据在不同的节点是通过 key 的来划分的,这个方式是将 key 进行 MD5 Hash,从而形成随机分布的 Token,然后根据这个 Token 将数据分布到不同的节点上。

org.apache.cassandra.dht.OrderPreservingPartitioner 是取 key 的 Ascii 字符来划分的,因此我们可以根据 key 来主动控制数据的分布,例如我们可以给 key 加一个前缀,相同前缀的 key 分布在同一个节点中。

InitialToken

给节点分配一个初始 Token,当节点第一次启动后这个 Token 就被写在系统表中。结合 Partitioner 就可以控制数据的分布。这个配置项可以让我们能调整集群的负载均衡。

CommitLogDirectory、DataFileDirectories

这两个配置项是设置 CommitLog 和 SSTable 存储的目录。

Seeds

关于 Seeds 节点的配置有这样几个疑问:

  1. 是不是集群中的所有节点都要配置在 seed 中。
  2. 本机需不需要配置在 seed 中。

关于第二个问题在前面中已经说明了,是否配置就决定是否作为 seed 节点来启动。关于第一个问题,答案是否定的,因为即使你把集群中的所有节点都配置在 seed 中,当 Cassandra 在启动时它也不会往每个 seed 发送心跳信息,而是随机选择一个节点与其同步集群中的其他所有节点状态。几个回合后这个节点同样能够获取集群中所有的节点的列表。这就是集群自治理的优 点,只要能发现其中一个节点就能发现全部节点。

ListenAddress

ListenAddress 这个配置是用来监听集群中其它节点与本节点交换状态信息和数据的地址。需要注意的是当你配置为本机的 ip 地址没有问题,不配置通常也没问题,但是如果你没有配置或者配置成主机名,而你又把你的主机名绑定到 127.0.0.1 时,这时将会导致本节点不能加入到集群中,因为它接受不到其他节点过来的任何信息,防止出错直接绑定本机 ip 最好。

ThriftAddress

监听 Client 的连接请求,不设或者配置成 0.0.0.0,监听所有地址的请求。

RowWarningThresholdInMB

当 Cassandra 压缩时,如果一个 row 超出了配置的大小时打印 warn 日志,没有任何其它作用。

SlicedBufferSizeInKB 和 ColumnIndexSizeInKB

分别是用来配置,根据 Slice 和 Column Name 来查询时 Cassandra 缓存数据的大小,当查询范围较小时可以适当设置大一点以提高命中率。

FlushDataBufferSizeInMB 和 FlushIndexBufferSizeInMB

这两个配置项是设置 Cassandra 在将内存中的数据写到磁盘时一次写入的缓存量,适当提高这个两个值可以提高 Cassandra 的写性能。

MemtableThroughputInMB、MemtableOperationsInMillions 和 MemtableFlushAfterMinutes

MemtableOperationsInMillions 是定义当前 Keyspace 对应的数据在内存中的缓存大小,Cassandra 默认是 64M,也就是当写到 Cassandra 的数据达到 64M 时,Cassandra 会将内存的数据写到本地磁盘中。

MemtableOperationsInMillions 是定义当前这个 Memtable 中所持有数据对象的个数,真实的个数是 MemtableOperationsInMillions*1024*1024。当超出这个数值时 Memtable 同样会被写到磁盘中。

MemtableFlushAfterMinutes 的作用是,当前两个条件都长时间不满足时,Memtable 中数据会一直不会写到磁盘,这也不合适,所以设置了一个时间限制,当超过这个时间长度时 Memtable 中的数据也会被写到磁盘中。

所以 Memtable 中的数据何时被写到写到磁盘是由这三个值决定,任何一个条件满足都会写到磁盘。

ConcurrentReads 和 ConcurrentWrites

这 两个是定义 Cassandra 用来处理 read 和 write 的线程池中线程的个数,根据当前的测试结果,读写的性能大慨是 1:10,适当的设置这两个值不仅要根据读写的性能,还要参考当前机器的处理性能。当机器的 load 很高,但是 cpu 的利用率却很低时,很明显是由于连接数过多,Cassandra 的已经处理不过来都处于等待状态。这样就可以适当增加读写的线程数,同样如果当读的请求大于写的请求时,也应该适当增加读的线程数,反之亦然。

CommitLogSync、CommitLogSyncPeriodInMS 和 CommitLogSyncBatchWindowInMS

我们知道 Cassandra 是先写到 CommitLog 中再写到 Memtable 和磁盘中。如果每写一条数据都要写一次到磁盘那样性能将会大打折扣。Cassandra 为了提高写 CommitLog 的性能提供了两种写的方式。

  1. Periodic。周期性的把 CommitLog 数据写到磁盘中,这个时间周期由 CommitLogSyncPeriodInMS 指定,默认是 10000MS, 如果是这种方式,可想而知 Cassandra 并不能完全保证写到 Cassandra 的数据不会丢失,最坏的情况就是在这个时间段的数据会被丢失,但是 Cassandra 的解释是通过数据的多个备份,来能提高安全性。但是如果是单机存储数据,最坏的情况仍然会丢失 10000MS 时间段写入的数据。可以说这种方式写 CommitLog 是完全的异步的方式。
  2. Batch。这种方式是等待数据被写到磁盘中才会返回,与前面相比安全性会得到保证,它能保证 100% 数据的正确性。但也并不是每写一条数据都立即写到磁盘中,而是有一个延迟时间,这个延迟时间就是由 CommitLogSyncBatchWindowInMS 指定的,也就是写一条数据到 CommitLog 的最大时间是 CommitLogSyncBatchWindowInMS 指定的时间,理想的时间范围是 0.1~10MS 之间。这个时间既要平衡客户端的相应时间也要考虑服务器写数据到磁盘的性能。

这两种方式各有好处,如果数据是存储在有多个备份的集群中,第一种情况下,丢数据的情况几乎为零,但是性能肯定会比第二种要好很多。如果是单机情况下,要保证数据的安全性第二种较合适。

GCGraceSeconds

这 个配置项不是 Java 中的 gc 回收内存,但是其功能类似于 jvm 中 gc,它也是回收已经没有被关联的数据,例如已经被标识为删除的数据,Cassandra 处理数据有点奇怪,即使数据被标识为删除,但是只要是没有超过 GCGraceSeconds 的时间这个数据仍然是存在的,也就是可以定制数据的实效时间,超出这个时间数据将会被回收。

Cassandra 的启动过程

Cassandra 的功能模块

按照我的理解我将 Cassandra 的功能模块划分为三个部分:

  1. 客户端协议解析。目前这个版本 Cassandra 支持两个客户端 avro 和 thrift,使用的较多的是后者,它们都是通过 socket 协议作为网络层协议,然后再包装一层应用层协议,这个应用层协议的包装和解析都是由它们的客户端和相应的服务端模块来完成的。这样设计的目的是解决多种多 样的客户端的连接方式,既可以是短连接也可以是长连接。既可以是 Java 程序调用也可以是 PHP 调用或者多种其它编程语言都可以调用。
  2. 集群 Gossip 协议。集群中节点之间相互通信是通过 Gossip 协议来完成的,它的实现都在 org.apache.cassandra.gms.Gossiper 类中。它的主要作用就是每个节点向集群中的其它节点发送心跳,心跳携带的信息是本身这个节点持有的其它节点的状态信息包括本节点的状态,如果发现两边的状 态信息不是不一致,则会用最新的状态信息替换,同时通过心跳来判断某个节点是否还在线,把这种状态变化通知感兴趣的事件监听者,以做出相应的修改,包括新 增节点、节点死去、节点复活等。除了维护节点状态信息外,还需做另外一些事,如集群之间的数据的转移,这些数据包括:读取的数据、写入的数据、状态检查的 数据、修复的数据等等。
  3. 数据的存储。数据的存储包括,内存中数据的组织形式,它又包括 CommitLog 和 Memtable。磁盘的数据组织方式,它又包括 date、filter 和 index 的数据。

其它剩下的就是如何读取和操作这些数据了,可以用下图来描述 Cassandra 是如何工作的:

图 3. Cassandra 的工作模型

图 3. Cassandra 的工作模型

Cassandra 的启动过程

这里将详细介绍 Cassandra 的启动过程。Cassandra 的启动过程大慨分为下面几个阶段:

storage-config.xml 配置文件的解析

配 置文件的读取和解析都是在 org.apache.cassandra.config.DatabaseDescriptor 类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给 DatabaseDescriptor 的私有静态常量。值得注意的是关于 Keyspace 的解析,按照 ColumnFamily 的配置信息构建成 org.apache.cassandra.config.CFMetaData 对象,最后把这些所有 ColumnFamily 放入 Keyspace 的 HashMap 对象 org.apache.cassandra.config.KSMetaData 中,每个 Keyspace 就是一个 Table。这些信息都是作为基本的元信息,可以通过 DatabaseDescriptor 类直接获取。DatabaseDescriptor 类相关的类结构如下图 4 所示:

图 4. DatabaseDescriptor 类相关的类结构

图 4. DatabaseDescriptor 类相关的类结构创建每个 Table 的实例

创 建 Table 的实例将完成:1)获取该 Table 的元信息 TableMatedate。2)创建改 Table 下每个 ColumnFamily 的存储操作对象 ColumnFamilyStore。3)启动定时程序,检查该 ColumnFamily 的 Memtable 设置的 MemtableFlushAfterMinutes 是否已经过期,过期立即写到磁盘。与 Table 相关的类如图 5 所示:

图 5. Table 相关的类图

图 5. Table 相关的类图一 个 Keyspace 对应一个 Table,一个 Table 持有多个 ColumnFamilyStore,而一个 ColumnFamily 对应一个 ColumnFamilyStore。Table 并没有直接持有 ColumnFamily 的引用而是持有 ColumnFamilyStore,这是因为 ColumnFamilyStore 类中不仅定义了对 ColumnFamily 的各种操作而且它还持有 ColumnFamily 在各种状态下数据对象的引用,所以持有了 ColumnFamilyStore 就可以操作任何与 ColumnFamily 相关的数据了。与 ColumnFamilyStore 相关的类如图 6 所示

图 6. ColumnFamilyStore 相关的类

图 6. ColumnFamilyStore 相关的类CommitLog 日志恢复

这里主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog 日志文件的恢复策略是,在头文件中发现没有被序列化的最新的

ColumnFamily Id,然后取出这个这个被序列化 RowMutation 对象的起始地址,反序列化成为 RowMutation 对象,后面的操作和新添一条数据的流程是一样的,如果这个 RowMutation 对象中的数据被成功写到磁盘中,那么会在 CommitLog 去掉已经被持久化的 ColumnFamily Id。关于 CommitLog 日志文件的存储格式以及数据如何写到 CommitLog 文件中,将在后面第三部分详细介绍。

启动存储服务

这里是启动过程中最重要的一步。这里将会启动一系列服务,主要包括如下步骤。

  1. 创建 StorageMetadata。StorageMetadata 将包含三个关键信息:本节点的 Token、当前 generation 以及 ClusterName,Cassandra 判断如果是第一次启动,Cassandra 将会创建三列分别存储这些信息并将它们存在在系统表的 LocationInfo ColumnFamily 中,key 是“L”。如果不是第一次启动将会更新这三个值。这里的 Token 是判断用户是否指定,如果指定了使用用户指定的,否则随机生成一个 Token。但是这个 Token 有可能在后面被修改。这三个信息被存在 StorageService 类的 storageMetadata_ 属性中,以便后面随时调用。
  2. GCInspector.instance.start 服务。主要是统计统计当前系统中资源的使用情况,将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。
  3. 启动消息监听服务。这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra 会根据每个消息的类型,做出相应的反应。关于消息的处理将在后面详细介绍。
  4. StorageLoadBalancer.instance.startBroadcasting 服务。这个服务是每个一段时间会收集当前这个节点所存的数据总量,也就是节点的 load 数据。把这个数据更新到本节点的 ApplicationState 中,然后就可以通过这个 state 来和其它节点交换信息。这个 load 信息在数据的存储和新节点加入的时候,会有参考价值。
  5. 启动 Gossiper 服务。在启动 Gossiper 服务之前,将 StorageService 注册为观察者,一旦节点的某些状态发生变化,而这些状态是 StorageService 感兴趣的,StorageService 的 onChange 方法就会触发。Gossiper 服务就是一个定时程序,它会向本节点加入一个 HeartBeatState 对象,这个对象标识了当前节点是 Live 的,并且记录当前心跳的 generation 和 version。这个 StorageMetadata 和前面的 StorageMetadata 存储的 generation 是一致的,version 是从 0 开始的。这个定时程序每隔一秒钟随机向 seed 中定义的节点发送一个消息,而这个消息是保持集群中节点状态一致的唯一途径。这个消息如何同步,将在后面详细介绍。
  6. 判断启动模式。是否是 AutoBootstrap 模式启动,又是如何判断的,以及应作出那些相应的操作,在前面的第一部分中已有介绍,这里不再赘述。这里主要说一下,当是 Bootstrap 模式启动时,Cassandra 都做了那些事情。这一步很重要,因为它关系到后面的很多操作,对 Cassandra 的性能也会有影响。

这个过程如下:

  1. 通过之前的消息同步获取集群中所有节点的 load 信息
  2. 找出 load 最大的节点的 ip 地址
  3. 向这个节点发送消息,获取其一半 key 范围所对应的 Token,这个 Token 是前半部分值。
  4. 将这个 Token 写到本地节点
  5. 本地节点会根据这个 Token 计算以及集群中的 Token 环,计算这个 Token 应该分摊集群中数据的一个范围(range)这个环应该就是,最大 load 节点的一半 key 的所对应的 range。
  6. 向这个 range 所在的节点请求数据。发送 STREAM-STAGE 类型的消息,要经过 STREAM_REQUEST、STREAM_INITIATE、STREAM_INITIATE_DONE、STREAM_FINISHED 几次握手,最终才将正确的数据传输到本节点。
  7. 数据传输完成时设置 SystemTable.setBootstrapped(true) 标记 Bootstrap 已经启动,这个标记的目的是防止再次重启时,Cassandra 仍然会执行相同的操作。

这个过程可以用下面的时序图来描述:

图 7. StorageService 服务启动时序图

图 7. StorageService 服务启动时序图以上是 AutoBootstrap 模式启动,如果是以非 AutoBootstrap 模式启动,那么启动将会非常简单,这个过程如下:

  1. 检查配置项 InitialToken 有没有指定,如果指定了初始 Token,使用用户指定的 Token,否则将根据 Partitioner 配置项指定的数据分配策略生成一个默认的 Token,并把它写到系统表中。
  2. 更新 generation=generation+1 到系统表中
  3. 设置 SystemTable.setBootstrapped(true),标记启动方式,防止用户再修改 AutoBootstrap 的启动模式。

Cassandra 集群中的节点状态的同步策略

我 们知道 Cassandra 集群中节点是通过自治理来对外提供服务的,它不像 Hadoop 这种 Master/Slave 形式的集群结构,会有一个主服务节点来管理所有节点中的原信息和对外提供服务的负载均衡。这种方式管理集群中的节点逻辑上比较简单也很方便,但是也有其弱 点,那就是这个 Master 容易形成瓶颈,其稳定性也是一种挑战。而 Cassandra 的集群管理方式就是一种自适应的管理方式,集群中的节点没有 Master、Slave 之分,它们都是平等的,每个节点都可以单独对外提供服务,某个节点 Crash 也不会影响到其它节点。但是一旦某个节点的状态发生变化,整个集群中的所有节点都要知道,并且都会执行预先设定好的应对方案,这会造成节点间要发送大量的 消息交换各自状态,这样也增加了集群中状态和数据一致性的复杂度,但是优点是它是一个高度自治的组织,健壮性比较好。

消息交换

那么 Cassandra 是如何做到这么高度自治的呢?这个问题的关键就是它们如何同步各自的状态信息,同步消息的前提是它们有一种约定的消息交换机制。这个机制就是 Gossip 协议,Cassandra 就是通过 Gossip 协议相互交换消息。

前 面在 Cassandra 服务启动时提到了 Gossiper 服务的启动,一旦 Cassandra 启动成功,Gossiper 服务就是一直执行下去,它是一个定时程序。这个服务的代码在 org.apache.cassandra.gms.Gossiper 类中,下面是定时程序执行的关键代码如清单 1 所示:

清单 1. Gossiper.GossipTimerTask.run
 public void run(){ 
   synchronized( Gossiper.instance ){ 
       endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat(); 
       List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); 
       Gossiper.instance.makeRandomGossipDigest(gDigests); 
       if ( gDigests.size() > 0 ){ 
          Message message = makeGossipDigestSynMessage(gDigests); 
          boolean gossipedToSeed = doGossipToLiveMember(message); 
          doGossipToUnreachableMember(message); 
          if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) 
                       doGossipToSeed(message); 
                        doStatusCheck(); 
          } 
       } 
 }

Cassandra 通过向其它节点发送心跳来证明自己仍然是活着的,心跳里面包含有当前的 generation,用来表示有的节点是不是死了又复活的。

本 地节点所保存的所有其它节点的状态信息都被放在了 GossipDigest 集合中。一个 GossipDigest 对象将包含这个节点的 generation、maxVersion 和节点地址。接着将会组装一个 Syn 消息(关于 Cassandra 中的消息格式将在后面介绍),同步一次状态信息 Cassandra 要进行三次会话,这三次会话分别是 Syn、Ack 和 Ack2。当组装成 Syn 消息后 Cassandra 将随机在当前活着的节点列表中选择一个向其发送消息。

Cassandra 中的消息格式如下:

  1. header:消息头 org.apache.cassandra.net.Header,消息头中包含五个属性:消息编号(messageId)、发送方地址(from)、消息类型(type)、所要做的动作(verb)和一个 map 结构(details)
  2. body:消息内容,是一个 byte 数组,用来存放序列化的消息主体。

可以用下面的图 8 更形象的表示:

图 8. message 消息结构

图 8. message 消息结构当组装成一个 message 后,再将这个消息按照 Gossip 协议组装成一个 pocket 发送到目的地址。关于这个 pocket 数据包的结构如下:

  1. header:包头,4 bytes。前两个是 serializer type;第三个是是否压缩包,默认是否;最后一个 byte 表示是否是 streaming mode。
  2. body:包体,message 的序列化字节数据。

这个 pocket 的序列化字节结构如下:

图 9. 通信协议包的结构

图 9. 通信协议包的结构当 另外一个节点接受到 Syn 消息后,反序列化 message 的 byte 数组,它会取出这个消息的 verb 执行相应的动作,Syn 的 verb 就是解析出发送节点传过来的节点的状态信息与本地节点的状态信息进行比对,看哪边的状态信息更新,如果发送方更新,将这个更新的状态所对应的节点加入请求 列表,如果本地更新,则将本地的状态再回传给发送方。回送的消息是 Ack,当发送方接受到这个 Ack 消息后,将接受方的状态信息更新的本地对应的节点。再将接收方请求的节点列表的状态发送给接受方,这个消息是 Ack2,接受方法接受到这个 Ack2 消息后将请求的节点的状态更新到本地,这样一次状态同步就完成了。

不管是发送方还是接受方每当节点的状态发生变化时都将通知感兴趣的观察者做出相应的反应。消息同步所涉及到的类由下面图 10 的关系图表示:

图 10. 节点状态同步相关类结构图

图 10. 节点状态同步相关类结构图节点的状态同步操作有点复杂,如果前面描述的还不是很清楚的话,再结合下面的时序图,你就会更加明白了,如图 11 所示:

图 11. 节点状态同步时序图

图 11. 节点状态同步时序图上图中省去了一部分重复的消息,还有节点是如何更新状态也没有在图中反映出来,这些部分在后面还有介绍,这里也无法完整的描述出来。

状态更新

前 面提到了消息的交换,它的目的就是可以根据交换的信息更新各自的状态。Cassandra 更新状态是通过观察者设计模式来完成的,订阅者被注册在 Gossiper 的集合中,当交换的消息中的节点的状态和本地节点不一致时,这时就会更新本地状态,更改本地状态本身并没有太大的意义,有意义的是状态发生变化这个动作, 这个动作发生时,就会通知订阅者来完成这个状态发生变化后应该做出那些相应的改动,例如,发现某个节点已经不在集群中时,那么对这个节点应该要在本地保存 的 Live 节点列表中移去,防止还会有数据发送到这个无法到达的节点。和状态相关的类如下:

图 12. 更新状态相关的类

图 12. 更新状态相关的类从 上图可以看出节点的状态信息由 ApplicationState 表示,并保存在 EndPointState 的集合中。状态的修改将会通知 IendPointStateChangeSubscriber,继而再更新 Subscriber 的具体实现类修改相应的状态。

下面是新节点加入的时序图,如图 13 所示:

图 13. 新加入节点的时序图

图 13. 新加入节点的时序图上 图基本描述了 Cassandra 更新状态的过程,需要说明的点是,Cassandra 为何要更新节点的状态,这实际上就是关于 Cassandra 对集群中节点的管理,它不是集中管理的方式,所以每个节点都必须保存集群中所有其它节点的最新状态,所以将本节点所持有的其它节点的状态与另外一个节点交 换,这样做有一个好处就是,并不需要和某个节点通信就能从其它节点获取它的状态信息,这样就加快了获取状态的时间,同时也减少了集群中节点交换信息的频 度。另外,节点状态信息的交换的根本还是为了控制集群中 Cassandra 所维护的一个 Token 环,这个 Token 是 Cassandra 集群管理的基础。因为数据的存储和数据流动都在这个 Token 环上进行,一旦环上的节点发生变化,Cassandra 就要马上调整这个 Token 环,只有这样才能始终保持整个集群正确运行。

到底哪些状态信息对整个集群是重要的,这个在 TokenMetadata 类中,它主要记录了当前这个集群中,哪些节点是 live 的哪些节点现在已经不可用了,哪些节点可能正在启动,以及每个节点它们的 Token 是多少。而这些信息都是为了能够精确控制集群中的那个 Token 环。只要每个集群中每个节点所保存的是同一个 Token 环,整个集群中的节点的状态就是同步的,反之,集群中节点的状态就没有同步。

当然 Cassandra 用这种集群管理方式有其优点,但也存在一些缺点。例如现在部分使用者在大规模集群(上千台服务器)的使用中发现不太稳定,这个跟 gossip 协议的本身也有关,所以这是 Cassandra 社区要致力解决的问题。

总结

本 文从配置文件开始介绍了 Cassandra 的启动过程,以及 Cassandra 是如何管理集群的。实际上 Cassandra 的启动和集群的管理是连在一起的,启动过程中的很多步骤都是集群管理的一部分,如节点以 AutoBootstrap 方式启动,在启动过程中就涉及到数据的重新分配,这个分配的过程正是在动态调整集群中 Token 环的过程。所以当你掌握了 Cassandra 是如何动态调整这个 Token 环,你也就掌握了 Cassandra 的集群是如何管理的了。下一篇将详细介绍 Cassandra 内部是如何组织数据和操作数据。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

Apache Cassandra 数据库

来源:http://www.ibm.com/developerworks/cn/opensource/os-apache-cassandra/

NoSQL 存储提供了关系数据库的一个灵活的、可扩展的替换物,而且在众多诸如此类的存储中,Cassandra 是广受欢迎的选择之一。本文将超越众所周知的一些细节,探讨与 Cassandra 相关的不太明显的细节。您将检查 Cassandra 数据模型、存储模式设计、架构,以及与 Cassandra 相关的潜在惊喜。

Apache Cassandra 简介在数据库历史文章 “What Goes Around Comes Around”中,Michal Stonebraker 详细描述了存储技术是如何随着时间的推移而发展的。实现关系模型之前,开发人员曾尝试过其他模型,比如层次图和有向图。值得注意的是,基于 SQL 的关系模型(即使到现在也仍然是事实上的标准)已经盛行了大约 30 年。鉴于计算机科学的短暂历史及其快速发展的步伐,这是一项非凡的成就。关系模型建立已久,以至于许多年来,解决方案架构师很容易为应用程序选择数据存 储。他们的选择总是关系数据库。

诸如增加系统、移动设备、扩展的用户在线状态、云计算和多核系统的用户群之类的开发已经导致产生越来越多的大型系统。Google 和 Amazon 之类的高科技公司都是首批触及规模问题的公司。他们很快就发现关系数据库并不足以支持大型系统。

为了避免这些挑战,Google 和 Amazon 提出了两个可供选择的解决方案:Big Table 和 Dynamo,他们可以由此放松关系数据模型提供的保证,从而实现更高的可扩展性。Eric Brewer 的 “CAP Theorem”后来官方化了这些观察结果。它宣称,对于可扩展性系统,一致性、可用性和分区容错性都是权衡因素,因为根本不可能构建包含所有这些属性的系统。不久之后, 根据 Google 和 Amazon 早期的工作,以及所获得的对可扩展性系统的理解,计划创建一种新的存储系统。这些系统被命名为 “NoSQL” 系统。该名称最初的意思是 “如果想缩放就不要使用 SQL”,后来被重新定义为 “不只是 SQL”,意思是说,除了基于 SQL 的解决方案外,还有其他的解决方案。

有许多 NoSQL 系统,而且每一个系统都缓和或改变了关系模型的某些方面。值得注意的是,没有一个 NoSQL 解决方案适用于所有的场景。每一个解决方案都优于关系模型,且针对一些用例子集进行了缩放。我的早期文章 “在 Data Storage Haystack 中为您的应用程序寻找正确的数据解决方案” 讨论了如何使应用程序需求和 NoSQL 解决方案相匹配。

Apache Cassandra是其中一个最早也是最广泛使用的 NoSQL 解决方案。本文详细介绍了 Cassandra,并指出了一些首次使用 Cassandra 时不容易发现的细节和复杂之处。

 

Apache Cassandra

Cassandra 是一个 NoSQL 列族 (column family) 实现,使用由 Amazon Dynamo 引入的架构方面的特性来支持 Big Table 数据模型。Cassandra 的一些优势如下所示:

  • 高度可扩展性和高度可用性,没有单点故障
  • NoSQL 列族实现
  • 非常高的写入吞吐量和良好的读取吞吐量
  • 类似 SQL 的查询语言(从 0.8 起),并通过二级索引支持搜索
  • 可调节的一致性和对复制的支持
  • 灵活的模式

这些优点很容易让人们推荐使用 Cassandra,但是,对于开发人员来说,至关重要的一点是要深入探究 Cassandra 的细节和复杂之处,从而掌握该程序的复杂性。

Cassandra 根据列族数据模型存储数据,如 图 1 所示。

图 1. Cassandra 数据模型

该图显示了密钥空间中列和行之间的关系

什么是列?

有点用词不当,使用名称单元格 很可能更容易理解一些。我会坚持使用,因为这是一种习惯用法。

Cassandra 数据模型包括列、行、列族和密钥空间 (keyspace)。让我们逐一进行详细介绍它们。

  • 列:Cassandra 数据模型中最基本的单元,每一个列包括一个名称、一个值和一个时间戳。在本文的讨论中,我们忽略了时间戳,您可以将一个列表示为一个名称值对(例如 author="Asimov")。
  • 行:用一个名称标记的列的集合。例如,清单 1 显示了如何表示一个行:
    清单 1. 行的示例
        "Second Foundation"-> {
        author="Asimov", 
        publishedDate="..",
        tag1="sci-fi", tag2="Asimov"
        }

    Cassandra 包括许多存储节点,并且在单个存储节点内存储每一个行。在每一行内,Cassandra 总是存储按照列名称排序的列。使用这种排序顺序,Cassandra 支持切片查询,在该查询中,给定了一个行,用户可以检索属于给定的列名称范围内的列的子集。例如,范围 tag0 到 tag9999 内的切片查询会获得所有名称范围在 tag0 和 tag9999 内的列。

  • 列族:用一个名称标记的行的集合。清单 2 显示了样例数据的可能形式:
    清单 2. 列族示例
        Books->{
        "Foundation"->{author="Asimov", publishedDate=".."},
        "Second Foundation"->{author="Asimov", publishedDate=".."},
        …
        }

    人们常说列族就像是关系模型中的一个表格。如下例所示,相似点将不复存在。

  • 密钥空间:许多列族共同形成的一个组。它只是列族的一个逻辑组合,并为名称提供独立的范围。

最后,超级列位于一个列族中,该列族对一个密钥下的多个列进行分组。正如开发人员不赞成使用超级列一样,在此,我对此也不作任何讨论。

Cassandra 与 RDBMS 数据模型

根据以上对 Cassandra 数据模型的描述,数据被放入每一个列族的二维 (2D) 空间中。要想在列族中检索数据,用户需要两个密钥:行名称和列名称。从这个意义上来说,尽管还存在多处至关重要的差异,关系模型和 Cassandra 仍然非常相似。

  • 关系列均匀分布在表中的所有行之间。数据项之间通常有明显的纵向关系,但这种情况并不适用于 Cassandra 列。这就是 Cassandra 使用各个数据项(列)来存储列名称的原因。
  • 有了关系模型,2D 数据空间就完整了。2D 空间内的每一个点至少应当拥有存储在此处的 null 值。另外,这种情况不适用于 Cassandra,Cassandra 可以拥有只包括少数项的行,而其他行可以拥有数百万个项。
  • 有了关系模型,就可以对模式进行预定义,而且在运行时不可以更改模式,而 Cassandra 允许用户在运行时更改模式。
  • Cassandra 始终存储数据,这样就可以根据其名称对列进行排序。这使得使用切片查询在列中搜索数据变得很容易,但在行中搜索数据变得很困难,除非您使用的是保序分区程序。
  • 另一个重要差异是,RDMBS 中的列名称表示与数据有关的元数据,但绝不是数据。而在 Cassandra 中,列名称可以包括数据。因此,Cassandra 行可以拥有数百万个列,而关系模型通常只有数十个列。
  • 关系模型使用定义良好的不可变模式来支持复杂的查询,这些查询中包括 JOIN 和聚合等。使用关系模型,用户无需担心查询就可定义数据模式。Cassandra 不支持 JOIN 和大多数 SQL 搜索方法。因此,模式必须满足应用程序的查询要求。

为了探讨上述差异,可以考虑一个书籍评分站点,用户可以在该站点添加书籍(作者、等级、价格和链接)、评论(文本、时间和名称),对这些添加的内容进行标记。应用程序需要支持用户的以下操作:

  • 添加书籍
  • 添加书籍评论
  • 添加书籍标记
  • 列出按等级排序的书籍
  • 列出给定一个标记的书籍
  • 列出给定一个书籍 ID 的评论

使用关系模型实现以上应用程序几乎微不足道。图 2 展示了数据库设计的实体关系 (ER) 图。

图 2. 书籍评分站点的 ER 模型

书籍站点数据模型的流程图让我们看看使用 Cassandra 数据模型如何实现此项操作。清单 3 展示了 Cassandra 的可能模式,其中第一行表示 “Book" 列族(拥有多个行),每一行拥有和列相同的书籍属性。<TS1> 和 <TS2> 表示时间戳。

清单 3. 用于书籍评分的 Cassandra 模式样例
Books[BookID->(author, rank, price, link, tag<TS1>, tag<TS2> .., 
    cmt+<TS1>= text + "-" + author) …] 
Tags2BooksIndex[TagID->(<TS1>=bookID1, <TS2>=bookID2, ..) ] 
Tags2AuthorsIndex[TagID->(<TS1>=bookID1, <TS2>=bookID2, ..) ]
RanksIndex["RANK" -> (rank<TS1>=bookID)]

表 1 是按照模式表示的样例数据集。

表 1. 书籍评分站点的样例数据
列族名称样例数据集
Books"Foundation" -> ("author"="Asimov", "rank"=9, "price"=14, "tag1"="sci-fi", "tag2"="future", "cmt1311031405922"="best book-sanjiva", "cmt1311031405923"="well I disagree-srinath")
"I Robot" -> ("author"="Asimov", "rank"=7, "price"=14, "tag1"="sci-fi" "tag2"="robots", "cmt1311031405924"="Asimov's best-srinath", "cmt1311031405928"="I like foundation better-sanjiva")
RanksIndex"Rank" -> (9="Foundation", 7="I Robot")
Tags2BooksIndex"sci-fi" -> ("1311031405918"="Foundation", "1311031405919"="I Robot"
"future" -> …
Tags2AuthorsIndex"sci-fi" -> (1311031405920="Asimov")
"future" -> …

本示例展示了关系模型和 Cassandra 模型之间的几个设计差异。Cassandra 模型在一个名为 “Book" 的单个列族内存储书籍数据,而其他三个列族是构建用来支持查询的索引。

请仔细看一下 “Books” 列族,该模型使用了一个行来表示书籍名称是行 ID 的每本书。有关书籍的细节被表示为存储在行中的列。

再 仔细看看,您可能会发现,已存储的数据项(比如评论、与书籍关系为 1:M 的标记)也位于单个行中。为了实现这一点,可以将时间戳附加在列名称上,以便进行标记和评论。这种方法在同一列中存储所有的数据。这样的操作避免了必须执 行 JOIN 才可检索数据的问题。Cassandra 弥补了通过此方法支持 JOIN 的不足。

这也提供了一些优势。

  • 通过使用单个查询读取完整行的方法,您可以读取书籍的所有数据。
  • 您可以通过使用切片查询来检索评论和标记,无需执行 JOIN,该切片查询的起始范围和终止范围分别为 cmt0-cmt9999 和 tag0-tag9999。

由 于 Cassandra 存储按照其列名称排序的列,这就使得切片查询非常快就能完成。值得注意的是,在单个行中存储所有的数据细节并使用排序顺序是 Cassandra 数据设计时最重要的理念。大多数 Cassandra 数据模型根据这些理念的某些形式进行设计。用户在存储数据和构建索引时可以使用排序顺序。例如,给列名称附加时间戳的另一个副作用是:就像列名称按照排序 顺序进行存储一样,评论也有使用时间戳后缀的列名称,并按照创建它们的顺序进行存储,且搜索结果也具有相同的顺序。

Cassandra 不支持基础设计的任何搜索方法。尽管其支持二级索引,这些方法还是通过使用后来构建的索引来提供支持,而且二级索引有一些局限性,不支持范围查询。

因 此,要实现 Cassandra 数据设计最好的结果,需要用户通过构建定制索引并使用列和行排序顺序来实现搜索。其他三个列族(Tags2BooksIndex、 Tags2AuthorsIndex 和 RankIndex)也这样做。由于用户需要搜索具有给定标记的书籍,通过将标记名称存储为行 ID,并将使用该标记进行标记的所有书籍存储为该行下的列,“Tags2BooksIndex” 列族构建了一个索引。如该例所示,时间戳被添加为列密钥,但是也是将要提供的惟一的列 ID。通过按照标记名称查找行并通过读取存储在该 rowID 内的所有列来找到匹配项,搜索实现仅读取索引。

表 2 讨论了应用程序要求的每个查询是如何使用上述 Cassandra 索引来实现的。

表 2. 查询实现的比较
查询描述SQL 查询Cassandra 实现
列出根据等级存储的书籍运行查询
"Select * from Books order by rank",然后在每个结果上执行 "Select tag from Tags where bookid=?" and "Select comment from Comments where bookid=?"
在 “RankIndex” 列族上进行切片查询,接收已排序的书籍列表,并在 “Books” 对每一个书籍执行切片查询,以便读取书籍的详细信息。
给定一个标记,查找具有给定标记的书籍的作者。Select distinct author from Tags, Books where Tags.bookid=Books.bookid and tag=?使用切片查询在 Tags2Authors 中读取给定标记的所有列。
给定一个标记,列出具有给定标记的书籍。Select bookid from Tags where tag=? 使用切片查询在 Tags2BooksIndex 中读取具有给定标记的所有列。
给定一个书籍,创建评论时,按时间的排序对列出该书籍的评论进行排序。Select text, time, user from Comments where bookid=? Order by time在 “Books” 列族中,在与给定书籍对应的行中执行切片查询。它们是按排序顺序的,这是因为将时间戳用作了列名称。

尽管上述设计可以高效支持由书籍评分站点要求的查询,但它只能支持为专用查询设计但不支持专用查询的查询。例如,如果没有构建新的索引,它就不能支持以下查询。

  • Select * from Books where price > 50;
  • Select * from Books where author="Asimov"

将设计更改为支持这些和其他查询是有可能的,通过构建适当的索引或编写代码来遍历所有数据即可实现此操作。但是,需要定制代码来支持新的查询,与关系模型相比,这是一种局限性,因为在关系模型中添加新查询通常不需要更改模式。

在 0.8 发行版中,Cassandra 支持次级索引,用户可以在此根据给定属性指定搜索,而且 Cassandra 可以自动构建索引来根据该属性进行搜索。但是,该模型的灵活性不大。例如,次级索引不支持范围查询,也没有为结果的排序顺序提供保证。

在 Java 环境中使用 Cassandra

Cassandra 具有许多用不同语言编写的客户端。本文将重点介绍 Hector 客户端(参阅 参考资料),这是最广泛用于 Cassandra 的 Java 客户端。用户可以通过向应用程序类路径添加 Hector JAR 向应用程序添加 Hector 客户端节点。清单 4 展示了一个样例 Hector 客户端。

首先,连接到 Cassandra 集群。然后使用 Cassandra Getting Started Page(参阅 参考资料)中的指令来建立一个 Cassandra 节点。除非更改了配置,否则通常在端口 9160 之上运行该指令。其次,要定义一个密钥空间,这可以通过客户端或 conf/cassandra.yaml 配置文件来完成。

清单 4. Cassandra 的样例 Hector 客户端节点
Cluster cluster = HFactory.createCluster('TestCluster', 
        new CassandraHostConfigurator("localhost:9160"));

//define a keyspace
Keyspace keyspace = HFactory.createKeyspace("BooksRating", cluster);

//Now let's add a new column. 
String rowID = "Foundation"; 
String columnFamily = "Books";

Mutator<String>
 mutator = HFactory.createMutator(keyspace, user);
mutator.insert(rowID, columnFamily, 
        HFactory.createStringColumn("author", "Asimov"));

//Now let's read the column back 
ColumnQuery<String, String, String>
        columnQuery = HFactory.createStringColumnQuery(keyspace);
columnQuery.setColumnFamily(columnFamily).setKey(”wso2”).setName("address");
QueryResult<HColumn<String, String>
 result = columnQuery.execute();
System.out.println("received "+ result.get().getName() + "= " 
        + result.get().getValue() + " ts = "+ result.get().getClock());

在 Download 中查找书籍评分示例的完整节点。包括切片查询的样例和其他复杂的操作。

Cassandra 架构

查看过 Cassandra 的数据模型之后,让我们返回到 Cassandra 的架构,从分布式系统的角度了解它的优缺点。

图 3 展示了 Cassandra 集群的架构。首先观察到得是 Cassandra 是一个分布式系统。Cassandra 包括多个节点,并跨这些节点来分发数据(用数据库的术语来说就是,将数据分成很多份)。

图 3. Cassandra 集群

该图显示了 cassandra 集群中每一个节点在一个回路中的连接方式Cassandra 使用一致的散列算法给节点分配数据项。简言之,Cassandra 使用一个散列算法来计算存储在 Cassandra (例如,列名称和行 ID)中的每个数据项的密钥散列。散列范围或所有可能的散列值(又称为密钥空间)是在 Cassandra 集群中的节点之间进行分配的。然后,Cassandra 向该节点分配每一个数据项,而该节点负责存储或管理数据项。论文 “Cassandra - A Decentralized Structured Storage System”(参阅 参考资料)提供了有关 Cassandra 架构的详细讨论。

由此产生的架构提供了以下属性:

  • Cassandra 在其节点之间分发数据,这对用户是透明的。任何节点可以接收任何请求(读取、写入或删除)并将请求路由至正确的节点,即使数据没有存储在该节点中。
  • 用户可以定义所需的副本的数量,而且 Cassandra 会透明地处理副本的创建和管理。
  • 可 调节的一致性:在存储和读取数据时,用户可以选择所期望的每项操作的一致性级别。例如,如果 “quorum” 一致性级别是在执行写入或读取操作时使用,那么可以对来自集群中一半以上的节点的数据进行写入和读取操作。支持可调节的一致性使用户能够选择最适合其用例 的一致性级别。
  • Cassandra 提供非常快速的写入速度,比每个节点以每秒 80-360MB 的速度传输数据时的读取速度还要快。它通过使用两项技术实现这一目的。
    • Cassandra 在其负责的节点上保留内存中的大多数数据,而且所有更新都在内存中完成,并以一种懒惰的方式写入永久存储(文件系统)。但是,为了避免丢失数 据,Cassandra 将所有的事务写入磁盘中的提交日志。与在磁盘中更新数据不同,向提交日志写入数据是追加数据,因此在向磁盘写入输入时可以避免旋转延迟。有关磁盘驱动性能 特征的更多信息,请参阅 参考资料。
    • 除非要求写入操作是完全一致的,否则 Cassandra 无需解决任何数据不一致性(只在首次读取时解决不一致性)问题即可将数据写入足够多的节点。这个过程称作 “读取修复”。

由 此产生的架构具有高可缩放性。您可以构建一个具有数十至数百个节点的 Cassandra 集群,能够处理数 TB 到数 PB 字节的数据。分布式系统有一个权衡,而且缩放几乎从来不会免费提供。如前所述,用户在从关系数据库迁移到 Cassandra 时会遇到许多惊喜。下一部分将讨论这些问题。

Cassandra 可能带来的惊喜

从关系数据库迁移到 Cassandra 时要意识到这些差异。

不支持事务,就不支持 JOIN

众所周知,Cassandra 不支持 ACID 事务。尽管其有一个批处理操作,但还是不能保证批处理操作内的子操作是以原子的方式进行的。在 失败操作可能产生变更 中会对此进行详细讨论。

此外,Cassandra 不支持 JOIN。如果用户需要连接两个列族,就必须以编程方式检索和连接数据。对于大型数据集来说,这通常代价高昂且非常耗时。Cassandra 通过在同一行中存储尽更多的数据来巧妙地避免这种局限性,如示例中所述。

没有任何外键和键是不可变的

Cassandra 不支持外键,所以 Cassandra 不可能代表用户来管理数据的一致性。因此,应用程序应当处理数据一致性。此外,用户不能更改键。推荐使用具有需要对多个键进行更改的用例的代理键(生成多个键而非一个键,并将键作为属性进行管理)。

键必须是惟一的

每个键(例如行键和列键)在此范围内都必须是惟一的,而且如果同一个键使用过两次,则需要重写数据。

对 于这个问题有两种解决方案。第一个是,您可以使用一个组合键。也就是说,通过组合多个字段来创建键,而且这个解决方案通常和行键一起使用。第二个解决方案 是,当出现同一个键被使用两次的危险时,使用任意值或时间戳作为该键的后缀。在索引将某个值存储为列名称并且使用这些索引时,通常会发生这种情况。例如, 在书籍评分应用程序中,等级用作列名称。为了避免有两个条目因具有相同的等级而具有相同的列名称,时间戳作为后缀添加到等级中。

失败的操作可能导致发生更改

正如前面已经解释过的,Cassandra 不支持原子操作。相反,它支持幂等操作。不论执行多少次操作,幂等操作都将系统保持为相同的状态。所有的 Cassandra 操作都是幂等的。如果操作失败,您可以果断进行重试。这就提供了一种从暂时性故障中恢复的机制。

Cassandra 还支持批处理操作,但也不提供任何原子性保证。因为操作是幂等的,所以客户端可以一直重试,直到所有批处理操作成功完成为止。

幂 等操作并不等同于原子操作。如果一个操作成功了,一切都很顺利,结果与原子操作是相同的;如果某个操作失败,客户端可以进行重试,如果重试成功了,那么再 次一切顺利。但是,如果在重试后操作仍然失败(与原子操作不同),则会产生副作用。不幸的是,在使用 Cassandra 的时候,这是程序员必须亲自处理的一项复杂事物。

搜索变得复杂

搜索并没有构建为 Cassandra 架构的核心,而且如前所述,搜索机制使用了排序顺序,划分在分层结构的顶部。Cassandra 支持次级索引,系统可以利用一些受限的功能在此自动构建次级索引。当次级索引不工作时,用户必须了解数据模型,并使用排序顺序和切片来构建索引。

与构建搜索方法相关的三种类型的复杂领域:

  1. 在一定程度上,构建定制搜索方法需要程序员了解索引和存储的细节。因此,Cassandra 需要的是更高水平的技能熟练的开发人员,而不是关系模型。
  2. 定制索引很大程度上取决于排序顺序,而且被复杂化。有两种类型的排序顺序:第一,列始终根据名称进行排序,第二,行排序顺序只在使用保序分区程序(参阅 参考资料)时起作用。
  3. 添加一个新查询通常需要新的索引以及与关系模型不同的代码更改。这就要求开发人员先分析查询,然后再存储数据。

不赞成使用超级列和保序分区程序

Cassandra 超级列在建模多层数据时非常有用,它可以向层次结构再添加一个级别。然而,可以与超级列一起建模的任何事物也通过列进行支持。因此,超级列不提供附加能 力,也不支持次级索引。因此,Cassandra 开发人员不赞成使用超级列。尽管没有固定的中断支持的日期,但这种情况会发生在将来的版本中。

Cassandra 中的分区程序决定了以何种方式在 Cassandra 节点之间分发(分开)数据,而且有多种实现方式。如果使用保序分区程序,那么 rowID 会根据排序顺序进行存储,而且 Cassandra 也可以跨各个 rowID 进行切片(搜索)。然而,该分区程序并不是在其节点之间均匀地分发数据,如果使用大数据库,一些节点可能负担很重,而其他节点则是空载的。因此,开发人员 也不赞成使用保序分区程序。

手动执行故障修复

如果 Cassandra 集群中的一个节点已经失败,如果您有副本的话,则该集群将继续工作。完整恢复,用来重新分发数据并弥补缺少的副本,是一项通过名为节点工具(参阅 参考资料)执行的手动操作。而且,执行手动操作时,系统是不可用的。

记得删除

Cassandra 被设计为:即使节点发生故障(或断开连接)但随后又恢复,节点仍会继续工作,不会出现任何问题。其中一个结果就是这使得数据删除复杂化。例如,假设一个节 点出现故障。出现故障时,数据项已经从副本中删除。当不可用的节点恢复的时候,如果 Cassandra 记得该数据项已经删除,它会在同步流程中再次引入已删除的数据项。

因此,Cassandra 必须记得该数据项已经删除。在 0.8 发行版中,Cassandra 记得所有数据(即使已经删除)。这就使得进行集中更新操作时磁盘使用率持续增长。Cassandra 不需要记得所有已删除的数据,但事实恰好已经删除了一个数据项。在以后的 Cassandra 发行版中会执行此项修复。

结束语

本 文深入研究了在考虑 Cassandra 时不太明显的一些细节。我描述了 Cassandra 数据模型,将它与关系数据模型进行了比较,并演示了一个使用 Cassandra 设计的典型模式。其中一个重要的观察结果是,与关系模型有所不同,Cassandra 将数据分解成许多表格,并试图在同一行内保留尽可能多的数据,从而避免连接该数据进行检索。

您还可以看看基于 Cassandra 的方法的一些局限性。但是,这些局限性对于大多数 NoSQL 解决方案来说很常见,而且常常是支持高可扩展性时需要留意的一些设计权衡。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门

分布式Key-Value存储系统Cassandra入门

Cassandra 的数据存储结构

Cassandra 的数据模型是基于列族(Column Family)的四维或五维模型。它借鉴了 Amazon 的 Dynamo 和 Google's BigTable 的数据结构和功能特点,采用 Memtable 和 SSTable 的方式进行存储。在 Cassandra 写入数据之前,需要先记录日志 ( CommitLog ),然后数据开始写入到 Column Family 对应的 Memtable 中,Memtable 是一种按照 key 排序数据的内存结构,在满足一定条件时,再把 Memtable 的数据批量的刷新到磁盘上,存储为 SSTable 。

图 1. Cassandra 的数据模型图:

  1. Cassandra 的数据模型的基本概念:
  2. 1. Cluster : Cassandra 的节点实例,它可以包含多个 Keyspace
    2. Keyspace : 用于存放 ColumnFamily 的容器,相当于关系数据库中的 Schema 或 database3. ColumnFamily : 用于存放 Column 的容器,类似关系数据库中的 table 的概念 4. SuperColumn :它是一个特列殊的 Column, 它的 Value 值可以包函多个 Column5. Columns:Cassandra 的最基本单位。由 name , value , timestamp 组成

下面是关于数据模型实例分析 :

图 2. 数据模型实例分析

图 2. 数据模型实例分析

Cassandra 节点的安装和配置

获取 Cassandra

 # wget  http://labs.renren.com/apache-mirror/cassandra/0.6.0/apache- 
 cassandra-0.6.0-rc1-bin.tar.gz 
 # tar -zxvf apache-cassandra-0.6.0-rc1-bin.tar.gz 
 # mv  apache-cassandra-0.6.0-rc1 cassandra 
 # ls Cassandra
Cassandra 的目录说明
bin存放与 Cassandra 操作的相关脚本
conf存放配置文件的目录
interfaceCassandra 的 Thrift 接口定义文件,可以用于生成各种编程语言的接口代码
Javadoc源代码的 javadoc
libCassandra 运行时所需的 jar 包

配制 Cassandra 节点的数据存储目录和日志目录

修改配制文件 storage-conf.xml:

默认的内容
 <CommitLogDirectory>/var/lib/cassandra/commitlog</CommitLogDirectory> 
	 <DataFileDirectories> 
 <DataFileDirectory>/var/lib/cassandra/data</DataFileDirectory> 
	 </DataFileDirectories>
配置后的内容
	 <CommitLogDirectory>/data3/db/lib/cassandra/commitlog</CommitLogDirectory> 
	 <DataFileDirectories> 
 <DataFileDirectory>/data3/db/lib/cassandra/data</DataFileDirectory> 
	 </DataFileDirectories>

修改日志配制文件 log4j.properties:

log4j.properties 配置
 # 日志路径
 #log4j.appender.R.File=/var/log/cassandra/system.log 
 # 配置后的日志路径 : 
 log4j.appender.R.File=/data3/db/log/cassandra/system.log

创建文件存放数据和日志的目录

 # mkdir – p /data3/db/lib/cassandra 
 # mkdir – p /data3/db/log/Cassandra

配制完成后,启动 Cassandra

 # bin/Cassandra

显示信息

 INFO 09:29:12,888 Starting up server gossip 
 INFO 09:29:12,992 Binding thrift service to localhost/127.0.0.1:9160

看到这两行启动回显信息时,说明 Cassandra 已启动成功。

连接到 Cassandra 并添加、获取数据

Cassandra 的 bin 目录已自带了命令行连接工具 cassandra-cli,可使用它连接到 Cassandra,并添加、读取数据。

连接到 Cassandra,并添加、读取数据
 # bin/cassandra-cli --host localhost --port 9160 
 Connected to: "Test Cluster" on localhost/9160 
 Welcome to cassandra CLI. 
 Type 'help' or '?' for help. Type 'quit' or 'exit' to quit. 
 cassandra> 
 cassandra> set Keyspace1.Standard2['studentA']['age'] = '18'
 Value inserted 
 cassandra> get Keyspace1.Standard2['studentA'] 
 => (column=age, value=18, timestamp=1272357045192000) 
 Returned 1 results

停止 Cassandra 服务

查出 Cassandra 的 pid:16328
 # ps -ef | grep cassandra 
 # kill 16328

Cassandra 配制文件 storage-conf.xml 相关配制介绍

清单 1. storage-conf.xml 节点配制说明清单
 <!-- 集群时显示的节点名称 --> 
 <ClusterName>Test Cluster</ClusterName> 
 <!-- 节点启动时,是否自动加入到集群中,默认为 false --> 
 <AutoBootstrap>false</AutoBootstrap> 
 <!-- 集群的节点配制 --> 
 <Seeds> 
 <Seed>127.0.0.1</Seed> 
 </Seeds> 
 <!-- 节点之间通迅的监听地址 --> 
 <ListenAddress>localhost</ListenAddress> 
 <!-- 
      基于 Thrift 的 cassandra 客户端监听地址,
集群时设为:0.0.0.0 表示侦听所有客户端 , 默认为:localhost 
 --> 
 <ThriftAddress>localhost</ThriftAddress> 
 <!-- 客户端连接的端口 --> 
 <ThriftPort>9160</ThriftPort> 
 <!-- 
   FlushDataBufferSizeInMB   将 memtables 上的数据写入在 Disk 上,
超过设定好的限制大小时 ( 默认 32M),则将数据写入磁盘,
   FlushIndexBufferSizeInMB  超过设定的时长(默认 8 分钟)后,
将 memtables 由的数据写入磁盘中
 --> 
 <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB> 
 <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB> 
 <!-- 
 节点之间的日志记录同步模式。
 默认:periodic, 对应配制 CommitLogSyncPeriodInMS 
 启动 batch 时,则对应的配制 CommitLogSyncBatchWindowInMS 
 --> 
 <CommitLogSync>periodic</CommitLogSync> 
 <!-- 	默认为每 10 秒同步一次日志记录 --> 
 <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS> 
 <!-- 
 <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->

常用编程语言使用 Cassandra 来存储数据

在 使用 Cassandra 时,通常情况下都需要使用第三方插件 Thrift 来生成与 Cassandra 相关的库文件 , 您可以在 http://incubator.apache.org/thrift 下载此插件,并学习它的使用方法。以下是分别在 Java、PHP、Python、C#、Ruby 五种常用编程语言中使用 Cassandra:

Java 程序使用 Cassandra

把 libthrift-r917130.jar,apache-cassandra-0.6.0-rc1.jar 加入到 Eclipse 的编译路径中。

建立数据库连接:使用 libthrift-r917130.jar 的 TTransport 的 open 方法建立起与 Cassandra 服务端 (IP:192.168.10.2 端口:9160) 的连接。

数据库操作:使用 Cassandra.Client 创建一个客户端实例。调用 Client 实例的 insert 方法写入数据,通过 get 方法获取数据。

关闭数据库连接:使用 TTransport 的 close 方法断开与 Cassandra 服务端的连接。

清单 2. Java 连接 Cassandra,写入并读取数据。
 package com.test.cassandra;| 
 import java.io.UnsupportedEncodingException; 
 import org.apache.thrift.transport.TTransport; 
 import org.apache.thrift.transport.TSocket; 
 import org.apache.thrift.protocol.TProtocol; 
 import org.apache.thrift.protocol.TBinaryProtocol; 
 import org.apache.thrift.TException; 
 import org.apache.cassandra.thrift.Cassandra; 
 import org.apache.cassandra.thrift.Column; 
 import org.apache.cassandra.thrift.ColumnOrSuperColumn; 
 import org.apache.cassandra.thrift.ColumnPath; 
 import org.apache.cassandra.thrift.ConsistencyLevel; 
 import org.apache.cassandra.thrift.InvalidRequestException; 
 import org.apache.cassandra.thrift.NotFoundException; 
 import org.apache.cassandra.thrift.TimedOutException; 
 import org.apache.cassandra.thrift.UnavailableException; 
 /* 
 * 使 Java 客户端连接 Cassandra 并进行读写操作
 * @author jimmy 
 * @date 2010-04-10 
 */ 
 public class JCassandraClient{ 
 public static void main(String[] args) throws InvalidRequestException, 
 NotFoundException, UnavailableException, TimedOutException, 
 TException, UnsupportedEncodingException { 

 // 建立数据库连接
 TTransport tr = new TSocket("192.168.10.2", 9160); 
 TProtocol proto = new TBinaryProtocol(tr); 
 Cassandra.Client client = new Cassandra.Client(proto); 
 tr.open(); 
 String keyspace = "Keyspace1"; 
 String cf = "Standard2"; 
 String key = "studentA"; 
 // 插入数据
 long timestamp = System.currentTimeMillis(); 
 ColumnPath path = new ColumnPath(cf); 
 path.setColumn("age".getBytes("UTF-8")); 
 client.insert(keyspace,key,path,"18".getBytes("UTF-8"), 
 timestamp,ConsistencyLevel.ONE); 
 path.setColumn("height".getBytes("UTF-8")); 
 client.insert(keyspace,key,path,"172cm".getBytes("UTF-8"), 
 timestamp,ConsistencyLevel.ONE); 
 // 读取数据
 path.setColumn("height".getBytes("UTF-8")); 
 ColumnOrSuperColumn cc = client.get(keyspace, key, path, ConsistencyLevel.ONE); 
 Column c = cc.getColumn(); 
 String v = new String(c.value, "UTF-8"); 
        // 关闭数据库连接
 tr.close(); 
 } 
 }

PHP 程序使用 Cassandra

在 PHP 代码中使用 Cassandra,需要借助 Thrift 来生成需要的 PHP 文件,通过使用 thrift --gen php interface/cassandra.thrift 生成所需要的 PHP 文件,生成的 PHP 文件中提供了与 Cassandra 建立连接、读写数据时所需要的函数。

清单 3. PHP 连接 Cassandra,写入并读取数据。
 <?php 
 $GLOBALS['THRIFT_ROOT'] = '/usr/share/php/Thrift'; 
 require_once 
 $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php'; 
 require_once 
 $GLOBALS['THRIFT_ROOT'].'/packages/cassandra/cassandra_types.php'; 
 require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php'; 
 require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php'; 
 require_once 
 $GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php'; 
 require_once 
 $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php'; 
 try { 
 // 建立 Cassandra 连接
 $socket = new TSocket('192.168.10.2', 9160); 
 $transport = new TBufferedTransport($socket, 1024, 1024); 
 $protocol = new TBinaryProtocolAccelerated($transport); 
 $client = new CassandraClient($protocol); 
 $transport->open(); 
 $keyspace = 'Keyspace1'; 
 $keyUser = "studentA"; 
 $columnPath = new cassandra_ColumnPath(); 
 $columnPath->column_family = 'Standard1'; 
 $columnPath->super_column = null; 
 $columnPath->column = 'age'; 
 $consistency_level = cassandra_ConsistencyLevel::ZERO; 
 $timestamp = time(); 
 $value = "18"; 
 // 写入数据
 $client->insert($keyspace, $keyUser, $columnPath, $value, 
                            $timestamp, $consistency_level); 
 $columnParent = new cassandra_ColumnParent(); 
 $columnParent->column_family = "Standard1"; 
 $columnParent->super_column = NULL; 
 $sliceRange = new cassandra_SliceRange(); 
 $sliceRange->start = ""; 
 $sliceRange->finish = ""; 
 $predicate = new cassandra_SlicePredicate(); 
 list() = $predicate->column_names; 
 $predicate->slice_range = $sliceRange; 
 $consistency_level = cassandra_ConsistencyLevel::ONE; 
 $keyUser = studentA; 
 // 查询数据
 $result = $client->get_slice($keyspace, $keyUser, $columnParent, 
             $predicate, $consistency_level); 
 // 关闭连接
 $transport->close(); 
 } catch (TException $tx) { 
 }?>

Python 程序使用 Cassandra

在 Python 中使用 Cassandra 需要 Thrift 来生成第三方 Python 库,生成方式: thrift --gen py interface/cassandra.thrift, 然后在 Python 代码中引入所需的 Python 库,生成的 Python 库提供了与 Cassandra 建立连接、读写数据时所需要的方法。

清单 4. Python 连接 Cassandra,写入并读取数据。
 from thrift import Thrift 
 from thrift.transport import TTransport 
 from thrift.transport import TSocket 
 from thrift.protocol.TBinaryProtocol import 
 TBinaryProtocolAccelerated 
 from cassandra import Cassandra 
 from cassandra.ttypes import * 
 import time 
 import pprint 
 def main(): 
 socket = TSocket.TSocket("192.168.10.2", 9160) 
 transport = TTransport.TBufferedTransport(socket) 
 protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) 
 client = Cassandra.Client(protocol) 
 pp = pprint.PrettyPrinter(indent=2) 
 keyspace = "Keyspace1"
 column_path = ColumnPath(column_family="Standard1", column="age") 
 key = "studentA"
 value = "18 "
 timestamp = time.time() 
 try: 
 # 打开数据库连接
 transport.open() 
 # 写入数据
 client.insert(keyspace,key,column_path, 
 value,timestamp,ConsistencyLevel.ZERO) 
 # 查询数据
 column_parent = ColumnParent(column_family="Standard1") 
 slice_range = SliceRange(start="", finish="") 
 predicate = SlicePredicate(slice_range=slice_range) 
 result = client.get_slice(keyspace,key,column_parent, 
 predicate,ConsistencyLevel.ONE) 
 pp.pprint(result) 
 except Thrift.TException, tx: 
 print 'Thrift: %s' % tx.message 
 finally: 
 # 关闭连接
 transport.close() 
 if __name__ == '__main__': 
 main()

C# 使用 Cassandra

在 C# 中使用 Cassandra 需要 Thrift.exe 来生成动态链接库,使用 ./thrift.exe --gen csharp interface/cassandra.thrift 生成所需要的 DLL 文件,生成的 DLL 提供了与 Cassandra 建立连接,读写数据等所需的类和方法,在编程环境中引入生成的 DLL,即可使用。

清单 5. C# 连接 Cassandra,写入并读取数据。
 namespace CshareCassandra{ 
 using System; 
 using System.Collections.Generic; 
 using System.Diagnostics; 
 using Apache.Cassandra; 
 using Thrift.Protocol; 
 using Thrift.Transport; 
 class CassandraClient{ 
 static void Main(string[] args){ 
 // 建立数据库连接
 TTransport transport = new TSocket("192.168.10.2", 9160); 
 TProtocol protocol = new TBinaryProtocol(transport); 
 Cassandra.Client client = new Cassandra.Client(protocol); 
 transport.Open(); 
 System.Text.Encoding utf8Encoding = System.Text.Encoding.UTF8; 
 long timeStamp = DateTime.Now.Millisecond; 
 ColumnPath nameColumnPath = new ColumnPath(){ 
 Column_family = "Standard1", 
 Column = utf8Encoding.GetBytes("age")}; 
   // 写入数据
 client.insert("Keyspace1","studentA",nameColumnPath, 
 utf8Encoding.GetBytes("18"),timeStamp, ConsistencyLevel.ONE); 
 // 读取数据
 ColumnOrSuperColumn returnedColumn = client.get("Keyspace1", 
            "studentA", nameColumnPath, ConsistencyLevel.ONE); 
        Console.WriteLine("Keyspace1/Standard1: age: {0}, value: {1}", 
                     utf8Encoding.GetString(returnedColumn.Column.Name), 
                     utf8Encoding.GetString(returnedColumn.Column.Value)); 
 // 关闭连接
 transport.Close(); 
 } 
 }}

Ruby 使用 Cassandra

在 Ruby 中使用 Cassandra 需要先安装 gem,安装命令:gem install cassandra

安装完成后,打开 Ruby 的 irb 开始使用 Cassandra。

清单 6. Ruby 连接 Cassandra,写入并读取数据
 > require 'rubygems' 
 > require 'cassandra'
 # 建立数据库连接 
 > cdb = Cassandra.new('Keyspace1',"192.168.10.1:9160", :retries => 3) 
 # 写入数据
 > cdb.insert(:Standard1, 'studentA', {'age' => '18'}) 
 # 读取数据
 > cdb.get(:Standard1, :studentA) 
 # 关闭连接
 > cdb.disconnect

搭建 Cassandra 集群环境

Cassandra 的集群是没有中心节点的,各个节点的地位完全相同,节点之间是通过 gossip 的协议来维护集群的状态。

以下是两台安装了 Linux 系统的服务器,且初步设置了 Cassandra 环境和启用了端口 7000,9160:
服务器名端口IP 地址
ServiceA7000,9160192.168.10.3
ServiceB7000,9160192.168.10.2

配制服务器 ServiceA、ServiceB 的 storage-conf.xml 文件

ServiceA 的配置
 <Seeds> 
 <Seed>192.168.10.3</Seed> 
 </Seeds> 
 <ListenAddress>192.168.10.2</ListenAddress> 
 <ThriftAddress>0.0.0.0</ThriftAddress>
ServiceB 的配置
 <Seeds> 
 <Seed>192.168.10.3</Seed> 
 <Seed>192.168.10.2</Seed> 
 </Seeds> 
 <ListenAddress>192.168.10.2</ListenAddress> 
 <ThriftAddress>0.0.0.0</ThriftAddress>

配制完成后,分别启动 ServiceA 和 ServiceB 上的 Cassandra 服务。

查看 ServiceA 和 ServiceB 是否集群成功,可使用 Cassandra 自带的客户端命令

 bin/nodetool --host 192.168.10.2 ring
集群成功则会返回以下类似信息:
 Address Status Load Range Ring 
                                       106218876142754404016344802054916108445 
 192.168.10.2  Up         2.55 KB       31730917190839729088079827277059909532     |<--| 
 192.168.10.3  Up         3.26 KB       106218876142754404016344802054916108445    |-->|

使用 Cassandra 命令行工具进行集群测试

从 ServiceB 连接到 ServiceA,可使用命令:

 cassandra-cli -host 192.168.10.3 -port 9160
集群测试一
写入集群数据
 ServiceA 连接到 ServiceA: 

 # set Keyspace1.Standard2['studentAA']['A2A'] = 'a2a'

 ServiceB 连接到 ServiceA: 

 # set Keyspace1.Standard2['studentBA']['B2A'] = 'b2a'

 ServiceA 连接到 ServiceB: 
 # set Keyspace1.Standard2['studentAB']['A2B'] = 'a2b'

获取集群数据:

 ServiceA 连接到 ServiceA : 
 # get Keyspace1.Standard2['studentAA'], 
   get Keyspace1.Standard2['studentBA'], 
    get Keyspace1.Standard2['studentAB'] 

 ServiceB 连接到 ServiceA : 
 # get Keyspace1.Standard2['studentAA'], 
    get Keyspace1.Standard2['studentBA'], 
    get Keyspace1.Standard2['studentAB'] 

 ServiceA 连接到 ServiceB : 
 # get Keyspace1.Standard2['studentAA'], 
  get Keyspace1.Standard2['studentBA'], 
  get Keyspace1.Standard2['studentAB']

清单 8. 集群测试清单二

ServiceA 停止 Cassandra 服务,ServiceA 连接到 ServiceB 并写入数据

 # set Keyspace1.Standard2['studentAR']['A2R'] = 'a2R'

启动 ServiceA,并链接到 ServiceA 本身,读取刚才在 ServiceB 写入的数据

 # bin/cassandra-cli -host 192.168.10.3 -port 9160 
 # get Keyspace1.Standard2['studentAR']

总结

以 上我们介绍了 Cassandra 的数据模型、节点安装和配置、常用编程语言中使用 Cassandra 以及 Cassandra 的集群和测试。Cassandra 是一个高性能的 P2P 去中心化的非关系型数据库,可以分布式进行读写操作。在系统运行时可以随意的添加或删降字段,是 SNS 应用的理想数据库。

 

同类文章:

cassandra入门

Cassandra 分布式数据库 数据结构与数据读写

Cassandra 分布式数据库 配置、启动与集群

Apache Cassandra 数据库

分布式Key-Value存储系统Cassandra入门