标签归档:redis

配置redis为自动启动的服务

本文是讲述, 如何在centos6.x 系统下如何配置 redis为一个自动启动的服务的过程

在操作过程中 始终需要 root权限

一. Redis安装

redis-service-install-002

 

特别注意 redis的安装目录 如上图

  1. 下载redis 安装程序

wget http://download.redis.io/releases/redis-2.8.19.tar.gz

  1. 解压缩redis安装源代码

tar xzvf redis-2.8.19.tar.gz

cd redis-2.8.19/

  1. 安装redis

make PREFIX=/usr/local/redis install

一定要安装到特定目录中

  1. 拷贝redis配置文件到安装目录

cp   redis.conf    /usr/local/redis/

vi /usr/local/redis/redis.conf

 

二.安装Redis启动服务的辅助程序

  1. 下载

wget http://developer.axis.com/download/distribution/apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar.gz

  1. 解压缩

tar zxf apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar.gz

  1. 编译安装

cd apps/sys-utils/start-stop-daemon-IR1_9_18-2/

gcc start-stop-daemon.c -o start-stop-daemon

cp start-stop-daemon /usr/local/bin/start-stop-daemon

也可以从这里下载, 然后上传, 最后用上面步骤安装

apps-sys-utils-start-stop-daemon-IR1_9_18-2.tar

三.Redis启动服务的安装

1. 下载 安装代码

redis-as-service.v0.1.tar

从上面地址下载源代码

2. 上传到服务器

3. 解压缩源代码

4. 运行安装命令安装

sh install.sh

如下图

redis-service-install-001

5. 配置启动自动运行

chkconfig --level 2 redis-server on && chkconfig --level 3 redis-server on && chkconfig --level 4 redis-server on && chkconfig --level 5  redis-server on

最后检查一下是否设置成功

chkconfig  --list

显示如下:

redis-server              0:off  1:off  2:on   3:on   4:on   5:on   6:off

 

安装完成

 

srcache_nginx redis 构建缓存系统应用一例

redis是一种高效的key-value存储。srcache_nginx模块相关参数介绍,可以参见《memc_nginx+srcache_nginx+memcached构建透明的动态页面缓存》。

下面举一例应用,看配置:

upstream redis {
server 127.0.0.1:6380;
keepalive 512;
}server {
listen       80 backlog=1024 default;
server_name  www.ttlsa.com;
index index.html index.htm index.php;
root  /data/wwwroot/www.ttlsa.com/webroot;

location / {
set $flag 0;
if ($uri ~ /thumb/[0-9]+_160.jpg$){
set $flag "${flag}1";
}
if ($arg_unitid = 42012){
set $flag "${flag}1";
}
if (!-e $request_filename) {
rewrite ^/(.*)$ /index.php?kohana_uri=$1 last;
}
}
location ~ .*\.php?$ {
srcache_store_private on;
srcache_methods GET;
srcache_response_cache_control off;
if ($flag = "011"){
set $key $request_uri;
set_escape_uri $escaped_key $key;
srcache_fetch GET /redis $key;
srcache_default_expire 172800;
srcache_store PUT /redis2 key=$escaped_key&exptime=$srcache_expire;

add_header X-flag $flag;
add_header X-Cached-From $srcache_fetch_status;
add_header X-Cached-Store $srcache_store_status;
add_header X-Key $key;
set_md5 $md5key $key;
add_header X-md5-key $md5key;
add_header X-Query_String $query_string;
add_header X-expire $srcache_expire;
}
include fastcgi_params;
fastcgi_pass  127.0.0.1:10080;
fastcgi_index index.php;
fastcgi_connect_timeout 300;
fastcgi_send_timeout 300;
fastcgi_read_timeout 300;
fastcgi_buffer_size 128k;
fastcgi_buffers 4 256k;
fastcgi_busy_buffers_size 256k;
fastcgi_temp_file_write_size 256k;
fastcgi_intercept_errors on;
fastcgi_param  SCRIPT_FILENAME  $document_root$fastcgi_script_name;
}

location = /redis {
internal;
set_md5 $redis_key $args;
redis_pass redis;
}

location = /redis2 {
internal;

set_unescape_uri $exptime $arg_exptime;
set_unescape_uri $key $arg_key;
set_md5 $key;

redis2_query set $key $echo_request_body;
redis2_query expire $key $exptime;
redis2_pass redis;
}
}

测试:memcached

redis实例下:

memcached

可以记录下日志来测试加缓存前后的耗时。日志格式如下:

log_format srcache_log '$remote_addr - $remote_user [$time_local] "$request" '
'"$status" $body_bytes_sent $request_time $bytes_sent $request_length '
'[$upstream_response_time] [$srcache_fetch_status] [$srcache_store_status] [$srcache_expire]';

转载请注明来自运维生存时间: http://www.ttlsa.com/html/3952.html

Kafka+Spark Streaming+Redis实时计算整合实践

基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
我们的应用场景是分析用户使用手机App的行为,描述如下所示:

  • 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
  • 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
  • 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析

Spark Streaming介绍

Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就 构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):
spark-cluster-overview-streaming-dstream
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过 转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且 Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才 会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作 用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行 Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。

Kafka+Spark Streaming+Redis编程实践

下面,我们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}

一个事件包含4个字段:

  • uid:用户编号
  • event_time:事件发生时间戳
  • os_type:手机App操作系统类型
  • click_count:点击次数

下面是我们实现的代码,如下所示:
package org.shirdrn.spark.streaming.utils

import java.util.Properties
import scala.util.Properties
import org.codehaus.jettison.json.JSONObject
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import scala.util.Random

object KafkaEventProducer {

private val users = Array(
"4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
"011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
"068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
"d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
"6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")

private val random = new Random()

private var pointer = -1

def getUserID() : String = {
pointer = pointer + 1
if(pointer >= users.length) {
pointer = 0
users(pointer)
} else {
users(pointer)
}
}

def click() : Double = {
random.nextInt(10)
}

// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events
// bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning
def main(args: Array[String]): Unit = {
val topic = "user_events"
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)

while(true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os_type", "Android")
.put("click_count", click)

// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)

Thread.sleep(200)
}
}
}

通过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面我们来讨论实现实时统计每个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程中要注意一些问题,如对象序列化等。先看实现代码,稍后我们再详细讨论,代码实现如下所示:
object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码使用了Jedis客户端来操作Redis,将分组计数结果数据累加写入Redis存储,如果其他系统需要实时获取该数据,直接从Redis实时读取即可。RedisClient实现代码如下所示:
object RedisClient extends Serializable {
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)

lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}

上面代码我们分别在local[K]和Spark Standalone集群模式下运行通过。
如果我们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实 例里,上面的代码默认情况是,如果没有传参数则是local[K]模式,所以如果使用这种方式在创建Redis连接池或连接的时候,可能非常容易调试通 过,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是由于在处理Redis连接池或连接的时候出错了。我们可以看一下Spark架构,如图 所示(来自官网):
spark-cluster-overview-0001无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环 境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上 的Executor上(在Spark Streaming中是作用于DStream的操作),那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用 lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引 用(Lazy Reference)来实现的,代码如下所示:
// lazy pool reference
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
...
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})

另一种方式,我们将代码修改为,把对Redis连接的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,如下所示:
package org.shirdrn.spark.streaming

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder
import net.sf.json.JSONObject
import redis.clients.jedis.JedisPool

object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}

// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1
val clickHashKey = "app::users::click"

// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {

/**
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
*/
object InternalRedisClient extends Serializable {

@transient private var pool: JedisPool = null

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
}

def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
if(pool == null) {
val poolConfig = new GenericObjectPoolConfig()
poolConfig.setMaxTotal(maxTotal)
poolConfig.setMaxIdle(maxIdle)
poolConfig.setMinIdle(minIdle)
poolConfig.setTestOnBorrow(testOnBorrow)
poolConfig.setTestOnReturn(testOnReturn)
poolConfig.setMaxWaitMillis(maxWaitMillis)
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

val hook = new Thread{
override def run = pool.destroy()
}
sys.addShutdownHook(hook.run)
}
}

def getPool: JedisPool = {
assert(pool != null)
pool
}
}

// Redis configurations
val maxTotal = 10
val maxIdle = 10
val minIdle = 1
val redisHost = "10.10.4.130"
val redisPort = 6379
val redisTimeout = 30000
val dbIndex = 1
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

val uid = pair._1
val clickCount = pair._2
val jedis =InternalRedisClient.getPool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
InternalRedisClient.getPool.returnResource(jedis)
})
})
})

ssc.start()
ssc.awaitTermination()

}
}

上面代码实现,得益于Scala语言的特性,可以在代码中任何位置进行class或object的定义,我们将用来管理Redis连接的代码放在了 特定操作的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样做还要求我们能够了解Spark内部是如何操作RDD数据集的,更多可 以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行如下命令:
1    cd /usr/local/spark
2    ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077

可以查看集群中各个Worker节点执行计算任务的状态,也可以非常方便地通过Web页面查看。
下面,看一下我们存储到Redis中的计算结果,如下所示:
127.0.0.1:6379[1]> HGETALL app::users::click
1) "4A4D769EB9679C054DE81B973ED5D768"
2) "7037"
3) "8dfeb5aaafc027d89349ac9a20b3930f"
4) "6992"
5) "011BBF43B89BFBF266C865DF0397AA71"
6) "7021"
7) "97edfc08311c70143401745a03a50706"
8) "6874"
9) "d7f141563005d1b5d0d3dd30138f3f62"
10) "7057"
11) "a95f22eabc4fd4b580c011a3161a9d9d"
12) "7092"
13) "6b67c8c700427dee7552f81f3228c927"
14) "7266"
15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
16) "7188"
17) "c8ee90aade1671a21336c721512b817a"
18) "6950"
19) "068b746ed4620d25e26055a9f804385f"

有关更多关于Spark Streaming的详细内容,可以参考官方文档。

附录

这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。如果使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上 提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.shirdrn.spark</groupId>
<artifactId>spark</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>${basedir}/src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

来源:http://shiyanjun.cn/archives/1097.html

Memcached、Redis OR Tair

一、前言

非关系型数据库(NoSQL = Not Only SQL)的产品非常多,常见的有Memcached、Redis、MongoDB等优秀开源项目,相关概念和资料网上也非常丰富,不再重复描述,本文主要 引入Memcached和Redis与淘宝开源Tair分布式存储进行对比测试,由于各自适用场景不同,且每个产品的可配置参数繁多,涉及缓存策略、分布 算法、序列化方式、数据压缩技术、通信方式、并发、超时等诸多方面因素,都会对测试结果产生影响,单纯的性能对比存在非常多的局限性和不合理性,所以不能 作为任何评估依据,仅供参考,加深对各自产品的理解。以下是一些基本认识:

1、尽管 Memcached 和 Redis 都标识为Distribute,但从Server端本身而言它们并不提供分布式的解决方案,需要Client端实现一定的分布算法将数据存储到各个节点, 从而实现分布式存储,两者都提供了Replication功能(Master-Slave)保障可靠性。

2、Tair 则本身包含 Config Server 和 Data Server 采用一致性哈希算法分布数据存储,由ConfigSever来管理所有数据节点,理论上服务器端节点的维护对前端应用不会产生任何影响,同时数据能按指定 复制到不同的DataServer保障可靠性,从Cluster角度来看属于一个整体Solution,组件图参照上一篇博文( http://www.cnblogs.com/lengfo/p/4171655.html )。

基于此,本文设定了实验环境都使用同一台机器进行 Memcached、Redis 和 Tair 的单Server部署测试。

二、前置条件

1、虚拟机环境(OS :CentOS6.5,CPU:2 Core,Memory:4G)

2、软件环境

 Sever Client
MemcachedMemcached 1.4.21Xmemcached 2.0.0
RedisRedis 2.8.19Jedis 2.8.5
TairTair 2.3Tair Client 2.3.1

3、服务器配置,单一服务器通过配置尽可能让资源分配一致(由于各个产品服务器端的配置相对复杂,不再单独列出,以下仅描述内存、连接等基本配置)

 IP_Port Memory_Size Max_Connection 备注
Memcached10.129.221.70:120001024MB2048
Redis10.129.221.70:63791gb(1000000000byte)10000(默认)
Tair Config Server10.129.221.70:5198
Tair Data Server10.129.221.70:51911024MB使用mdb存储引擎

三、用例场景,分别使用单线程和多线程进行测试

1、从数据库读取一组数据缓存(SET)到每个缓存服务器,其中对于每个Server的写入数据是完全一致的,不设置过期时间,进行如下测试。

1)单线程进行1次写入

2)单线程进行500次写入

3)单线程进行2000次写入

4)并行500个线程,每个线程进行1次写入

5)并行500个线程,每个线程进行5次写入

6)并行2000个线程,每个线程进行1次写入

2、分别从每个缓存服务器读取(GET)数据,其中对于每个Server的读取数据大小是完全一致的,进行如下测试。

1)单线程进行1次读取

2)单线程进行500次读取

3)单线程进行2000次读取

4)并行500个线程,每个线程进行1次读取

5)并行500个线程,每个线程进行5次读取

6)并行2000个线程,每个线程进行1次读取

四、单线程测试

1、缓存Model对象(OrderInfo)的定义参照tbOrder表(包括单据号、制单日期、商品、数量等字段)

2、单线程的读写操作对于代码的要求相对较低,不需要考虑Pool,主要代码如下:

1)Memcached单线程读写,使用二进制方式序列化,不启用压缩。

 1 public static void putItems2Memcache(List<OrderInfo> orders) throws Exception {
 2         MemcachedClient memcachedClient = null;
 3         try {
 4             MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000"));
 5             builder.setCommandFactory(new BinaryCommandFactory());
 6             memcachedClient = builder.build();
 7 
 8             for (OrderInfo order : orders) {
 9                 boolean isSuccess = memcachedClient.set("order_" + order.BillNumber, 0, order);
10                 if (!isSuccess) {
11                     System.out.println("put: order_" + order.BillNumber + "  " + isSuccess);
12                 }
13             }
14         } catch (Exception ex) {
15             ex.printStackTrace();
16         } finally {
17             memcachedClient.shutdown();
18         }
19     }
20 
21     public static void getItemsFromMemcache(List<String> billNumbers) throws Exception {
22         MemcachedClient memcachedClient = null;
23         try {
24             MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000"));
25             builder.setCommandFactory(new BinaryCommandFactory());
26             memcachedClient = builder.build();
27 
28             for (String billnumber : billNumbers) {
29                 OrderInfo result = memcachedClient.get(billnumber);
30 
31                 if (result == null) {
32                     System.out.println(" get failed : " + billnumber + " not exist ");
33                 }
34             }
35         } catch (Exception ex) {
36             ex.printStackTrace();
37         } finally {
38             memcachedClient.shutdown();
39         }
40     }

View Code

2)Redis单线程读写,由于Jedis Client 不支持对象的序列化,需要自行实现对象序列化(本文使用二进制方式)。

 1 public static void putItems2Redis(List<OrderInfo> orders) {
 2         Jedis jedis = new Jedis("10.129.221.70", 6379);
 3 
 4         try {
 5             jedis.connect();
 6 
 7             for (OrderInfo order : orders) {
 8                 String StatusCode = jedis.set(("order_" + order.BillNumber).getBytes(), SerializeUtil.serialize(order));
 9                 if (!StatusCode.equals("OK")) {
10                     System.out.println("put: order_" + order.BillNumber + "  " + StatusCode);
11                 }
12             }
13         } catch (Exception ex) {
14             ex.printStackTrace();
15         } finally {
16             jedis.close();
17         }
18     }
19 
20     public static void getItemsFromRedis(List<String> billNumbers) {
21         Jedis jedis = new Jedis("10.129.221.70", 6379);
22 
23         try {
24             jedis.connect();
25 
26             for (String billnumber : billNumbers) {
27                 byte[] result = jedis.get(billnumber.getBytes());
28                 if (result.length > 0) {
29                     OrderInfo order = (OrderInfo) SerializeUtil.unserialize(result);
30                     if (order == null) {
31                         System.out.println(" unserialize failed : " + billnumber);
32                     }
33                 } else {
34                     System.out.println(" get failed : " + billnumber + " not exist ");
35                 }
36             }
37         } catch (Exception ex) {
38             ex.printStackTrace();
39         } finally {
40             jedis.close();
41         }
42     }

序列化代码

 1 package common;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.ObjectInputStream;
 6 import java.io.ObjectOutputStream;
 7 
 8 public class SerializeUtil {
 9 
10     /**
11      * 序列化
12      * @param object
13      * @return
14      */
15     public static byte[] serialize(Object object) {
16         ObjectOutputStream oos = null;
17         ByteArrayOutputStream baos = null;
18 
19         try { 
20             baos = new ByteArrayOutputStream();
21             oos = new ObjectOutputStream(baos);
22             oos.writeObject(object);
23             byte[] bytes = baos.toByteArray();
24             return bytes;
25         } catch (Exception e) {
26             e.printStackTrace();
27         }
28         return null;
29     }
30 
31     /**
32      * 反序列化
33      * @param bytes
34      * @return
35      */
36     public static Object unserialize(byte[] bytes) {
37         ByteArrayInputStream bais = null;
38         try {
39             bais = new ByteArrayInputStream(bytes);
40             ObjectInputStream ois = new ObjectInputStream(bais);
41             return ois.readObject();
42         } catch (Exception e) {
43             e.printStackTrace();
44         }
45 
46         return null;
47     }
48 }

3)Tair单线程读写,使用Java序列化,默认压缩阀值为8192字节,但本文测试的每个写入项都不会超过这个阀值,所以不受影响。

 1 public static void putItems2Tair(List<OrderInfo> orders) {
 2         try {
 3             List<String> confServers = new ArrayList<String>();
 4             confServers.add("10.129.221.70:5198");
 5             //confServers.add("10.129.221.70:5200");
 6 
 7             DefaultTairManager tairManager = new DefaultTairManager();
 8             tairManager.setConfigServerList(confServers);
 9             tairManager.setGroupName("group_1");
10             tairManager.init();
11 
12             for (OrderInfo order : orders) {
13                 ResultCode result = tairManager.put(0, "order_" + order.BillNumber, order);
14                 if (!result.isSuccess()) {
15                     System.out.println("put: order_" + order.BillNumber + "  " + result.isSuccess() + " code:" + result.getCode());
16                 }
17             }
18         } catch (Exception ex) {
19             ex.printStackTrace();
20         }
21     }
22 
23     public static void getItemsFromTair(List<String> billNumbers) {
24         try {
25             List<String> confServers = new ArrayList<String>();
26             confServers.add("10.129.221.70:5198");
27             //confServers.add("10.129.221.70:5200");
28 
29             DefaultTairManager tairManager = new DefaultTairManager();
30             tairManager.setConfigServerList(confServers);
31             tairManager.setGroupName("group_1");
32             tairManager.init();
33 
34             for (String billnumber : billNumbers) {
35                 Result<DataEntry> result = tairManager.get(0, billnumber);
36                 if (result.isSuccess()) {
37                     DataEntry entry = result.getValue();
38                     if (entry == null) {
39                         System.out.println(" get failed : " + billnumber + " not exist ");
40                     }
41                 } else {
42                     System.out.println(result.getRc().getMessage());
43                 }
44             }
45         } catch (Exception ex) {
46             ex.printStackTrace();
47         }
48     }

3、测试结果,每项重复测试取平均值

五、多线程测试

1、除了多线程相关代码外的公共代码和单线程基本一致,多线程测试主要增加了Client部分代码对ConnectionPool、TimeOut相关设置,池策略、大小都会对性能产生很大影响,为了达到更高的性能,不同的使用场景下都需要有科学合理的测算。

2、主要测试代码

1)每个读写测试线程任务完成后统一调用公共Callback,在每批测试任务完成后记录消耗时间

 1 package common;
 2 
 3 public class ThreadCallback {
 4 
 5     public static int CompleteCounter = 0;
 6     public static int failedCounter = 0;
 7 
 8     public static synchronized void OnException() {
 9         failedCounter++;
10     }
11 
12     public static synchronized void OnComplete(String msg, int totalThreadCount, long startMili) {
13         CompleteCounter++;
14         if (CompleteCounter == totalThreadCount) {
15             long endMili = System.currentTimeMillis();
16             System.out.println("(总共" + totalThreadCount + "个线程 ) " + msg + "  ,总耗时为:" + (endMili - startMili) + "毫秒 ,发生异常线程数:" + failedCounter);
17             CompleteCounter = 0;
18             failedCounter = 0;
19         }
20     }
21 }

View Code

2)Memcached多线程读写,使用XMemcached客户端连接池,主要设置连接池大小ConnectionPoolSize=5,连接超时时间ConnectTimeout=2000ms,测试结果要求没有超时异常线程。

测试方法

 1         /*-------------------Memcached(多线程初始化)--------------------*/
 2         MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.31.191:12000"));
 3         builder.setCommandFactory(new BinaryCommandFactory());
 4         builder.setConnectionPoolSize(5);
 5         builder.setConnectTimeout(2000);
 6         MemcachedClient memcachedClient = builder.build();
 7         memcachedClient.setOpTimeout(2000);
 8 
 9         /*-------------------Memcached(多线程写入)--------------------*/
10         orders = OrderBusiness.loadOrders(5);
11         startMili = System.currentTimeMillis();
12         totalThreadCount = 500;
13         for (int i = 1; i <= totalThreadCount; i++) {
14             MemcachePutter putter = new MemcachePutter();
15             putter.OrderList = orders;
16             putter.Namesapce = i;
17             putter.startMili = startMili;
18             putter.TotalThreadCount = totalThreadCount;
19             putter.memcachedClient = memcachedClient;
20 
21             Thread th = new Thread(putter);
22             th.start();
23         }
24 
25                 //读取代码基本一致

线程任务类

 1 public class MemcachePutter implements Runnable {
 2     public List<OrderInfo> OrderList;
 3     public int Namesapce;
 4     public int TotalThreadCount;
 5     public long startMili;
 6     public MemcachedClient memcachedClient = null; // 线程安全的?
 7 
 8     @Override
 9     public void run() {
10         try {
11             for (OrderInfo order : OrderList) {
12                 boolean isSuccess = memcachedClient.set("order_" + order.BillNumber, 0, order);
13                  if (!isSuccess) {
14                 System.out.println("put: order_" + order.BillNumber + "  " + isSuccess);
15                 }
16             }
17         } catch (Exception ex) {
18             ex.printStackTrace();
19             ThreadCallback.OnException();
20         } finally {
21             ThreadCallback.OnComplete("Memcached 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
22         }
23     }
24 }
25 
26 
27 
28 public class MemcacheGetter implements Runnable {
29 
30     public List<String> billnumbers;
31     public long startMili;
32     public int TotalThreadCount;
33     public MemcachedClient memcachedClient = null; // 线程安全的?
34 
35     @Override
36     public void run() {
37         try {
38             for (String billnumber : billnumbers) {
39                 OrderInfo result = memcachedClient.get(billnumber);
40                 if (result == null) {
41                     System.out.println(" get failed : " + billnumber + " not exist ");
42                 }
43             }
44         } catch (Exception ex) {
45             ex.printStackTrace();
46             ThreadCallback.OnException();
47         } finally {
48             ThreadCallback.OnComplete("Memcached 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
49         }
50     }
51 }

3)Redis多线程读写,使用Jedis客户端连接池,从源码可以看出依赖与Apache.Common.Pool2,主要设置连接池MaxTotal=5,连接超时时间Timeout=2000ms,测试结果要求没有超时异常线程。

测试方法

 1         /*-------------------Redis(多线程初始化)--------------------*/
 2         GenericObjectPoolConfig config = new GenericObjectPoolConfig();
 3         config.setMaxTotal(5);
 4         JedisPool jpool = new JedisPool(config, "192.168.31.191", 6379, 2000);
 5 
 6         /*-------------------Redis(多线程写入)--------------------*/
 7         totalThreadCount = 2000;
 8         orders = OrderBusiness.loadOrders(1);
 9         startMili = System.currentTimeMillis();
10         for (int i = 1; i <= totalThreadCount; i++) {
11             RedisPutter putter = new RedisPutter();
12             putter.OrderList = orders;
13             putter.Namesapce = i;
14             putter.startMili = startMili;
15             putter.TotalThreadCount = totalThreadCount;
16             putter.jpool = jpool;
17 
18             Thread th = new Thread(putter);
19             th.start();
20         }

线程任务类

 1 public class RedisPutter implements Runnable {
 2 
 3     public List<OrderInfo> OrderList;
 4     public int Namesapce;
 5     public int TotalThreadCount;
 6     public long startMili;
 7     public JedisPool jpool;
 8 
 9     @Override
10     public void run() {
11         Jedis jedis = jpool.getResource();
12 
13         try {
14             jedis.connect();
15 
16             for (OrderInfo order : OrderList) {
17                 String StatusCode = jedis.set(("order_" + order.BillNumber).getBytes(), SerializeUtil.serialize(order));
18                 if (!StatusCode.equals("OK")) {
19                     System.out.println("put: order_" + order.BillNumber + "  " + StatusCode);
20                 }
21             }
22         } catch (Exception ex) {
23             // ex.printStackTrace();
24             jpool.returnBrokenResource(jedis);
25             ThreadCallback.OnException();
26         } finally {
27             jpool.returnResource(jedis);
28             ThreadCallback.OnComplete("Redis 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
29         }
30     }
31 }
32 
33 
34 
35 public class RedisGetter implements Runnable {
36     public List<String> billnumbers;
37     public long startMili;
38     public int TotalThreadCount;
39     public JedisPool jpool;
40 
41     @Override
42     public void run() {
43         Jedis jedis = jpool.getResource();
44 
45         try {
46             jedis.connect();
47             for (String billnumber : billnumbers) {
48                 byte[] result = jedis.get(billnumber.getBytes());
49                 if (result.length > 0) {
50                     OrderInfo order = (OrderInfo) SerializeUtil.unserialize(result);
51                     if (order == null) {
52                         System.out.println(" unserialize failed : " + billnumber);
53                     }
54                 } else {
55                     System.out.println(" get failed : " + billnumber + " not exist ");
56                 }
57             }
58         } catch (Exception ex) {
59             // ex.printStackTrace();
60             jpool.returnBrokenResource(jedis);
61             ThreadCallback.OnException();
62         } finally {
63             jpool.returnResource(jedis);
64             ThreadCallback.OnComplete("Redis 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
65         }
66     }
67 }

View Code

4)Tair多线程读写,使用官方Tair-Client,可设置参数MaxWaitThread主要指最大等待线程数,当超过这个数量的线程在 等待时,新的请求将直接返回超时,本文测试设置MaxWaitThread=100,连接超时时间Timeout=2000ms,测试结果要求没有超时异 常线程。

测试方法

 1      /*-------------------Tair(多线程初始化tairManager)--------------------*/
 2         List<String> confServers = new ArrayList<String>();
 3         confServers.add("192.168.31.191:5198");
 4         DefaultTairManager tairManager = new DefaultTairManager();
 5         tairManager.setConfigServerList(confServers);
 6         tairManager.setGroupName("group_1");
 7         tairManager.setMaxWaitThread(100);// 最大等待线程数,当超过这个数量的线程在等待时,新的请求将直接返回超时
 8         tairManager.setTimeout(2000);// 请求的超时时间,单位为毫秒
 9         tairManager.init();
10 
11         /*-------------------Tair(多线程写入)--------------------*/
12         orders = OrderBusiness.loadOrders(5);
13         startMili = System.currentTimeMillis();
14         totalThreadCount = 500;
15         for (int i = 1; i <= totalThreadCount; i++) {
16             TairPutter putter = new TairPutter();
17             putter.OrderList = orders;
18             putter.Namesapce = i;
19             putter.startMili = startMili;
20             putter.TotalThreadCount = totalThreadCount;
21             putter.tairManager = tairManager;
22 
23             Thread th = new Thread(putter);
24             th.start();
25         }
26      /*-------------------Tair(多线程读取)--------------------*/
27         //读取代码基本一致

线程任务类

 1 public class TairGetter implements Runnable {
 2     public List<String> billnumbers;
 3     public long startMili;
 4     public int TotalThreadCount;
 5     public DefaultTairManager tairManager;
 6 
 7     @Override
 8     public void run() {
 9         try {
10             for (String billnumber : billnumbers) {
11                 Result<DataEntry> result = tairManager.get(0, billnumber);
12                 if (result.isSuccess()) {
13                     DataEntry entry = result.getValue();
14                     if (entry == null) {
15                         System.out.println(" get failed : " + billnumber + " not exist ");
16                     }
17                 } else {
18                     System.out.println(result.getRc().getMessage());
19                 }
20             }
21         } catch (Exception ex) {
22             // ex.printStackTrace();
23             ThreadCallback.OnException();
24         } finally {
25             ThreadCallback.OnComplete("Tair 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
26         }
27     }
28 }
29 
30 
31 
32 public class TairPutter implements Runnable {
33 
34     public List<OrderInfo> OrderList;
35     public int Namesapce;
36     public int TotalThreadCount;
37     public long startMili;
38     public DefaultTairManager tairManager;
39 
40     @Override
41     public void run() {
42         try {
43             for (OrderInfo order : OrderList) {
44                 ResultCode result = tairManager.put(0, "order_" + order.BillNumber, order);
45                 if (!result.isSuccess()) {
46                     System.out.println("put: order_" + order.BillNumber + "  " + result.isSuccess() + " code:" + result.getCode());
47                 }
48             }
49         } catch (Exception ex) {
50             // ex.printStackTrace();
51             ThreadCallback.OnException();
52         } finally {
53             ThreadCallback.OnComplete("Tair 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
54         }
55     }
56 }

3、测试结果,每项重复测试取平均值

六、Memcached、Redis、Tair 都非常优秀

Redis在单线程环境下的性能表现非常突出,但在并行环境下则没有很大的优势,是JedisPool或者CommonPool的性能瓶颈还是我测试代码的问题请麻烦告之,过程中修改setMaxTotal,setMaxIdle都没有太大的改观。

Tair由于需要在服务器端实现数据分布等相关算法,所以在测试对比中性能有所损耗应该也很好理解。

如之前所言,每个技术本身的原理、策略、适用场景各不相同,尽管以上测试方法已经考虑了很多影响因素,但任然可能存在不足之处,所以类似的对比缺乏合理 性,Tair还有2种存储引擎没有测试,而且以上都基于单机环境测试,在Cluster环境下可能也会有差别,所以结果仅供参考,不作任何评估依据。

 

来源:http://www.tuicool.com/articles/mmEz6zq

redis使用与学习

redis是一个非常优秀的分布式缓存系统, 以前主要是用memcached做缓存, 后来有些数据需要在缓存中做排序等操作, memcached就不方便了。因此后来采用redis系统。

在没有采用redis系统前,快乐成长blog经常非常缓慢,每次用户请求都要用php进行解析处理, 因此效率最低下了。

为了解决php下调用redis问题, 查阅了redis的网站,然后搜索一下文章来检查php访问redis方法, 参考如下:

试用Redis安装、php环境连接、测试    介绍php中进行redis连接的一些资料

PHP-redis中文文档    php中访问redis的资料, 比较详细

Redis与phpredis的安装  php中访问redis的资料, 同前面类似

redis是如此好的一个分布式缓存, 非常值得好好学习一下,学习redis应该以实践为基础, 多写代码,多思考。 正常redis是运行在linux中,对我们日常学习和开发有些影响。 其实redis是可以在windows中运行的, 这个同windows中的nginx一样仅仅是用来做开发的。一般实际使用中,还是在linux中的。

      Windows下Redis的安装使用          通过这个资料可以找到redis的下载地址, 以及如何在windows中启动redis等

      windows xp下PHP中redis的使用  包括xp中使用php访问redis的一些资料,比较简单。

redis的原理等一些介绍,可以在下面一些资料中找到

Redis 起步   简单介绍redis

redis缓存技术学习   比较详细,全面的介绍了redis系统, 说的很明白

再有redis的高级特性, 包括分布式缓存,事物,内存等在下面有些资料, 有兴趣可以去参考一下。

8种NoSQL数据库对比    对比一些常用的 缓存、数据库等

PHP-redis中文文档    redis的php文档

Redis内存使用优化与存储  redis内存结构,数据存储等, 很重要

Redis内存存储结构分析 从代码的级别进行了分析, 很好, 但是很难懂, 有兴趣可以去阅读。(资料都是从互联网上搜集整理的)

使用Redis的五个注意事项

Redis集群明细文档

Redis复制与可扩展集群搭建      redis的主从复制是比较重要的, 但是主从是异步复制, 有主从不一致的问题, 使用时要注意相关问题对逻辑的影响。

Redis主从复制配置及工作过程

Redis高级实用特性:安全性与主从复制

Redis事务学习

解密Redis持久化

Redis的Sorted-Sets数据类型学习

Redis服务器安装

Redis的Key操作命令学习

redis 排序

memcached&redis性能测试