Bonita Montero
unread,Nov 16, 2019, 1:36:43 AM11/16/19You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
Here, Amine, that's a generic parallel merge-sort- as well as a parallel
quicksort-algorithm. The merge-sort-algorithm is faster than the Micro-
soft STL library stable_sort. But the quicksort is slower than the MS
STL's implemenation. So improve the latter to make it faster than the
MS-implementation! I don't know how to get it faster.
#include <algorithm>
#include <iterator>
#include <cstdlib>
#include <thread>
#include <functional>
#include <exception>
#include <mutex>
#include <memory>
#include <execution>
#include <array>
template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;
public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}
~invoke_on_destruct()
{
if( m_enabled )
m_t();
}
void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};
template<typename It,
typename Cmp = std::less<typename
std::iterator_traits<It>::value_type>,
typename Allocator = std::allocator<typename
std::iterator_traits<It>::value_type>>
class merge_sort
{
private:
using ep_allocator = typename
std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>;
using exceptions_vector = std::vector<std::exception_ptr,
ep_allocator>;
public:
merge_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned
nThreads = 0, Allocator const &alloc = Allocator() );
struct par_exception : public std::exception
{
using iterator = typename std::vector<std::exception_ptr,
ep_allocator>::iterator;
// there's no copy-constructor because of internal vector
// so catch par_exception only via reference
~par_exception();
iterator begin();
iterator end();
private:
friend class merge_sort;
par_exception( std::vector<std::exception_ptr, ep_allocator>
&&exceptions );
std::vector<std::exception_ptr, ep_allocator> m_exceptions;
};
private:
Cmp m_cmp;
Allocator m_alloc;
// mutex that protects the exception-array
std::mutex m_excMtx;
exceptions_vector m_exceptions;
template<typename UpIt>
void parRecursion( UpIt start, UpIt end, unsigned nThreads );
template<typename UpIt, typename BufIt>
void recursion( UpIt start, UpIt end, BufIt buf );
template<typename UpIt, typename BufIt>
void merge( UpIt start, BufIt leftBuf, BufIt rightBuf, BufIt bufEnd );
};
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::merge_sort( It start, It end, Cmp const
&cmp, unsigned nThreads, Allocator const &alloc ) :
m_cmp( cmp ),
m_alloc( alloc ),
m_exceptions( ep_allocator( alloc ) )
{
using namespace std;
// threads == 0 -> number of threads = hardware-threads
unsigned hwThreads = thread::hardware_concurrency();
hwThreads += hwThreads == 0;
nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ?
nThreads : hwThreads;
// reserve number of threads elements in the exception_ptr-vector
// so that there will be no exception when we do emplace_back
m_exceptions.reserve( nThreads );
try
{
parRecursion( start, end, nThreads );
if( m_exceptions.size() )
if( m_exceptions.size() > 1 )
// multiple exceptions from threads: throw par_exception
throw par_exception( move( m_exceptions ) );
else
// only one exception from threads: rethrow it
rethrow_exception( m_exceptions[0] );
}
catch( ... )
{
if( m_exceptions.size() )
{
// additional exception catched: throw par_exception
m_exceptions.emplace_back( current_exception() );
throw par_exception( move( m_exceptions ) );
}
else
// single exception: rethrow it
throw;
}
}
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::recursion( UpIt start, UpIt end,
BufIt buf )
{
using namespace std;
if( end - start <= 1 )
return;
copy( start, end, buf );
size_t n = end - start;
BufIt leftBuf = buf,
leftBufEnd = buf + n / 2,
rightBuf = leftBufEnd,
bufEnd = buf + n;
recursion( leftBuf, leftBufEnd, bufEnd );
recursion( rightBuf, bufEnd, bufEnd );
merge( start, leftBuf, rightBuf, bufEnd );
}
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::merge( UpIt start, BufIt leftBuf,
BufIt rightBuf, BufIt bufEnd )
{
BufIt leftBufEnd = rightBuf;
for( UpIt wrtBack = start; ; )
if( m_cmp( *leftBuf, *rightBuf ) )
{
*wrtBack++ = *leftBuf;
if( ++leftBuf == leftBufEnd )
{
// faster for small number of elements than std::copy
do
*wrtBack++ = *rightBuf;
while( ++rightBuf != bufEnd );
break;
}
}
else
{
*wrtBack++ = *rightBuf;
if( ++rightBuf == bufEnd )
{
do
*wrtBack++ = *leftBuf;
while( ++leftBuf != leftBufEnd );
break;
}
}
}
template<typename It, typename Cmp, typename Allocator>
template<typename UpIt>
void merge_sort<It, Cmp, Allocator>::parRecursion( UpIt start, UpIt end,
unsigned nThreads )
{
using namespace std;
using T = typename iterator_traits<It>::value_type;
size_t n = end - start;
if( nThreads <= 1 )
{
vector<T, Allocator> buf( m_alloc );;
size_t bs = 0;
// calculate buffer-/stack-size
for( size_t split = end - start; split > 1; bs += split, split
-= split / 2 );
buf.resize( bs );
recursion( start, end, buf.begin() );
}
else
{
// split-buffer
vector<T, Allocator> buf( m_alloc );;
buf.resize( n );
copy( start, end, buf.begin() );
// array for left-recursion and right-recursion thread
array<thread, 2> threads;
// automatically join threads when an exception is thrown
auto joinThreads = [&threads]()
{
for( thread &thr : threads )
// try to join infinitely because the thread
// will continue to access our buffer
if( thr.get_id() != thread::id() )
for( ; ; )
try
{
thr.join();
break;
}
catch( ... )
{
}
};
invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads );
// iterator-type for our split-buffer
using BufIt = typename vector<T, Allocator>::iterator;
// proxy thread-lambda for our new threads
auto prProxy = [this]( BufIt start, BufIt end, unsigned nThreads )
{
try
{
parRecursion( start, end, nThreads );
}
catch( ... )
{
// remember exception in our exception-array
unique_lock<mutex> excLock( m_excMtx );
m_exceptions.emplace_back( current_exception() );
}
};
unsigned rightThreads = nThreads / 2,
leftThreads = nThreads - rightThreads;
// if the left number of threads is uneven give the threads
more input
size_t left = (size_t)(n * ((double)leftThreads /
nThreads)),
right = n - left;
BufIt leftBuf = buf.begin(),
leftBufEnd = buf.begin() + left,
rightBuf = leftBufEnd,
bufEnd = buf.begin() + n;
// start left thread
threads[0] = move( thread( prProxy, leftBuf, leftBufEnd,
leftThreads ) );
if( rightThreads > 1 )
// start right thread
threads[1] = move( thread( prProxy, rightBuf, bufEnd,
rightThreads ) );
else
// there's only one thread right, so we do it on our own
parRecursion( rightBuf, bufEnd, 1 );
// join threads
iodJoin.invoke_and_disable();
// stop if there are any exceptions from the threads
unique_lock<mutex> excLock( m_excMtx );
if( m_exceptions.size() )
return;
excLock.unlock();
// merge split-buffer back into input-buffer
merge( start, leftBuf, rightBuf, bufEnd );
}
}
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::~par_exception()
{
}
template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::par_exception( typename
merge_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) :
m_exceptions( std::move( exceptions ) )
{
}
template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::begin()
{
return m_exceptions.begin();
}
template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::end()
{
return m_exceptions.end();
}
#if defined(_MSC_VER)
#pragma warning(disable: 26444)
#endif
template<typename It,
typename Cmp = std::less<typename
std::iterator_traits<It>::value_type>,
typename Allocator = std::allocator<typename
std::iterator_traits<It>::value_type>>
class quick_sort
{
private:
using ep_allocator = typename
std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>;
using exceptions_vector = std::vector<std::exception_ptr,
ep_allocator>;
public:
quick_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned
nThreads = 0, Allocator const &alloc = Allocator() );
struct par_exception : public std::exception
{
using iterator = typename std::vector<std::exception_ptr,
ep_allocator>::iterator;
// there's no copy-constructor because of internal vector
// so catch par_exception only via reference
~par_exception();
iterator begin();
iterator end();
private:
friend class quick_sort;
par_exception( std::vector<std::exception_ptr, ep_allocator>
&&exceptions );
std::vector<std::exception_ptr, ep_allocator> m_exceptions;
};
private:
Cmp m_cmp;
Allocator m_alloc;
// mutex that protects the exception-array
std::mutex m_excMtx;
exceptions_vector m_exceptions;
void parRecursion( It left, It right, unsigned nThreads );
void recursion( It left, It right );
It partition( It left, It right );
};
template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::quick_sort( It start, It end, Cmp const
&cmp, unsigned nThreads, Allocator const &alloc ) :
m_cmp( cmp ),
m_alloc( alloc ),
m_exceptions( ep_allocator( alloc ) )
{
using namespace std;
// threads == 0 -> number of threads = hardware-threads
unsigned hwThreads = thread::hardware_concurrency();
hwThreads += hwThreads == 0;
nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ?
nThreads : hwThreads;
// reserve number of threads elements in the exception_ptr-vector
// so that there will be no exception when we do emplace_back
m_exceptions.reserve( nThreads );
try
{
parRecursion( start, end - 1, nThreads );
if( m_exceptions.size() )
if( m_exceptions.size() > 1 )
// multiple exceptions from threads: throw par_exception
throw par_exception( move( m_exceptions ) );
else
// only one exception from threads: rethrow it
rethrow_exception( m_exceptions[0] );
}
catch( ... )
{
if( m_exceptions.size() )
{
// additional exception catched: throw par_exception
m_exceptions.emplace_back( current_exception() );
throw par_exception( move( m_exceptions ) );
}
else
// single exception: rethrow it
throw;
}
}
template<typename It, typename Cmp, typename Allocator>
void quick_sort<It, Cmp, Allocator>::parRecursion( It left, It right,
unsigned nThreads )
{
using namespace std;
if( nThreads == 1 )
recursion( left, right );
else
{
if( left >= right )
return;
It p = partition( left, right );
// array for left-recursion and right-recursion thread
array<thread, 2> threads;
// automatically join threads when an exception is thrown
auto joinThreads = [&threads]()
{
for( thread &thr : threads )
// try to join infinitely because the thread
// will continue to access our buffer
if( thr.get_id() != thread::id() )
for( ; ; )
try
{
thr.join();
break;
}
catch( ... )
{
}
};
invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads );
// proxy thread-lambda for our new threads
auto prProxy = [this]( It left, It right, unsigned nThreads )
{
try
{
parRecursion( left, right, nThreads );
}
catch( ... )
{
// remember exception in our exception-array
unique_lock<mutex> excLock( m_excMtx );
m_exceptions.emplace_back( current_exception() );
}
};
unsigned rightThreads = nThreads / 2,
leftThreads = nThreads - rightThreads;
threads[0] = move( thread( prProxy, left, p, leftThreads ) );
if( rightThreads > 1 )
// start right thread
threads[1] = move( thread( prProxy, p + 1, right,
rightThreads ) );
else
// there's only one thread right, so we do it on our own
recursion( p + 1, right );
}
}
template<typename It, typename Cmp, typename Allocator>
void quick_sort<It, Cmp, Allocator>::recursion( It left, It right )
{
if( left >= right )
return;
It p = partition( left, right );
recursion( left, p );
recursion( p + 1, right );
}
template<typename It, typename Cmp, typename Allocator>
It quick_sort<It, Cmp, Allocator>::partition( It left, It right )
{
using namespace std;
using T = typename iterator_traits<It>::value_type;
T pivot = left[(right - left) / 2];
It i = left - 1,
j = right + 1;
for( ; ; )
{
while( m_cmp( *++i, pivot ) );
while( m_cmp( pivot, *--j ) );
if( i >= j )
return j;
swap( *i, *j );
}
}
template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::par_exception::~par_exception()
{
}
template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::par_exception::par_exception( typename
quick_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) :
m_exceptions( std::move( exceptions ) )
{
}
template<typename It, typename Cmp, typename Allocator>
typename quick_sort<It, Cmp, Allocator>::par_exception::iterator
quick_sort<It, Cmp, Allocator>::par_exception::begin()
{
return m_exceptions.begin();
}
template<typename It, typename Cmp, typename Allocator>
typename quick_sort<It, Cmp, Allocator>::par_exception::iterator
quick_sort<It, Cmp, Allocator>::par_exception::end()
{
return m_exceptions.end();
}
#include <iostream>
#include <random>
#include <chrono>
#include <limits>
#include <cassert>
using namespace std;
using namespace chrono;
int main( int argc, char **argv )
{
if( argc < 2 )
return EXIT_FAILURE;
size_t n = (size_t)(unsigned)atoi( argv[1]) * 1'000'000;
unsigned nThreads = argc < 3 ? 0 : (unsigned)atoi( argv[2] );
random_device rd;
uniform_int_distribution<int> uid( numeric_limits<int>::min(),
numeric_limits<int>::max() );
vector<int> rvRef;
rvRef.resize( n );
cout << "randomizing ..." << endl;
for( int &r : rvRef )
r = uid( rd );
vector<int> rvSort( rvRef );
cout << "sorting with merge_sort (multi-threaded) ..." << endl;
time_point<high_resolution_clock> start = high_resolution_clock::now();
using msInt = merge_sort<vector<int>::iterator>;
msInt( rvSort.begin(), rvSort.end(), less<int>(), nThreads );
double seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
for( vector<int>::iterator scn = rvSort.begin(); scn < rvSort.end()
- 1; ++scn )
assert(scn[0] <= scn[1]);
cout << "sorting with merge_sort (single-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
msInt( rvSort.begin(), rvSort.end(), less<int>(), 1 );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::stable_sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
stable_sort( execution::parallel_policy(), rvSort.begin(),
rvSort.end(), less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::sort (singled-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
sort( rvSort.begin(), rvSort.end(), less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
sort( execution::parallel_policy(), rvSort.begin(), rvSort.end(),
less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with quick_sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
using qsInt = quick_sort<vector<int>::iterator>;
qsInt( rvSort.begin(), rvSort.end(), less<int>(), nThreads );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
for( vector<int>::iterator scn = rvSort.begin(); scn < rvSort.end()
- 1; ++scn )
assert(scn[0] <= scn[1]);
return EXIT_SUCCESS;
}