Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Task for Amine

3 views
Skip to first unread message

Bonita Montero

unread,
Nov 16, 2019, 1:36:43 AM11/16/19
to
Here, Amine, that's a generic parallel merge-sort- as well as a parallel
quicksort-algorithm. The merge-sort-algorithm is faster than the Micro-
soft STL library stable_sort. But the quicksort is slower than the MS
STL's implemenation. So improve the latter to make it faster than the
MS-implementation! I don't know how to get it faster.

#include <algorithm>
#include <iterator>
#include <cstdlib>
#include <thread>
#include <functional>
#include <exception>
#include <mutex>
#include <memory>
#include <execution>
#include <array>

template<typename T>
struct invoke_on_destruct
{
private:
T &m_t;
bool m_enabled;

public:
invoke_on_destruct( T &t ) :
m_t( t ), m_enabled( true )
{
}

~invoke_on_destruct()
{
if( m_enabled )
m_t();
}

void invoke_and_disable()
{
m_t();
m_enabled = false;
}
};

template<typename It,
typename Cmp = std::less<typename
std::iterator_traits<It>::value_type>,
typename Allocator = std::allocator<typename
std::iterator_traits<It>::value_type>>
class merge_sort
{
private:
using ep_allocator = typename
std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>;
using exceptions_vector = std::vector<std::exception_ptr,
ep_allocator>;

public:
merge_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned
nThreads = 0, Allocator const &alloc = Allocator() );

struct par_exception : public std::exception
{
using iterator = typename std::vector<std::exception_ptr,
ep_allocator>::iterator;
// there's no copy-constructor because of internal vector
// so catch par_exception only via reference
~par_exception();
iterator begin();
iterator end();
private:
friend class merge_sort;
par_exception( std::vector<std::exception_ptr, ep_allocator>
&&exceptions );
std::vector<std::exception_ptr, ep_allocator> m_exceptions;
};

private:
Cmp m_cmp;
Allocator m_alloc;
// mutex that protects the exception-array
std::mutex m_excMtx;
exceptions_vector m_exceptions;
template<typename UpIt>
void parRecursion( UpIt start, UpIt end, unsigned nThreads );
template<typename UpIt, typename BufIt>
void recursion( UpIt start, UpIt end, BufIt buf );
template<typename UpIt, typename BufIt>
void merge( UpIt start, BufIt leftBuf, BufIt rightBuf, BufIt bufEnd );
};

template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::merge_sort( It start, It end, Cmp const
&cmp, unsigned nThreads, Allocator const &alloc ) :
m_cmp( cmp ),
m_alloc( alloc ),
m_exceptions( ep_allocator( alloc ) )
{
using namespace std;
// threads == 0 -> number of threads = hardware-threads
unsigned hwThreads = thread::hardware_concurrency();
hwThreads += hwThreads == 0;
nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ?
nThreads : hwThreads;
// reserve number of threads elements in the exception_ptr-vector
// so that there will be no exception when we do emplace_back
m_exceptions.reserve( nThreads );
try
{
parRecursion( start, end, nThreads );
if( m_exceptions.size() )
if( m_exceptions.size() > 1 )
// multiple exceptions from threads: throw par_exception
throw par_exception( move( m_exceptions ) );
else
// only one exception from threads: rethrow it
rethrow_exception( m_exceptions[0] );
}
catch( ... )
{
if( m_exceptions.size() )
{
// additional exception catched: throw par_exception
m_exceptions.emplace_back( current_exception() );
throw par_exception( move( m_exceptions ) );
}
else
// single exception: rethrow it
throw;
}
}

template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::recursion( UpIt start, UpIt end,
BufIt buf )
{
using namespace std;
if( end - start <= 1 )
return;
copy( start, end, buf );
size_t n = end - start;
BufIt leftBuf = buf,
leftBufEnd = buf + n / 2,
rightBuf = leftBufEnd,
bufEnd = buf + n;
recursion( leftBuf, leftBufEnd, bufEnd );
recursion( rightBuf, bufEnd, bufEnd );
merge( start, leftBuf, rightBuf, bufEnd );
}

template<typename It, typename Cmp, typename Allocator>
template<typename UpIt, typename BufIt>
void merge_sort<It, Cmp, Allocator>::merge( UpIt start, BufIt leftBuf,
BufIt rightBuf, BufIt bufEnd )
{
BufIt leftBufEnd = rightBuf;
for( UpIt wrtBack = start; ; )
if( m_cmp( *leftBuf, *rightBuf ) )
{
*wrtBack++ = *leftBuf;
if( ++leftBuf == leftBufEnd )
{
// faster for small number of elements than std::copy
do
*wrtBack++ = *rightBuf;
while( ++rightBuf != bufEnd );
break;
}
}
else
{
*wrtBack++ = *rightBuf;
if( ++rightBuf == bufEnd )
{
do
*wrtBack++ = *leftBuf;
while( ++leftBuf != leftBufEnd );
break;
}
}
}

template<typename It, typename Cmp, typename Allocator>
template<typename UpIt>
void merge_sort<It, Cmp, Allocator>::parRecursion( UpIt start, UpIt end,
unsigned nThreads )
{
using namespace std;
using T = typename iterator_traits<It>::value_type;
size_t n = end - start;
if( nThreads <= 1 )
{
vector<T, Allocator> buf( m_alloc );;
size_t bs = 0;
// calculate buffer-/stack-size
for( size_t split = end - start; split > 1; bs += split, split
-= split / 2 );
buf.resize( bs );
recursion( start, end, buf.begin() );
}
else
{
// split-buffer
vector<T, Allocator> buf( m_alloc );;
buf.resize( n );
copy( start, end, buf.begin() );
// array for left-recursion and right-recursion thread
array<thread, 2> threads;
// automatically join threads when an exception is thrown
auto joinThreads = [&threads]()
{
for( thread &thr : threads )
// try to join infinitely because the thread
// will continue to access our buffer
if( thr.get_id() != thread::id() )
for( ; ; )
try
{
thr.join();
break;
}
catch( ... )
{
}
};
invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads );
// iterator-type for our split-buffer
using BufIt = typename vector<T, Allocator>::iterator;
// proxy thread-lambda for our new threads
auto prProxy = [this]( BufIt start, BufIt end, unsigned nThreads )
{
try
{
parRecursion( start, end, nThreads );
}
catch( ... )
{
// remember exception in our exception-array
unique_lock<mutex> excLock( m_excMtx );
m_exceptions.emplace_back( current_exception() );
}
};
unsigned rightThreads = nThreads / 2,
leftThreads = nThreads - rightThreads;
// if the left number of threads is uneven give the threads
more input
size_t left = (size_t)(n * ((double)leftThreads /
nThreads)),
right = n - left;
BufIt leftBuf = buf.begin(),
leftBufEnd = buf.begin() + left,
rightBuf = leftBufEnd,
bufEnd = buf.begin() + n;
// start left thread
threads[0] = move( thread( prProxy, leftBuf, leftBufEnd,
leftThreads ) );
if( rightThreads > 1 )
// start right thread
threads[1] = move( thread( prProxy, rightBuf, bufEnd,
rightThreads ) );
else
// there's only one thread right, so we do it on our own
parRecursion( rightBuf, bufEnd, 1 );
// join threads
iodJoin.invoke_and_disable();
// stop if there are any exceptions from the threads
unique_lock<mutex> excLock( m_excMtx );
if( m_exceptions.size() )
return;
excLock.unlock();
// merge split-buffer back into input-buffer
merge( start, leftBuf, rightBuf, bufEnd );
}
}

template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::~par_exception()
{
}

template<typename It, typename Cmp, typename Allocator>
merge_sort<It, Cmp, Allocator>::par_exception::par_exception( typename
merge_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) :
m_exceptions( std::move( exceptions ) )
{
}

template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::begin()
{
return m_exceptions.begin();
}

template<typename It, typename Cmp, typename Allocator>
typename merge_sort<It, Cmp, Allocator>::par_exception::iterator
merge_sort<It, Cmp, Allocator>::par_exception::end()
{
return m_exceptions.end();
}

#if defined(_MSC_VER)
#pragma warning(disable: 26444)
#endif

template<typename It,
typename Cmp = std::less<typename
std::iterator_traits<It>::value_type>,
typename Allocator = std::allocator<typename
std::iterator_traits<It>::value_type>>
class quick_sort
{
private:
using ep_allocator = typename
std::allocator_traits<Allocator>::template rebind_alloc<std::exception_ptr>;
using exceptions_vector = std::vector<std::exception_ptr,
ep_allocator>;

public:
quick_sort( It start, It end, Cmp const &cmp = Cmp(), unsigned
nThreads = 0, Allocator const &alloc = Allocator() );

struct par_exception : public std::exception
{
using iterator = typename std::vector<std::exception_ptr,
ep_allocator>::iterator;
// there's no copy-constructor because of internal vector
// so catch par_exception only via reference
~par_exception();
iterator begin();
iterator end();
private:
friend class quick_sort;
par_exception( std::vector<std::exception_ptr, ep_allocator>
&&exceptions );
std::vector<std::exception_ptr, ep_allocator> m_exceptions;
};

private:
Cmp m_cmp;
Allocator m_alloc;
// mutex that protects the exception-array
std::mutex m_excMtx;
exceptions_vector m_exceptions;
void parRecursion( It left, It right, unsigned nThreads );
void recursion( It left, It right );
It partition( It left, It right );
};

template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::quick_sort( It start, It end, Cmp const
&cmp, unsigned nThreads, Allocator const &alloc ) :
m_cmp( cmp ),
m_alloc( alloc ),
m_exceptions( ep_allocator( alloc ) )
{
using namespace std;
// threads == 0 -> number of threads = hardware-threads
unsigned hwThreads = thread::hardware_concurrency();
hwThreads += hwThreads == 0;
nThreads = nThreads == 0 ? hwThreads : nThreads <= hwThreads ?
nThreads : hwThreads;
// reserve number of threads elements in the exception_ptr-vector
// so that there will be no exception when we do emplace_back
m_exceptions.reserve( nThreads );
try
{
parRecursion( start, end - 1, nThreads );
if( m_exceptions.size() )
if( m_exceptions.size() > 1 )
// multiple exceptions from threads: throw par_exception
throw par_exception( move( m_exceptions ) );
else
// only one exception from threads: rethrow it
rethrow_exception( m_exceptions[0] );
}
catch( ... )
{
if( m_exceptions.size() )
{
// additional exception catched: throw par_exception
m_exceptions.emplace_back( current_exception() );
throw par_exception( move( m_exceptions ) );
}
else
// single exception: rethrow it
throw;
}
}

template<typename It, typename Cmp, typename Allocator>
void quick_sort<It, Cmp, Allocator>::parRecursion( It left, It right,
unsigned nThreads )
{
using namespace std;
if( nThreads == 1 )
recursion( left, right );
else
{
if( left >= right )
return;
It p = partition( left, right );
// array for left-recursion and right-recursion thread
array<thread, 2> threads;
// automatically join threads when an exception is thrown
auto joinThreads = [&threads]()
{
for( thread &thr : threads )
// try to join infinitely because the thread
// will continue to access our buffer
if( thr.get_id() != thread::id() )
for( ; ; )
try
{
thr.join();
break;
}
catch( ... )
{
}
};
invoke_on_destruct<decltype(joinThreads)> iodJoin( joinThreads );
// proxy thread-lambda for our new threads
auto prProxy = [this]( It left, It right, unsigned nThreads )
{
try
{
parRecursion( left, right, nThreads );
}
catch( ... )
{
// remember exception in our exception-array
unique_lock<mutex> excLock( m_excMtx );
m_exceptions.emplace_back( current_exception() );
}
};
unsigned rightThreads = nThreads / 2,
leftThreads = nThreads - rightThreads;
threads[0] = move( thread( prProxy, left, p, leftThreads ) );
if( rightThreads > 1 )
// start right thread
threads[1] = move( thread( prProxy, p + 1, right,
rightThreads ) );
else
// there's only one thread right, so we do it on our own
recursion( p + 1, right );
}
}

template<typename It, typename Cmp, typename Allocator>
void quick_sort<It, Cmp, Allocator>::recursion( It left, It right )
{
if( left >= right )
return;
It p = partition( left, right );
recursion( left, p );
recursion( p + 1, right );
}

template<typename It, typename Cmp, typename Allocator>
It quick_sort<It, Cmp, Allocator>::partition( It left, It right )
{
using namespace std;
using T = typename iterator_traits<It>::value_type;
T pivot = left[(right - left) / 2];
It i = left - 1,
j = right + 1;
for( ; ; )
{
while( m_cmp( *++i, pivot ) );
while( m_cmp( pivot, *--j ) );
if( i >= j )
return j;
swap( *i, *j );
}
}

template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::par_exception::~par_exception()
{
}

template<typename It, typename Cmp, typename Allocator>
quick_sort<It, Cmp, Allocator>::par_exception::par_exception( typename
quick_sort<It, Cmp, Allocator>::exceptions_vector &&exceptions ) :
m_exceptions( std::move( exceptions ) )
{
}

template<typename It, typename Cmp, typename Allocator>
typename quick_sort<It, Cmp, Allocator>::par_exception::iterator
quick_sort<It, Cmp, Allocator>::par_exception::begin()
{
return m_exceptions.begin();
}

template<typename It, typename Cmp, typename Allocator>
typename quick_sort<It, Cmp, Allocator>::par_exception::iterator
quick_sort<It, Cmp, Allocator>::par_exception::end()
{
return m_exceptions.end();
}

#include <iostream>
#include <random>
#include <chrono>
#include <limits>
#include <cassert>

using namespace std;
using namespace chrono;

int main( int argc, char **argv )
{
if( argc < 2 )
return EXIT_FAILURE;
size_t n = (size_t)(unsigned)atoi( argv[1]) * 1'000'000;
unsigned nThreads = argc < 3 ? 0 : (unsigned)atoi( argv[2] );
random_device rd;
uniform_int_distribution<int> uid( numeric_limits<int>::min(),
numeric_limits<int>::max() );
vector<int> rvRef;
rvRef.resize( n );
cout << "randomizing ..." << endl;
for( int &r : rvRef )
r = uid( rd );
vector<int> rvSort( rvRef );
cout << "sorting with merge_sort (multi-threaded) ..." << endl;
time_point<high_resolution_clock> start = high_resolution_clock::now();
using msInt = merge_sort<vector<int>::iterator>;
msInt( rvSort.begin(), rvSort.end(), less<int>(), nThreads );
double seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
for( vector<int>::iterator scn = rvSort.begin(); scn < rvSort.end()
- 1; ++scn )
assert(scn[0] <= scn[1]);
cout << "sorting with merge_sort (single-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
msInt( rvSort.begin(), rvSort.end(), less<int>(), 1 );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::stable_sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
stable_sort( execution::parallel_policy(), rvSort.begin(),
rvSort.end(), less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::sort (singled-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
sort( rvSort.begin(), rvSort.end(), less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with std::sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
sort( execution::parallel_policy(), rvSort.begin(), rvSort.end(),
less<int>() );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
cout << "sorting with quick_sort (multi-threaded) ..." << endl;
rvSort = rvRef;
start = high_resolution_clock::now();
using qsInt = quick_sort<vector<int>::iterator>;
qsInt( rvSort.begin(), rvSort.end(), less<int>(), nThreads );
seconds = (double)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count() / 1.0E9;
cout << "sorting-time: " << seconds << " seconds" << endl;
for( vector<int>::iterator scn = rvSort.begin(); scn < rvSort.end()
- 1; ++scn )
assert(scn[0] <= scn[1]);
return EXIT_SUCCESS;
}
0 new messages