标签归档:hadoop

flume中sink到hdfs示例

一。 什么flume

     Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
现在这个项目是 apache的一个顶级项目,地址如下:http://flume.apache.org/
原文信息:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Agent component diagram

二。参见用法

1. Setting multi-agent flow(设置多流)
Two agents communicating over Avro RPC

In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.

2. Consolidation(合并)

A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.

A fan-in flow using Avro RPC to consolidate events in one place

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

3. Multiplexing the flow(复用流

Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.

A fan-out flow using a (multiplexing) channel selector

The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

三。 用法举例

1. Flume 收集Nginx日志到Hdfs Tail-to-hdfs sink

nginx,access.log日志约8000条/s,每100w条数据约253M,需要2min

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure spooldir source1
#agent1.sources.source1.type = spooldir
#agent1.sources.source1.spoolDir = /var/log/apache/flumeSpool1
#agent1.sources.source1.fileHeader = true

# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /tmp/log.log
agent1.sources.source1.channels = channel1

# Describe/configure nc source1
#agent1.sources.source1.type = netcat
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444

#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname

# Describe sink1
#agent1.sinks.sink1.type = logger

agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%y-%m-%d/%H%M%S
agent1.sinks.sink1.hdfs.path =hdfs://xxx:9000/tmp/tail/%y-%m-%d/%H
agent1.sinks.sink1.hdfs.filePrefix = %{hostname}/events-
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 500
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 0
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 600
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
2.flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用

在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=XXX
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这里配置的是60秒,文件滚动一次,也就每隔60秒,会新产生一个文件【前提,flume的source端有数据来】但是当我启动flume的时候,运行十几秒,不断写入数据,发现hdfs端频繁的产生文件,每隔几秒就有新文件产生
而且在flume的日志输出可以频繁看到这句:

[WARN] Block Under-replication detected. Rotating file.

只要有这句,就会产生一个新的文件

意思就是检测到复制块正在滚动文件,结合源码看下:

private boolean shouldRotate() {
    boolean doRotate = false;

    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }

    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

    return doRotate;
  }

这是判断是否滚动文件,但是这里面的第一判断条件是判断是否当前的HDFSWriter正在复制块

public boolean isUnderReplicated() {
    try {
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        desiredBlocks = configuredMinReplicas;
      } else {
        desiredBlocks = getFsDesiredReplication();
      }
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
  }

通过读取的配置复制块数量和当前正在复制的块比较,判断是否正在被复制

if (shouldRotate()) {
      boolean doRotate = true;

      if (isUnderReplicated) {
        if (maxConsecUnderReplRotations > 0 &&
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
          doRotate = false;
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
            LOG.error("Hit max consecutive under-replication rotations ({}); " +
                "will not continue rolling files under this path due to " +
                "under-replication", maxConsecUnderReplRotations);
          }
        } else {
          LOG.warn("Block Under-replication detected. Rotating file.");
        }
        consecutiveUnderReplRotateCount++;
      } else {
        consecutiveUnderReplRotateCount = 0;
      }

以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大于0,会按照你的配置来滚动的,但是在入口进来后,发现,又去判断了是否有块在复制;

里面就读取了一个固定变量maxConsecUnderReplRotations=30,也就是正在复制的块,最多之能滚动出30个文件,如果超过了30次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate=false,不会滚动了,所以有的人发现自己一旦运行一段时间,会出现30个文件再结合上面的源码看一下:

如果你配置了10秒滚动一次,写了2秒,恰好这时候该文件内容所在的块在复制中,那么虽然没到10秒,依然会给你滚动文件的,文件大小,事件数量的配置同理了。

为了解决上述问题,我们只要让程序感知不到写的文件所在块正在复制就行了,怎么做呢??

只要让isUnderReplicated()方法始终返回false就行了

该方法是通过当前正在被复制的块和配置中读取的复制块数量比较的,我们能改的就只有配置项中复制块的数量,而官方给出的flume配置项中有该项

hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

默认读的是hadoop中的dfs.replication属性,该属性默认值是3这里我们也不去该hadoop中的配置,在flume中添加上述属性为1即可配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
#a1.sinks.k1.hdfs.fileType=DataStream
#a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这样程序就永远不会因为文件所在块的复制而滚动文件了,只会根据你的配置项来滚动文件了,试试吧!!

hadoop 2.0 yarn应用程序的执行流程

YARN程序的执行流程

Yarn是一个资源管理系统,负责整个集群资源的管理和分配。如果需要在一个yarn集群上运行程序:

  1. 首先得有个客户端client来提交job到ResourceManager(RM)申请资源。Client通过RMProtocol协议来与 RM通信,将应用程序运行所需的一些信息,比如local file/jars,执行的命令,参数,环境变量等提供给RM来运行应用的第一个container也就是 ApplicationMaster(AppMaster)。
  2. 如果申请到了资源,RM就在第一个container上启动AppMaster。AppMaster然后通过AMRMProtocol协议与ResourceManager通讯,注册自身,然后继续申请资源。
  3. 如果获得了containers,AppMaster会通过ContainerManager类与NodeManager通信,为任务启动 container。AppMaster同时也会提供启动container需要的一些信息,比如命令行,环境变量等。任务完成后,AppMaster会 通过AMRMProtocol::finishApplicationMaster来通知RM任务完成。同时,client可以通过查询RM来获取job 的状态信息,或者如果AppMaster支持也可以直接从AppMaster查询信息。如果需要,client可以通过 ClientRMProtocol::forceKillApplication来kill掉application。

整个执行流程可以参考下图(来源网络):

hadoop-yarn-job-run-flow-diagram

三个角色

  1. client 即客户端,负责将应用程序提交到RM。
  2. AppMaster 即整个应用程序的核心,负责与RM通信,申请资源,启动containers。并监控containers的执行情况,在container执行失败后做failover的处理。
  3. container 就是具体干活的,和具体业务相关的一些处理逻辑。

三个RPC协议

  1. ClientRMProtocol(Client<–>ResourceManager):客户端与RM通信的协议,可以启动AppMater,查询或者kill AppMaster。
  2. AMRMProtocol(ApplicationMaster<–>ResourceManager):AppMaster与RM通信,AppMaster可以向RM注册和注销自己,也可以向RM申请资源以启动container。
  3. ContainerManager(ApplicationMaster<–> NodeManager):AppMaster与NM通信,可以启动或者停止一个container,也可以获取container的执行状态。

Distributed shell

编写yarn应用程序的详细步骤可以直接参考源码自带的distributed shell的例子。distributed shell是在每个节点上执行一条shell命令或者一个脚本,对于理解基本的概念还是很有帮助的。

YARN编程框架的开发

可以看到,一个YARN应用程序的编写,大量的工作是在编写客户端和AppMaster。而AppMaster要处理资源申请,启动和监控container,尤其是container的fail over,这才是真正值得关注的地方。对于大量的应用程序来说,AppMaster的工作机制可能相同,这就可以抽象出一个通用的AppMaster框架。框架的使用者只需要关心自己的具体应用逻辑即container就可以了,可以大大减少开发成本。

其实Yarn已经提供了一个可以直接使用的客户端-MRClienService和AppMaster-MRAppMater。MapReduce 也只是Yarn上的一种通用的框架,所以完全可以参考MRAppMaster来实现自己的框架。如类似storm的流式计算框架,或者调度RPC Service的框架,或者支持MPI的框架。目前github上已经有类似的项目出现了,相信不久就会出现一些通用的框架。

 

来源:http://www.rigongyizu.com/how-to-write-hadoop-0-23-yarn-mapreduce-and-other-applications/

spark基于hadoop集群环境的搭建

spark和hadoop都是两个炙手可热的两个项目,都是大名鼎鼎, hadoop的联邦高可用集群已经搭建完毕。

今天基于新版的spark1.3.1并且构建在hadoop2.7上,安装过程如下:

安装前提, 安装好hadoop集群,略。

一. 安装scala

1. 进入下载页面 http://www.scala-lang.org/download/2.11.7.html

2. 下载如下scala版本 http://downloads.typesafe.com/scala/2.11.7/scala-2.11.7.tgz?_ga=1.131438735.1254462619.1436380164

3. 上传的服务器上相关目录

4. 解压缩scala文件 tar -xvf scala-2.11.7.tgz

5. 验证scala,  输入scala -version
Scala code runner version 2.11.7 -- Copyright 2002-2013, LAMP/EPFL

--- $ scala
Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_37).
Type in expressions to have them evaluated.
Type :help for more information.

 

二. 下载spark

1. 进入下载页面  http://spark.apache.org/downloads.html  选择如下选项

install-spark-download

2. 上传下载的spark程序到master服务器

3. 进入spark的master服务器,解压缩 tar xzvf spark-1.3.1-bin-hadoop2.6.tgz

4. 修改spark的相关目录, 修改为 spark1.3

三. 配置环境变量

1. 编辑环境变量 vi ~/.bash_profile

在文件最后添加如下内容:

HADOOP_PREFIX=/export/hh/hadoop-ha-zk
export HADOOP_PREFIX
export JAVA_HOME=/export/hh/jdk1.7.0_79
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/export/hh/scala
export HADOOP_HOME=/export/hh/hadoop-ha-zk
export SPARK_HOME=/export/hh/spark1.3
export PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${HADOOP_HOME}/bin

保存退出,

2. 运行 source ~/.bash_profile  是配置 在本机生效

3. 拷贝.bash_profile 文件到其他服务器

scp .bash_profile hh@mq66:~

。。。。。

四. 配置spark的环境变量

1. 进入spark的配置目录  cd /export/hh/spark1.3/conf    请根据自己的解压缩目录调整命令

2. 执行下面命令, cp spark-env.sh.template  spark-env.sh 进行配置

3. 编辑vi  spark-env.sh 文件, 在文件最后添加如下内容

export JAVA_HOME=/export/hh/jdk1.7.0_79
export SCALA_HOME=/export/hh/scala
export SPARK_WORKER_MEMORY=4g
export SPARK_MASTER_IP=mq65
export MASTER=spark://mq65:7077

保存退出

4. 拷贝配置文件到其他服务器, 执行下面命令

scp spark-env.sh hh@mq66:/export/hh/spark1.3/conf/

。。。。。

5. 编辑slaves文件, 输入如下命令

cp slaves.template slaves
vi slaves

添加如下内容:

mq65
mq66
。。。。。

保存退出

6. 拷贝配置文件 到其他服务器

scp slaves hh@mq66:/export/hh/spark1.3/conf

。。。。。

五. 启动spark集群

1. 启动spark分布式集群并查看信息

用下面命令启动spark集群 sbin/start-all.sh

用jps查看进程

jps
23016 NodeManager
21909 JournalNode
22120 DFSZKFailoverController
21678 DataNode
21559 NameNode
5870 Worker
5185 Master
6507 Jps

2. 页面查看集群状况:

http://mq65:8080/

进去spark集群的web管理页面,访问

install-spark-download-2

 

3. 我们进入spark的bin目录,启动spark-shell控制台

bin/spark-shell

现在我们已经顺利进入spark-shell的世界了

访问http://mq65:4040/,我们可以看到spark WEBUI页面

install-spark-download-3

spark集群环境搭建成功了

Hadoop MapReduce工作原理

1.剖析MapReduce作业运行机制

1).经典MapReduce--MapReduce1.0

整个过程有有4个独立的实体

  • 客户端:提交MapReduce
  • JobTracker:协调作业的运行
  • TaskTracker:运行作业划分后的任务
  • HDFS:用来在其他实体之间共享作业文件

以下为运行整体图

A.作业的提交

JobClient的runJob是用于新建JobClient实例并调用其submitJob()方法的便捷方式,提交Job后,runJob()每秒轮询检测作业的进度,随时监控Job的运行状态。

其中JobClient的submitJob()方法所实现的作业提交过程:

  • 向JobTracker请求一个新的作业ID
  • 检查作业的输出说明
  • 计算作业的输入分片
  • 将运行作业所需要的资源(Jar文件,配置文件和计算所得输入分片)复制到一个作业ID命名的目录下JobTracker的文件系统中。

B.作业的初始化

JobTracker接收对其提交的作业后,会把这个调用放入一个队列,交由作业调度器调度,初始化。初始化包括创建一个表示正在运行作业的对象---封装任务和记录信息,以便跟踪任务的状态和进程

C.任务的分配

TaskTracker运行简单的循环来对JobTracker发送心跳,告知自己的是否存活,同时交互信息,对于map任务和reduce任务,TaskTracker会分配适当的固定数量的任务槽,理想状态一般遵循数据本地化,和机架本地化

D.任务的执行

第一步:TaskTracker拷贝JAR文件到本地,第二部:TaskTracker新建本地目录,将JAR文件加压到其下面;第三步:TaskTracker新建一个TaskRunner实例运行该任务。

Streaming和Pipes可运行特殊的Map和Reduce任务,Streaming支持多语言的编写,Pipes还可以与C++进程通信,如下图:

E.进程和状态的更新

通过Job的Status属性对Job进行检测,例如作业云习惯状态,map和reduce运行的进度、Job计数器的值、状态消息描述等等,尤其对计数器Counter(计数器)属性的检查。状态更新在MapReduce系统中的传递流程如下

F.作业的完成

当JobTracker收到Job最后一个Task完成的消息时候便把Job的状态设置为”完成“,JobClient得知后,从runJob()方法返回

2).Yarn(MapReduce 2.0)

Yarn出现在Hadoop 0.23和2.0版本中,相对前面 MapReduce的性能有不少的提高

相比较MapReduce1.0,JobTracker在MRv2 中被拆分成了两个主要的功能使用守护进程执行:资源管理和任务的调度与监视。这个想法创建一个全局的资源管理(global ResourceManager (RM))和为每个应用创建一个应用管理(ApplicationMaster (AM))。一个应用可以使一个MR jobs的经典场景或者是一串连续的Job

ResourceManager 和每个slave节点的NodeManager (NM)构成一个资源估算框架。ResourceManager 对在系统中所有应用的资源分配拥有最终的最高级别仲裁权。其中ResourceManager 拥有两个主要的组件:调度器(Scheduler) 和资源管理器(ApplicationsManager)

实际上每个应用的ApplicationMaster(AM)是资源估算框架具体用到的lib包,被用来和ResourceManager 进行资源谈判,并且为NodeManager执行和监控task

整体如下图:

综上,在Hadoop Yarn中有5个独立的实体

  • 客户端:用来提交MapReduce作业(Job)的
  • Yarn ResourcesManager:用来管理协调分配集群中的资源
  • Yarn NodeManager:用来启动和监控本地计算机资源单位Container的利用情况
  • MapReduce Application Master:用来协调MapReduce Job下的Task的运行。它和MapReduce Task 都运行在 Container中,这个Container由RM(ResourcesManager)调度并有NM(NodeManager)管理
  • HDFS:用来在其他实体之间共享作业文件

整体如下:

A.作业的提交

Job提交相似于MapReduce1.0.当在配置文件中设置mapreduce.framework.name为yarn时 候,MapReduce2.0继承接口ClientProtocol的模式就激活了,RM生成新的Job ID(从Yarn的角度看是Application ID---步骤2),接着Job客户端计算输入分片,拷贝资源(包括Job JAR文件、配置文件,分片信息)到HDFS(步骤3),最后用submitApplication函数提交Job给RM(步骤4)

B.作业的初始化

RM接受到由上面的A提交过来的调用,将其请求给调度器处理,调度器分配Container,同时RM在NM上启动Application Master进程(步骤5a和5b),AM主函数MRAppMatser会初始化一定数量的记录对象(bookkeeping)来跟踪Job的运行进度, 并收取task的进度和完成情况(步骤6),接着MRAppMaster收集计算后的输入分片

之后与MapReduce1.0又有所不同,此时Application Master会决定如何组织运行MapReduce Job,如果Job很小,能在同一个JVM,同一个Node运行的话,则用uber模式运行(参见源码)

C.任务的分配

如果不在uber模式下运行,则Application Master会为所有的map和reducer task向RM请求Container,所有的请求都通过heartbeat(心跳)传递,心跳也传递其他信息,例如关于map数据本地化的信息,分片所 在的主机和机架地址信息,这信息帮主调度器来做出调度的决策,调度器尽可能遵循数据本地化或者机架本地化的原则分配Container

在Yarn中,不像MapReduce1.0中那样限制map或者reduce的slot个数,这样就限制了资源是利用率,Yarn中非配资源更具 有灵活性,可以在配置文件中设置最大分配资源和最小分配资源,例如,用yarn.scheduler.capacity.minimum- allocation-mb设置最小申请资源1G,用yarn.scheduler.capacity.maximum-allocation-mb设置 最大可申请资源10G 这样一个Task申请的资源内存可以灵活的在1G~10G范围内

D.任务的执行

分配给Task任务Container后,NM上的Application Master就联系NM启动(starts)Container,Task最后被一个叫YarnChild的main类执行,不过在此之前各个资源文件已 经从分布式缓存拷贝下来,这样才能开始运行map Task或者reduce Task。PS:YarnChild是一个(dedicated)的JVM

Streaming 和 Pipes 运行机制与MapReduce1.0一样

E.进程和状态的更新

当Yarn运行同时,Task和Container会报告它的进度和状态给Application Master,客户端会每秒轮询检测Application Master,这样就随时收到更新信息,这些信息也哭通过Web UI来查看

F.作业的完成

客户端每5秒轮询检查Job是否完成,期间需要调用函数Job类下waitForCompletion()方法,Job结束后该方法返回。轮询时间间隔可以用配置文件的属性mapreduce.client.completion.pollinterval来设置

2.失败情况

1)经典MapReduce---MapReduce1.0

A.TasK失败

第一种情况:map或reduce任务中的用户代码抛出运行异常,此时子进程JVM进程会在退出之前想TaskTracker发送错误报告,错误报 告被记录错误日志,TaskTracker会将这个任务(Task)正在运行的Task Attempt标记为失败,释放一个任务槽去运行另外一个Task Attempt

第二种情况:子进程JVM突然退出Task Tracker会注意到JVM退出,并将此Task Attempt标记为失败

JobTracker通过心跳得知一个Task Attempt失败后,会重启调度该Task的执行,默认情况下如果失败4次不会重试(通过mapred.map.max.attempts可改变这个次数),整个Job也会标记为失败

B.TaskTracker失败

如果TaskTracker由于崩溃或者运行过慢失败,则停止向JobTracker发送心跳,JobTracker会注意到这点并将这个TaskTracker从等待任务调度TaskTracker池中移除

即使TaskTracker没有失败,也有可能因为失败任务次数远远高于集群的平均失败次数,这种情况会被列入黑名单,在重启后才将此TaskTracker移出黑名单

C.JobTracker失败

JobTracker失败是是最严重的是爱,此时只得重新开始提交运行

2).Yarn失败

A.Task(任务)的失败

情况与MapReduce1.0相似,其中Task Attempt失败这个消息会通知Application Master,由Application Master标记其为失败。当Task失败的比例大于mapreduce.map.failures.maxpercent(map)或者 mapreduce.reduce.failures.maxpercent(reduece)时候,Job失败

B.Application Master的失败

与前面相似,当Application Master失败,会被标记为失败,这是RM会立刻探寻到AM(Application Master)的失败,并新实例化一个AM和借助NM建造新的相应的Container,在设置 yarn.app.mapreduce.am.job.recovery.enable为true情况下失败的AM能够恢复,并且恢复后并不返回。默认情 况下,失败AM会让所有的Task返回

如果客户端轮询得知AM失败后,经过一段时间AM失败状态仍然没有改变,则重新想RM申请相应的资源

C.Node Manager的失败

NM失败时,会停止向RM发送心跳,则RM会将这个NM从可用的NM池中移出,心态间隔时间可由yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms设置,默认是10分钟

如果NM上Application失败次数过高,此NM会被列入黑名单,AM会在不同Node上运行Task

D.Resources Manager的失败

RM的失败是最严重,离开了RM整个系统将陷入瘫痪,所以它失败后,会利用checkpoint机制来重新构建一个RM,详细配置见权威指南和Hadoop官网

3.作业的调度

1).FIFO Scheduler

这个调度是Hadoop默认d ,使用FIFO调度算法来运行Job,并且可以设置优先级,但是这个FIFO调度并不支持抢占,所以后来的高优先级的Job仍然会被先来低优先级的Job所阻塞

2)Fair Scheduler

Fair Scheduler的目标是让每个用户公平的享有集群当中的资源,多个Job提交过来后,空闲的任务槽资源可以以"让每个用户公平共享集群"的原则被分配,某个用户一个很短的Job将在合理时间内完成,即便另一个用户有一个很长的Job正在运行

一般Job都放在Job池当中,默认时,每个用户都有自己的Job池,当一个用户提交的Job数超过另一个用户时,不会因此得到更多的集群资源

另外Fair Scheduler支持抢占,如果一个池的资源未在一段时间内公平得到集群资源,那么Fair Scheduler会从终止得到多余集群资源的Task,分给前者。

3).Capacity Scheduler

Capacity Scheduler中,集群资源会有很多队列,每个队列有一定的分配能力,在每个队列内会按照FIFO Scheduler去分配集群资源。

4.shuffle和排序

在Hadoop Job运行时,MapReduce会确保每个reducer的输入都按键排序,并且执行这个排序过程---将map的输出所谓reducer的输入---称为shuffle,从许多方面看来,shuffle正是map的心脏。

以下掩盖了一些细节,并且新版Hadoop,在这一块有修改

1).map端

MapTask都一个环形的内存缓冲区,之后当缓冲区被占用内到达一定比例(比如80%),会启用spill线程将缓冲区中的数据写入磁盘,写入磁 盘前,spill线程会据根据最终要送达的reduce将数据划分为相应的partition,每个partition中,线程会按照键进行内排序 (Haoop2.0显示的用的是快排序),当spill线程执行处理最后一批MapTask的输出结构后,启用merger合并spill文件,如果设置 Combiner,那接下来执行Combine函数,合并本地相同键的文件

2).reduce端

接下来运行ReduceTask,其中的Fetch的线程会从Map端以HTTP方式获取相应的文件分区,完成复制map的输出后,reducer 就开始排序最后运行merger把复制过来的文件存储在本地磁盘。(PS:在Yarn中 Map和Reduce之间的数据传输用到了Netty以及Java NIO 详见源代码)

这里需要注意的是:每趟合并目标是合并最小数量的文件以便满足最后一趟的合并系数,eg:有40个文件,我们不会再4趟中,每趟合并10个文件然后 得到4个文件,相反第一堂只合并4个文件,最后的三趟每次合并10个文件,在最后的一趟中4个已经合并的文件和余下的6个文件(未合并)进行10个文件的 合并(见下图),其实这里并没有改变合并次数,它只是一个优化措施,尽量减少写到磁盘的数据量,因为最后一趟总是合并到reduce(?这个地方合并来源来自内存和磁盘,减少了从内存的文件数,所以减少最后一次写到磁盘的数据量)

从Map到Reducer数据整体传输过程如下:

3)配置的调优

调优总的原则给shuffle过程尽量多提供内存空间,在map端,可以通过避免多次溢出写磁盘来获得最佳性能(相关配置 io.sort.*,io.sort.mb),在reduce端,中间数据全部驻留在内存时,就能获得最佳性能,但是默认情况下,这是不可能发生的,因为 一般情况所有内存都预留给reduce含函数(如需修改 需要配置mapred.inmem.merge.threshold,mapred.job.reduce.input.buffer.percent)

5.Task的执行

1).任务执行环境

Hadoop为MapTask和ReduceTask提供了运行环境相关信息,例如MapTask可以找到他所处理文件的名称,通过为mapper和reducer提供一个configure()方法实现,表可获得下图中的Job的配置信息。

Hadoop设置Job的配置参数可以作为Streaming程序的环境变量。

2).推测执行(Speculative Execution)

Speculative Execution机制的为了解决Hadoop中出现缓慢某些Task拖延整个Job运行的问题,Speculative Execution会针对那些慢于平均进度的Task启动Speculative Task,此时如果原Task在Speculative Task前完成,则Speculative Task会被终止,同样的,如果Speculative Task先于原Task完成则原来的Task会被终止

默认情况下Speculative Execution是启用的,下面的属性可以控制是否开启该功能:

3).Output Committers

Hadoop MapReduce利用commit协议确保在Job或者Task运行期间插入是适当的操作,无论他们成功完成或者失败,在新的API中OutputCommitter由接口OutputFormat决定,

下面是OutputCommitter的API:

public abstract class OutputCommitter {

    public abstract void setupJob(JobContext jobContext) throws IOException;

    public void commitJob(JobContext jobContext) throws IOException {
    }

    public void abortJob(JobContext jobContext, JobStatus.State state)
            throws IOException {
    }

    public abstract void setupTask(TaskAttemptContext taskContext)
            throws IOException;

    public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
            throws IOException;

    public abstract void commitTask(TaskAttemptContext taskContext)
            throws IOException;

    public abstract void abortTask(TaskAttemptContext taskContext)
            throws IOException;

}

其中当将要运行Job时候运行setupJob()方法;当Job运行成功后调用commitJob()方法,并且输出目录后缀_SUCCESS; 当Job没有成功运行,则调用abortJob(),并且删除相应的输出文件;相类似Task执行成功调用commitTask()方法,Task执行失 败调用abortTask()方法,并且删掉相应生成的文件

4).Task JVM重用

启动JVM重用后,可以让不同Task重用一个JVM,节省了重建销毁JVM的时间,在Hadoop2.0中默认情况不提供这种功能

5).跳过坏记录

在大数据中经常有一些损坏的记录,当Job开始处理这个损坏的记录时候,导致Job失败,如果这些记录并不明显影响运行结果,我们就可以跳过损坏记录让Job成功运行

一般情况,当任务失败失败两次后会启用skipping mode,对于一直在某记录失败的Task,NM或者TaskTracker将会运行一下TaskAttempt

A.任务失败

B.任务失败

C.开启skipping mode。任务失败,失败记录由TaskTracker或者NM保存

D.仍然启用skipping mode,任务继续运行,但是跳过上一次尝试失败的坏记录

在默认情况下,skipping mode是关闭的 可以用SkipBadRecords类来启用该功能

 

来源:http://www.cnblogs.com/biyeymyhjob/archive/2012/08/11/2631750.html

Hadoop的kerberos的实践部署

根据原文,结合从网络上查到的资料汇总起来,形成便于阅读的本文

一. Hadoop的认证机制

相关hadoop安全问题参考:大数据安全Hadoop安全模型的演进

hadoop安全认证主要涉及kerberos和HTTP SPNEGO, kerberos下面有介绍, HTTP SPNEGO 和kerberos 的介绍如下:Kerberos and SPNEGO 以及 SPNEGO

另外, 下面是一些早期网络上的有关使用SPNEGO文章,可以部分辅助理解SPNEGO
管理SPNEGO TAI:关于使用Kerberos服务主体名称的提示
如何在Notes中通过account使用SPNEGO单点登录
Kerberos原理
基于 SAML 的 WebSphere Application Server 单点登录的场景设计
跨 KDC 域的 WebSphere Web Services Security 应用中的 Kerberos 加密算法
测试驱动的单点登录

hadoop官网上的 相关安全文档地址:http://hadoop.apache.org/docs/r2.7.0/hadoop-auth/Examples.html

二. hadoop-kerberos介绍

Kerberos能解决的Hadoop安全认证问题

kerberos实现的是机器级别的安全认证,也就是前面提到的服务到服务的认证问题。事先对集群中确定的机器由管理员手动添加到kerberos 数据库中,在KDC上分别产生主机与各个节点的keytab(包含了host和对应节点的名字,还有他们之间的密钥),并将这些keytab分发到对应的 节点上。通过这些keytab文件,节点可以从KDC上获得与目标节点通信的密钥,进而被目标节点所认证,提供相应的服务,防止了被冒充的可能性。

  • 解决服务器到服务器的认证

由于kerberos对集群里的所有机器都分发了keytab,相互之间使用密钥进行通信,确保不会冒充服务器的情况。集群中的机器就是它们所宣称的,是可靠的。

防止了用户伪装成Datanode,Tasktracker,去接受JobTracker,Namenode的任务指派。

  • 解决client到服务器的认证

Kerberos对可信任的客户端提供认证,确保他们可以执行作业的相关操作。防止用户恶意冒充client提交作业的情况。

用户无法伪装成其他用户入侵到一个HDFS 或者MapReduce集群上

用户即使知道datanode的相关信息,也无法读取HDFS上的数据

用户无法发送对于作业的操作到JobTracker上

  • 对用户级别上的认证并没有实现

无法控制用户提交作业的操作。不能够实现限制用户提交作业的权限。不能控制哪些用户可以提交该类型的作业,哪些用户不能提交该类型的作业。这些由ACL模块控

Kerberos工作原理介绍

基本概念

Princal(安全个体):被认证的个体,有一个名字和口令

KDC(key distribution center ) : 是一个网络服务,提供ticket 和临时会话密钥

Ticket:一个记录,客户用它来向服务器证明自己的身份,包括客户标识、会话密钥、时间戳。

AS (Authentication Server): 认证服务器

TSG(Ticket Granting Server): 许可证服务器

kerberos 工作原理

Kerberos协议

Kerberos可以分为两个部分:

  • Client向KDC发送自己的身份信息,KDC从Ticket Granting Service得到TGT(ticket-granting ticket), 并用协议开始前Client与KDC之间的密钥将TGT加密回复给Client。此时只有真正的Client才能利用它与KDC之间的 密钥将加密后的TGT解密,从而获得TGT。(此过程避免了Client直接向KDC发送密码,以求通过验证的不安全方式)
  • Client利用之前获得的TGT向KDC请求其他Service的Ticket,从而通过其他Service的身份鉴别

Kerberos认证过程

Kerberos协议的重点在于第二部分(即认证过程):

(1)Client将之前获得TGT和要请求的服务信息(服务名等)发送给KDC,KDC中的Ticket Granting Service将为Client和Service之间生成一个Session Key用于Service对Client的身份鉴别。然后KDC将这个Session Key和用户名,用户地址(IP),服务名,有效期, 时间戳一起包装成一个Ticket(这些信息最终用于Service对Client的身份鉴别)发 送给Service, 不过Kerberos协议并没有直接将Ticket发送给Service,而是通过Client转发给Service,所以有了第 二步。

(2)此时KDC将刚才的Ticket转发给Client。由于这个Ticket是要给Service的,不能让Client看到,所以KDC用协 议开始前KDC与Service之间的密钥将Ticket加密后再发送给Client。同时为了让Client和Service之间共享那个密钥(KDC 在第一步为它们创建的Session Key),KDC用Client与它之间的密钥将Session Key加密随加密的Ticket一起返回给Client。

(3)为了完成Ticket的传递,Client将刚才收到的Ticket转发到Service. 由于Client不知道KDC与Service 之间的密钥,所以它无法算改Ticket中的信息。同时Client将收到的Session Key解密出来,然后将自己的用户名,用户地址(IP)打包成Authenticator用Session Key加密也发送给Service。

(4)Service 收到Ticket后利用它与KDC之间的密钥将Ticket中的信息解密出来,从而获得Session Key和用户名,用户地址(IP),服务名,有效期。然后再用Session Key将Authenticator解密从而获得用户名,用户地址(IP)将其与之前Ticket中解密出来的用户名,用户地址(IP)做比较从而验证 Client的身份。

(5)如果Service有返回结果,将其返回给Client。

kerberos在Hadoop上的应用

Hadoop集群内部使用Kerberos进行认证

具体的执行过程可以举例如下:

使用kerberos进行验证的原因

  • 可靠 Hadoop 本身并没有认证功能和创建用户组功能,使用依靠外围的认证系统
  • 高效 Kerberos使用对称钥匙操作,比SSL的公共密钥快
  • 操作简单 用户可以方便进行操作,不需要很复杂的指令。比如废除一个用户只需要从Kerbores的KDC数据库中删除即可。

简单来说,没有做kerberos认证的Hadoop,只要有client端就能够连接上。而且,通过一个有root的权限的内网机器,通过创建对应的linux用户,就能够得到Hadoop集群上对应的权限。

而实行Kerberos后,任意机器的任意用户都必须现在Kerberos的KDC中有记录,才允许和集群中其它的模块进行通信。

三. Java的安全机制

详细介绍请参考JAAS:灵活的Java安全机制

简单来说,用户首先使用LoginContext的接口进行登录验证。LoginContext可以配置使用不同的验证协议。验证通过后,用户得到 一个subject,里面包含凭证,公私钥等。之后,在涉及到需要进行权限认证的地方(例如,资源访问,外部链接校验,协议访问等),使用doAs函数 ()代替直接执行。

这样,java的权限认证就和用户的业务逻辑分离了。

    //一段典型的代码如下
    LoginContext lc = new LoginContext("MyExample");
    try {
    lc.login();
    } catch (LoginException) {
    // Authentication failed.
    }

    // Authentication successful, we can now continue.
    // We can use the returned Subject if we like.
    Subject sub = lc.getSubject();
    Subject.doAs(sub, new MyPrivilegedAction());

Kerberos认证协议

Kerberos是一种网络认证协议,其设计目标是通过密钥系统为客户机 / 服务器应用程序提供强大的认证服务。

简单介绍

使用Kerberos时,一个客户端需要经过三个步骤来获取服务:

  1. 认证:客户端向认证服务器发送一条报文,并获取一个含时间戳的Ticket-Granting Ticket(TGT)。
  2. 授权:客户端使用TGT向Ticket-Granting Server(TGS)请求一个服务Ticket。
  3. 服务请求:客户端向服务器出示服务Ticket,以证实自己的合法性。该服务器提供客户端所需服务,在Hadoop应用中,服务器可以是namenode或jobtracker。

为此,Kerberos需要The Key Distribution Centers(KDC)来进行认证。KDC只有一个Master,可以带多个slaves机器。slaves机器仅进行普通验证。Mater上做的修改需要自动同步到slaves。

另外,KDC需要一个admin,来进行日常的管理操作。这个admin可以通过远程或者本地方式登录。

搭建Kerberos

环境:假设我们有5个机器,分别是hadoop1~hadoop5。选择hadoop1,hadoop2,hadoop3组成分布式的KDC。hadoop1作为Master机器。

1.安装:通过yum安装即可,组成KDC。

yum install -y krb5-server krb5-lib krb5-workstation

2.配置:Kerberos的配置文件只有两个。在Hadoop1中创建以下两个文件,并同步/etc/krb5.conf到所有机器。

  1. /var/kerberos/krb5kdc/kdc.conf:包括KDC的配置信息。默认放在 /usr/local/var/krb5kdc。或者通过覆盖KRB5_KDC_PROFILE环境变量修改配置文件位置。配置示例:
    [kdcdefaults]
     kdc_ports = 88
     kdc_tcp_ports = 88
    
    [realms]
     HADOOP.COM = {
      master_key_type = aes128-cts
      acl_file = /var/kerberos/krb5kdc/kadm5.acl
      dict_file = /usr/share/dict/words
      admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
      max_renewable_life = 7d
      supported_enctypes = aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal
     }
    

    说明:

    HADOOP.COM:是设定的realms。名字随意。Kerberos可以支持多个realms,会增加复杂度。本文不探讨。大小写敏感,一般为了识别使用全部大写。这个realms跟机器的host没有大关系。
    
    max_renewable_life = 7d 涉及到是否能进行ticket的renwe必须配置。
    
    master_key_type:和supported_enctypes默认使用aes256-cts。由于,JAVA使用aes256-cts验证方式需要安装额外的jar包。推荐不使用。
    
    acl_file:标注了admin的用户权限,需要用户自己创建。文件格式是 
         Kerberos_principal permissions [target_principal]  [restrictions]
        支持通配符等。最简单的写法是
        */admin@HADOOP.COM      *
        代表名称匹配*/admin@HADOOP.COM 都认为是admin,权限是 *。代表全部权限。
    
    admin_keytab:KDC进行校验的keytab。后文会提及如何创建。
    
    supported_enctypes:支持的校验方式。注意把aes256-cts去掉。
    
  2. /etc/krb5.conf:包含Kerberos的配置信息。例如,KDC的位置,Kerberos的admin的realms 等。需要所有使用的Kerberos的机器上的配置文件都同步。这里仅列举需要的基本配置。详细介绍参考:krb5conf配置示例:
    [logging]
     default = FILE:/var/log/krb5libs.log
     kdc = FILE:/var/log/krb5kdc.log
     admin_server = FILE:/var/log/kadmind.log
    
    [libdefaults]
     default_realm = HADOOP.COM
     dns_lookup_realm = false
     dns_lookup_kdc = false
     ticket_lifetime = 24h
     renew_lifetime = 7d
     max_life = 12h 0m 0s
     forwardable = true
     udp_preference_limit = 1
    
    [realms]
     HADOOP.COM = {
      kdc = hadoop1:88
      admin_server = hadoop1:749
      default_domain = HADOOP.COM
     }
    
    [appdefaults]
    

    说明:

    [logging]:表示server端的日志的打印位置
    [libdefaults]:每种连接的默认配置,需要注意以下几个关键的小配置
       default_realm = HADOOP.COM 默认的realm,必须跟要配置的realm的名称一致。
       udp_preference_limit = 1 禁止使用udp可以防止一个Hadoop中的错误
    [realms]:列举使用的realm。
       kdc:代表要kdc的位置。格式是 机器:端口
       admin_server:代表admin的位置。格式是 机器:端口
       default_domain:代表默认的域名
    
    [appdefaults]:可以设定一些针对特定应用的配置,覆盖默认配置。
    
  3. 初始化并启动:完成上面两个配置文件后,就可以进行初始化并启动了。A.初始化数据库:在hadoop1上运行命令。其中-r指定对应realm。
    kdb5_util create -r HADOOP.COM -s
    

    如果遇到数据库已经存在的提示,可以把/var/kerberos/krb5kdc/目录下的principal的相关文件都删除掉。默认的数据库名字都是principal。可以使用-d指定数据库名字。(尚未测试多数据库的情况)。

    B.启动kerberos。如果想开机自启动,需要stash文件。

     /usr/local/sbin/krb5kdc
     /usr/local/sbin/kadmind
    

    至此kerberos,搭建完毕。

  4. 搭建Slave KDCs为了在生产环境中获得高可用的KDC。还需要搭建Slave KDCs。 TODO 经过各种努力还是不能成功同步,先放下。
  5. 测试kerberos,搭建完毕后,进行以下步骤测试Kerberos是否可用。A. 进入kadmin在kadmin上添加一个超级管理员账户,需要输入passwd
    kadmin.local
    addprinc admin/admin
    

    B. 在其它机器尝试通过kadmin连接,需要输入密码

    kinit admin/admin
    kadmin 
    

    如果能成功进入,则搭建成功。

kerberos日常操作

  • 管理员操作
    1. 登录到管理员账户: 如果在本机上,可以通过kadmin.local直接登录。其它机器的,先使用kinit进行验证。
      kadmin.local  
      
      kinit admin/admin
      kadmin 
      
    2. 增删改查账户:在管理员的状态下使用addprinc,delprinc,modprinc,listprincs命令。使用?可以列出所有的命令。
      kamdin:addprinc -randkey hdfs/hadoop1
      kamdin:delprinc hdfs/hadoop1
      kamdin:listprincs命令
      
    3. 生成keytab:使用xst命令或者ktadd命令
      kadmin:xst -k /xxx/xxx/kerberos.keytab hdfs/hadoop1
      
  • 用户操作
    1. 查看当前的认证用户:klist
    2. 认证用户:kinit -kt /xx/xx/kerberos.keytab hdfs/hadoop1
    3. 删除当前的认证的缓存: kdestroy

四. 在CM上使用Kerberos认证

在CM上使用Kerberos认证,它会帮我们创建所有的需要的Kerberos账户,并且在启动的时候自动生成keytab存放到对应的启动目录,在配置文件中添加对应的keytab文件配置和用户名。

所以,只需要给CM创建一个拥有管理员权限的账户。CM就能够完成大部分的初始化工作。

初始化部署

  1. 为CM添加一个账户,并生成keytab文件kadmin kadmin:addprinc -randkey cloudera-scm/admin@HADOOP.COM kadmin:xst -k cmf.keytab cloudera-scm/admin@HADOOP.COM
  2. 将上文产生的keytab文件移到cloudera-scm的配置目录,添加cmf.principal文件并写入账户的名称,最后修改文件权限。
    mv cmf.keytab /etc/cloudera-scm-server/
    echo "cloudera-scm/admin@HADOOP.COM" >> /etc/cloudera-scm-server/cmf.principal
    chown cloudera-scm:cloudera-scm cmf.keytab 
    chmod 600 cmf.keytab 
    chown cloudera-scm:cloudera-scm cmf.principal
    chmod 600 cmf.principal
    

    默认配置目录在/etc/cloudera-scm-server/,但是我们修改为/home/cloudera-manager/cm-4.6.3/etc/cloudera-scm-server/

  3. 设置CM的default Realm :在界面上顶部的Administrator-setting-security-Kerberos Security Realm 填入 HADOOP.COM
  4. 针对所有服务开启security选项
    • Zookeeper:
      • 勾选 Zookeeper Service > Configuration > Enable Zookeeper Security
    • HDFS:
      • 勾选 HDFS Service > Configuration > Authentication
      • 勾选 HDFS Service > Configuration > Authorization
      • 修改Datanode Transceiver Port 到1004
      • 修改Datanode HTTP Web UI Port 到1006
    • HBASE:
      • 勾选HBase Service > Configuration > Authentication

      - 勾选HBase Service > Configuration > Authorization

  1. 启动即可

五. 非CM下的keytab配置

检查:如果JAVA的版本在1.6.21或以前的,会遇到客户端需要renew ticket,才能通过认证。而renwe ticket必须保证kdc的配置文件包含max_renewable_life = 7d项。

创建账户

创建所有账户,生成keytab(我们使用hadoop账户启动所有的服务,所以,只生成hadoop和HTTP账户就足够了)

kadmin:addprinc -randkey hadoop/hadoop1@HADOOP.COM
...
kadmin:addprinc -randkey hadoop/hadoop5@HADOOP.COM
kadmin:addprinc -randkey HTTP/hadoop1@HADOOP.COM
...
kadmin:addprinc -randkey HTTP/hadoop5@HADOOP.COM
kadmin:xst -k /xxx/hadoop.keytab hadoop/hadoop1 HTTP/hadoop1
...
kadmin:xst -k /xxx/hadoop.keytab hadoop/hadoop5 HTTP/hadoop5

说明:一共添加了10个账户分别是hadoop的hadoop1到hadoop5的账户和HTTP的hadoop1到hadoop5的账户。导出账户的时候,把hadoop1机器的hadoop账户和HTTP账户导入到同一个keytab文件中。

在标准的情况中,依据不同服务的启动者的不同,会创建不同的账户,导出不同的keytab文件。由于我们使用的是hadoop用户启动所有服务的状 况,所以一个hadoop.keytab就足够使用了。如果像ClouderaManager那样的一个用户启动一种服务,就要创建不同的用户,导出不同 的keytab。例如:hadoop1的zookeeper配置文件中需要zookeeper.keytab,当中含有 zookeeper/hadoop1这个账户

下文提到的配置文件中添加keytab文件,都要求不同机器含有对应的机器名和启动用户的keytab文件。要测试这个机器的keytab文件是否可用,可使用以下命令进行测试:

kinit -kt /xx/xx/hadoop.keytab hadoop/hadoop1
klist 

为ZK添加认证

  • 修改zoo.cfg添加配置
    • authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    • jaasLoginRenew=3600000
  • 在配置目录中添加对应账户的keytab文件且创建jaas.conf配置文件,内容如下:
    Server {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="/XX/XX/hadoop.keytab"
        storeKey=true
        useTicketCache=true
        principal="hadoop/hadoop3@HADOOP.COM";
    };
    

    其中keytab填写真实的keytab的绝对路径,principal填写对应的认证的用户和机器名称。

  • 在配置目录中添加java.env的配置文件,内容如下:
    export JVMFLAGS="-Djava.security.auth.login.config=/xx/xx/jaas.conf"
    
  • 每个zookeeper的机器都进行以上的修改
  • 启动方式和平常无异,如成功使用安全方式启动,日志中看到如下日志:
    2013-11-18 10:23:30,067 ... - successfully logged in.
    

为HDFS添加认证

  • 增加基本配置包括各种的princal和keytab文件的配置(生成的hdfs的keytab和HTTP的keytab最好放一起,容易配置。下面的配置中keytab文件使用绝对路径,principal使用_HOST,Hadoop会自动替换为对应的域名。)。
    • core-site.xml
      • hadoop.security.authorization:true
      • hadoop.security.authentication:kerberos
    • hdfs-site.xml
      • dfs.block.access.token.enable:true
      • dfs.namenode.keytab.file: /xx/xx/hadoop.keytab
      • dfs.namenode.kerberos.principal: hadoop/_HOST@HADOOP.COM
      • dfs.namenode.kerberos.internal.spnego.principal: HTTP/_HOST@HADOOP.COM
      • dfs.datanode.keytab.file: /xx/xx/hadoop.keytab
      • dfs.datanode.kerberos.principal: hadoop/_HOST@HADOOP.COM
      • dfs.datanode.address: 1004 (小于1024)
      • dfs.datanode.http.address: 1006 (小于1024)
      • dfs.journalnode.keytab.file: /xx/xx/hadoop.keytab
      • dfs.journalnode.kerberos.principal: hadoop/_HOST@HADOOP.COM
      • dfs.journalnode.kerberos.internal.spnego.principal: HTTP/_HOST@HADOOP.COM
    • hadoop-env.sh
      export HADOOP_SECURE_DN_USER=hadoop
      export HADOOP_SECURE_DN_PID_DIR=/home/hadoop/hadoop/pids
      export HADOOP_SECURE_DN_LOG_DIR=/home/hadoop/hadoop/logs
      export JSVC_HOME=/usr/bin
      #如果root下没有JAVA_HOME配置,则需要指定JAVA_HOME
      export JAVA_HOME=/home/hadoop/java/jdk 
      
  • 启动:设置了Security后,NameNode,QJM,ZKFC可以通过start-dfs.sh启动。DataNode需要使用root权 限启动。设置了HADOOP_SECURE_DN_USER的环境变量后,start-dfs.sh的启动脚本将会自动跳过DATANODE的启动。所 以,整个启动过程分为以下两步:
    • 启动NameNode,QJM,ZKFC
      start-dfs.sh
      

      说明:查看QJM的日志和ZKFC的日志。检查有无exception。QJM的报错不会有明显的提示。如果启动不成功检查以下几点是否做好:

      • QJM和NameNode对应的keytab文件是否包含hadoop账户和HTTP账户对应该机器的kerberos账户。
      • keytab使用绝对路径,可以避免一些问题。

      疑惑:ZKFC中有日志,但是工作正常,大胆预测连接zookeeper不需要强制通过jaas验证。TODO:验证此猜想。

      INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server hadoop3/10.1.74.46:59181. Will not attempt to authenticate using SASL (无法定位登录配置)
      
    • 启动DataNode:
      • 配置JSVC:DataNode需要JSVC启动。首先安装JSVC,然后配置的hadoop-env.sh的JSVC_HOME变量。JSVC运行还需要一个commons-daemon-xxx.jar包。从commons/daemon下载一个最新版本的jar包。当前,JSVC启动的时候遇到一个奇怪的bug,就是JSVC的classpath不支持*匹配。详细修改如下:
        #添加commons-daemon的jar包,并替换路径为绝对路径
        export CLASSPATH=$CLASSPATH:/xxx/commons-daemon-1.0.15.jar
        temp=${CLASSPATH//':'/' '}
        t=`echo $temp`
        export CLASSPATH=${t//' '/':'}
        
      • mv问题:由于权限问题,在移动日志文件启动的时候,会询问是否覆盖只读的日志文件。这个会导致使用start-secure-dns.sh启动的时候不顺畅。推荐修改hadoop-daemon.sh的74行:
        mv "$log" "$log.$num"; -->修改为--> mv -f "$log" "$log.$num";         
        
      • 启动:
        • 切换到root用户,需要配置这个root用户免密码登陆到其它的机器。
          #自动登陆并启动datanode
          sh /home/xx/hadoop/sbin/start-secure-dns.sh
          
        • 否则,需要单独登陆到所有机器启动datanode。
          #如果单独登陆启动datanode
          sh /home/xx/hadoop/sbin/hadoop-daemon.sh datanode start
          
  • 测试:使用任意用户通过keytab文件进行认证,运行hdfs相关命令。
    kinit -kt /xx/xx/qiujw/keytab qiujw/hadoopN
    #对于java1.6_26以下版本的需要renew ticket
    kinit -R
    klist
    hdfs dfs -ls /tmp
    

六. 为YARN添加认证配置

  • 添加配置
    • yarn.xml:
      • yarn.resourcemanager.keytab:/xx/xx/hadoop.keytab
      • yarn.resourcemanager.principal:hadoop/_HOST@HADOOP.COM
      • yarn.nodemanager.keytab:/xx/xx/hadoop.keytab
      • yarn.nodemanager.principal:hadoop/_HOST@HADOOP.COM
      • yarn.nodemanager.container-executor.class:org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor
      • yarn.nodemanager.linux-container-executor.group:hadoop
    • mapred.xml:
      • mapreduce.jobhistory.keytab:/xx/xx/hadoop.keytab
      • >mapreduce.jobhistory.principal:hadoop/_HOST@HADOOP.COM
  • 修改container-executor.conf.dir,重新编译container-executor:
    cp ~/hadoop/src
    mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=/etc
    cp ./hadoop-yarn-project/target/hadoop-yarn-project-2.0.0-cdh4.2.1/bin/container-executor ~/hadoop/bin
    #以下命令查看编译是否成功
    strings ~/hadoop/bin/container-executor|grep etc
    #修改权限
    sudo chown root:hadoop  /xx/hadoop/bin/container-executor
    sudo chmod 4750 /xx/hadoop/bin/container-executor
    

    说明:为什么要编译container-executor?

    答:因为container-executor要求container-executor.cfg这个文件及其所有父目录都属于root用户,且权 限小于755。配置文件container-executor.cfg默认的路径在../etc/hadoop/container- executor.cfg。如果,按照默认的路径修改所有父目录都属于root,显然不现实。于是,把路径编译到/etc/container- executor.cfg中。

  • 创建/etc/container-executor.cfg文件,文件内容如下:
    #运行container的用户
    yarn.nodemanager.linux-container-executor.group=hadoop
    #这个是允许运行应用的用户列表,默认是全部可以运行
    #banned.users=
    #这个是允许提交job的最小的userid的值。centos中一般用户的id在500以上。
    min.user.id=500
    

    修改/etc/container-executor.cfg的权限

    sudo chown root:root /etc/container-executor.cfg
    sudo chmod 600 /etc/container-executor.cfg
    
  • 启动,使用hadoop用户直接启动即可
    start-yarn.sh
    
  • 检查Nodemanager和Resourcemanager的日志是否有异常。
    • 一般异常都是因为container-executor.cfg的权限和container-executor的权限问题。请仔细核对:
      [hadoop@hadoop2 hadoop]$ ls ~/hadoop/bin/container-executor  -l
      

      -rwsr-x--- 1 root hadoop 89206 Nov 18 16:18 /home/hadoop/hadoop/bin/container-executor [hadoop@hadoop2 hadoop]$ ls /etc/container-executor.cfg -l -rw------- 1 root root 240 Nov 18 16:31 /etc/container-executor.cfg

  • 测试:使用任意用户通过keytab文件进行认证,运行yarn相关命令。
    kinit -kt /xx/xx/qiujw/keytab qiujw/hadoopN
    #对于java1.6_26以下版本的需要renew ticket
    kinit -R
    klist
    yarn jar /xx/xx/hadoop-mapreduce-examples-xx.jar pi 10 100
    

七. 为hbase添加认证

  • 添加配置:
    • hbase-site.xml:以下添加到client和server端
      • hbase.security.authentication:kerberos
      • hbase.rpc.engine: org.apache.hadoop.hbase.ipc.SecureRpcEngine
    • hbase-site.xml:以下添加到server端
      • hbase.regionserver.kerberos.principal:hadoop/_HOST@HADOOP.COM
      • hbase.regionserver.keytab.file: /xx/xx/hadoop.keytab
      • hbase.master.kerberos.principal: hadoop/_HOST@HADOOP.COM
      • hbase.master.keytab.file: /xx/xx/hadoop.keytab
  • 添加hbase连接secure的zookeeper:
    • 创建zk-jaas.conf配置文件,内容如下:
      Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        useTicketCache=false
        keyTab="/xx/hadoop.keytab"
        principal="hadoop/hadoopN@HADOOP.COM";
      };
      
    • 修改hbase-env.sh:
      export HBASE_OPTS="$HBASE_OPTS -Djava.security.auth.login.config=/xx/zk-jaas.conf"
      export HBASE_MANAGES_ZK=false
      
    • 确保以下配置项是正确的:
      • hbase-site.xml:
        • hbase.zookeeper.quorum: hadoopN,...,hadoopN
        • hbase.cluster.distributed: true
    • 添加以下项目到zoo.cfg中:
      • kerberos.removeHostFromPrincipal: true
      • kerberos.removeRealmFromPrincipal: true
  • 启动:如往常启动即可
    start-hbase.sh
    
  • TroubleShooting笔者在启动hbase后,在zookeeper的日志中大量发现这种信息:
     Client failed to SASL authenticate: javax.security.sas     l.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Specified version of key is not available (44     ))]
    

    在多次调整无果后,怀疑是因为我的一些老旧的账户的renewmax属性还是0.于是,把所有相关账户都删除,生成后,再次启动。这个错误就消失了。

Hbase的权限控制

  • 启动hbase的用户是超级用户拥有所有的权限。
  • hbase支持4个权限
    • R :读权限 Get, Scan, or Exists calls
    • W :写权限 Put, Delete, LockRow, UnlockRow, IncrementColumnValue, CheckAndDelete, CheckAndPut, Flush, or Compact
    • C :创建权限 Create, Alter, or Drop
    • A :管理员权限 Enable, Disable, MajorCompact, Grant, Revoke, and Shutdown.
  • 权限控制语句:
    grant <user> <permissions>[ <table>[ <column family>[ <column qualifier> ] ] ]
    revoke <user> <permissions> [ <table> [ <column family> [ <column qualifier> ]]]
    alter <table> {OWNER => <user>} # sets the table owner
    user_permission <table>  # displays existing permissions
    
  • 创建表的用户拥有该表的所有权限
  • 如果赋予权限的时候没有针对某个表或者CF进行赋予,就会对全局获得权限。请小心。

Hive的权限

  • Hive的客户端的权限和普通的客户端的一致就可以了。

客户端配置

使用者要和实行了kerberos的集群进行通信。要kerberos的管理员创建对应的账户。并且生成keytab返回给使用者,使用者通过kinit命令认证后,就跟平常使用Hadoop的方式一致地使用即可。以下是一个例子:

kadmin:addprinc qiujw/hadoop1
kadmin:xst -k qiujw.keytab qiujw/hadoop1
#将qiujw.keytab交给用户
#在hadoop1机器上
kinit -kt qiujw.keytab qiujw/hadoop1
klist
    Ticket cache: FILE:/tmp/krb5cc_512
    Default principal: qiujw/hadoop2@HADOOP.COM

    Valid starting     Expires            Service principal
    11/19/13 10:53:54  11/20/13 10:53:54  krbtgt/HADOOP.COM@HADOOP.COM
            renew until 11/26/13 10:44:10

说明:Expires下面的是这个认证的过期的日志。renew until后面的是续约期。
意思是,如果这个缓存过了认证的过期时间,就会失效。在续约期期间通过使用kinit -R可以续约这个认证。但是,过了续约期。必须要使用keytab重新认证。

Hadoop等的服务中,都会使用keytab自动做续约不会存在过期的情况。如果客户端需要长久运行不过期,需要在程序中使用keytab做认证。

协议控制

Hadoop的框架中支持针对不同的协议开启权限控制。不再本次探究范围内。服务协议控制

参考阅读:

管理SPNEGO TAI:关于使用Kerberos服务主体名称的提示
如何在Notes中通过account使用SPNEGO单点登录
Kerberos原理
基于 SAML 的 WebSphere Application Server 单点登录的场景设计
跨 KDC 域的 WebSphere Web Services Security 应用中的 Kerberos 加密算法
测试驱动的单点登录
大数据安全Hadoop安全模型的演进
Kerberos and SPNEGO 以及 SPNEGO
Hadoop MapReduce工作原理
Hadoop学习入门
hadoop2.x本地伪分布环境实践yarn
Hadoop MapReduceV2(Yarn) 框架简介
Hadoop2.0NameNode HA实践
Hadoop2 HA方案之QJM
Hadoop应用构建企业级的安全解决方案
vmware虚拟机下hadoop集群安装过程

 

来源:http://blog.csdn.net/xiao_jun_0820/article/details/39375819