调度与section与稀疏矩阵乘法

并行与分布式系统

Posted by Birdie on May 2, 2022

调度方式

  1. Consider a simple loop that calls a function dummy containing a programmable delay (sleep). All invocations of the function are independent of the others. Partition this loop across four threads using static, dynamic, and guided scheduling. Use different parameters for static and guided scheduling. Document the result of this experiment as the delay within the dummy function becomes large.

解决方法

理论分析

​ OpenMP中,任务调度主要用于并行的for循环中,当循环中每次迭代的计算量不相等时,如果简单地给各个线程分配相同次数的迭代的话,会造成各个线程计算负载不均衡,这会使得有些线程先执行完,有些后执行完,造成某些CPU核空闲,影响程序性能。

schedule子句

  schedule(type[, chunk])

  参数type是指调度的类型,可以取值为staticdynamicguided

  参数chunk表示每次调度的迭代数量,必须是整数。该参数是可选的。

静态调度static

​ 在编译的时候就已经确定了,哪些循环由哪些线程执行。当不使用 chunk 时,将给每个线程分配$\lceil\frac{N}{t}\rceil$个迭代。当使用 chunk 时,将每次给线程分配 chunk 次迭代。

动态调度dynamic

​ 动态调度依赖于运行时的状态动态确定线程所执行的迭代,也就是线程执行完已经分配的任务后,会去领取还有的任务。由于线程启动和执行完的时间不确定,所以迭代被分配到哪个线程是无法事先知道的。

启发式调度guided

​ 动态调度依赖于运行时的状态动态确定线程所执行的迭代,也就是线程执行完已经分配的任务后,会去领取还有的任务。

​ 初始大小是 number_of_iterations / number_of_threads,每一次迭代会是 number_of_iterations_remaining / number_of_threads。

实现方法

代码框架:

#pragma omp parallel for schedule(schedule_type, chunk) num_threads(4)
for i : 1 to times do
	dummy(delay_time)
end

为了观察到清晰的实验结果,chunk 我选择从 1 迭代到 5。

迭代次数times选择了 10。

delay_time 一方面为了更好的观测调度方式的不同,另一方面不希望程序跑太久,选择了$i\times100$。

实验结果

image-20211121172014310

​ 可以看到,在 chunk 的 size 为 1 的时候,staticdynamic要略优于guided;而当 chunk 的 size 为 2 的时候,guided方式的效率要远高于staticdynamic。当 chunk 变大的时候,三种调度方式几乎没有差异。

​ 当有大量任务需要循环时,刚开始为线程分配大量任务,最后任务不多时,给每个线程少量任务,可以达到线程任务均衡。不难理解,在 chunk 为 2 的时候,他是最优的。

​ 当 chunk 变大的时候,由于块的大小比较大,使得三种分配方式几乎没有差异。因此,采取比较小的 chunk 能提供最好的性能。

选取了 chunk 为 2 的时候,线程执行的情况:

static
i = 0 calls from thread 0
i = 1 calls from thread 0
i = 2 calls from thread 1
i = 4 calls from thread 2
i = 3 calls from thread 1
i = 6 calls from thread 3
i = 8 calls from thread 0
i = 5 calls from thread 2
i = 7 calls from thread 3
i = 9 calls from thread 0
dynamic
i = 0 calls from thread 1
i = 1 calls from thread 1
i = 2 calls from thread 3
i = 4 calls from thread 2
i = 3 calls from thread 3
i = 6 calls from thread 0
i = 5 calls from thread 2
i = 8 calls from thread 1
i = 7 calls from thread 0
i = 9 calls from thread 1
guided
i = 0 calls from thread 3
i = 1 calls from thread 3
i = 3 calls from thread 2
i = 2 calls from thread 3
i = 5 calls from thread 1
i = 7 calls from thread 0
i = 4 calls from thread 2
i = 6 calls from thread 1
i = 9 calls from thread 3
i = 8 calls from thread 0

static方式中,线程 0 执行了0\1\8\9,线程 1 执行了2\3,线程 2 执行了4\5,线程 3 执行了6\7。可以发现,分配给线程的每一块的大小确实是 2,而且静态调度确实是按顺序划分的。

​ 同样的,在dynamic方式中,块的大小也是 2,但是他是按照线程执行的情况动态划分的。

​ 而guided中,不仅是按照线程执行的情况动态划分的,块的大小也从 2 逐步变成 1。开始时每个线程会分配到较大的迭代块,之后分配到的迭代块会逐渐递减。每个任务分配的任务是先大后小,指数下降。

section

  1. Implement a producer-consumer framework in OpenMP using sections to create a single producer task and a single consumer task. Ensure appropriate synchronization using locks. Test your program for a varying number of producers and consumers.

解决方法

理论分析

​ section语句是用在 sections 语句里用来将 sections 语句里的代码划分成几个不同的段。section语句可以理解成为任务并行,将不同的 section 交给不同的线程并行处理。

#pragma omp parallel sections
{
    #pragma omp section
    {
        producer();
    }
    #pragma omp section
    {
        consumer();
    }
}

这样,生产者和消费者在 sections 中的两个 section 里面,他们可以并行操作。

​ 当有多个生产者和消费者的时候,框架如下:

#pragma omp parallel num_threads(p_count + c_count)
{
    int id = omp_get_thread_num();
    #pragma omp parallel sections
    {
        #pragma omp section
        {
            if (id < p_count)
                producer(id);
        }
        #pragma omp section
        {
            if (id >= p_count)
                consumer(id);
        }
    }
}

​ 首先创建多个线程,划分相应的线程给生产者和消费者,这样,生产者和消费者在 sections 中的里面,是可以并行操作的。

实现方法

​ 采用一个队列来存储生产者生产的资源,生产者将资源放入资源队列中,然后消费者取出队列进 行使用。在这种操作下,队列是一个临界资源, 需要用一个信号量来使得资源不会同时被生产者和消费者同时访问。特别需要注意的是,如果队列为空的时候,消费者试图从队列中取出资源,由于队列并没有资源,会出现越界的错误,需要使用一个信号量来表示是否为空,避免消费者从空队列中取出资源。

​ 代码的核心部分如下:

void consume(int i, int id) {
    printf("source %d is consumed by thread %d.\n", i, id);
}

void producer(int id) {
    while (true) {
        int x = produce();   
        sem_wait(&s);
        source.push(x);
        printf("source %d is produced by thread %d.\n", x, id);
        sem_post(&s);
        sem_post(&n);
    }
}

void consumer(int id) {
    while (true) {
        int x;
        sem_wait(&n);
        sem_wait(&s);
        x = source.front();
        source.pop();
        sem_post(&s);
        consume(x, id);
    }
}

​ 这里 n 是控制队列是否为空的信号量,s 是控制避免重复访问队列的信号量。一开始队列为空,因此 n 的初值是 0,而初始时可以访问队列,s 的初值是 1。

实验结果

​ 一个生产者和消费者的时候,截取了实验结果开头的那几行。

source 1 is produced by thread 0.
source 2 is produced by thread 0.
source 1 is consumed by thread 1.
source 3 is produced by thread 0.
source 2 is consumed by thread 1.
source 4 is produced by thread 0.
source 3 is consumed by thread 1.
source 5 is produced by thread 0.
source 4 is consumed by thread 1.
source 6 is produced by thread 0.
source 5 is consumed by thread 1.
source 7 is produced by thread 0.
source 6 is consumed by thread 1.
source 8 is produced by thread 0.
source 7 is consumed by thread 1.
source 9 is produced by thread 0.
source 8 is consumed by thread 1.
source 10 is produced by thread 0.
source 9 is consumed by thread 1.
source 11 is produced by thread 0.

可以发现,生产者在生产,而消费者在消费,两者同时进行。

​ 而当有多个生产者和消费者的时候,实验结果前几行如下:

source 1 is produced by thread 2.
source 2 is produced by thread 0.
source 2 is produced by thread 1.
source 2 is produced by thread 3.
source 2 is produced by thread 2.
source 1 is consumed by thread 4.
source 3 is produced by thread 0.
source 2 is consumed by thread 7.
source 3 is produced by thread 1.
source 2 is consumed by thread 5.
source 3 is produced by thread 3.
source 2 is consumed by thread 6.
source 3 is produced by thread 2.
source 2 is consumed by thread 4.
source 4 is produced by thread 0.
source 3 is consumed by thread 7.
source 4 is produced by thread 1.
source 3 is consumed by thread 5.
source 4 is produced by thread 3.

​ 多个线程正在同时工作,而且没有发生竞争。

稀疏矩阵乘法

  1. Consider a sparse matrix stored in the compressed row format (you may find a description of this format on the web or any suitable text on sparse linear algebra). Write an OpenMP program for computing the product of this matrix with a vector. Download sample matrices from the Matrix Market (http://math.nist.gov/MatrixMarket/) and test the performance of your implementation as a function of matrix size and number of threads.

解决方法

理论分析

行存储

​ 稀疏矩阵由于有很多0,为了节省空间,一般压缩存储,通常只需要保存非零元素及其位置即可。

Compressed Row Storage(CRS)格式是把行的信息压缩存储了,只显式保留每行第一个非零元素的位置。

假设有稀疏矩阵A,我们需要创建三个数组,一个浮点型数组val,另外两个为整型数组(col_ind, row_ptr)。

  • val数组,大小为矩阵A的非零元素的个数,保存矩阵A的非零元素(按从上往下,从左往右的行遍历方式访问元素)。

  • col_ind数组,和val数组一样,大小为矩阵A的非零元素的个数,保存val数组中元素的列索引。其数学表示为:

    如果 $val(k)=a(i,j)$,则 $col_ind(k)=j$。

  • row_ptr数组,大小为矩阵A的行数,保存矩阵A的每行第一个非零元素在val中的索引。其数学表示为:

    如果$val(k)=a(i,j)$,则$row_ptr(i)\leq k < row_ptr(i+1)$。 按照惯例,一般定义$row_ptr(n+1) = nnz + 1$,而$nnz$是 A 中非零元素的个数。

该方法可以节省很多空间,只需要2nnz + n + 1个存储单元,而不是n的平方个单元。

实现方法

​ 首先用一个结构体存储 val 数组和 col_ind 数组。由于相同行的 val 已经聚合在一起,这样能够很好地实现并行操作。

代码框架:

for i : 0 to row_size do
	from <- row_ptr[i]
	end <- row_ptr[i+1]
	for j : from to end do
		result[i] <- result[i] + val[j] * vector[j]
	end
end

​ 具体实现的时候,可以将 from 和 end 单独存储起来,减少依赖。

​ 考虑并行的时候,reslut 需要做的 critical 操作。第一层循环在不同的循环体之间不存在数据竞争,不同的循环体对结果向量的不同的值进行修改。因此对第一层循环做并行化,不需要做额外地锁的开销。

实验结果

采用的矩阵如下:

image-20211121204959138

实验结果如下:

Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 9 ms

没有出现 ERROR,计算结果正确。

实验中遇到的困难

困难一

​ 一开始,受限于我的读题水平,我感觉我可能没有理解正确第一题的题意。为什么会产生这样的思想呢?我们来看看一开始的实验结果:

image-20211121162557257

三种调度方式几乎没有任何区别。因为我是这样实现的,dummp里面每个sleep是睡眠1 ms,而我更改了循环次数。后来我觉得奇奇怪怪的,于是我固定了循环次数,更改了sleep的时间。结果还是得到一样的结果。这是为什么呢?

​ 因为这两种做法显然都不能观察到三种调度方式的区别,原因在于,for里面每一步执行的效率都是一样的,因此无论调度方式是什么,都不会有显著效果的差异。需要观察到三种调度方式的差异,就应该和老师上课一样,for里面每一步的操作需要的时间是不一样的,有梯度差异,这样才能体会到分配线程的调度方式对效率的影响。

困难二

​ 实现完成并行的代码后,与串行程序进行比较:

Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 17 ms

Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 20 ms

Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 13 ms

Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 9 ms

Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 7 ms

Serious time of matrix and vector is : 16 ms
Serious time of matrix and vector is : 12 ms

Serious time of matrix and vector is : 0 ms
Serious time of matrix and vector is : 19 ms

反复测量了很多次,可能是并行的开销有点大,实际并没有达到应有的计算效果。

困难三

​ 这个也算后续,因为我后来发现,这个网站下载的矩阵是按列压缩的。不过由于矩阵的形状是方阵,采用按行读取的结果是原矩阵的转置,不会影响计算的复杂度。

附件

Question 1

代码

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <windows.h>
#include <omp.h>

void dummy(int t) {
    Sleep(t);
}

int main() {
    clock_t start, finish;
    double Total_time;
    FILE *fp1 = fopen("static.txt", "w");
    FILE *fp2 = fopen("dynamic.txt", "w");
    FILE *fp3 = fopen("guided.txt", "w");

    for (int p = 1; p <= 5; p++) {
        // static, p
        start = clock();
        #pragma omp parallel for schedule(static, p) num_threads(4)
        for (int i = 0; i < 10; i++)
            dummy(i * 100);
        finish = clock();
        Total_time = (double)(finish - start);
        fprintf(fp1, "%f\n", Total_time);
        
        // dynamic, p
        start = clock();
        #pragma omp parallel for schedule(dynamic, p) num_threads(4)
        for (int i = 0; i < 10; i++)
            dummy(i * 100);
        finish = clock();
        Total_time = (double)(finish - start);
        fprintf(fp2, "%f\n", Total_time);

        // guided, p
        start = clock();
        #pragma omp parallel for schedule(guided, p) num_threads(4)
        for (int i = 0; i < 10; i++)
            dummy(i * 100);
        finish = clock();
        Total_time = (double)(finish - start);
        fprintf(fp3, "%f\n", Total_time);
    }

    printf("static\n");
#pragma omp parallel for schedule(static, 2) num_threads(4)
    for (int i = 0; i < 10; i++) {
        dummy(i * 100);
        printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
    }
    
    printf("dynamic\n");
#pragma omp parallel for schedule(dynamic, 2) num_threads(4)
    for (int i = 0; i < 10; i++) {
        dummy(i * 100);
        printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
    }

    printf("guided\n");
#pragma omp parallel for schedule(guided, 2) num_threads(4)
    for (int i = 0; i < 10; i++) {
        dummy(i * 100);
        printf("i = %d calls from thread %d\n", i, omp_get_thread_num());
    }
}

画图代码

import matplotlib.pyplot as plt
import numpy

fp1 = open('static.txt', 'r')
ls1 = []
for line in fp1:
    line = line.strip('\n')
    ls1.append(line.split(' ')[0])
ls1 = numpy.array(ls1, dtype = float)
fp1.close()

fp2 = open('dynamic.txt', 'r')
ls2 = []
for line in fp2:
    line = line.strip('\n')
    ls2.append(line.split(' ')[0])
ls2 = numpy.array(ls2, dtype = float)
fp2.close()

fp3 = open('guided.txt', 'r')
ls3 = []
for line in fp3:
    line = line.strip('\n')
    ls3.append(line.split(' ')[0])
ls3 = numpy.array(ls3, dtype = float)
fp3.close()


input_values = numpy.arange(1, 6, 1, dtype = int)
plt.plot(input_values, ls1)
plt.plot(input_values, ls2)
plt.plot(input_values, ls3)
plt.title("openmp for scheduling time delay", fontsize = 24)
plt.xticks(input_values)
plt.xlabel("chunk", fontsize = 14)
plt.ylabel("execute time (ms)", fontsize = 14)
plt.legend(['static', 'dynamic', 'guided'], loc = 'upper left')
# plt.tick_params(axis='both', labelsize = 14)
plt.show()

编译参数:

g++ -g -o 1 1.c -fopenmp python plt.py

Question 2

代码

#include <stdio.h>
#include <stdlib.h>
#include <queue>
#include <windows.h>
#include <omp.h>
#include <cassert>
#include <semaphore.h>
using namespace std;

sem_t n, s;
queue <int> source;

int produce() {
    Sleep(0.05);
    if (source.empty())
        return 1;
    else
        return source.back() + 1;
}

void consume(int i, int id) {
    printf("source %d is consumed by thread %d.\n", i, id);
}

void producer(int id) {
    while (true) {
        int x = produce();   
        sem_wait(&s);
        source.push(x);
        printf("source %d is produced by thread %d.\n", x, id);
        sem_post(&s);
        sem_post(&n);
    }
}

void consumer(int id) {
    while (true) {
        int x;
        sem_wait(&n);
        sem_wait(&s);
        x = source.front();
        source.pop();
        sem_post(&s);
        consume(x, id);
    }
}

int main() {
    sem_init(&n, 0, 0);
    sem_init(&s, 0, 1);
    
    // a varying number of producers and consumers
    int p_count = 4;
    int c_count = 4;
    // a single producer task and a single consumer task
    // int p_count = 1;
    // int c_count = 1;

	#pragma omp parallel num_threads(p_count + c_count)
	{
	    int id = omp_get_thread_num();
        printf("%d\n", id);
	    #pragma omp parallel sections
	    {
            #pragma omp section
            {
                int qwq = omp_get_thread_num();
                assert(qwq == id);
                exit(0);
                if (id < p_count)
                    producer(id);
            }
            #pragma omp section
            {
                int qwq = omp_get_thread_num();
                assert(qwq == id);
                exit(0);
                if (id >= p_count)
                    consumer(id);
            }
	    }
	}
}

编译参数:

g++ -g -o 2 2.c -fopenmp

Question 3

代码

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define BUFLEN 1000000

char buf[BUFLEN];

int main(int argc, char **argv)
{ 
    int my_id = 0;
    int p;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
    MPI_Comm_size(MPI_COMM_WORLD, &p);
    MPI_Status status_p;
    MPI_Barrier(MPI_COMM_WORLD);
    double my_start,my_end,my_elapsed,elapsed;
    my_start = MPI_Wtime();
    if(my_id == 0){
        for(int i = 1;i < p;i++){
            MPI_Send(buf,BUFLEN,MPI_CHAR,i,i,MPI_COMM_WORLD);
        }
    }
    else{
        MPI_Recv(buf,BUFLEN,MPI_CHAR,0,my_id,MPI_COMM_WORLD,&status_p);
    }
    my_end = MPI_Wtime();

    my_elapsed = my_end - my_start;

    MPI_Reduce(&my_elapsed,&elapsed,1,MPI_DOUBLE,MPI_MAX,0,MPI_COMM_WORLD);
    if(my_id == 0){
	printf("Time delay is %f s\n",elapsed);
        printf("Bandwidth is %f Mbit/s\n", BUFLEN * 1.0 / (1048576* elapsed));
    }
    MPI_Finalize();


    return 0;
}

矩阵文件:s3dkq4m2.dat

编译参数:

g++ -g -o 3 3.c -fopenmp