1. 介绍
归并排序(Merge Sort)是一种比较高效、稳定的排序算法,它的基本思想是将两个有序的子序列合并成一个大的有序序列。它的时间复杂度为O(nlogn),比较适合于大数据量的排序。在本文中,我们将使用多线程在C++中实现归并排序,从而提高排序效率。
2. 归并排序算法原理
归并排序的基本思想是采用分治法,将待排序的序列分成若干个子序列,每个子序列都是有序的,然后再将有序的子序列合并成一个有序的序列,直到整个序列有序为止。归并排序的具体实现可以用递归或非递归的方式来实现。
2.1. 递归实现
递归实现归并排序,可以将序列反复的二分,直到每个分区只有一个元素,然后进行合并。
void mergeSort(int a[], int left, int right)
{
if(left < right)
{
int mid = (left + right) / 2;
mergeSort(a, left, mid);
mergeSort(a, mid + 1, right);
merge(a, left, mid, right);
}
}
其中,merge()函数是将两个有序的子序列合并为一个有序的序列。
2.2. 非递归实现
非递归实现归并排序,可以使用迭代的方式来实现,可以用一个变量记录序列的长度,每次将子序列长度倍增,并且对这些子序列进行两两合并,形成更大的有序子序列,直到整个序列有序。
void mergeSort(int a[], int len)
{
int left, mid, right;
int i;
for(i=1; i
{
left = 0;
while(left+i < len)
{
mid = left + i - 1;
right = mid + i;
if(right > len-1)
right = len-1;
merge(a, left, mid, right);
left = right + 1;
}
}
}
3. 多线程实现归并排序
归并排序是一个适合多线程并行的算法,因为在排序的过程中,每个子序列是相互独立的,彼此之间不会产生相互依赖的问题。这就为我们使用多线程来实现归并排序提供了机会,可以提高排序效率。
3.1. 线程池
在多线程实现中,我们可以使用线程池来管理线程,这样不仅可以减少线程的创建和销毁,还可以增加线程的局部性,提高运行速度。
线程池是一组已经创建好的线程集合,其中线程的数量是固定的。当需要使用线程时,就从线程池中取出一个线程来执行任务;当线程完成任务后,就将它归还到线程池里面,以备下次使用。
// 线程池类
class ThreadPool
{
public:
ThreadPool(int size);
~ThreadPool();
void Submit(function f);
private:
queue > que_; // 任务队列
vector threads_; // 工作线程组
mutex mtx_; // 保护任务队列的锁
condition_variable cv_; // 信号量
bool stop_; // 线程池是否停止
};
线程池的实现比较简单,就是一个任务队列,以及一组工作线程。当有新的任务加入到队列中时,就通知其中一个空闲的线程来执行,如果队列为空,就让线程等待信号。
3.2. 多线程归并排序
在多线程实现中,我们将原序列分成若干个子序列,并在不同的线程中对这些子序列进行归并排序,然后再将这些有序的子序列依次合并为一个有序的序列。
将序列分成若干个子序列:
void divide(int a[], int n, vector>& list, int thread_num)
{
int len = n / thread_num;
for(int i=0; i
{
vector tmp;
int left = i * len;
int right = (i + 1) * len - 1;
if(i == thread_num - 1)
right = n - 1;
for(int j=left; j<=right; j++)
tmp.push_back(a[j]);
list[i] = tmp; // 存储子序列
}
}
对每个子序列在不同的线程中进行归并排序:
void sort(vector>& list, ThreadPool& pool)
{
for(int i=0; i
{
function f = bind(&mergeSort, list[i].data(), 0, list[i].size()-1);
pool.Submit(f); // 提交任务到线程池
}
}
将有序的子序列合并为一个有序的序列:
void merge(vector>& list, int* result)
{
int thread_num = list.size();
int len = list[0].size();
for(int i=0, j=1; j<=len*thread_num; i = (i+2)%len, j++)
{
int a = list[i/len][i%len];
int b = list[(i+1)/len][(i+1)%len];
if(a > b)
{
swap(a, b); // 保证a小于b
}
int k = j / thread_num;
result[k] = a;
if(j == len * thread_num)
{
result[k+1] = b;
break;
}
if(i == len - 2)
{
int thread_id = (j / len) % thread_num;
int index = i + 1 + thread_id * len;
a = list[thread_id][index];
b = list[thread_id+1][0];
if(a > b)
{
swap(a, b); // 保证a小于b
}
result[k+1] = a;
}
}
}
4. 示例代码
下面是完整的归并排序的C++代码:
#include <iostream>
#include <vector>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
using namespace std;
// 线程池类
class ThreadPool
{
public:
ThreadPool(int size);
~ThreadPool();
void Submit(function<void()> f);
private:
queue<function<void()>> que_; // 任务队列
vector<thread> threads_; // 工作线程组
mutex mtx_; // 保护任务队列的锁
condition_variable cv_; // 信号量
bool stop_; // 线程池是否停止
};
ThreadPool::ThreadPool(int size): stop_(false)
{
for(int i=0; i<size; i++)
{
threads_.emplace_back([this]()
{
while(1)
{
unique_lock<mutex> lock(mtx_);
while(!stop_ && que_.empty())
cv_.wait(lock);
if(stop_) return;
auto task = que_.front();
que_.pop();
lock.unlock();
task();
}
});
}
}
ThreadPool::~ThreadPool()
{
{
lock_guard<mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all();
for(auto& t:threads_)
{
t.join();
}
}
void ThreadPool::Submit(function<void()> f)
{
{
lock_guard<mutex> lock(mtx_);
que_.emplace(f);
}
cv_.notify_one();
}
// 归并函数
void merge(int a[], int left, int mid, int right)
{
int* tmp = new int[right - left + 1];
int i = left, j = mid + 1, k = 0;
while(i <= mid && j <= right)
{
if(a[i] <= a[j])
tmp[k++] = a[i++];
else
tmp[k++] = a[j++];
}
while(i <= mid)
tmp[k++] = a[i++];
while(j <= right)
tmp[k++] = a[j++];
for(i=0; i
{
a[left+i] = tmp[i];
}
delete[] tmp;
}
// 归并排序函数
void mergeSort(int a[], int left, int right)
{
if(left < right)
{
int mid = (left + right) / 2;
mergeSort(a, left, mid);
mergeSort(a, mid + 1, right);
merge(a, left, mid, right);
}
}
void divide(int a[], int n, vector<vector<int>>& list, int thread_num)
{
int len = n / thread_num;
for(int i=0; i<thread_num; i++)
{
vector<int> tmp;
int left = i * len;
int right = (i + 1) * len - 1;
if(i == thread_num - 1)
right = n - 1;
for(int j=left; j<=right; j++)
tmp.push_back(a[j]);
list[i] = tmp; // 存储子序列
}
}
void sort(vector<vector<int>>& list, ThreadPool& pool)
{
for(int i=0; i<list.size(); i++)
{
function<void()> f = bind(&mergeSort, list[i].data(), 0, list[i].size()-1);
pool.Submit(f); // 提交任务到线程池
}
}
void merge(vector<vector<int>>& list, int* result)
{
int thread_num = list.size();
int len = list[0].size();
for(int i=0, j=1; j<=len*thread_num; i = (i+2)%len, j++)
{
int a = list[i/len][i%len];
int b = list[(i+1)/len][(i+1)%len];
if(a > b)
{
swap(a, b); // 保证a小于b
}
int k = j / thread_num;
result[k] = a;
if(j == len * thread_num)
{
result[k+1] = b;
break;
}
if(i == len - 2)
{
int thread_id = (j / len) % thread_num;
int index = i + 1 + thread_id * len;
a = list[thread_id][index];
b = list[thread_id+1][0];
if(a > b)
{
swap(a, b); // 保证a小于b
}
result[k+1] = a;
}
}
}
void mergeSortMT(int a[], int n, int thread_num)
{
ThreadPool pool(thread_num);
vector<vector<int>> list(thread_num);
divide(a, n, list, thread_num);
sort(list, pool);
int* result = new int[n];
merge(list, result);
for(int i=0; i<n; i++)
{
a[i] = result[i];
}
delete[] result;
}
int main()
{
int a[] = {38, 27, 43, 3, 9, 82, 10};
int n = sizeof(a) / sizeof(int);
mergeSortMT(a, n, 4);
for(int i=0; i<n; i++) cout << a[i] << " ";
return 0;
}
5. 总结
本文介绍了归并排序算法的基本原理和多线程实现方法。在实现的过程中,我们使用线程池管理线程,同时将序列分成若干个子序列,然后在不同的线程中对子序列进行归并排序。最后,将有序的子序列进行合并,形成一个有序的序列。多线程实现有助于提高排序效率,同时也体现了程序员对于并行化程序设计的执着追求。