调度方式
- Consider a simple loop that calls a function
dummy
containing a programmable delay (sleep). All invocations of the function are independent of the others. Partition this loop across four threads usingstatic
,dynamic
, andguided
scheduling. Use different parameters for static and guided scheduling. Document the result of this experiment as the delay within thedummy
function becomes large.
解决方法
理论分析
OpenMP中,任务调度主要用于并行的for循环中,当循环中每次迭代的计算量不相等时,如果简单地给各个线程分配相同次数的迭代的话,会造成各个线程计算负载不均衡,这会使得有些线程先执行完,有些后执行完,造成某些CPU核空闲,影响程序性能。
schedule子句
schedule(type[, chunk])
参数type
是指调度的类型,可以取值为static
,dynamic
,guided
。
参数chunk
表示每次调度的迭代数量,必须是整数。该参数是可选的。
静态调度static
在编译的时候就已经确定了,哪些循环由哪些线程执行。当不使用 chunk 时,将给每个线程分配$\lceil\frac{N}{t}\rceil$个迭代。当使用 chunk 时,将每次给线程分配 chunk 次迭代。
动态调度dynamic
动态调度依赖于运行时的状态动态确定线程所执行的迭代,也就是线程执行完已经分配的任务后,会去领取还有的任务。由于线程启动和执行完的时间不确定,所以迭代被分配到哪个线程是无法事先知道的。
启发式调度guided
动态调度依赖于运行时的状态动态确定线程所执行的迭代,也就是线程执行完已经分配的任务后,会去领取还有的任务。
初始大小是 number_of_iterations / number_of_threads,每一次迭代会是 number_of_iterations_remaining / number_of_threads。
实现方法
代码框架:
#pragma omp parallel for schedule(schedule_type, chunk) num_threads(4)
for i : 1 to times do
dummy(delay_time)
end
为了观察到清晰的实验结果,chunk
我选择从 1 迭代到 5。
迭代次数times
选择了 10。
delay_time
一方面为了更好的观测调度方式的不同,另一方面不希望程序跑太久,选择了$i\times100$。
实验结果
可以看到,在 chunk 的 size 为 1 的时候,static
和dynamic
要略优于guided
;而当 chunk 的 size 为 2 的时候,guided
方式的效率要远高于static
和dynamic
。当 chunk 变大的时候,三种调度方式几乎没有差异。
当有大量任务需要循环时,刚开始为线程分配大量任务,最后任务不多时,给每个线程少量任务,可以达到线程任务均衡。不难理解,在 chunk 为 2 的时候,他是最优的。
当 chunk 变大的时候,由于块的大小比较大,使得三种分配方式几乎没有差异。因此,采取比较小的 chunk 能提供最好的性能。
选取了 chunk 为 2 的时候,线程执行的情况:
static
i = 0 calls from thread 0
i = 1 calls from thread 0
i = 2 calls from thread 1
i = 4 calls from thread 2
i = 3 calls from thread 1
i = 6 calls from thread 3
i = 8 calls from thread 0
i = 5 calls from thread 2
i = 7 calls from thread 3
i = 9 calls from thread 0
dynamic
i = 0 calls from thread 1
i = 1 calls from thread 1
i = 2 calls from thread 3
i = 4 calls from thread 2
i = 3 calls from thread 3
i = 6 calls from thread 0
i = 5 calls from thread 2
i = 8 calls from thread 1
i = 7 calls from thread 0
i = 9 calls from thread 1
guided
i = 0 calls from thread 3
i = 1 calls from thread 3
i = 3 calls from thread 2
i = 2 calls from thread 3
i = 5 calls from thread 1
i = 7 calls from thread 0
i = 4 calls from thread 2
i = 6 calls from thread 1
i = 9 calls from thread 3
i = 8 calls from thread 0
static
方式中,线程 0 执行了0\1\8\9,线程 1 执行了2\3,线程 2 执行了4\5,线程 3 执行了6\7。可以发现,分配给线程的每一块的大小确实是 2,而且静态调度确实是按顺序划分的。
同样的,在dynamic
方式中,块的大小也是 2,但是他是按照线程执行的情况动态划分的。
而guided
中,不仅是按照线程执行的情况动态划分的,块的大小也从 2 逐步变成 1。开始时每个线程会分配到较大的迭代块,之后分配到的迭代块会逐渐递减。每个任务分配的任务是先大后小,指数下降。
section
- Implement a producer-consumer framework in OpenMP using sections to create a single producer task and a single consumer task. Ensure appropriate synchronization using locks. Test your program for a varying number of producers and consumers.
解决方法
理论分析
section语句是用在 sections 语句里用来将 sections 语句里的代码划分成几个不同的段。section语句可以理解成为任务并行,将不同的 section 交给不同的线程并行处理。
#pragma omp parallel sections
{
#pragma omp section
{
producer();
}
#pragma omp section
{
consumer();
}
}
这样,生产者和消费者在 sections 中的两个 section 里面,他们可以并行操作。
当有多个生产者和消费者的时候,框架如下:
#pragma omp parallel num_threads(p_count + c_count)
{
int id = omp_get_thread_num();
#pragma omp parallel sections
{
#pragma omp section
{
if (id < p_count)
producer(id);
}
#pragma omp section
{
if (id >= p_count)
consumer(id);
}
}
}
首先创建多个线程,划分相应的线程给生产者和消费者,这样,生产者和消费者在 sections 中的里面,是可以并行操作的。
实现方法
采用一个队列来存储生产者生产的资源,生产者将资源放入资源队列中,然后消费者取出队列进 行使用。在这种操作下,队列是一个临界资源, 需要用一个信号量来使得资源不会同时被生产者和消费者同时访问。特别需要注意的是,如果队列为空的时候,消费者试图从队列中取出资源,由于队列并没有资源,会出现越界的错误,需要使用一个信号量来表示是否为空,避免消费者从空队列中取出资源。
代码的核心部分如下:
void consume(int i, int id) {
printf("source %d is consumed by thread %d.\n", i, id);
}
void producer(int id) {
while (true) {
int x = produce();
sem_wait(&s);
source.push(x);
printf("source %d is produced by thread %d.\n", x, id);
sem_post(&s);
sem_post(&n);
}
}
void consumer(int id) {
while (true) {
int x;
sem_wait(&n);
sem_wait(&s);
x = source.front();
source.pop();
sem_post(&s);
consume(x, id);
}
}
这里 n 是控制队列是否为空的信号量,s 是控制避免重复访问队列的信号量。一开始队列为空,因此 n 的初值是 0,而初始时可以访问队列,s 的初值是 1。
实验结果
一个生产者和消费者的时候,截取了实验结果开头的那几行。
source 1 is produced by thread 0.
source 2 is produced by thread 0.
source 1 is consumed by thread 1.
source 3 is produced by thread 0.
source 2 is consumed by thread 1.
source 4 is produced by thread 0.
source 3 is consumed by thread 1.
source 5 is produced by thread 0.
source 4 is consumed by thread 1.
source 6 is produced by thread 0.
source 5 is consumed by thread 1.
source 7 is produced by thread 0.
source 6 is consumed by thread 1.
source 8 is produced by thread 0.
source 7 is consumed by thread 1.
source 9 is produced by thread 0.
source 8 is consumed by thread 1.
source 10 is produced by thread 0.
source 9 is consumed by thread 1.
source 11 is produced by thread 0.
可以发现,生产者在生产,而消费者在消费,两者同时进行。
而当有多个生产者和消费者的时候,实验结果前几行如下:
source 1 is produced by thread 2.
source 2 is produced by thread 0.
source 2 is produced by thread 1.
source 2 is produced by thread 3.
source 2 is produced by thread 2.
source 1 is consumed by thread 4.
source 3 is produced by thread 0.
source 2 is consumed by thread 7.
source 3 is produced by thread 1.
source 2 is consumed by thread 5.
source 3 is produced by thread 3.
source 2 is consumed by thread 6.
source 3 is produced by thread 2.
source 2 is consumed by thread 4.
source 4 is produced by thread 0.
source 3 is consumed by thread 7.
source 4 is produced by thread 1.
source 3 is consumed by thread 5.
source 4 is produced by thread 3.
多个线程正在同时工作,而且没有发生竞争。
稀疏矩阵乘法
- Consider a sparse matrix stored in the compressed row format (you may find a description of this format on the web or any suitable text on sparse linear algebra). Write an OpenMP program for computing the product of this matrix with a vector. Download sample matrices from the Matrix Market (http://math.nist.gov/MatrixMarket/) and test the performance of your implementation as a function of matrix size and number of threads.
解决方法
理论分析
行存储
稀疏矩阵由于有很多0,为了节省空间,一般压缩存储,通常只需要保存非零元素及其位置即可。
Compressed Row Storage(CRS)格式是把行的信息压缩存储了,只显式保留每行第一个非零元素的位置。
假设有稀疏矩阵A,我们需要创建三个数组,一个浮点型数组val,另外两个为整型数组(col_ind, row_ptr)。
val数组,大小为矩阵A的非零元素的个数,保存矩阵A的非零元素(按从上往下,从左往右的行遍历方式访问元素)。
col_ind数组,和val数组一样,大小为矩阵A的非零元素的个数,保存val数组中元素的列索引。其数学表示为:
如果 $val(k)=a(i,j)$,则 $col_ind(k)=j$。
row_ptr数组,大小为矩阵A的行数,保存矩阵A的每行第一个非零元素在val中的索引。其数学表示为:
如果$val(k)=a(i,j)$,则$row_ptr(i)\leq k < row_ptr(i+1)$。 按照惯例,一般定义$row_ptr(n+1) = nnz + 1$,而$nnz$是 A 中非零元素的个数。
该方法可以节省很多空间,只需要2nnz + n + 1个存储单元,而不是n的平方个单元。
实现方法
首先用一个结构体存储 val 数组和 col_ind 数组。由于相同行的 val 已经聚合在一起,这样能够很好地实现并行操作。
代码框架:
for i : 0 to row_size do
from <- row_ptr[i]
end <- row_ptr[i+1]
for j : from to end do
result[i] <- result[i] + val[j] * vector[j]
end
end
具体实现的时候,可以将 from 和 end 单独存储起来,减少依赖。
考虑并行的时候,reslut 需要做的 critical 操作。第一层循环在不同的循环体之间不存在数据竞争,不同的循环体对结果向量的不同的值进行修改。因此对第一层循环做并行化,不需要做额外地锁的开销。
实验结果
采用的矩阵如下:
实验结果如下:
Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 9 ms
没有出现 ERROR,计算结果正确。
实验中遇到的困难
困难一
一开始,受限于我的读题水平,我感觉我可能没有理解正确第一题的题意。为什么会产生这样的思想呢?我们来看看一开始的实验结果:
三种调度方式几乎没有任何区别。因为我是这样实现的,dummp
里面每个sleep
是睡眠1 ms
,而我更改了循环次数。后来我觉得奇奇怪怪的,于是我固定了循环次数,更改了sleep
的时间。结果还是得到一样的结果。这是为什么呢?
因为这两种做法显然都不能观察到三种调度方式的区别,原因在于,for里面每一步执行的效率都是一样的,因此无论调度方式是什么,都不会有显著效果的差异。需要观察到三种调度方式的差异,就应该和老师上课一样,for里面每一步的操作需要的时间是不一样的,有梯度差异,这样才能体会到分配线程的调度方式对效率的影响。
困难二
实现完成并行的代码后,与串行程序进行比较:
Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 17 ms
Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 20 ms
Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 13 ms
Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 9 ms
Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 7 ms
Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 12 ms
Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 19 ms
反复测量了很多次,可能是并行的开销有点大,实际并没有达到应有的计算效果。
困难三
这个也算后续,因为我后来发现,这个网站下载的矩阵是按列压缩的。不过由于矩阵的形状是方阵,采用按行读取的结果是原矩阵的转置,不会影响计算的复杂度。
附件
Question 1
代码
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <windows.h>
#include <omp.h>
void dummy(int t) {
Sleep(t);
}
int main() {
clock_t start, finish;
double Total_time;
FILE *fp1 = fopen("static.txt", "w");
FILE *fp2 = fopen("dynamic.txt", "w");
FILE *fp3 = fopen("guided.txt", "w");
for (int p = 1; p <= 5; p++) {
// static, p
start = clock();
#pragma omp parallel for schedule(static, p) num_threads(4)
for (int i = 0; i < 10; i++)
dummy(i * 100);
finish = clock();
Total_time = (double)(finish - start);
fprintf(fp1, "%f\n", Total_time);
// dynamic, p
start = clock();
#pragma omp parallel for schedule(dynamic, p) num_threads(4)
for (int i = 0; i < 10; i++)
dummy(i * 100);
finish = clock();
Total_time = (double)(finish - start);
fprintf(fp2, "%f\n", Total_time);
// guided, p
start = clock();
#pragma omp parallel for schedule(guided, p) num_threads(4)
for (int i = 0; i < 10; i++)
dummy(i * 100);
finish = clock();
Total_time = (double)(finish - start);
fprintf(fp3, "%f\n", Total_time);
}
printf("static\n");
#pragma omp parallel for schedule(static, 2) num_threads(4)
for (int i = 0; i < 10; i++) {
dummy(i * 100);
printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
}
printf("dynamic\n");
#pragma omp parallel for schedule(dynamic, 2) num_threads(4)
for (int i = 0; i < 10; i++) {
dummy(i * 100);
printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
}
printf("guided\n");
#pragma omp parallel for schedule(guided, 2) num_threads(4)
for (int i = 0; i < 10; i++) {
dummy(i * 100);
printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
}
}
画图代码
import matplotlib.pyplot as plt
import numpy
fp1 = open('static.txt', 'r')
ls1 = []
for line in fp1:
line = line.strip('\n')
ls1.append(line.split(' ')[0])
ls1 = numpy.array(ls1, dtype = float)
fp1.close()
fp2 = open('dynamic.txt', 'r')
ls2 = []
for line in fp2:
line = line.strip('\n')
ls2.append(line.split(' ')[0])
ls2 = numpy.array(ls2, dtype = float)
fp2.close()
fp3 = open('guided.txt', 'r')
ls3 = []
for line in fp3:
line = line.strip('\n')
ls3.append(line.split(' ')[0])
ls3 = numpy.array(ls3, dtype = float)
fp3.close()
input_values = numpy.arange(1, 6, 1, dtype = int)
plt.plot(input_values, ls1)
plt.plot(input_values, ls2)
plt.plot(input_values, ls3)
plt.title("openmp for scheduling time delay", fontsize = 24)
plt.xticks(input_values)
plt.xlabel("chunk", fontsize = 14)
plt.ylabel("execute time (ms)", fontsize = 14)
plt.legend(['static', 'dynamic', 'guided'], loc = 'upper left')
# plt.tick_params(axis='both', labelsize = 14)
plt.show()
编译参数:
g++ -g -o 1 1.c -fopenmp
python plt.py
Question 2
代码
#include <stdio.h>
#include <stdlib.h>
#include <queue>
#include <windows.h>
#include <omp.h>
#include <cassert>
#include <semaphore.h>
using namespace std;
sem_t n, s;
queue <int> source;
int produce() {
Sleep(0.05);
if (source.empty())
return 1;
else
return source.back() + 1;
}
void consume(int i, int id) {
printf("source %d is consumed by thread %d.\n", i, id);
}
void producer(int id) {
while (true) {
int x = produce();
sem_wait(&s);
source.push(x);
printf("source %d is produced by thread %d.\n", x, id);
sem_post(&s);
sem_post(&n);
}
}
void consumer(int id) {
while (true) {
int x;
sem_wait(&n);
sem_wait(&s);
x = source.front();
source.pop();
sem_post(&s);
consume(x, id);
}
}
int main() {
sem_init(&n, 0, 0);
sem_init(&s, 0, 1);
// a varying number of producers and consumers
int p_count = 4;
int c_count = 4;
// a single producer task and a single consumer task
// int p_count = 1;
// int c_count = 1;
#pragma omp parallel num_threads(p_count + c_count)
{
int id = omp_get_thread_num();
printf("%d\n", id);
#pragma omp parallel sections
{
#pragma omp section
{
int qwq = omp_get_thread_num();
assert(qwq == id);
exit(0);
if (id < p_count)
producer(id);
}
#pragma omp section
{
int qwq = omp_get_thread_num();
assert(qwq == id);
exit(0);
if (id >= p_count)
consumer(id);
}
}
}
}
编译参数:
g++ -g -o 2 2.c -fopenmp
Question 3
代码
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define BUFLEN 1000000
char buf[BUFLEN];
int main(int argc, char **argv)
{
int my_id = 0;
int p;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
MPI_Comm_size(MPI_COMM_WORLD, &p);
MPI_Status status_p;
MPI_Barrier(MPI_COMM_WORLD);
double my_start,my_end,my_elapsed,elapsed;
my_start = MPI_Wtime();
if(my_id == 0){
for(int i = 1;i < p;i++){
MPI_Send(buf,BUFLEN,MPI_CHAR,i,i,MPI_COMM_WORLD);
}
}
else{
MPI_Recv(buf,BUFLEN,MPI_CHAR,0,my_id,MPI_COMM_WORLD,&status_p);
}
my_end = MPI_Wtime();
my_elapsed = my_end - my_start;
MPI_Reduce(&my_elapsed,&elapsed,1,MPI_DOUBLE,MPI_MAX,0,MPI_COMM_WORLD);
if(my_id == 0){
printf("Time delay is %f s\n",elapsed);
printf("Bandwidth is %f Mbit/s\n", BUFLEN * 1.0 / (1048576* elapsed));
}
MPI_Finalize();
return 0;
}
矩阵文件:s3dkq4m2.dat
编译参数:
g++ -g -o 3 3.c -fopenmp