消息队列的原理与实现

  categories:资料  tags:  author:

来源:http://www.springload.cn/springload/detail/312

简介

消息驱动机制是 GUI 系统的基础,消息驱动的底层基础设施之一是消息队列,它是整个 GUI 系统运转中枢,本文介绍了一个基于环形队列的消息队列实现方法,给出了它的数据结构、主要操作流程和核心代码。

环形队列

环行队列是一种首尾相连的队列数据结构,遵循先进先出原则,如下图所示:

ring buffer 示意图

在环形队列中用一组连续地址的存储单元依次存放从队列头到队列尾的元素,通过两个指针 read_pos 和 write_pos 分别指向读取位置和写入位置。

初始化队列时,令 read_pos = write_pos = 0,每当写入一个新元素时,write_pos 增 1;每当读取一个元素时,read_pos 增 1 。若队列已满,不能往队列写入数据;若队列为空,则不能读取数据。判断对列是否为满的的方法是看(write_pos + 1)% QUEUE_SIZE == read_pos 是否成立,判断队列是否为空的方法是看 write_pos == read_pos 是否成立。

鉴于多个线程同时访问环形队列,需要考虑线程之间的互斥和同步问题,拟采用锁控制多个线程互斥访问环形队列,使用信号量控制线程之间的同步。

一段时间内只能有一个线程获得锁,当它持有锁时,其它线程要访问环形队列必须等待,直到前者释放锁。由此,锁可以保证多个线程互斥的访问环形队列。

线程从队列对数据前首先判断信号量是否大于 1 ,若是,则从队列读数据;否则,进入等待状态,直到信号量大于 1 为止;线程往队列写入一个数据后,会将信号量增 1 ,若有线程在等待,则会被唤醒。由此,信号量实现了多线程同步访问环形队列。

流程图

下图是环形缓冲区的初始化、读数据、写数据的主要流程。

ring buffer 流程图

  • 初始化时为环形队列分配内存空间,并完成锁和信号量的初始化;
  • 若往环形队列写数据,首先要获得锁, 若锁已被占用,则进入等待状态,否则进一步去判断环形队列是否已满。若满了,则释放锁并返回;若队列未满,将数据写入 write_pos 位置write_pos 增 1,释放锁并将信号量增 1,表示已写入一个数据;
  • 若从环形队列读数据,首先判断信号量是否大于 1 ,若不是,则等待,否则去获取锁,若锁已被占用,则等待,否则从 read_pos 位置读取数据,将read_pos 增 1 ,释放锁,读取完毕。

数据结构

环形队列的数据结构如下所示:

01 typedef _MSG {
02 int message;
03 void* param;
04 } MSG;
05
06 typedef _MSGQUE {
07 pthread_mutex_t lock;
08 sem_t  wait;
09
10 MSG* msg;
11 int size;
12
13 int read_ops;
14 int write_ops;
15 } MSGQUEUE;

环形队列包括如下数据:

  • lock:互斥锁;
  • wait:信号量
  • msg:指向数据区的指针;
  • size:环形队列数据最大个数;
  • read_ops:读取位置;
  • write_ops:写入位置。

队列初始化

初始化主要完成三个任务:

  • 为环形队列分配内存;
  • 初始化互斥锁,用 pthread_mutex_init 完成;
  • 初始化信号量,用 sem_init 完成。
01 /* Create message queue */
02 _msg_queue = malloc (sizeof (MSGQUEUE));
03
04 /* init lock and sem */
05 pthread_mutex_init (&_msg_queue->lock, NULL);
06 sem_init (&_msg_queue->wait, 0, 0);
07
08 /* allocate message memory */
09 _msg_queue -> msg = malloc (sizeof(MSG) * nr_msg);
10 _msg_queue -> size = nr_msg;

写操作

如上面的流程图介绍,写操作主要包括如下几步:

  • 获取锁;
  • 判断队列是否已满;
  • 若没满,将数据写入 write_pos 处,将 write_pos 增 1,并判断 write_pos 是否越界;
  • 释放锁,并将信号量增 1。
01 /* lock the message queue */
02 pthread_mutex_lock (_msg_queue->lock);
03
04 /* check if the queue is full. */
05 if ((_msg_queue->write_pos + 1)% _msg_queue->size == _msg_queue->read_pos) {
06 /* Message queue is full. */
07 pthread_mutex_unlock (_msg_queue->lock);
08 return;
09 }
10
11 /* write a data to write_pos. */
12 _msg_queue -> msg [write_pos] = *msg;
13 write_pos ++;
14
15 /* check if write_pos if overflow. */
16 if (_msg_queue->write_pos >= _msg_queue->size)
17 _msg_queue->write_pos = 0;
18
19 /* release lock */
20 pthread_mutex_unlock (_msg_queue->lock);
21
22 sem_post (_msg_queue->wait);

读操作

同理,读操作分如下几个步骤:

  • 检查信号量;
  • 获取锁;
  • 判断队列是否为空;
  • 若不为空,则读取 read_ops 处的数据,将 read_ops 增 1,并判断 read_pos 是否越界;
  • 并释放锁。
01 sem_wait (_msg_queue->wait);
02
03 /* lock the message queue */
04 pthread_mutex_lock (_msg_queue->lock);
05
06 /* check if queue is empty */
07 if (_msg_queue->read_pos != _msg_queue->write_pos) {
08 msg = _msg_queue->msg + _msg_queue->read_pos;
09
10 /* read a data and check if read_pos is overflow */
11 _msg_queue->read_pos ++;
12 if (_msg_queue->read_pos >= _msg_queue->size)
13 _msg_queue->read_pos = 0;
14
15 return;
16 }
17
18 /* release lock*/
19 pthread_mutex_unlock (_msg_queue->lock);

问题

  • 本文采用的环形队列是固定长度的,还可进一步改进,设计成可变长度的环形队列;
  • 本文的消息队列是“先进先出”原则,没有考虑带优先级的消息,但这种场合是存在的;
  • 本文重点介绍了消息队列的原理和实现,对于一个 GUI 程序来讲,还需要一个消息循环与消息队列一起工作,消息循环将单独总结。


快乐成长 每天进步一点点      京ICP备18032580号-1