月度归档:2015年03月

YARN应用开发流程

来源:http://my.oschina.net/u/1434348/blog/193374

1 概况

YARN是Hadoop系统上的资源统一管理平台,其主要作用是实现集群资源的统一管理和调度。YARN是一个高速发展中的资源管理与调度平台,目前还不是很完善,当前只支持CPU和内存的分配。作为资源调度器,YARN支持如下几个资源调度语义:

  • 获取指定节点的特定资源量,如node1上4个虚拟CPU核,1GB内存(YARN上的资源使用容器包装);
  • 获取指定机架上的特定资源量;
  • 支持资源黑名单(添加/删除);
  • 要求某些应用归还指定的资源,通常用于抢占场景。

YARN目前不支持的调度语义有(或者说支持得不是很好):

  • 获取任意节点上的特定资源量;
  • 获取任意机架上的特定资源量;
  • 获取一组或几组符合特定规则的资源量;
  • 细粒度资源分配,如获取主频大于2.4G的CPU等;
  • 动态调整资源容器容量(对长应用比较重要)。

YARN上的应用按其运行的生命周期长短,可以分为长应用和短应用,短应用通常是分析作业,作业从提交到完成,所耗的时间是有限的,作业完成后,其占用的资源就会被释放,归还给YARN进行再次分配。长应用通常是一些服务,应用启动后除非意外或人为终止,将一直运行下去。长应用通常长期占用集群上的一些资源,且运行期间对资源的需求也时常变化,因此,动态调整资源对长应用来说比较重要。目前,YARN对长应用的支持还不是很好,从社区讨论来看,受hortonworks的Hoya项目推动,YARN在2.20版本后加强了对长应用的支持。

 

2 应用开发

 

2.1 概述

YARN的应用开发主要过程如下:

图2.1 YARN应用开发流程

YARN主要由ResourceManager和NodeManager组成,ResourceManager负责资源的管理与分配,NodeManager则负责具体资源的隔离。YARN中,资源使用容器进行封装。用户在YARN上开发应用时,需要实现如下三个模块:

  • Application Client: 应用客户端用于将应用提交到YARN上,使应用运行在YARN上,同时,监控应用的运行状态,控制应用的运行;
  • Application Master: AM负责整个应用的运行控制,包括向YARN注册应用、申请资源、启动容器等,应用的实际工作在容器中进行;
  • Application Worker: 应用的实际工作,并不是所有的应用都需要编写worker。NodeManager启动AM发送过来的容器,容器内部封装了该应用worker运行所需的资源和启动命令。

实现上述模块,涉及如下2个RPC协议:

  • ApplicationClientProtocol: Client-RM之间的协议,主要用于应用的提交;
  • ApplicationMasterProtocol: AM-RM之间的协议,AM通过该协议向RM注册并申请资源;
  • ContainerManagementProtocol: AM-NM之间的协议,AM通过该协议控制NM启动容器。

上述协议的定义在hadoop-yarn-api工程中。

从业务的角度看,一个应用需要分两部分进行开发,一个是接入YARN平台,实现上述3个协议,通过YARN实现对集群资源的访问和利用;另一个是业务功能的实现,这个与YARN本身没有太大关系。下面主要阐述如何将一个应用接入YARN平台。

 

2.2 客户端开发

客户端开发流程如图2.2所示:

图2.2 YARN应用程序客户端开发

从上图可以看出,客户端的主要作用是提交(部署)应用和监控应用运行两个部分。

 

2.2.1 提交应用

提交应用涉及ApplicationClientProtocol协议中的两个方法:

GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request)

SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)

具体步骤如下:

  1. 客户端通过getNewApplication方法从RM上获取应用ID;
  2. 客户端将应用相关的运行配置封装到ApplicationSubmissionContext中,通过submitApplication方法将应用提交到RM上;
  3. RM根据ApplicationSubmissionContext上封装的内容启动AM;
  4. 客户端通过AM或RM获取应用的运行状态,并控制应用的运行过程。

通过getNewApplication可从RM上获取全局唯一的应用ID和最大可申请的资源量(内存和虚拟CPU核数),如下所示:

图2.3 getNewApplication方法的输入输出

在获取应用程序ID后,客户端封装应用相关的配置到ApplicationSubmissionContext中,通过submitApplication方法提交到RM上。

图2.4 submitApplication方法的输入输出

ApplicationSubmissionContext主要包括如下几个部分:

  • applicationId: 通过getNewApplication获取的应用ID;
  • applicationName: 应用名称,将显示在YARN的web界面上;
  • applicationType: 应用类型,默认为”YARN”;
  • priority: 应用优先级,数值越小,优先级越高;
  • queue: 应用所属队列,不同应用可以属于不同的队列,使用不同的调度算法;
  • unmanagedAM: 布尔类型,表示AM是否由客户端启动(AM既可以运行在YARN平台之上,也可以运行在YARN平台之外。运行在YARN平台之上的AM通过RM启动,其运行所需的资源受YARN控制);
  • cancelTokensWhenComplete: 应用完成后,是否取消安全令牌;
  • maxAppAttempts: AM启动失败后,最大的尝试重启次数;
  • resource: 启动AM所需的资源(虚拟CPU数/内存),虚拟CPU核数是一个归一化的值;
  • amContainerSpec: 启动AM容器的上下文,主要包括如下内容:
  • tokens: AM所持有的安全令牌;
  • serviceData: 应用私有的数据,是一个Map,键为数据名,值为数据的二进制块;
  • environment: AM使用的环境变量;
  • commands: 启动AM的命令列表;
  • applicationACLs:应程序访问控制列表;
  • localResource: AM启动需要的本地资源列表,主要是一些外部文件、压缩包等。

  • 监控应用运行状态

应用监控涉及ApplicationClientProtocol协议中的如下几个方法:

//强制杀死一个应用

KillApplicationResponse forceKillApplication(KillApplicationRequest request)

//获取应用状态,如进度等

GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest request)

//获取集群度量

GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request)

//获取符合条件的应用的状态(列表)

GetApplicationsResponse getApplications(GetApplicationsRequest request)

//获取集群中各个节点的状态

GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)

//获取RM中的队列信息

GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)

//获取当前用户的访问控制信息

GetQueueUserAclsInfoResponse getQueueUserAcls(GetQueueUserAclsInfoRequest request)

//获取委托令牌,使得容器可以使用这些令牌与服务通信

GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request)

//更新已存在的委托令牌

RenewDelegationTokenResponse renewDelegationToken(RenewDelegationTokenRequest request)

//需要已存在的委托令牌

CancelDelegationTokenResponse cancelDelegationToken(CancelDelegationTokenRequest request)

客户端既可以从RM上获取应用的信息,也可以通过AM获取。通常为了减少RM的压力,使用从AM获取应用运行状态的方式。客户端与AM之间的通信使用应用内部的私有协议,与YARN无关。

 

2.3 AM开发

AM的主要功能是按照业务需求,从RM处申请资源,并利用这些资源完成业务逻辑。因此,AM既需要与RM通信,又需要与NM通信。这里涉及两个协议,分别是AM-RM协议(ApplicationMasterProtocol)和AM-NM协议(ContainerManagementProtocol),如图2.5所示:

图2.5 AM-YARN接口协议

 

2.3.1 AM-RM协议

AM-RM之间使用ApplicationMasterProtocol协议进行通信,该协议提供如下几个方法:
//向RM注册AM

RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)

//告知RM,应用已经结束

FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)

//向RM申请/归还资源,维持心跳

AllocateResponse allocate(AllocateRequest request)

客户端向RM提交应用后,RM会根据提交的信息,分配一定的资源来启动AM,AM启动后调用ApplicationMasterProtocol协议的registerApplicationMaster方法主动向RM注册。完成注册后,AM通过ApplicationMasterProtocol协议的allocate方法向RM申请运行任务的资源,获取资源后,通过ContainerManagementProtocol在NM上启动资源容器,完成任务。应用完成后,AM通过ApplicationMasterProtocol协议的finishApplicationMaster方法向RM汇报应用的最终状态,并注销AM。主要过程如图2.6所示:

图2.6 AM-RM交互流程

需要注意的是,ApplicationMasterProtocol#allocate()方法还兼顾维持AM-RM心跳的作用,因此,即便应用运行过程中有一段时间无需申请任何资源,AM都需要周期性的调用相应该方法,以避免触发RM的容错机制。下面具体看一下每一步所传递的信息:

1 AM向RM注册

AM启动后会主动调用registerApplicationMaster方法向RM注册,注册信息中包括该AM所在节点和开放的RPC服务端口,以及一个应用状态跟踪Web接口(将在RM的Web页面上显示)。RM向AM返回一个对象,里面包含了应用最大可申请的单个容器容量、应用访控制列表和一个用于与客户端通信的安全令牌。

图2.7 registerApplicationMaster方法输入输出

2 AM向RM申请资源

AM通过allocate方法向RM申请或释放资源。AM向RM发送的信息被封装在AllocateRequest里,包括如下内容:

  • responseId: 相应ID,用于区分重复的响应;
  • askList:AM向RM申请的资源列表,是一个List<ResourceRequest>对象,其中ResourceRequest中一个资源请求的详细参数,包括优先级、容器个数、单个容器容量和分配策略(是否放宽本地化约束);
  • releaseList: AM主动释放的资源容器列表;
  • resourceBlacklistRequest: 要添加或删除的资源黑名单;
  • progress:应用的运行进度。

图2.8 AllocateRequest

RM接受到AM的请求后,扫描其上的资源镜像,按照调度算法分配全部或部分申请的资源给AM,返回一个AllocateResponse对象,里面内容包括:

  • responseId: 相应ID,用于区分重复的响应;
  • numClusterNodes: 集群规模大小;
  • updatedNodes: 状态被更新过的所有节点列表,每个节点的状态更新信息被分装在NodeReport对象中,包括以下内容:
  • nodeId: 节点唯一标识;
  • httpAddress: 节点的Web页面地址;
  • rackName: 节点所在机架名;
  • numContainers: 节点上当前运行的容器个数;
  • nodeState: 节点运行状态,是一个枚举类型;
  • used: 节点上已经使用的资源量;
  • capability: 节点总的资源量;
  • healthReport: 节点的健康诊断信息;
  • lastHealthReportTime: 最新的节点的健康诊断时戳;
  • availableResources: 集群的资源净空量;
  • AMCommand: RM给AM发送的控制命令,包括重连和关闭;
  • NMTokens: AM与NM之间的通信令牌;
  • allocatedContainers: RM新分配给AM的资源容器列表,这些资源被封装在资源容器(Container)中:
  • id: 容器ID,每个容器都具有全局唯一的ID;
  • priority: 优先级;
  • nodeId: 容器所在节点的ID;
  • nodeHttpAddress: 节点的Web页面地址;
  • containerToken: 容器的安全令牌;
  • resource: 该容器所持有的资源,包括内存和CPU。
  • completedContainersStatuses: 已完成的容器状态列表;
  • preemptionMessage: 资源抢占信息,包括两部分,强制收回部分和可自主调配部分:
  • strictContract: 强制收回部分,AM必须释放的容器列表;
  • preemptionContract: 可自主调配的部分,该部分包含了两个内容,分别是抢占资源需求和可抢占的资源列表,AM需要从可抢占的资源列表中选出部分资源进行释放,以满足抢占资源需求;

 

图2.9 AllocateResponse

3 AM通知RM应用已结束

在应用完成后,AM通知RM应用结束的消息,同时向RM提供应用的最终状态(成功/失败等)、一些失败时的诊断信息和应用跟踪地址,RM收到通知后注销相应的AM,并将注销结果发送给AM,AM收到注销成功的消息后,退出进程。AM通过调用ApplicationMasterProtocol#finishApplicationMaster方法通知RM,该方法的输入输出如下所示:

图2.10 finishApplicationMaster方法的I/O

 

2.3.2 AM-NM协议

AM通过ContainerManagementProtocol协议与NM交互,包括3个方面的功能:启动容器、查询容器状态、停止容器,分别对应协议中的三个方法:

//启动容器

StartContainersResponse startContainers(StartContainersRequest request)

//查询容器状态

GetContainerStatusesResponse getContainerStatuses(GetContainerStatusesRequest request)

//停止容器

StopContainersResponse stopContainers(StopContainersRequest request)、

AM-NM交互过程如图2.11所示:

图2.11 AM-NM交互流程

1 AM在NM上启动容器

AM通过ContainerManagementProtocol# startContainers()方法启动一个NM上的容器,AM通过该接口向NM提供启动容器的必要配置,包括分配到的资源、安全令牌、启动容器的环境变量和命令等,这些信息都被封装在StartContainersRequest中。NM收到请求后,会启动相应的容器,并返回启动成功的容器列表和失败的容器列表,同时还返回其上相应的辅助服务元数据。startContainers方法的输入输出如图2.12所示:

图2.12 startContainers的I/O

2 AM查询NM上的容器运行状态

在应用运行期间,AM需要实时掌握各个Container的运行状态,以便及时响应一些异常,如容器运行失败等。AM通过ContainerManagementProtocol# getContainerStatuses ()方法获取各个容器的运行状态,其输入输出如下图所示:

图2.13 getContainerStatuses I/O

3 AM停止NM上的容器

当一个容器运行完成后,分配给它的资源需要被回收。AM通过ContainerManagementProtocol# stopContainers()方法停止NM上的容器,释放相关资源,然后通过AM-RM协议,将释放的资源上报给RM,RM完成最终的资源回收。stopContainers的输入输出如下图所示:

图2.14 stopContainers I/O

 

2.4 使用YARN编程库开发应用

如2.3节所述,YARN上的应用开发分为平台接入和业务开发两个部分,其中平台接入就是实现上述三个RPC协议。直接实现上述协议的开发难度较高,需要处理很多细节和性能问题,如系统并发等。为此,YARN提供了一套应用程序编程库来简化应用的开发过程,该编程库是基于事件驱动机制的,利用了YARN内部的服务库、事件库和状态机库,分为三个部分,与上述三个协议一一对应。

 

2.4.1 YARN基础库

1 服务库

YARN中普遍采用基于服务的对象管理模型,将一些生命周期较长的对应服务化,YARN提供一套抽象的接口对服务进行了统一描述,该服务具有如下特点:

  • 具有标准状态,所有服务都具有4个状态,NOTINITED、INITED、STARTED、STOPPED;
  • 状态驱动,服务状态变化将触发一些动作,使其转变成另一种状态;
  • 服务嵌套,一个服务可以由其他服务组合嵌套而来。

YARN服务库如下所示:

图2.15 YARN服务库

2 事件库

YARN中大量采用了基于事件驱动的并发模型,该模型由事件、异步调度器和事件处理器三个模块组成。处理请求被抽象为事件,放入异步调度器的事件队列中,调度线程从事件队列中取出事件分发给不同的事件处理器,事件处理器处理事件,产生新的事件放入事件队列,如此循环,直到处理完成(完成事件)。

图2.16 YARN事件库

3 状态机库

YARN中使用{转换前状态、转换后状态、事件、回调函数}四元组来表示一个状态变换,一个或多个事件的到来,触发绑定在对象上状态转移函数,使对象的状态发生变化。状态机使得事件处理变得简单可控。

图2.17 状态机库

总的来说,YARN中的服务由一个或多个含有有限状态机的事件处理系统组成,总体框架如下图所示。

图2.18 YARN服务通用模型

 

2.4.2 YARN应用客户端库(CLIENT-RM编程库)

YARN的Client-RM编程库位于org.apache.hadoop.yarn.client.YarnClient(Hadoop-yarn-api项目),该库实现了通用的ApplicationClientProtocol协议,提供了重试机制。用户利用该库可以快速开发YARN应用的客户端程序,而不需要关心RPC等底层接口。如图所示:

图2.19 YarnClient

用户开发自己的应用客户端时,只要设置好ApplicationSubmissionContext对象,调用YarnClient的相关接口,即可实现应用的提交。

 

2.4.3 AM-RM编程库

AM-RM编程库主要简化了AM向RM申请资源过程的开发。YARN提供了两套AM-RM编程库,分别为阻塞式和非阻塞式模式。如图2.20所示。

图2.20 AM-RM编程库

其中,AMRMClient是阻塞式的,实现了ApplicationMasterProtocol协议,用户调用该类的相应接口,可实现AM与RM的通信。而AMRMClientAsync是AMRMClient的非阻塞式封装,所有响应通过回调函数的形式返回给用户,用户实现自己的AM时,只需要实现AMRMClientAsync的CallbackHandler即可。如图2.21所示:

图2.21 AM-RM编程库

 

2.4.4 NM编程库

NM编程库对AM和RM与NM之间的交互进行了封装,同样有阻塞式和非阻塞式两种封装(AM与NM和RM与NM的交互逻辑相似),如图2.22所示。

图2.22 NM编程库

同样的,对于异步编程库NMClientAsync,用户只需要在自己的AM上实现相应的回调函数,就可以控制NM上Container的启动/停止和状态监控了。如图2.23所示。

图2.23 NM编程库

 

2.5 总结

本文介绍了YARN平台应用开发的基本流程,总结如下:

  • YARN平台应用开发主要有两个工作:YARN平台接入和业务逻辑实现;
  • YARN平台应用开发主要需要开发三个组件:客户端、AM和worker;
  • YARN平台接入主要涉及三个协议,分别为ApplicationClientProtocol、ApplicationMasterProtocol和ContainerManagementProtocol,其中,客户端通过ApplicationClientProtocol协议与RM交互,提交(部署)应用并监控应用的运行;AM通过ApplicationMasterProtocol协议维持AM-RM心跳,并向RM申请YARN上的资源;AM通过ContainerManagementProtocol协议控制NM启动、停止申请到的容器,并监控容器的运行状态。容器是YARN对资源的封装,应用的Worker在容器中运行,只能使用容器中的资源,从而实现资源隔离;
  • YARN提供了Client-RM编程库、AM-RM编程库和NM编程库,从而简化了YARN上的应用开发(当然还不是很简单),需要注意的是,该编程库的接口还不是很稳定,以后还有可能发生变化(hadoop2.0与hadoop2.2.0中YARN的这些编程库不兼容)。

总得来说,YARN是一个资源管理平台,并不涉及业务逻辑,具体的业务逻辑需要用户自己去实现。YARN的核心作用就是分配资源、保证资源隔离。

Java多线程中公平锁

概要

本章对“公平锁”的获取锁机制进行介绍(本文的公平锁指的是互斥锁的公平锁),内容包括:
基本概念
ReentrantLock数据结构
参考代码
获取公平锁(基于JDK1.7.0_40)
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()
“公平锁”的释放锁的机制在后面一章再做介绍,锁的使用示例请参考“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”。

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3496147.html

基本概念

本章,我们会讲解“线程获取公平锁”的原理;在讲解之前,需要了解几个基本概念。后面的内容,都是基于这些概念的;这些概念可能比较枯燥,但从这些概念中,能窥见“java锁”的一些架构,这对我们了解锁是有帮助的。
1. AQS -- 指AbstractQueuedSynchronizer类。
AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。

2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。 公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典 型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
(02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。这些锁的用途和原理,在以后的章节再详细介绍。

3. CLH队列 -- Craig, Landin, and Hagersten lock queue
CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁 中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

4. CAS函数 -- Compare And Swap
CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。

本章是围绕 “公平锁”如何获取锁而层次展开。“公平锁”涉及到的知识点比较多,但总的来说,不是特别难;如果读者能读懂AQS和 ReentrantLock.java这两个类的大致意思,理解锁的原理和机制也就不成问题了。本章只是作者本人对锁的一点点理解,希望这部分知识能帮助 您了解“公平锁”的获取过程,认识“锁”的框架。

ReentrantLock数据结构

ReentrantLock的UML类图

从图中可以看出:
(01) ReentrantLock实现了Lock接口。
(02) ReentrantLock与sync是组合关系。ReentrantLock中,包含了Sync对象;而且,Sync是AQS的子类;更重要的 是,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。ReentrantLock是一个独占锁,至于它到底是公平锁 还是非公平锁,就取决于sync对象是"FairSync的实例"还是"NonFairSync的实例"。

参考代码

下面给出Java1.7.0_40版本中,ReentrantLock和AQS的源码,仅供参考!

ReentranLock.java

AQS(AbstractQueuedSynchronizer.java)

 

获取公平锁(基于JDK1.7.0_40)

通过前面“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”的“示例1”,我们知道,获取锁是通过lock()函数。下面,我们以lock()对获取公平锁的过程进行展开。

1. lock()

lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:

final void lock() {
    acquire(1);
}

说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获 取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因 了。
可重入就是指锁可以被单个线程多次获取。

2. acquire()

acquire()在AQS中实现的,它的源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进 入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线 程"会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。

上面是对acquire()的概括性说明。下面,我们将该函数分为4部分来逐步解析。
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()

一. tryAcquire()

1. tryAcquire()

公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“独占锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”,
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“独占锁”的拥有者已经为“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。

 

2. hasQueuedPredecessors()

hasQueuedPredecessors()在AQS中实现,源码如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

说明: 通过代码,能分析出,hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。下面对head、tail和Node进行说明。

 

3. Node的源码

Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下:

private transient volatile Node head;    // CLH队列的队首
private transient volatile Node tail;    // CLH队列的队尾

// CLH队列的节点
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    // 线程已被取消,对应的waitStatus的值
    static final int CANCELLED =  1;
    // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。
    // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    static final int SIGNAL    = -1;
    // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
    static final int CONDITION = -2;
    // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
    static final int PROPAGATE = -3;

    // waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,
    // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
    volatile int waitStatus;

    // 前一节点
    volatile Node prev;

    // 后一节点
    volatile Node next;

    // 节点所对应的线程
    volatile Thread thread;

    // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
    // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
    // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
    Node nextWaiter;

    // “共享锁”则返回true,“独占锁”则返回false。
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前一节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

说明
Node是CLH队列的节点,代表“等待锁的线程队列”。
(01) 每个Node都会一个线程对应。
(02) 每个Node会通过prev和next分别指向上一个节点和下一个节点,这分别代表上一个等待线程和下一个等待线程。
(03) Node通过waitStatus保存线程的等待状态。
(04) Node通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。

 

4. compareAndSetState()

compareAndSetState()在AQS中实现。它的源码如下:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

说明: compareAndSwapInt() 是sun.misc.Unsafe类中的一个本地方法。对此,我们需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作当前线程;若当前线程的状态为expect,则设置它的状态为update。

 

5. setExclusiveOwnerThread()

setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中实现,它的源码如下:

// exclusiveOwnerThread是当前拥有“独占锁”的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}

说明:setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。

 

6. getState(), setState()

getState()和setState()都在AQS中实现,源码如下:

// 锁的状态
private volatile int state;
// 设置锁的状态
protected final void setState(int newState) {
    state = newState;
}
// 获取锁的状态
protected final int getState() {
    return state;
}

说明:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。

 

小结:tryAcquire()的作用就是让“当前线程”尝试获取锁。获取成功返回true,失败则返回false。

 

二. addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)的作用是,创建“当前线程”的Node节点,且Node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到CLH队列的末尾。

1.addWaiter()

addWaiter()在AQS中实现,源码如下:

private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
    enq(node);
    return node;
}

说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

 

2. compareAndSetTail()

compareAndSetTail()在AQS中实现,源码如下:

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

说明:compareAndSetTail也属于CAS函数,也是通过“本地方法”实现的。compareAndSetTail(expect, update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话,就将队尾设为update。

 

3. enq()

enq()在AQS中实现,源码如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

说明: enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。

 

小结:addWaiter()的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。

 

三. acquireQueued()

前面,我们已经将当前线程添加到CLH队列中了。而 acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取 锁了才返回。下面,我们看看acquireQueued()的具体流程。

 

1. acquireQueued()

acquireQueued()在AQS中实现,源码如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH队列的调度中,
        // “当前线程”在休眠时,有没有被中断过。
        boolean interrupted = false;
        for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

说明:acquireQueued()的目的是从队列中获取锁。

 

2. shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire()在AQS中实现,源码如下:

// 返回“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前继节点的状态
    int ws = pred.waitStatus;
    // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
    if (ws == Node.SIGNAL)
        return true;
    // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点”  为  “‘原前继节点’的前继节点”。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

说明
(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。

CANCELLED[1]  -- 当前线程已被取消
SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
[0]           -- 当前线程不属于上面的任何一种状态。

(02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。

 

3. parkAndCheckInterrupt())

parkAndCheckInterrupt()在AQS中实现,源码如下:

private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
}

说明:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。

这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。

补充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。
它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁。
关于LockSupport,我们会在之后的章节再专门进行介绍!

 

4. 再次tryAcquire()

了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

说明
(01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点,若对此有疑惑可以查看下面关于Node类的介绍。
(02) p == head && tryAcquire(arg)
首先,判断“前继节点”是不是CHL表头。如果是的话,则通过tryAcquire()尝试获取锁。
其实,这样做的目的是为了“让当前线程获取锁”,但是为什么需要先判断p==head呢?理解这个对理解“公平锁”的机制很重要,因为这么做的原因就是为了保证公平性!
(a) 前面,我们在shouldParkAfterFailedAcquire()我们判断“当前线程”是否需要阻塞;
(b) 接着,“当前线程”阻塞的话,会调用parkAndCheckInterrupt()来阻塞线程。当线程被解除阻塞的时候,我们会返回线程的中断状态。而 线程被解决阻塞,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。
(c) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这 一点,后面在“释放锁”的过程中会看到)。 OK,是前继节点调用unpark()唤醒了当前线程!
此时,再来理解p==head就很简单了:当前继节点是CLH队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过 tryAcquire()获取锁;获取成功的话,通过setHead(node)设置当前节点为头节点,并返回。
总之,如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是CLH表头”,此时就是满足p==head,也就是符合公平性原则的。否 则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!
小结:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。

 

四. selfInterrupt()

selfInterrupt()是AQS中实现,源码如下:

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到 cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也 就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于 Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态 之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!

 

小结:selfInterrupt()的作用就是当前线程自己产生一个中断。

 

总结

再回过头看看acquire()函数,它最终的目的是获取锁!

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中 排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

 

释放过程如下:

1. unlock()

unlock()在ReentrantLock.java中实现的,源码如下:

public void unlock() {
    sync.release(1);
}

说明
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。

关于AQS, ReentrantLock 和 sync的关系如下:

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }

    ...
}

从中,我们发现:sync是ReentrantLock.java中的成员对象,而Sync是AQS的子类。

 

2. release()

release()在AQS中实现的,源码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

说明
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

 

3. tryRelease()

tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:

protected final boolean tryRelease(int releases) {
    // c是本次释放锁之后的状态
    int c = getState() - releases;
    // 如果“当前线程”不是“锁的持有者”,则抛出异常!
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置当前线程的锁的状态。
    setState(c);
    return free;
}

说明
tryRelease()的作用是尝试释放锁。
(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。
getState(), setState()在前一章已经介绍过,这里不再说明。
getExclusiveOwnerThread(), setExclusiveOwnerThread()在AQS的父类AbstractOwnableSynchronizer.java中定义,源码如下:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    // “锁”的持有线程
    private transient Thread exclusiveOwnerThread;

    // 设置“锁的持有线程”为t
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

    // 获取“锁的持有线程”
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
   
    ...
}

 

4. unparkSuccessor()

在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。

private void unparkSuccessor(Node node) {
    // 获取当前线程的状态
    int ws = node.waitStatus;
    // 如果状态<0,则设置状态=0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
    // 这里的有效,是指“后继节点对应的线程状态<=0”
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒“后继节点对应的线程”
    if (s != null)
        LockSupport.unpark(s.thread);
}

说明
unparkSuccessor()的作用是“唤醒当前线程的后继线程”。后继线程被唤醒之后,就可以获取该锁并恢复运行了。
关于node.waitStatus的说明,请参考“上一章关于Node类的介绍”。

 

总结

“释放锁”的过程相对“获取锁”的过程比较简单。释放锁时,主要进行的操作,是更新当前线程对应的锁的状态。如果当前线程对锁已经彻底释放,则设置“锁”的持有线程为null,设置当前线程的状态为空,然后唤醒后继线程。

设计用于并发计算的数据结构

原文:http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html

大家都在谈论并行计算;这个主题非常热门。本文是讨论多线程结构的两篇系列文章的第一篇,介绍如何使用 POSIX 库在多线程环境中设计并发数据结构。

简介

现在,您的计算机有四个 CPU 核;并行计算 是最时髦的主题,您急于掌握这种技术。但是,并行编程不只是在随便什么函数和方法中使用互斥锁和条件变量。C++ 开发人员必须掌握的关键技能之一是设计并发数据结构。本文是两篇系列文章的第一篇,讨论如何在多线程环境中设计并发数据结构。对于本文,我们使用 POSIX Threads 库(也称为 Pthreads;见 参考资料 中的链接),但是也可以使用 Boost Threads 等实现(见 参考资料 中的链接)。

本文假设您基本了解基本数据结构,比较熟悉 POSIX Threads 库。您还应该基本了解线程的创建、互斥锁和条件变量。在本文的示例中,会相当频繁地使用 pthread_mutex_lockpthread_mutex_unlockpthread_cond_waitpthread_cond_signalpthread_cond_broadcast

设计并发队列

我们首先扩展最基本的数据结构之一:队列。我们的队列基于链表。底层列表的接口基于 Standard Template Library(STL;见 参考资料)。多个控制线程可以同时在队列中添加数据或删除数据,所以需要用互斥锁对象管理同步。队列类的构造函数和析构函数负责创建和销毁互斥锁,见 清单 1

清单 1. 基于链表和互斥锁的并发队列
#include <pthread.h>
#include <list.h> // you could use std::list or your implementation 

namespace concurrent { 
template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_lock, NULL); 
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_lock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
}

};

在并发队列中插入和删除数据

显然,把数据放到队列中就像是把数据添加到列表中,必须使用互斥锁保护这个操作。但是,如果多个线程都试图把数据添加到队列中,会发 生什么?第一个线程锁住互斥并把数据添加到队列中,而其他线程等待轮到它们操作。第一个线程解锁/释放互斥锁之后,操作系统决定接下来让哪个线程在队列中 添加数据。通常,在没有实时优先级线程的 Linux® 系统中,接下来唤醒等待时间最长的线程,它获得锁并把数据添加到队列中。清单 2 给出代码的第一个有效版本。

清单 2. 在队列中添加数据
void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
}

取出数据的代码与此类似,见 清单 3

清单 3. 从队列中取出数据
T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”element not found”;
       }
       pthread_mutex_lock(&_lock); 
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

清单 2清单 3 中的代码是有效的。但是,请考虑这样的情况:您有一个很长的队列(可能包含超过 100,000 个元素),而且在代码执行期间的某个时候,从队列中读取数据的线程远远多于添加数据的线程。因为添加和取出数据操作使用相同的互斥锁,所以读取数据的速度 会影响写数据的线程访问锁。那么,使用两个锁怎么样?一个锁用于读取操作,另一个用于写操作。清单 4 给出修改后的 Queue 类。

清单 4. 对于读和写操作使用单独的互斥锁的并发队列
template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_rlock, NULL); 
       pthread_mutex_init(&_wlock, NULL);
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_rlock);
       pthread_mutex_destroy(&_wlock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _rlock, _wlock;
}

清单 5 给出 push/pop 方法的定义。

清单 5. 使用单独互斥锁的并发队列 Push/Pop 操作
void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_wlock);
       _list.push_back(value);
       pthread_mutex_unlock(&_wlock);
}

T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”element not found”;
       }
       pthread_mutex_lock(&_rlock);
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_rlock);
       return _temp;
}

设计并发阻塞队列

目前,如果读线程试图从没有数据的队列读取数据,仅仅会抛出异常并继续执行。但是,这种做法不总是我们想要的,读线程很可能希望等待(即阻塞自身),直到有数据可用时为止。这种队列称为阻塞的队列。如何让读线程在发现队列是空的之后等待?一种做法是定期轮询队列。但是,因为这种做法不保证队列中有数据可用,它可能会导致浪费大量 CPU 周期。推荐的方法是使用条件变量 — 即 pthread_cond_t 类型的变量。在深入讨论语义之前,先来看一下修改后的队列定义,见 清单 6

清单 6. 使用条件变量的并发阻塞队列
template <typename T>
class BlockingQueue { 
public: 
   BlockingQueue ( ) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_cond, NULL);
    } 
    ~BlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_cond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

清单 7 给出阻塞队列的 pop 操作定义。

清单 7. 从队列中取出数据
T BlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       if (_list.empty( )) { 
           pthread_cond_wait(&_cond, &_lock) ;
       }
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

当队列是空的时候,读线程现在并不抛出异常,而是在条件变量上阻塞自身。pthread_cond_wait 还隐式地释放 mutex_lock。现在,考虑这个场景:有两个读线程和一个空的队列。第一个读线程锁住互斥锁,发现队列是空的,然后在 _cond 上阻塞自身,这会隐式地释放互斥锁。第二个读线程经历同样的过程。因此,最后两个读线程都等待条件变量,互斥锁没有被锁住。

现在,看一下 push() 方法的定义,见 清单 8

清单 8. 在阻塞队列中添加数据
void BlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _list.empty( );
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

如果列表原来是空的,就调用 pthread_cond_broadcast 以宣告列表中已经添加了数据。这么做会唤醒所有等待条件变量 _cond 的读线程;读线程现在隐式地争夺互斥锁。操作系统调度程序决定哪个线程获得对互斥锁的控制权 — 通常,等待时间最长的读线程先读取数据。

并发阻塞队列设计有两个要注意的方面:

  • 可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 会释放至少一个等待条件变量的线程,这个线程不一定是等待时间最长的读线程。尽管使用 pthread_cond_signal 不会损害阻塞队列的功能,但是这可能会导致某些读线程的等待时间过长。
  • 可能会出现虚假的线程唤醒。因此,在唤醒读线程之后,要确认列表非空,然后再继续处理。清单 9 给出稍加修改的 pop() 方法,强烈建议使用基于 while 循环的 pop() 版本。
清单 9. 能够应付虚假唤醒的 pop() 方法
T BlockingQueue<T>::pop( ) { 
       pthread_cond_wait(&_cond, &_lock) ;
       while(_list.empty( )) { 
           pthread_cond_wait(&_cond) ;
       }
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

设计有超时限制的并发阻塞队列

在许多系统中,如果无法在特定的时间段内处理新数据,就根本不处理数据了。例如,新闻频道的自动收报机显示来自金融交易所的实时股票行情,它每 n 秒收到一次新数据。如果在 n 秒内无法处理以前的一些数据,就应该丢弃这些数据并显示最新的信息。根据这个概念,我们来看看如何给并发队列的添加和取出操作增加超时限制。这意味着,如果系统无法在指定的时间限制内执行添加和取出操作,就应该根本不执行操作。清单 10 给出接口。

清单 10. 添加和取出操作有时间限制的并发队列
template <typename T>
class TimedBlockingQueue { 
public: 
   TimedBlockingQueue ( );
    ~TimedBlockingQueue ( );
    bool push(const T& data, const int seconds);
    T pop(const int seconds); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

首先看看有时间限制的 push() 方法。push() 方法不依赖于任何条件变量,所以没有额外的等待。造成延迟的惟一原因是写线程太多,要等待很长时间才能获得锁。那么,为什么不提高写线程的优先级?原因 是,如果所有写线程的优先级都提高了,这并不能解决问题。相反,应该考虑创建少数几个调度优先级高的写线程,把应该确保添加到队列中的数据交给这些线程。清单 11 给出代码。

清单 11. 把数据添加到阻塞队列中,具有超时限制
bool TimedBlockingQueue <T>::push(const T& data, const int seconds) {
       struct timespec ts1, ts2;
       const bool was_empty = _list.empty( );
       clock_gettime(CLOCK_REALTIME, &ts1);
       pthread_mutex_lock(&_lock);
       clock_gettime(CLOCK_REALTIME, &ts2);
       if ((ts2.tv_sec – ts1.tv_sec) <seconds) {
       was_empty = _list.empty( );
       _list.push_back(value);
       {
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

clock_gettime 例程返回一个 timespec 结构,它是系统纪元以来经过的时间(更多信息见 参考资料)。在获取互斥锁之前和之后各调用这个例程一次,从而根据经过的时间决定是否需要进一步处理。

具有超时限制的取出数据操作比添加数据复杂;注意,读线程会等待条件变量。第一个检查与 push() 相似。如果在读线程能够获得互斥锁之前发生了超时,那么不需要进行处理。接下来,读线程需要确保(这是第二个检查)它等待条件变量的时间不超过指定的超时时间。如果到超时时间段结束时还没有被唤醒,读线程需要唤醒自身并释放互斥锁。

有了这些背景知识,我们来看看 pthread_cond_timedwait 函数,这个函数用于进行第二个检查。这个函数与 pthread_cond_wait 相似,但是第三个参数是绝对时间值,到达这个时间时读线程自愿放弃等待。如果在超时之前读线程被唤醒,pthread_cond_timedwait 的返回值是 0清单 12 给出代码。

清单 12. 从阻塞队列中取出数据,具有超时限制
T TimedBlockingQueue <T>::pop(const int seconds) { 
       struct timespec ts1, ts2; 
       clock_gettime(CLOCK_REALTIME, &ts1); 
       pthread_mutex_lock(&_lock);
       clock_gettime(CLOCK_REALTIME, &ts2);

       // First Check 
       if ((ts1.tv_sec – ts2.tv_sec) < seconds) { 
           ts2.tv_sec += seconds; // specify wake up time
           while(_list.empty( ) && (result == 0)) { 
               result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
           }
           if (result == 0) { // Second Check 
               T _temp = _list.front( );
              _list.pop_front( );
              pthread_mutex_unlock(&_lock);
              return _temp;
          }
      }
      pthread_mutex_unlock(&lock);
      throw “timeout happened”;
}

清单 12 中的 while 循环确保正确地处理虚假的唤醒。最后,在某些 Linux 系统上,clock_gettime 可能是 librt.so 的组成部分,可能需要在编译器命令行中添加 –lrt 开关。

使用 pthread_mutex_timedlock API

清单 11清单 12 的缺点之一是,当线程最终获得锁时,可能已经超时了。因此,它只能释放锁。如果系统支持的话,可以使用 pthread_mutex_timedlock API 进一步优化这个场景(见 参考资料)。这个例程有两个参数,第二个参数是绝对时间值。如果在到达这个时间时还无法获得锁,例程会返回且状态码非零。因此,使用这个例程可以减少系统中等待的线程数量。下面是这个例程的声明:

int pthread_mutex_timedlock(pthread_mutex_t *mutex,
       const struct timespec *abs_timeout);

设计有大小限制的并发阻塞队列

最后,讨论有大小限制的并发阻塞队列。这种队列与并发阻塞队列相似,但是对队列的大小有限制。在许多内存有限的嵌入式系统中,确实需要有大小限制的队列。

对于阻塞队列,只有读线程需要在队列中没有数据时等待。对于有大小限制的阻塞队列,如果队列满了,写线程也需要等待。这种队列的外部接口与阻塞队列相似,见 清单 13。(注意,这里使用向量而不是列表。如果愿意,可以使用基本的 C/C++ 数组并把它初始化为指定的大小。)

清单 13. 有大小限制的并发阻塞队列
template <typename T>
class BoundedBlockingQueue { 
public: 
   BoundedBlockingQueue (int size) : maxSize(size) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_rcond, NULL);
       pthread_cond_init(&_wcond, NULL);
       _array.reserve(maxSize);
    } 
    ~BoundedBlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_rcond);
       pthread_cond_destroy(&_wcond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    vector<T> _array; // or T* _array if you so prefer
    int maxSize;
    pthread_mutex_t _lock;
    pthread_cond_t _rcond, _wcond;
}

在解释添加数据操作之前,看一下 清单 14 中的代码。

清单 14. 在有大小限制的阻塞队列中添加数据
void BoundedBlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _array.empty( );
       while (_array.size( ) == maxSize) { 
           pthread_cond_wait(&_wcond, &_lock);
       } 
       _ array.push_back(value);
      pthread_mutex_unlock(&_lock);
      if (was_empty) 
          pthread_cond_broadcast(&_rcond);
}

锁是否可以扩展到其他数据结构?

当 然可以。但这是最好的做法吗?不是。考虑一个应该允许多个线程使用的链表。与队列不同,列表没有单一的插入或删除点,使用单一互斥锁控制对列表的访问会导 致系统功能正常但相当慢。另一种实现是对每个节点使用锁,但是这肯定会增加系统的内存占用量。本系列的第二部分会讨论这些问题。

对于 清单 13清单 14,要注意的第一点是,这个阻塞队列有两个条件变量而不是一个。如果队列满了,写线程等待 _wcond 条件变量;读线程在从队列中取出数据之后需要通知所有线程。同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息。如果在发送广播通知时没有线程在等待 _wcond_rcond,会发生什么?什么也不会发生;系统会忽略这些消息。还要注意,两个条件变量使用相同的互斥锁。清单 15 给出有大小限制的阻塞队列的 pop() 方法。

清单 15. 从有大小限制的阻塞队列中取出数据
T BoundedBlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       const bool was_full = (_array.size( ) == maxSize);
       while(_array.empty( )) { 
           pthread_cond_wait(&_rcond, &_lock) ;
       }
       T _temp = _array.front( );
       _array.erase( _array.begin( ));
       pthread_mutex_unlock(&_lock);
       if (was_full)
           pthread_cond_broadcast(&_wcond);
       return _temp;
}

注意,在释放互斥锁之后调用 pthread_cond_broadcast。这是一种好做法,因为这会减少唤醒之后读线程的等待时间。

结束语

本文讨论了几种并发队列及其实现。实际上,还可能实现其他变体。例如这样一个队列,它只允许读线程在数据插入队列经过指定的延时之后才能读取数据。请通过 参考资料 进一步了解 POSIX 线程和并发队列算法。

参考资料

学习

  • 阅读出色的 pthreads 的基本用法 -- 介绍 POSIX 线程
  • 了解关于 POSIX Thread 库 的更多信息。
  • 阅读 在 POSIX 线程编程中避免内存泄漏(Wei Dong Xie,developerWorks,2010 年 8 月),进一步了解 Pthread 编程。
  • 了解关于 并发队列算法 的更多信息。
  • 寻找关于 clock time 例程 的更多信息。
  • 了解关于 利用互斥锁进行时间锁定 的更多信息。
  • AIX and UNIX 专区:developerWorks 的“AIX and UNIX 专区”提供了大量与 AIX 系统管理的所有方面相关的信息,您可以利用它们来扩展自己的 UNIX 技能。
  • AIX and UNIX 新手入门:访问“AIX and UNIX 新手入门”页面可了解更多关于 AIX 和 UNIX 的内容。
  • AIX and UNIX 专题汇总:AIX and UNIX 专区已经为您推出了很多的技术专题,为您总结了很多热门的知识点。我们在后面还会继续推出很多相关的热门专题给您,为了方便您的访问,我们在这里为您把本专区的所有专题进行汇总,让您更方便的找到您需要的内容。
  • AIX and UNIX 下载中心:在这里你可以下载到可以运行在 AIX 或者是 UNIX 系统上的 IBM 服务器软件以及工具,让您可以提前免费试用他们的强大功能。
  • IBM Systems Magazine for AIX 中文版: 本杂志的内容更加关注于趋势和企业级架构应用方面的内容,同时对于新兴的技术、产品、应用方式等也有很深入的探讨。IBM Systems Magazine 的内容都是由十分资深的业内人士撰写的,包括 IBM 的合作伙伴、IBM 的主机工程师以及高级管理人员。所以,从这些内容中,您可以了解到更高层次的应用理念,让您在选择和应用 IBM 系统时有一个更好的认识。
  • 技术书店:浏览关于这些和其他技术主题的图书。

获得产品和技术

讨论

nginx的HeadersMoreNginxModule模块功能

模块地址:

https://github.com/openresty/headers-more-nginx-module

ngx_headers_more - Set and clear input and output headers...more than "add"!
设置和清除 http的输入和输出头

This module is not distributed with the Nginx source. See the installation instructions.
本模块不是标准模块, 安装需要参照最后的安装指令

Version

This document describes headers-more-nginx-module v0.25 released on 10 January 2014.
Synopsis

# set the Server output header  设置server输出header
more_set_headers 'Server: my-server';

# set and clear output headers       设置和清除输出header
location /bar {
more_set_headers 'X-MyHeader: blah' 'X-MyHeader2: foo';
more_set_headers -t 'text/plain text/css' 'Content-Type: text/foo';
more_set_headers -s '400 404 500 503' -s 413 'Foo: Bar';
more_clear_headers 'Content-Type';          清除header

# your proxy_pass/memcached_pass/or any other config goes here...
}

# set output headers
location /type {
more_set_headers 'Content-Type: text/plain';
# ...
}

# set input headers    设置输入 header, 由nginx传递给 应用服务器,例如tomcat
location /foo {
set $my_host 'my dog';
more_set_input_headers 'Host: $my_host';
more_set_input_headers -t 'text/plain' 'X-Foo: bah';

# now $host and $http_host have their new values...
# ...
}

# replace input header X-Foo *only* if it already exists
more_set_input_headers -r 'X-Foo: howdy';

Description

This module allows you to add, set, or clear any output or input header that you specify.
这个模块允许您 添加,设置,清除 任何输出(发给客户端浏览器的)和输入(有浏览器发过来的)流中的http头。

This is an enhanced version of the standard headers module because it provides more utilities like resetting or clearing "builtin

headers" like Content-Type, Content-Length, and Server.
这个是个增强版本的标准 头修改工具, 他提供了更多的功能比通常的内建的工具(理解的)

It also allows you to specify an optional HTTP status code criteria using the -s option and an optional content type criteria

using the -t option while modifying the output headers with the more_set_headers and more_clear_headers directives. For example,
模块也允许你设置一个可选的http状态码 通过使用-s选项, 和-t选项改变输出头  下面这个功能的例子:

more_set_headers -s 404 -t 'text/html' 'X-Foo: Bar';

Input headers can be modified as well. For example    输入的http头的例子

location /foo {
more_set_input_headers 'Host: foo' 'User-Agent: faked';
# now $host, $http_host, $user_agent, and
#   $http_user_agent all have their new values.
}

The option -t is also available in the more_set_input_headers and more_clear_input_headers directives (for request header

filtering) while the -s option is not allowed.

-t选项也可以在more_set_input_headers 和 more_clear_input_headers指令中使用

Unlike the standard headers module, this module's directives will by default apply to all the status codes, including 4xx and 5xx.
??

more_set_headers

syntax: more_set_headers [-t <content-type list>]... [-s <status-code list>]... <new-header>...

default: no

context: http, server, location, location if

phase: output-header-filter

Replaces (if any) or adds (if not any) the specified output headers when the response status code matches the codes specified by

the -s option AND the response content type matches the types specified by the -t option.

If either -s or -t is not specified or has an empty list value, then no match is required. Therefore, the following directive set

the Server output header to the custom value for any status code and any content type:

more_set_headers    "Server: my_server";

Existing response headers with the same name are always overridden. If you want to add headers incrementally, use the standard

add_header directive instead.

A single directive can set/add multiple output headers. For example

more_set_headers 'Foo: bar' 'Baz: bah';

Multiple occurrences of the options are allowed in a single directive. Their values will be merged together. For instance

more_set_headers -s 404 -s '500 503' 'Foo: bar';

is equivalent to

more_set_headers -s '404 500 503' 'Foo: bar';

The new header should be the one of the forms:

Name: Value
Name:
Name

The last two effectively clear the value of the header Name.

Nginx variables are allowed in header values. For example:

set $my_var "dog";
more_set_headers "Server: $my_var";

But variables won't work in header keys due to performance considerations.

Multiple set/clear header directives are allowed in a single location, and they're executed sequentially.

Directives inherited from an upper level scope (say, http block or server blocks) are executed before the directives in the

location block.

Note that although more_set_headers is allowed in location if blocks, it is not allowed in the server if blocks, as in

?  # This is NOT allowed!
?  server {
?      if ($args ~ 'download') {
?          more_set_headers 'Foo: Bar';
?      }
?      ...
?  }

Behind the scene, use of this directive and its friend more_clear_headers will (lazily) register an ouput header filter that

modifies r->headers_out the way you specify.

Back to TOC
more_clear_headers

syntax: more_clear_headers [-t <content-type list>]... [-s <status-code list>]... <new-header>...

default: no

context: http, server, location, location if

phase: output-header-filter

Clears the specified output headers.

In fact,

more_clear_headers -s 404 -t 'text/plain' Foo Baz;

is exactly equivalent to

more_set_headers -s 404 -t 'text/plain' "Foo: " "Baz: ";

or

more_set_headers -s 404 -t 'text/plain' Foo Baz

See more_set_headers for more details.

Wildcard * can also be used to specify a header name pattern. For example, the following directive effectively clears any output

headers starting by "X-Hidden-":

more_clear_headers 'X-Hidden-*';

The * wildcard support was first introduced in v0.09.

Back to TOC
more_set_input_headers

syntax: more_set_input_headers [-r] [-t <content-type list>]... <new-header>...

default: no

context: http, server, location, location if

phase: rewrite tail

Very much like more_set_headers except that it operates on input headers (or request headers) and it only supports the -t option.

Note that using the -t option in this directive means filtering by the Content-Type request header, rather than the response

header.

Behind the scene, use of this directive and its friend more_clear_input_headers will (lazily) register a rewrite phase handler

that modifies r->headers_in the way you specify. Note that it always run at the end of the rewrite so that it runs after the

standard rewrite module and works in subrequests as well.

If the -r option is specified, then the headers will be replaced to the new values only if they already exist.

Back to TOC
more_clear_input_headers

syntax: more_clear_input_headers [-t <content-type list>]... <new-header>...

default: no

context: http, server, location, location if

phase: rewrite tail

Clears the specified input headers.

In fact,

more_clear_input_headers -s 404 -t 'text/plain' Foo Baz;

is exactly equivalent to

more_set_input_headers -s 404 -t 'text/plain' "Foo: " "Baz: ";

or

more_set_input_headers -s 404 -t 'text/plain' Foo Baz

See more_set_input_headers for more details.

Back to TOC
Limitations

Unlike the standard headers module, this module does not automatically take care of the constraint among the Expires, Cache-

Control, and Last-Modified headers. You have to get them right yourself or use the headers module together with this module.
You cannot remove the Connection response header using this module because the Connection response header is generated by the

standard ngx_http_header_filter_module in the Nginx core, whose output header filter runs always after the filter of this module.

The only way to actually remove the Connection header is to patch the Nginx core, that is, editing the C function

ngx_http_header_filter in the src/http/ngx_http_header_filter_module.c file.

Back to TOC
Installation

Grab the nginx source code from nginx.org, for example, the version 1.7.7 (see nginx compatibility), and then build the source

with this module:

wget 'http://nginx.org/download/nginx-1.7.7.tar.gz'
tar -xzvf nginx-1.7.7.tar.gz
cd nginx-1.7.7/

# Here we assume you would install you nginx under /opt/nginx/.
./configure --prefix=/opt/nginx \
--add-module=/path/to/headers-more-nginx-module

make
make install

Download the latest version of the release tarball of this module from headers-more-nginx-module file list.

Also, this module is included and enabled by default in the ngx_openresty bundle.

Back to TOC
Compatibility

ppt学习资料-别告诉我你懂PPT

ppt是一种演示文档, 用户可以在投影仪或者计算机上进行演示,也可以将演示文稿打印出来,制作成胶片,以便应用到更广泛的领域中。利用Microsoft Office PowerPoint不仅可以创建演示文稿,还可以在互联网上召开面对面会议、远程会议或在网上给观众展示演示文稿。其格式后缀名为:ppt、pptx;或者也可以保存为:pdf、图片格式等。2010及以上版本中可保存为视频格式。演示文稿中的每一页就叫幻灯片,每张幻灯片都是演示文稿中既相互独立又相互联系的内容, 详细内容参见:
http://baike.baidu.com/link?url=2IbxEcHb6BXWKv4szR8QXUMu4qxe8uyoaNHe4-HfqdEkvuk1mq9q3Sn6-qykw2NH2yEam5EY_8WKwVskm8yMdX0TN9ACRAsNLjiJ5vJ8zoe

工作中, 经常接触PPT,可要自己研究并做出卓越的PPT还是有难度的,这就需要一份有效的技能学习指导。

《别告诉我你懂PPT》 是一个讲解如何制作ppt的一个好的书籍, 书籍配套作者还有个视频介绍,分享如下:http://pan.baidu.com/s/1jGrarP0   ak3w
因为PPT,英语长句子还说不利索,满分15分的论文答辩,她居然取得16分;因为PPT,在强手如云的全美研究生演讲比赛中,她照样摘取了榜眼;还是因为PPT,百事全球高级副总裁亲自面试她,并欣然高薪录用;与PPT结缘,她的人生变得精彩无限……
跟着著名外企才女,找到一个雷人的标题,创新一套与众不同的模板,使用5种武器,打造一鸣惊人的PPT。奇思妙想的真实案例,发人深省的职场感悟,一份精彩的PPT报告,助你提升工作效率,受到老板青睐,在职场上一步一步走向成功!
不同于其他PPT书籍,本书没有讲述加入动画、艺术字体等具体的PPT制作技巧,而是教你一种态度,一种让你爱上自己工作的热情,一种提高团队战斗力的工作方式。它是基于PPT之上的创意工作方法,不仅让你的PPT更加漂亮,还让你养成一种正确的职业态度和做人艺术,成为真正的“创新型人才”。
但愿这本书能够帮你开启快乐职场的大门,帮你work smart,工作起来不仅效率高,还不觉得累。打开这本书,找到属于你的大山,营造你的快乐气场,静静地听,那来自大山的回音……

pdf参考书籍地址:
http://vdisk.weibo.com/s/dxQ3BZm3wMmJe?sudaref=www.baidu.com