tsp并行化实现

超级计算机原理

Posted by Birdie on May 2, 2022

一、实验题目

​ 实现tsp问题,要求实现一个串行版本和MPI,OpenMP,pthread中的任意两种版本。

​ tsp问题:树型搜索问题(Tree Search Problem),也叫做旅行商问题(Traveling Salesman Problem)。假设有一个旅行商人要拜访n个城市,他必须选择所要走的路径,路径的限制是每个城市只能拜访一次,而且最后要回到原来出发的城市。路径的选择目标是要求得的路径路程为所有路径之中的最小值。

二、实验内容

串行版本

​ 众所周知,搜索可以通过记忆化,减少重复的搜索内容,从而提高运行效率。而记忆化搜索,本质上是可以写成动态规划的形式的。因此,该实验使用动态规划来实现该TSP问题。具体来说,应该叫做状态压缩动态规划。

​ 因为始终是需要走一个环的,因此无论起点在哪个点都是等价的。不妨假设起点$start=0$,即从$0$号节点开始。设DP函数$f[state][now]$表示当前正在$now$节点,并且已经访问了$state$这个状态的最短路径的长度。具体来说,$state$是一个$n$位的二进制数,其中第$i$位为$1$/$0$分别表示$i$号节点是/否已经被访问。

​ 有了状态表示,可以很清晰地写出状态转移方程:\(f[state][now]=min\lbracef[state'][last]+d[last][now]\rbrace\)

满足:

  • $((1«now) \& state)>0$,即$now$这个节点必须已被$state$访问

  • $state’=state-(1«now)$,即$state’$是还未访问$now$前的状态

  • $((1«last) \& state’)>0$,即$last$这个节点必须已被状态$state’$访问

  • $d[last][now]$表示一条从$last$到$now$的直接路径

  • 初始值$f[1][0]=0$,表示已经访问$0$号节点的最短路径长度为$0$

​ 在状态转移的时候有一个小技巧,当然这个技巧本身和串行代码是没有关系的,为了保持和并行代码的一致性而修改的,具体缘由会在后面部分阐述。观察状态转移方程,可以发现$f[state][now]$实际上是从$f[state’][last]$转移过来的。而$state$和$state’$的关系是:$state$中$1$的数量会比$state’$中1的数量多$1$。因此,在枚举状态$state$的时候,采用$state$中$1$的数量从小到到枚举。

​ 通过递推填写函数$f$,即可计算出从$0$号节点出发遍历所有点的最短路径。由于该问题需要环绕一圈回到出发点,即需要回到$0$号点,最终答案为:$min_{0\leq k<n}\lbrace f[2^{n}-1][k]+d[k][0]\rbrace$

​ 由于问题还需要求解出最短路径的方案,因此考虑使用 $route[state][now]$ 表示在表示当前正在$now$节点,并且已经访问了 $state$ 这个状态的最短路径。假设在$f[state][now]$转移的时候,是从状态$f[state’][last]$转移过来的,则\(route[state][now]=\lbrace route[state'][last],now\rbrace\)

​ 代码及注释如下:

#include <bits/stdc++.h>
using namespace std;

const int maxn=17;								//点数
const int INF=1e9+7;							//无穷大
int d[maxn][maxn];								//两点间的距离
int ans,tsp[(1<<maxn)][maxn];					 //ans表示最短路径长度,tsp即上文中的f
vector <int> ans_route,route[(1<<maxn)][maxn];	   //ans_route是最优路径,route入上文
vector <int> num[maxn];							 //num[i]表示1的数量为i-1的state

int main() {
    //从tsp2.txt中读取点对的距离
    freopen("tsp2.txt","r",stdin);
    //将结果输出至serious_answer.txt
    freopen("serious_answer.txt","w",stdout);

    //规定点数为maxn
    int n=maxn;
    //将点对距离保存至数组d中
    for (int i=0;i<n;i++)
        for (int j=0;j<n;j++)
            scanf("%d",&d[i][j]);

    //将数组中的每个状态都设为无穷大
    int now,state;
    for (now=0;now<n;now++)
        for (state=0;state<(1<<n);state++)
            tsp[state][now]=INF;
    //初始答案为无穷大
    ans=INF;

    //cnt记录state中1的数量
    int cnt;
    for (state=0;state<(1<<n);state++) {
        cnt=0;
        //计算state中有多少个1
        for (now=0;now<n;now++)
            if (state & (1<<now)) cnt++;
        //将state加入到对应1的个数的队列
        num[cnt-1].push_back(state);
    }
    
    //开始计算记录时间
    clock_t startTime,endTime;
    startTime = clock();

    //起始点默认为0
    int start=0;
    int laststate,last,i,j;

    for (i=0;i<maxn;i++) {
        for (j=0;j<num[i].size();j++) {
            state=num[i][j];
            for (now=0;now<n;now++) {
                //state需要满足的条件
                if ((state & (1<<start)) == 0) continue;
                if ((state & (1<<now)) == 0) continue;

                //laststate即未访问now前的上一个状态
                laststate=state-(1<<now);

                //初始化起始点
                if (laststate == 0) {
                    tsp[state][start] = 0;
                    route[state][start].push_back(start);
                    continue;
                }

                //状态转移
                for (last=0;last<n;last++) {
                    if ((laststate & (1<<last)) == 0) continue;
                    if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
                        tsp[state][now] = tsp[laststate][last] + d[last][now];
                        route[state][now] = route[laststate][last];
                        route[state][now].push_back(now);
                    }
                }

                //更新答案
                if (state == (1<<n)-1) {
                    tsp[state][now] += d[now][start];
                    if (tsp[state][now] < ans) {
                        ans = tsp[state][now];
                        ans_route = route[state][now];
                    }
                }
            }
        }
    }

    //结束计时并输出结果
    endTime = clock();
    printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
    printf("The minimal route's length is: %d\n",ans);
    printf("The route is: ");
    for (auto i:ans_route) printf("%d->",i);
    printf("%d\n",ans_route[0]);
}

openMP版本

​ 考虑优化串行版本的代码。

​ 先研究串行版本的时间开销。串行版本的时间复杂度为:$O(2^nnn*n)$,实际上最后一个$n$是在复制路径的过程中产生的,当然可以避免,但是没有必要,因为三个版本的代码只需要保持结构的统一即可进行时间的比较。但是秉持科学严谨的态度,在此阐述一下如何优化掉最后的一个$n$。

​ 在复制路径的时候,我们是采取将$route[state’][last]$中记录的整整一条路径完全交付给$route[state][now]$。实际上,这个过程是没有必要的。考虑这条路径中那些节点是$route[state][now]$需要的,实际上,只需要知道$now$的上一个访问节点是$last$,即可推断出整条路径。原因为:

  1. 假设最优的答案是$f[state][now]$,则最后一个访问的节点必然是$now$。通过$route[state][now]$,可以得知$now$的上一个访问节点是$last$;

  2. 因此上一个状态为$f[state-(1«now)][last]$,不妨设该状态为新的一个$f[state][now]$。重复第一个步骤,我们又可以找到访问$last$前的上一个节点;

  3. 重复步骤1和步骤2

  4. 直到$state=0$,即可还原出整条路径。

​ 考虑到$n$其实很小,主要的时间开销在于$2^n$的时间花费,这个花费是用于枚举状态$state$产生的。但是不难发现,$state$是具有数据依赖性的,计算$f[state][now]$是需要依赖上一个状态$state’$的。这就意味着,我们无法简单地对下面的$for$语句并行:

	for (int state=0;state<(1<<n);state++)

​ 简单地切分会导致计算某一个$state$的时候,$state’$的答案并未被计算出来。因此就有了上文中的那个小技巧。因为计算$state$的结果的时候,仅仅只需要依赖于比$state$少一个$1$的$state’$的答案。因此,对于具有相同的$1$的个数的$state$,其计算过程都是没有数据依赖的。就是因为这个特性,上文才将所有的$state$根据$1$的数量划分成17个部分。

​ 因此,对于每一个部分的枚举$state$的$for$语句,都可以使用下属语句优化:

# pragma omp parallel for num_threads(thread_count)

​ $parallel for$指令可以生成一组线程来执行后面的结构化代码块。在该被$parallel for$指令并行化的$for$循环中,线程间的缺省划分方式是由系统决定的。大部分系统会粗略地使用块划分,即如果有$(1«n)$次迭代,则大约$(1«n)/threadcount$次迭代会被分配到线程$0$,接下来约$(1«n)/threadcount$次迭代会被分配到线程$1$,以此类推。由于我们并不关心具体是如何分配的,因此可以使用$parallel for$指令。

​ 值得注意的地方有两个:

  • 在$parallel for$指令并行化的$for$循环中,变量的作用域的问题。并行计算时循环变量应该是私有的,这样每个线程会有自己的变量副本,从而不会影响其他线程的运行。因此,在$for$循环块中使用的局部变量都应该声明为私有变量,在openMP中,使用$private$指令。

  • $ans$和$ansroute$每次只能被一个线程修改。这其实是一个竞争条件,多次线程试图访问并更新同一个共享资源,可能会导致错误。因此,我们需要将$ans$和$ansroute$更新的部分放入临界区内,在openMP中,使用$critical$指令。

​ 代码及注释如下:

#include <bits/stdc++.h>
#include <omp.h>
using namespace std;

const int maxn=17;
const int INF=1e9+7;
int d[maxn][maxn];
int ans,tsp[(1<<maxn)][maxn];
vector <int> ans_route,route[(1<<maxn)][maxn];
vector <int> num[maxn];

int main() {
    freopen("tsp2.txt","r",stdin);
    freopen("openmp_answer.txt","w",stdout);

    //使用六个线程
    int thread_count=6;

    int n=maxn;
    for (int i=0;i<n;i++)
        for (int j=0;j<n;j++)
            scanf("%d",&d[i][j]);

    int now,state;
    //初始化操作可以使用parallel for并行化
    //由于每个线程不会访问到tsp的同一个位置,不需要加入临界区
#  pragma omp parallel for num_threads(thread_count) private(state,now)
    for (now=0;now<n;now++)
        for (state=0;state<(1<<n);state++)
            tsp[state][now]=INF;
    ans=INF;

    int cnt;
    //同样的,state的分类也可以使用parallel for并行化
#  pragma omp parallel for num_threads(thread_count) private(cnt,state,now)
    for (state=0;state<(1<<n);state++) {
        cnt=0;
        for (now=0;now<n;now++)
            if (state & (1<<now)) cnt++;
        //num的不能同时被多个线程更新,需要加入临界区
        # pragma omp critical
        num[cnt-1].push_back(state);
    }

    clock_t startTime,endTime;
    startTime = clock();
    
    int start=0;
    int laststate,last,i,j;

    for (i=0;i<maxn;i++) {
        //对于每一类state,创建thread_count个线程并行化
        //结构块中的j/state/now/laststate/last变量都需要声明为线程的私有变量
        # pragma omp parallel for num_threads(thread_count) private(j,state,now,laststate,last)
        for (j=0;j<num[i].size();j++) {
            state=num[i][j];
            for (now=0;now<n;now++) {
                if ((state & (1<<start)) == 0) continue;
                if ((state & (1<<now)) == 0) continue;

                laststate=state-(1<<now);

                if (laststate == 0) {
                    tsp[state][start] = 0;
                    route[state][start].push_back(start);
                    continue;
                }

                for (last=0;last<n;last++) {
                    if ((laststate & (1<<last)) == 0) continue;
                    if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
                        tsp[state][now] = tsp[laststate][last] + d[last][now];
                        route[state][now] = route[laststate][last];
                        route[state][now].push_back(now);
                    }
                }

                if (state == (1<<n)-1) {
                    tsp[state][now] += d[now][start];
                    if (tsp[state][now] < ans) {
                        //更新答案的部分需要加入临界区
                        # pragma omp critical
                        ans = tsp[state][now];
                        # pragma omp critical
                        ans_route = route[state][now];
                    }
                }
            }
        }
    }

    endTime = clock();
    printf("Number of thread: %d\n",thread_count);
    printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
    printf("The minimal route's length is: %d\n",ans);
    printf("The route is: ");
    for (auto i:ans_route) printf("%d->",i);
    printf("%d\n",ans_route[0]);
}

pthread版本

​ 和openMP一样,考虑优化串行版本的代码。需要并行的部分其实和openMP相同,现在考虑将openMP改写成pthread。

​ 在写代码前,需要先了解一下pthread创建线程的方式。

  • 启动线程

    在pthread中,通常使用$pthread_create$函数来生成线程。其语法为:

    int pthread_create(
    	pthread_t*				thread_p				/* out */,
    	const pthread_attr_t*	 attr_p					 /* in  */,
    	void*				    (*start_rountine)(void*) /* in  */,
    	void*				    arg_p				    /* in  */);
    

    第一个参数是一个指针,指向对应的$pthread_t$对象。$pthread_t$数据结构用来存储线程的专有信息。

    第二个不用,在函数调用的时候把$NULL$传递给参数。

    第三个参数是线程即将要运行的函数。

    第四个参数也是一个指针,指向传给函数$start_rountine$的参数。

  • 停止线程

    pthread中,函数$pthread_join$将等待$pthread_t$对象所关联的那个线程结束。其语法为:

    int pthread_join(
    	pthread_t	thread		/* in  */
    	void** 		ret_val_p	/* out */);
    

    第二个参数可以接收任意有$pthread_t$对象所关联的那个线程的返回值。

​ 有了这些基础知识后,可以开始代码前的准备工作了。

​ 首先,需要创建$thread_count$个$pthread_t$对象,用于存储线程的专有信息。

pthread_t* thread_handles;
thread_handles=(pthread_t*)malloc(thread_count*sizeof(pthread_t));

​ 其次,将要并行的部分改写成函数的形式。

​ 在pthread中,设置临界区的方法有很多中,常用的有三种。

  • 忙等待。即当$i$号线程需要执行临界区中的语句的时候,先等待$i-1$号线程执行完成。

  • 互斥量。互斥量是一个特殊类型的变量,通过某些特殊类型的函数,限制每次只能有一个线程进入临界区。

    初始化函数如下:

    int pthread_mutex_init(
    	pthread_mutex_t*			mutex_p		/* out */
    	const pthread_mutexattr_r*	 attr_p		 /* in  */);
    

    使用完互斥量后,销毁函数如下:

    int pthread_mutex_destroy(pthread_mutex_t* mutex_p);
    

    具体使用方法如下:

    pthread_mutex_lock(&mutex);
    /*临界区*/
    pthread_mutex_unlock(&mutex);
    
  • 信号量。信号量可以认为是一种特殊的$unsigned int$无符号整型变量。大多数情况下,只给它们赋值$0$和$1$,这样的信号量称为二元信号量。粗略地讲,$0$对应于上了锁的互斥量,$1$对应于未上锁的互斥量。要把一个二元信号量用作互斥量的时候,需要先把信号量的初始值初始化为$1$,即开锁状态。在要保护的临界区前调用函数$sem_wait$,线程执行到$sem_wait$函数时,如果信号量为$0$,线程就会被阻塞。如果信号量是非$0$值,就减$1$后进入临界区。执行完临界区内对应的操作后,再调用$sem_post$对信号量的值加$1$。

​ 在这里,我们使用信号量对更新答案的部分设置临界区。

​ 代码及注释如下:

#include <bits/stdc++.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;

const int maxn=17;
const int INF=1e9+7;
int d[maxn][maxn];
int ans,tsp[(1<<maxn)][maxn];
vector <int> ans_route,route[(1<<maxn)][maxn];
vector <int> num[maxn];

int thread_count=6;
int n=maxn;
int start=0;
int number1;
sem_t sem,sem_num[maxn],sem_ans;

//初始化函数
void* clear(void* rank) {
    int my_rank=*(int*)(&rank);
    int local_n=(1<<n)/thread_count;
    int last_n=(1<<n)%thread_count;

    //计算该线程需要循环的部分
    //如果(1<<n)能够被thread_count整除,那该线程会被分配local_n次迭代
    //前last_n个线程还会额外被分配1次迭代
    int my_first_num=my_rank*local_n;
    my_first_num+=min(my_rank,last_n);
    int my_last_num=(my_rank+1)*local_n-1;
    my_last_num+=min(my_rank+1,last_n);

    for (int state=my_first_num;state<=my_last_num;state++)
        for (int now=0;now<n;now++)
            tsp[state][now]=INF;
    
    return NULL;
}

//state的分类函数
void* getnum(void* rank) {
    int my_rank=*(int*)(&rank);
    int local_n=(1<<n)/thread_count;
    int last_n=(1<<n)%thread_count;

    int my_first_num=my_rank*local_n;
    my_first_num+=min(my_rank,last_n);
    int my_last_num=(my_rank+1)*local_n-1;
    my_last_num+=min(my_rank+1,last_n);

    int cnt;
    for (int state=my_first_num;state<=my_last_num;state++) {
        cnt=0;
        for (int now=0;now<n;now++)
            if (state & (1<<now)) cnt++;
        if (cnt==0) continue;

        //对于每一类1的数量的state,都需要一个信号量
        //保证该类state被放入num的时候,不会被多个线程同时修改
        sem_wait(&sem_num[cnt-1]);
        num[cnt-1].push_back(state);
        sem_post(&sem_num[cnt-1]);
    }

    return NULL;
}

//动态规划计算答案部分
void* dp(void* rank) {
    int my_rank=*(int*)(&rank);
    int local_n=num[number1].size()/thread_count;
    int last_n=num[number1].size()%thread_count;

    int my_first_num=my_rank*local_n;
    my_first_num+=min(my_rank,last_n);
    int my_last_num=(my_rank+1)*local_n-1;
    my_last_num+=min(my_rank+1,last_n);

    for (int j=my_first_num;j<=my_last_num;j++) {
        int state=num[number1][j];
        for (int now=0;now<n;now++) {
            if ((state & (1<<start)) == 0) continue;
            if ((state & (1<<now)) == 0) continue;

            int laststate=state-(1<<now);

            if (laststate == 0) {
                tsp[state][start] = 0;
                route[state][start].push_back(start);
                continue;
            }

            for (int last=0;last<n;last++) {
                if ((laststate & (1<<last)) == 0) continue;
                if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
                    tsp[state][now] = tsp[laststate][last] + d[last][now];
                    route[state][now] = route[laststate][last];
                    route[state][now].push_back(now);
                }
            }

            if (state == (1<<n)-1) {
                tsp[state][now] += d[now][start];
                if (tsp[state][now] < ans) {
                    //使用信号量设置临界区
                    sem_wait(&sem_ans);
                    ans = tsp[state][now];
                    ans_route = route[state][now];
                    sem_post(&sem_ans);
                }
            }
        }
    }

    return NULL;
}

int main() {
    freopen("tsp2.txt","r",stdin);
    freopen("pthread_answer.txt","w",stdout);

    //创建并初始化pthread_t对象
    pthread_t* thread_handles;
    thread_handles=(pthread_t*)malloc(thread_count*sizeof(pthread_t));
    
    //初始化信号量
    sem_init(&sem, 0, 1);
    sem_init(&sem_ans, 0, 1);
    for (int i=0;i<n;i++)
        sem_init(&sem_num[i], 0, 1);

    for (int i=0;i<n;i++)
        for (int j=0;j<n;j++)
            scanf("%d",&d[i][j]);

    //初始化部分
    void* a1;
    int thread1;
    for (thread1=0;thread1<thread_count;thread1++)
        pthread_create(&thread_handles[thread1],NULL,clear,(void*)thread1);
    ans=INF;
    //ptrhead_join的意义是等待初始化完成后,再执行后面的部分
    for (thread1=0;thread1<thread_count;thread1++)
        pthread_join(thread_handles[thread1],&a1);
    //cerr<<"clear is finished!"<<endl;

    //state的分类部分
    void* a2;
    int thread2;
    for (thread2=0;thread2<thread_count;thread2++)
        pthread_create(&thread_handles[thread2],NULL,getnum,(void*)thread2);
    //ptrhead_join的意义是等待分类完成后,再执行后面的部分
    for (thread2=0;thread2<thread_count;thread2++)
        pthread_join(thread_handles[thread2],&a2);
    //cerr<<"getnum is finished!"<<endl;
    
    clock_t startTime,endTime;
    startTime = clock();

    //动态规划
    int thread3;
    for (number1=0;number1<maxn;number1++) {
        void* a3;
        for (thread3=0;thread3<thread_count;thread3++)
            pthread_create(&thread_handles[thread3],NULL,dp,(void*)thread3);
        for (thread3=0;thread3<thread_count;thread3++)
            pthread_join(thread_handles[thread3],&a3);
    }
    //cerr<<"dp is finished!"<<endl;

    endTime = clock();
    printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
    printf("The minimal route's length is: %d\n",ans);
    printf("The route is: ");
    for (auto i:ans_route) printf("%d->",i);
    printf("%d\n",ans_route[0]);

    return 0;
}

三、实验结果

使用小规模输入文件为tsp2.txt,文件内是城市之间的距离矩阵。

串行版本

​ 使用命令:

g++ -g -O2 -o serious serious.cpp

​ 生成可执行文件serious,运行:

./serious

​ 得到结果如下:

The run time is: 0.338000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0

openMP版本

​ 使用命令:

g++ -g -O2 -fopenmp -o openmp openmp.cpp 

​ 生成可执行文件openmp,运行:

./openmp

​ 得到结果如下:

Number of thread: 6
The run time is: 0.161000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0

pthread版本

​ 使用命令:

g++ -g -O2 -o pthread pthread.cpp -lpthread

​ 生成可执行文件pthread,运行:

./pthread

​ 得到结果如下:

The run time is: 0.296000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0