一、实验题目
实现tsp问题,要求实现一个串行版本和MPI,OpenMP,pthread中的任意两种版本。
tsp问题:树型搜索问题(Tree Search Problem),也叫做旅行商问题(Traveling Salesman Problem)。假设有一个旅行商人要拜访n个城市,他必须选择所要走的路径,路径的限制是每个城市只能拜访一次,而且最后要回到原来出发的城市。路径的选择目标是要求得的路径路程为所有路径之中的最小值。
二、实验内容
串行版本
众所周知,搜索可以通过记忆化,减少重复的搜索内容,从而提高运行效率。而记忆化搜索,本质上是可以写成动态规划的形式的。因此,该实验使用动态规划来实现该TSP问题。具体来说,应该叫做状态压缩动态规划。
因为始终是需要走一个环的,因此无论起点在哪个点都是等价的。不妨假设起点$start=0$,即从$0$号节点开始。设DP函数$f[state][now]$表示当前正在$now$节点,并且已经访问了$state$这个状态的最短路径的长度。具体来说,$state$是一个$n$位的二进制数,其中第$i$位为$1$/$0$分别表示$i$号节点是/否已经被访问。
有了状态表示,可以很清晰地写出状态转移方程:\(f[state][now]=min\lbracef[state'][last]+d[last][now]\rbrace\)
满足:
-
$((1«now) \& state)>0$,即$now$这个节点必须已被$state$访问
-
$state’=state-(1«now)$,即$state’$是还未访问$now$前的状态
-
$((1«last) \& state’)>0$,即$last$这个节点必须已被状态$state’$访问
-
$d[last][now]$表示一条从$last$到$now$的直接路径
-
初始值$f[1][0]=0$,表示已经访问$0$号节点的最短路径长度为$0$
在状态转移的时候有一个小技巧,当然这个技巧本身和串行代码是没有关系的,为了保持和并行代码的一致性而修改的,具体缘由会在后面部分阐述。观察状态转移方程,可以发现$f[state][now]$实际上是从$f[state’][last]$转移过来的。而$state$和$state’$的关系是:$state$中$1$的数量会比$state’$中1的数量多$1$。因此,在枚举状态$state$的时候,采用$state$中$1$的数量从小到到枚举。
通过递推填写函数$f$,即可计算出从$0$号节点出发遍历所有点的最短路径。由于该问题需要环绕一圈回到出发点,即需要回到$0$号点,最终答案为:$min_{0\leq k<n}\lbrace f[2^{n}-1][k]+d[k][0]\rbrace$
由于问题还需要求解出最短路径的方案,因此考虑使用 $route[state][now]$ 表示在表示当前正在$now$节点,并且已经访问了 $state$ 这个状态的最短路径。假设在$f[state][now]$转移的时候,是从状态$f[state’][last]$转移过来的,则\(route[state][now]=\lbrace route[state'][last],now\rbrace\)
代码及注释如下:
#include <bits/stdc++.h>
using namespace std;
const int maxn=17; //点数
const int INF=1e9+7; //无穷大
int d[maxn][maxn]; //两点间的距离
int ans,tsp[(1<<maxn)][maxn]; //ans表示最短路径长度,tsp即上文中的f
vector <int> ans_route,route[(1<<maxn)][maxn]; //ans_route是最优路径,route入上文
vector <int> num[maxn]; //num[i]表示1的数量为i-1的state
int main() {
//从tsp2.txt中读取点对的距离
freopen("tsp2.txt","r",stdin);
//将结果输出至serious_answer.txt
freopen("serious_answer.txt","w",stdout);
//规定点数为maxn
int n=maxn;
//将点对距离保存至数组d中
for (int i=0;i<n;i++)
for (int j=0;j<n;j++)
scanf("%d",&d[i][j]);
//将数组中的每个状态都设为无穷大
int now,state;
for (now=0;now<n;now++)
for (state=0;state<(1<<n);state++)
tsp[state][now]=INF;
//初始答案为无穷大
ans=INF;
//cnt记录state中1的数量
int cnt;
for (state=0;state<(1<<n);state++) {
cnt=0;
//计算state中有多少个1
for (now=0;now<n;now++)
if (state & (1<<now)) cnt++;
//将state加入到对应1的个数的队列
num[cnt-1].push_back(state);
}
//开始计算记录时间
clock_t startTime,endTime;
startTime = clock();
//起始点默认为0
int start=0;
int laststate,last,i,j;
for (i=0;i<maxn;i++) {
for (j=0;j<num[i].size();j++) {
state=num[i][j];
for (now=0;now<n;now++) {
//state需要满足的条件
if ((state & (1<<start)) == 0) continue;
if ((state & (1<<now)) == 0) continue;
//laststate即未访问now前的上一个状态
laststate=state-(1<<now);
//初始化起始点
if (laststate == 0) {
tsp[state][start] = 0;
route[state][start].push_back(start);
continue;
}
//状态转移
for (last=0;last<n;last++) {
if ((laststate & (1<<last)) == 0) continue;
if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
tsp[state][now] = tsp[laststate][last] + d[last][now];
route[state][now] = route[laststate][last];
route[state][now].push_back(now);
}
}
//更新答案
if (state == (1<<n)-1) {
tsp[state][now] += d[now][start];
if (tsp[state][now] < ans) {
ans = tsp[state][now];
ans_route = route[state][now];
}
}
}
}
}
//结束计时并输出结果
endTime = clock();
printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
printf("The minimal route's length is: %d\n",ans);
printf("The route is: ");
for (auto i:ans_route) printf("%d->",i);
printf("%d\n",ans_route[0]);
}
openMP版本
考虑优化串行版本的代码。
先研究串行版本的时间开销。串行版本的时间复杂度为:$O(2^nnn*n)$,实际上最后一个$n$是在复制路径的过程中产生的,当然可以避免,但是没有必要,因为三个版本的代码只需要保持结构的统一即可进行时间的比较。但是秉持科学严谨的态度,在此阐述一下如何优化掉最后的一个$n$。
在复制路径的时候,我们是采取将$route[state’][last]$中记录的整整一条路径完全交付给$route[state][now]$。实际上,这个过程是没有必要的。考虑这条路径中那些节点是$route[state][now]$需要的,实际上,只需要知道$now$的上一个访问节点是$last$,即可推断出整条路径。原因为:
-
假设最优的答案是$f[state][now]$,则最后一个访问的节点必然是$now$。通过$route[state][now]$,可以得知$now$的上一个访问节点是$last$;
-
因此上一个状态为$f[state-(1«now)][last]$,不妨设该状态为新的一个$f[state][now]$。重复第一个步骤,我们又可以找到访问$last$前的上一个节点;
-
重复步骤1和步骤2
-
直到$state=0$,即可还原出整条路径。
考虑到$n$其实很小,主要的时间开销在于$2^n$的时间花费,这个花费是用于枚举状态$state$产生的。但是不难发现,$state$是具有数据依赖性的,计算$f[state][now]$是需要依赖上一个状态$state’$的。这就意味着,我们无法简单地对下面的$for$语句并行:
for (int state=0;state<(1<<n);state++)
简单地切分会导致计算某一个$state$的时候,$state’$的答案并未被计算出来。因此就有了上文中的那个小技巧。因为计算$state$的结果的时候,仅仅只需要依赖于比$state$少一个$1$的$state’$的答案。因此,对于具有相同的$1$的个数的$state$,其计算过程都是没有数据依赖的。就是因为这个特性,上文才将所有的$state$根据$1$的数量划分成17个部分。
因此,对于每一个部分的枚举$state$的$for$语句,都可以使用下属语句优化:
# pragma omp parallel for num_threads(thread_count)
$parallel for$指令可以生成一组线程来执行后面的结构化代码块。在该被$parallel for$指令并行化的$for$循环中,线程间的缺省划分方式是由系统决定的。大部分系统会粗略地使用块划分,即如果有$(1«n)$次迭代,则大约$(1«n)/threadcount$次迭代会被分配到线程$0$,接下来约$(1«n)/threadcount$次迭代会被分配到线程$1$,以此类推。由于我们并不关心具体是如何分配的,因此可以使用$parallel for$指令。
值得注意的地方有两个:
-
在$parallel for$指令并行化的$for$循环中,变量的作用域的问题。并行计算时循环变量应该是私有的,这样每个线程会有自己的变量副本,从而不会影响其他线程的运行。因此,在$for$循环块中使用的局部变量都应该声明为私有变量,在openMP中,使用$private$指令。
-
$ans$和$ansroute$每次只能被一个线程修改。这其实是一个竞争条件,多次线程试图访问并更新同一个共享资源,可能会导致错误。因此,我们需要将$ans$和$ansroute$更新的部分放入临界区内,在openMP中,使用$critical$指令。
代码及注释如下:
#include <bits/stdc++.h>
#include <omp.h>
using namespace std;
const int maxn=17;
const int INF=1e9+7;
int d[maxn][maxn];
int ans,tsp[(1<<maxn)][maxn];
vector <int> ans_route,route[(1<<maxn)][maxn];
vector <int> num[maxn];
int main() {
freopen("tsp2.txt","r",stdin);
freopen("openmp_answer.txt","w",stdout);
//使用六个线程
int thread_count=6;
int n=maxn;
for (int i=0;i<n;i++)
for (int j=0;j<n;j++)
scanf("%d",&d[i][j]);
int now,state;
//初始化操作可以使用parallel for并行化
//由于每个线程不会访问到tsp的同一个位置,不需要加入临界区
# pragma omp parallel for num_threads(thread_count) private(state,now)
for (now=0;now<n;now++)
for (state=0;state<(1<<n);state++)
tsp[state][now]=INF;
ans=INF;
int cnt;
//同样的,state的分类也可以使用parallel for并行化
# pragma omp parallel for num_threads(thread_count) private(cnt,state,now)
for (state=0;state<(1<<n);state++) {
cnt=0;
for (now=0;now<n;now++)
if (state & (1<<now)) cnt++;
//num的不能同时被多个线程更新,需要加入临界区
# pragma omp critical
num[cnt-1].push_back(state);
}
clock_t startTime,endTime;
startTime = clock();
int start=0;
int laststate,last,i,j;
for (i=0;i<maxn;i++) {
//对于每一类state,创建thread_count个线程并行化
//结构块中的j/state/now/laststate/last变量都需要声明为线程的私有变量
# pragma omp parallel for num_threads(thread_count) private(j,state,now,laststate,last)
for (j=0;j<num[i].size();j++) {
state=num[i][j];
for (now=0;now<n;now++) {
if ((state & (1<<start)) == 0) continue;
if ((state & (1<<now)) == 0) continue;
laststate=state-(1<<now);
if (laststate == 0) {
tsp[state][start] = 0;
route[state][start].push_back(start);
continue;
}
for (last=0;last<n;last++) {
if ((laststate & (1<<last)) == 0) continue;
if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
tsp[state][now] = tsp[laststate][last] + d[last][now];
route[state][now] = route[laststate][last];
route[state][now].push_back(now);
}
}
if (state == (1<<n)-1) {
tsp[state][now] += d[now][start];
if (tsp[state][now] < ans) {
//更新答案的部分需要加入临界区
# pragma omp critical
ans = tsp[state][now];
# pragma omp critical
ans_route = route[state][now];
}
}
}
}
}
endTime = clock();
printf("Number of thread: %d\n",thread_count);
printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
printf("The minimal route's length is: %d\n",ans);
printf("The route is: ");
for (auto i:ans_route) printf("%d->",i);
printf("%d\n",ans_route[0]);
}
pthread版本
和openMP一样,考虑优化串行版本的代码。需要并行的部分其实和openMP相同,现在考虑将openMP改写成pthread。
在写代码前,需要先了解一下pthread创建线程的方式。
-
启动线程
在pthread中,通常使用$pthread_create$函数来生成线程。其语法为:
int pthread_create( pthread_t* thread_p /* out */, const pthread_attr_t* attr_p /* in */, void* (*start_rountine)(void*) /* in */, void* arg_p /* in */);
第一个参数是一个指针,指向对应的$pthread_t$对象。$pthread_t$数据结构用来存储线程的专有信息。
第二个不用,在函数调用的时候把$NULL$传递给参数。
第三个参数是线程即将要运行的函数。
第四个参数也是一个指针,指向传给函数$start_rountine$的参数。
-
停止线程
pthread中,函数$pthread_join$将等待$pthread_t$对象所关联的那个线程结束。其语法为:
int pthread_join( pthread_t thread /* in */ void** ret_val_p /* out */);
第二个参数可以接收任意有$pthread_t$对象所关联的那个线程的返回值。
有了这些基础知识后,可以开始代码前的准备工作了。
首先,需要创建$thread_count$个$pthread_t$对象,用于存储线程的专有信息。
pthread_t* thread_handles;
thread_handles=(pthread_t*)malloc(thread_count*sizeof(pthread_t));
其次,将要并行的部分改写成函数的形式。
在pthread中,设置临界区的方法有很多中,常用的有三种。
-
忙等待。即当$i$号线程需要执行临界区中的语句的时候,先等待$i-1$号线程执行完成。
-
互斥量。互斥量是一个特殊类型的变量,通过某些特殊类型的函数,限制每次只能有一个线程进入临界区。
初始化函数如下:
int pthread_mutex_init( pthread_mutex_t* mutex_p /* out */ const pthread_mutexattr_r* attr_p /* in */);
使用完互斥量后,销毁函数如下:
int pthread_mutex_destroy(pthread_mutex_t* mutex_p);
具体使用方法如下:
pthread_mutex_lock(&mutex); /*临界区*/ pthread_mutex_unlock(&mutex);
-
信号量。信号量可以认为是一种特殊的$unsigned int$无符号整型变量。大多数情况下,只给它们赋值$0$和$1$,这样的信号量称为二元信号量。粗略地讲,$0$对应于上了锁的互斥量,$1$对应于未上锁的互斥量。要把一个二元信号量用作互斥量的时候,需要先把信号量的初始值初始化为$1$,即开锁状态。在要保护的临界区前调用函数$sem_wait$,线程执行到$sem_wait$函数时,如果信号量为$0$,线程就会被阻塞。如果信号量是非$0$值,就减$1$后进入临界区。执行完临界区内对应的操作后,再调用$sem_post$对信号量的值加$1$。
在这里,我们使用信号量对更新答案的部分设置临界区。
代码及注释如下:
#include <bits/stdc++.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
const int maxn=17;
const int INF=1e9+7;
int d[maxn][maxn];
int ans,tsp[(1<<maxn)][maxn];
vector <int> ans_route,route[(1<<maxn)][maxn];
vector <int> num[maxn];
int thread_count=6;
int n=maxn;
int start=0;
int number1;
sem_t sem,sem_num[maxn],sem_ans;
//初始化函数
void* clear(void* rank) {
int my_rank=*(int*)(&rank);
int local_n=(1<<n)/thread_count;
int last_n=(1<<n)%thread_count;
//计算该线程需要循环的部分
//如果(1<<n)能够被thread_count整除,那该线程会被分配local_n次迭代
//前last_n个线程还会额外被分配1次迭代
int my_first_num=my_rank*local_n;
my_first_num+=min(my_rank,last_n);
int my_last_num=(my_rank+1)*local_n-1;
my_last_num+=min(my_rank+1,last_n);
for (int state=my_first_num;state<=my_last_num;state++)
for (int now=0;now<n;now++)
tsp[state][now]=INF;
return NULL;
}
//state的分类函数
void* getnum(void* rank) {
int my_rank=*(int*)(&rank);
int local_n=(1<<n)/thread_count;
int last_n=(1<<n)%thread_count;
int my_first_num=my_rank*local_n;
my_first_num+=min(my_rank,last_n);
int my_last_num=(my_rank+1)*local_n-1;
my_last_num+=min(my_rank+1,last_n);
int cnt;
for (int state=my_first_num;state<=my_last_num;state++) {
cnt=0;
for (int now=0;now<n;now++)
if (state & (1<<now)) cnt++;
if (cnt==0) continue;
//对于每一类1的数量的state,都需要一个信号量
//保证该类state被放入num的时候,不会被多个线程同时修改
sem_wait(&sem_num[cnt-1]);
num[cnt-1].push_back(state);
sem_post(&sem_num[cnt-1]);
}
return NULL;
}
//动态规划计算答案部分
void* dp(void* rank) {
int my_rank=*(int*)(&rank);
int local_n=num[number1].size()/thread_count;
int last_n=num[number1].size()%thread_count;
int my_first_num=my_rank*local_n;
my_first_num+=min(my_rank,last_n);
int my_last_num=(my_rank+1)*local_n-1;
my_last_num+=min(my_rank+1,last_n);
for (int j=my_first_num;j<=my_last_num;j++) {
int state=num[number1][j];
for (int now=0;now<n;now++) {
if ((state & (1<<start)) == 0) continue;
if ((state & (1<<now)) == 0) continue;
int laststate=state-(1<<now);
if (laststate == 0) {
tsp[state][start] = 0;
route[state][start].push_back(start);
continue;
}
for (int last=0;last<n;last++) {
if ((laststate & (1<<last)) == 0) continue;
if (tsp[state][now] > tsp[laststate][last] + d[last][now]) {
tsp[state][now] = tsp[laststate][last] + d[last][now];
route[state][now] = route[laststate][last];
route[state][now].push_back(now);
}
}
if (state == (1<<n)-1) {
tsp[state][now] += d[now][start];
if (tsp[state][now] < ans) {
//使用信号量设置临界区
sem_wait(&sem_ans);
ans = tsp[state][now];
ans_route = route[state][now];
sem_post(&sem_ans);
}
}
}
}
return NULL;
}
int main() {
freopen("tsp2.txt","r",stdin);
freopen("pthread_answer.txt","w",stdout);
//创建并初始化pthread_t对象
pthread_t* thread_handles;
thread_handles=(pthread_t*)malloc(thread_count*sizeof(pthread_t));
//初始化信号量
sem_init(&sem, 0, 1);
sem_init(&sem_ans, 0, 1);
for (int i=0;i<n;i++)
sem_init(&sem_num[i], 0, 1);
for (int i=0;i<n;i++)
for (int j=0;j<n;j++)
scanf("%d",&d[i][j]);
//初始化部分
void* a1;
int thread1;
for (thread1=0;thread1<thread_count;thread1++)
pthread_create(&thread_handles[thread1],NULL,clear,(void*)thread1);
ans=INF;
//ptrhead_join的意义是等待初始化完成后,再执行后面的部分
for (thread1=0;thread1<thread_count;thread1++)
pthread_join(thread_handles[thread1],&a1);
//cerr<<"clear is finished!"<<endl;
//state的分类部分
void* a2;
int thread2;
for (thread2=0;thread2<thread_count;thread2++)
pthread_create(&thread_handles[thread2],NULL,getnum,(void*)thread2);
//ptrhead_join的意义是等待分类完成后,再执行后面的部分
for (thread2=0;thread2<thread_count;thread2++)
pthread_join(thread_handles[thread2],&a2);
//cerr<<"getnum is finished!"<<endl;
clock_t startTime,endTime;
startTime = clock();
//动态规划
int thread3;
for (number1=0;number1<maxn;number1++) {
void* a3;
for (thread3=0;thread3<thread_count;thread3++)
pthread_create(&thread_handles[thread3],NULL,dp,(void*)thread3);
for (thread3=0;thread3<thread_count;thread3++)
pthread_join(thread_handles[thread3],&a3);
}
//cerr<<"dp is finished!"<<endl;
endTime = clock();
printf("The run time is: %lfs\n",(double)(endTime-startTime)/CLOCKS_PER_SEC);
printf("The minimal route's length is: %d\n",ans);
printf("The route is: ");
for (auto i:ans_route) printf("%d->",i);
printf("%d\n",ans_route[0]);
return 0;
}
三、实验结果
使用小规模输入文件为tsp2.txt,文件内是城市之间的距离矩阵。
串行版本
使用命令:
g++ -g -O2 -o serious serious.cpp
生成可执行文件serious,运行:
./serious
得到结果如下:
The run time is: 0.338000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0
openMP版本
使用命令:
g++ -g -O2 -fopenmp -o openmp openmp.cpp
生成可执行文件openmp,运行:
./openmp
得到结果如下:
Number of thread: 6
The run time is: 0.161000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0
pthread版本
使用命令:
g++ -g -O2 -o pthread pthread.cpp -lpthread
生成可执行文件pthread,运行:
./pthread
得到结果如下:
The run time is: 0.296000s
The minimal route's length is: 2085
The route is: 0->15->11->8->4->1->9->10->2->14->13->16->5->7->6->12->3->0