OpenMP生产者消费者问题(未完结)

OpenMP生产者消费者问题

本节将讨论一个不适合用parallel for指令或者for指令来并行化的问题。

1.队列

队列是一种抽象的数据结构,插入元素时将元素插入到队列“尾部”,而读取元素时,队列“头部”的元素被返回并从队列中被移除。队列可以看做是在超市中等待付款的消费者的抽象,队列中的元素是消费者。新的消费者到达时排在等待队列的尾部,下一个付款离开等待队列的是排在队列头部的消费者。

当一个新的元素插入到队列的尾部时,通常称这个新的元素“入队”了;当一个元素从队列的头部被移除时,通常称这个元素“出队”了。

队列在计算机科学中随处可见。例如,如果有多个进程,每个进程都试图向硬盘写入数据,为了确保每次只有一个进程在写硬盘,一种自然而然的方法是将进程组织为队列。换句话说,排在队列第一个的进程在当前进程结束对硬盘的使用后,第一个获得硬盘的访问权限;排在队列第二个的进程在排在队列第一个的进程使用完硬盘后获得硬盘的访问权限,依此类推。

队列也是在多线程应用程序中经常使用到的数据结构。例如,我们有几个“生产者”线程和几个“消费者”线程。生产者线程“产生”对服务器数据的请求———例如当前股票的价格,而消费者线程通过发现和生成数据(例如,当前股票的价格)来“消费”请求。生产者线程将请求入队,而消费者线程将请求从队列中移除。在这个例子中,只有当消费者线程将请求的数据发送给生产者线程时,进程才会结束。

2.消息传递

生产者和消费者问题模型的另外一个应用是在共享内存系统上实现消息传递。每一个线程有一个消息共享队列,当一个线程要向另一个线程“发送消息“时,他将消息放入目标线程的消息队列中。一个线程接受消息时只需从它的消息队列的头部取出消息。

这里我们将实现一个简单的消息传递程序,在这个程序中,每个线程随机产生整数”消息“和消息的日志目标线程。当创建一条消息后,线程将消息加入到合适的消息队列中。当发送消息之后,该线程查看它自己的消息队列以获知它是否收到了消息,如果它收到了消息,它将从队首的消息出队并打印该消息。每个线程交替发送和接受消息,用户需要指定每个线程发送消息的数目。当一个线程发送完所有消息后,该线程不断接受消息直到所有的线程都已完成,此时所有的线程都结束了。每个线程的伪代码如下。

1
2
3
4
5
6
7
for(send_msgs = 0; sent_msgs < send_max ;sent_msgs++){
Send_msg();
Try_receive();
}

while(!Done())
Try_receive();

3.发送消息

需要注意的是,访问消息队列并将消息入队,可能是一个临界区。尽管我们还没有深入地研究如何实现消息队列,但我们很有可能需要用一个变量来跟踪队列的尾部。例如,使用一个单链表来实现消息队列,链表的尾部对应着队列的尾部。然后,为了有效地进行入队操作,需要存储指向链表尾部的指针,当一条新消息入队时,需要检查和更新这个队尾指针。如果两个线程试图同时进行这些操作,那么可能会丢失一条已经由其中一个线程入队的消息.(画张图能够有助于理解这种情况!)两个操作的结果会发生冲突,因此入队操作形成了临界区。

Send_msg()函数的伪代码如下:

1
2
3
4
mesg = random();
dest = random() % thread_count;
#pragma omp critical
Enqueue(queue,dest,my_rank.mesg);

注意在上面的实现中,允许线程向它自己发送消息。

4.接受消息

接受消息的同步问题与发送消息有些不同。只有消息队列的拥有者(即目标线程)可以从给定的消息队列中获取消息。如果消息队列中至少有两条消息,那么只要每次只出队一条消息,那么出队操作和入队操作就不可能冲突。因此如果队列中至少有两条消息,通过跟踪队列的大小就可以避免任何同步(例如critical指令)

现在的问题是如何存储队列大小。如果只使用一个变量来存储队列的大小,那么对该变量的操作会形成临界区。然而可以使用两个变量:enqueued和dequeued,那么队列中消息的个数(队列的大小)就为

1
queue_size = enqueued - dequeued

并且,唯一能够更新dequeued的线程是消息队列的拥有者。可以看到在一个线程使用enqueued计算队列大小queue_size的同时,另外一个线程可以更新enqueued。为了解释这种情况,假如进程q正在计算queue_size,那么它将可能得到enqueued新的或者旧的值。当queue_size实际值是1或者2时,线程q可能会得到queue_size是0或者1。但这只会引起程序一定的延迟,而不会引起程序错误。如果queue_size本应该是1,却误计算为0,那么线程q延迟一段时间后会试图重新计算队列的大小;如果queue_size本应该是2,却误计算为1,那么线程q将执行临界区指令,虽然这本来是不必要的。

因此,可以按照如下的方式实现Try_receive:

1
2
3
4
5
6
7
8
queue_size = enqueued - dequeued;
if(queue_size == 0) return;
else if(queue_size == 1)
#pragma omp critical
Dequeue(queue,&src,&mesg);
else
Dequeue(queue,&src,&mesg);
Print_message(src,mesg);

5.终止检测

接下来,我们探讨如何实现Done函数。首先,我们给出一个”直接“的实现,但这个实现隐藏着问题:

1
2
3
4
5
queue_size = enqueued - dequeued;
if(queue_size == 0)
return True;
else
return False;

如果线程u执行这段代码,那么很有可能有些线程,如线程v,在线程u计算出queue_size = 0后向线程u发送一条消息。当然,线程u在得出queue_size = 0后将终止,那么线程v发送给它的消息就永远不会被接受到。

然而,在我们程序中,每个线程在执行完for循环后将不再发送任何消息。因此可以增加一个计数器done_sending,每个线程在for循环结束后将该计数器加1,Done的实现如下:

1
2
3
4
5
queue_size = enqueued - dequeued;
if(queue_size == 0 && done_sending == thread_count)
return TRUE;
else
rerun FALSE;

6.启动

当程序开始执行时,主线程将得到命令行参数并且分配一个数组空间给消息队列,每个线程对应着一个消息队列。由于每个线程可以向其他任意的下次线程发送消息,所以这个数组应该被所有线程共享,而且每个线程可以向任何一个消息队列插入一条消息。消息队列(至少)可以存储:

  1. 消息列表
  2. 队尾指针或索引
  3. 队首指针或索引
  4. 入队消息的数目
  5. 出队消息的数目

最好将队列存在消息队列的结构体中,为了减少参数传递时复制的开销,最好用指向结构体的指针数组来实现消息队列。因此,一旦主线程分配了队列数组,就可以使用parallel指令开始执行线程,每个线程可以为自己的队列分配存储空间。

这里一个重要的问题是:一个或者多个线程可能在其他线程之前完成它的队列分配。如果这种情况出现了,那么完成分配的线程可能会试图开始向那些还没有完成队列分配的线程发送消息,这将导致程序崩溃。因此,我们必须确保任何一个线程都必须在所有的线程都完成了队列分配后才开始发送消息。回想一下,之前我们见过一些OpenMP指令在结束时提供隐式路障,即任何一个线程都必须等到组中所有的线程完成了某个程序块后才可以接着执行后续代码。然而,在这个例子中,我们处于parallel块的中间,所以我们不能依赖于OpenMP提供的隐式路障——我们应当使用显式路障。幸运的是,OpenMP提供了相应的指令:

1
#pragma omp barrier

当线程遇到路障时,它将被阻塞,直到组中所有的线程都到达了这个路障。当组中所有的线程都到达了这个路障时,这些线程就可以接着往下执行。

7.atomic指令

发送完所有的消息后,每个线程在执行最后的循环以便接受消息之前,需要对done_sending加1.显然,对done_sending的增量操作是临界区,可以通过critical指令来保护它。然后,OpenMP提供了另外一种可能更加高效的指令:atomic指令:

1
#pragma omp atomic

与critical指令不同,它只能保护由一条C语言赋值语句所形成的临界区。此外,语句必须是一下几种形式之一:

1
2
3
4
5
x <op> = <expression>
x++;
++x;
x--;
--x;

可以是以下任意的二元操作符:

1
+,*,-,/,&,|,^,<<,or >>

这里要记住,不能引用x。

需要注意的是,只有x的装载和存储可以确保是受保护的,例如在下面的代码中:

1
2
#pragma omp atomic
x += y++;

其他线程对x的更新必须等到该线程对x的更新结束之后。但是对y的更新不受保护,因此程序的结果是不可预测的。

atomic指令的思想是许多处理器提供专门的装载-修改-存储(load-modify-store)指令。使用这种专门的指令而不使用保护临界区的通用结构,可以更高效地保护临界区。

8.临界区和锁

为了完成对消息传递程序的讨论,我们需要进一步仔细研究OpenMP critical指令的规范。在更早的例子中,程序最多只有一个临界区,critical指令强制所有的线程对该区域进行互斥访问。在这个程序中,临界区的使用将更加复杂。我们将在源代码中看到3个在critical或atomic指令后面的代码块:

  1. done_sending++
  2. Enqueue(q_p,my_rank,mesg);
  3. Dequeue(q_p,&src,&mesg);

然而,我们不需要强制对3个代码块都进行互斥访问,甚至不需要强制对第二个和第三个代码块进行完全的互斥访问。例如,线程0在向线程1的消息队列写消息的同时,线程1可以向线程2的消息队列写消息。但是OpenMP的规定第二个和第三个代码块是被critical指令保护的代码块。在OpenMP看来,我们的程序有两个不同的临界区;被atomic指令保护的done_sending++和“复合”临界区。在“复合”临界区中,程序读取和发送消息。

强制线程间的互斥会使程序的执行串行化。OpenMP默认的做法是将所有的临界区代码块作为复合临界区的一部分,这可能非常不利于程序的性能。OpenMP提供了向critical指令添加名字的选项:

1
#pragma omp critical(name)

采取这种方式,两个用不同名字的critical指令保护的代码块就可以同时执行。我们想为每一个线程的消息队列的临界区提供不同的名字,但是临界区的名字是在程序编译过程中设置的。因此,我们需要在程序执行的过程中设置临界区的名字。但是按照为我们的设置,当我们想让访问不同队列的线程可以同时访问相同的代码块时,被命名的critical指令就不能满足我们的要求了。

解决方案是使用锁(lock)。锁由一个数据结构和定义在这个数据结构上的函数组成,这些函数使得程序员可以显式地强制对临界区进行互斥访问。锁的使用可以大概用下面的伪代码描述:

1
2
3
4
5
6
7
8
9
10
/*Executed by one thread*/
initialize the lock data structure;
...
/*Executed by multiple threads*/
Attempt to lock or set the lock data structure;
Critical section;
Unlock or unset the lock data structure;
...
/*Executed by one thread*/
Destory the lock data structure;

锁的数据结构被执行临界区的线程所共享,这些线程中的某个线程(如主线程)会初始化锁。而当所有的线程都使用完锁后,某个线程应当负责销毁锁。

在一个线程进入临界区前,它尝试通过调用锁函数来上锁(set)。如果没有其他的线程正在执行临界区代码,那么它将获得锁并进入临界区。当该线程执行完临界区代码后,它调用解锁函数释放(relinquish或者unset)锁,以便其他线程可以获得锁。

当一个线程拥有锁时,其他线程都不能进入该临界区。其他线程尝试通过调用锁函数进入该临界区时会阻塞。如果有多个线程被锁函数阻塞,则当临界区的线程释放锁时,这些线程中的某个线程会获得锁,而其他线程仍被阻塞。

OpenMP有两种锁:简单(simple)锁和嵌套(nested)锁。简单锁在被释放前只能获得一次,而一个嵌套锁在被释放前可以被同一个线程获得多次。OpenMP简单锁的类型是omp_lock_t,定义简单锁的函数包括:

image-20230116165204803

相关的类型和函数在头文件omp.h中声明。第一个函数的作为是初始化锁,所以此时锁处于解锁状态,换句话说,此时没有线程拥有这个锁。第二个函数尝试获得锁,如果成功,调用该函数的线程可以继续执行;如果失败,调用该函数的线程将被阻塞,直到锁被其他线程释放。第三个函数释放锁,以便其他线程可以获得该锁。第四个函数销毁锁。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!