There's the Unix-command wc which counts words and lines. And the
wc-implementation from the current GNU core utilities contain an
optional very tricky AVX-implementation. This improves the speed
of wc on my Linux-computer by factor 29.
I improved this algorithm further to partition the data in three
parts which I handle interleaved, i.e. 32-byte-chunks synchronously
from each part and then I increment the common offset by 32. This
is while the original-algorithm has a depencency-chain which limits
out of order execution. I partitioned the data in three but not in
four parts because there wouldn't be enough integer-registers - I
need 14 in my interleaving-loop. With four parts I'd have much
regiseter spilling and reloading which gains a performance similar
to the original-algorithm.
The speedup of the interleaved code over the original wc-algorithm
is about 60%.
The reason why I tell this is that I benchmarked the code under
different conditions. I also improved the trivial algorithm with
prefetching and partitioning and it could be run with either
switched on like the AVX-code. This are the results on my Linux
Ryzen 7 1800X:
trivial / non-interleaved / non-prefetched
1 thread: 468MB/s
trivial / non-interleaved / prefetched
1 thread: 492MB/s
trivial / interleaved / non-prefetched
1 thread: 778MB/s
trivial / interleaved / prefetched
1 thread: 694MB/s
AVX / non-interleaved / non-prefetched
1 thread: 13731MB/s
AVX / non-interleaved / prefetched
1 thread: 13757MB/s
AVX / interleaved / non-prefetched
1 thread: 19722MB/s
AVX / interleaved / prefetched
1 thread: 23558MB/s
As you can see manual prefetching gives only a little gain for the
trivial non-interleaved code, and it even drops with the trivial
interleaved / prefetched code over the trivial interleaved / non
-prefetched code. But for the AVX-code there's a significant speedup
of the interleaved / prefetched code over the interleaved / non
-prefetched code. So there are cases where prefetching gives a
significant speedup.
With interleaving there are more complex memory access patterns
and I suspect that the prefetcher doesn't work that good under
such conditions.
If you're interested in the code. The relevant functions are the
lambdas trivialSpaceCount and avxSpaceCount. The code is compilable
only with C++20.
#if defined(_MSC_VER)
#include <Windows.h>
#elif defined(__unix__)
#include <pthread.h>
#endif
#include <iostream>
#include <utility>
#include <fstream>
#include <vector>
#include <cstdint>
#include <algorithm>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <cstdlib>
#include <charconv>
#include <cmath>
#include <sstream>
#include <limits>
#include <cctype>
#include <functional>
#include <array>
#include <string.h>
#if defined(_MSC_VER)
#include <intrin.h>
#elif defined(__GNUC__)
#include <x86intrin.h>
#include <cpuid.h>
#endif
#if defined(_MSC_VER)
#pragma warning(disable: 26495)
#endif
using namespace std;
using namespace chrono;
struct cmline_params
{
char const *fileName;
size_t blockSize;
unsigned nCPUs;
bool invert;
enum class priority_t : unsigned
{
UNSET, NORMAL, HIGH, REALTIME, BEST_AS_CAN
} priority;
vector<string> parse( int argc, char const *const *argv );
};
static void setThreadAffinity( thread::native_handle_type handle,
unsigned affinity );
static unsigned popCnt32( uint32_t value );
static vector<char> readFileRepeated( char const *fileName, size_t
blockSize );
static int xstricmp( char const *a, char const *b );
int main( int argc, char **argv )
{
cmline_params params;
vector<string> errs( params.parse( argc, argv ) );
if( errs.size() )
{
for( string &err : errs )
cout << err << endl;
return EXIT_FAILURE;
}
#if defined(_MSC_VER)
if( params.priority != cmline_params::priority_t::UNSET )
{
auto setPriority = []( DWORD dwPriorityClass )
{
// SetPriorityClass always returns false !
SetPriorityClass( GetCurrentProcess(), dwPriorityClass );
return GetPriorityClass( GetCurrentProcess() ) == dwPriorityClass;
};
static const
struct prio_map_t
{
cmline_params::priority_t priority;
DWORD dwPriorityClass;
} prioMappings[] =
{
{ cmline_params::priority_t::NORMAL, NORMAL_PRIORITY_CLASS },
{ cmline_params::priority_t::HIGH, HIGH_PRIORITY_CLASS },
{ cmline_params::priority_t::REALTIME, REALTIME_PRIORITY_CLASS }
};
DWORD dwPriorityClass = -1;
for( prio_map_t const &pm : prioMappings )
if( pm.priority == params.priority )
{
dwPriorityClass = pm.dwPriorityClass;
break;
}
if( dwPriorityClass != -1 )
if( !setPriority( dwPriorityClass ) )
return EXIT_FAILURE;
else;
else
{
ptrdiff_t p = 2;
bool succ;
do
succ = setPriority( prioMappings[p].dwPriorityClass );
while( !succ && --p >= 0 );
}
}
#endif
vector<char> block;
try
{
if( !(block = readFileRepeated( params.fileName, params.blockSize
)).size() )
throw 123;
}
catch( ... )
{
cout << "error reading file" << endl;
return EXIT_FAILURE;
}
struct words_and_lines
{
size_t words, lines;
words_and_lines( size_t words = 0, size_t lines = 0 ) :
words( words ), lines( lines )
{
}
};
using count_fn_t = void (*)( words_and_lines &, char *, size_t, bool,
bool * );
struct state_t
{
char *mem;
bool wasSpace;
words_and_lines counters;
state_t( char *mem, bool wasSpace, words_and_lines const &counters ) :
mem( mem ),
wasSpace( wasSpace ),
counters( counters )
{
}
};
static size_t const PREFETCH_DISTANCE = 32 * 64;
static
auto trivialSpaceCount = []( bool interleave, bool prefetch,
words_and_lines &counters, char *mem, size_t count, bool extend, bool
*pWasSpace )
{
bool wasSpace = pWasSpace ? *pWasSpace : false;
if( !count )
{
counters.words += !extend && !wasSpace;
return;
}
auto stateBlock = [&]<bool prefetch>( state_t &state, size_t offset )
{
if constexpr( prefetch )
_mm_prefetch( &state.mem[offset] + PREFETCH_DISTANCE, _MM_HINT_NTA );
bool isSpace = (unsigned char)state.mem[offset] <= ' ';
state.counters.words += isSpace && !wasSpace;
state.counters.lines += state.mem[offset] == '\n';
state.wasSpace = isSpace;
};
if( interleave && count >= 3 )
{
size_t partitionSize = count / 3;
char *ends[] = { mem + partitionSize, mem + partitionSize * 2 };
state_t states[] =
{
state_t( mem, wasSpace, words_and_lines() ),
state_t( ends[0], ends[0][-1] == ' ', words_and_lines() ),
state_t( ends[1], ends[1][-1] == ' ', words_and_lines() ),
};
size_t offset = 0;
if( prefetch )
for( ; (ptrdiff_t)offset < (ptrdiff_t)(partitionSize -
PREFETCH_DISTANCE); ++offset )
stateBlock.operator ()<true>( states[0], offset ),
stateBlock.operator ()<true>( states[1], offset ),
stateBlock.operator ()<true>( states[1], offset );
for( ; offset != partitionSize; ++offset )
stateBlock.operator ()<false>( states[0], offset ),
stateBlock.operator ()<false>( states[1], offset ),
stateBlock.operator ()<false>( states[1], offset );
mem += partitionSize * 3;
count -= partitionSize * 3;
counters.words += states[0].counters.words + states[1].counters.words
+ states[0].counters.words;
counters.lines += states[0].counters.lines + states[1].counters.lines
+ states[1].counters.lines;
wasSpace = states[2].wasSpace;
}
if( count )
{
state_t state( mem, wasSpace, counters );
size_t offset = 0;
if( prefetch )
for( ; (ptrdiff_t)offset < (ptrdiff_t)(count - PREFETCH_DISTANCE);
++offset )
stateBlock.operator ()<true>( state, offset );
for( ; offset != count; ++offset )
stateBlock.operator ()<false>( state, offset );
counters = state.counters;
}
if( pWasSpace )
*pWasSpace = wasSpace;
};
static
auto avxSpaceCount = []( bool interleave, bool prefetch,
words_and_lines &counters, char *mem, size_t count, bool extend, bool
*pWasSpace )
{
bool wasSpace = pWasSpace ? *pWasSpace : false;
if( !count )
{
counters.words += !extend && !wasSpace;
return;
}
size_t prefix = ((size_t)mem + 31 & -32) - (size_t)mem;
prefix = prefix <= count ? prefix : count;
trivialSpaceCount( interleave, prefetch, counters, mem, prefix, count
> prefix || extend, &wasSpace );
mem += prefix;
count -= prefix;
if( count >= 32 )
{
__m256i spaces = _mm256_set1_epi8( ' ' + 1 ),
newlines = _mm256_set1_epi8( '\n' );
auto stateBlock = [&]<bool prefetch>( state_t &state, size_t offset )
{
if constexpr( prefetch )
_mm_prefetch( &state.mem[offset] + PREFETCH_DISTANCE, _MM_HINT_NTA );
__m256i chunk = _mm256_load_si256( (__m256i *)&state.mem[offset] );
uint32_t isSpaceMask = _mm256_movemask_epi8( _mm256_andnot_si256(
chunk, _mm256_sub_epi8( chunk, spaces ) ) ),
wasSpaceMask = isSpaceMask << 1 | (uint32_t)state.wasSpace,
newlineMask = _mm256_movemask_epi8( _mm256_cmpeq_epi8(
chunk, newlines ) );
state.counters.words += popCnt32( isSpaceMask & ~wasSpaceMask );
state.counters.lines += popCnt32( newlineMask );
state.wasSpace = (int32_t)isSpaceMask < 0 ? 1 : 0;
};
if( interleave && count >= (3 * 32) )
{
size_t partitionSize = count / (3 * 32) * 32;
char *ends[] = { mem + partitionSize, mem + partitionSize * 2 };
state_t states[] =
{
state_t( mem, wasSpace, words_and_lines() ),
state_t( ends[0], ends[0][-1] == ' ', words_and_lines() ),
state_t( ends[1], ends[1][-1] == ' ', words_and_lines() ),
};
size_t offset = 0;
if( prefetch )
// with prefetching
for( ; (ptrdiff_t)offset < (ptrdiff_t)(partitionSize -
PREFETCH_DISTANCE); offset += 32 )
stateBlock.operator ()<true>( states[0], offset ),
stateBlock.operator ()<true>( states[1], offset ),
stateBlock.operator ()<true>( states[2], offset );
// without prefetching
for( ; offset != partitionSize; offset += 32 )
stateBlock.operator ()<false>( states[0], offset ),
stateBlock.operator ()<false>( states[1], offset ),
stateBlock.operator ()<false>( states[2], offset );
mem += partitionSize * 3;
count -= partitionSize * 3;
counters.words += states[0].counters.words +
states[1].counters.words + states[0].counters.words;
counters.lines += states[0].counters.lines +
states[1].counters.lines + states[1].counters.lines;
wasSpace = states[2].wasSpace;
}
if( count >= 32 )
{
state_t state( mem, wasSpace, counters );
size_t offset = 0;
do
stateBlock.operator ()<false>( state, offset );
while( (offset += 32) != (count & -32) );
mem += count & -32;
count %= 32;
counters = state.counters;
wasSpace = state.wasSpace;
}
}
trivialSpaceCount( interleave, prefetch, counters, mem, count, extend,
&wasSpace );
if( pWasSpace )
*pWasSpace = wasSpace;
};
using spacecount_fn_t = function<void( bool, bool, words_and_lines &,
char *, size_t, bool, bool * )>;
struct descr_count_fn
{
char const *descr;
spacecount_fn_t countFn;
};
static
array<descr_count_fn, 2> const descrCountFns(
{
{ "trivial", bind( trivialSpaceCount, placeholders::_1,
placeholders::_2, placeholders::_3, placeholders::_4, placeholders::_5,
placeholders::_6, placeholders::_7 ) },
{ "AVX", bind( avxSpaceCount, placeholders::_1, placeholders::_2,
placeholders::_3, placeholders::_4, placeholders::_5, placeholders::_6,
placeholders::_7 ) }
} );
mutex mtx;
unsigned ready;
condition_variable cvRready;
bool run;
condition_variable cvRun;
atomic_int64_t sumDur;
auto theThread = [&]( bool interleave, bool prefetch, spacecount_fn_t
const &countFn, char *mem, size_t blockSize, size_t repeats )
{
unique_lock<mutex> lock( mtx );
if( !--ready )
cvRready.notify_one();
cvRun.wait( lock, [&]() -> bool { return run; } );
lock.unlock();
auto start = high_resolution_clock::now();
size_t volatile sum = 0;
words_and_lines wordsAndLines;
for( size_t r = repeats; r; --r )
{
wordsAndLines = words_and_lines();
sum = 0;
countFn( interleave, prefetch, wordsAndLines, mem, blockSize, false,
nullptr );
sum += wordsAndLines.words + wordsAndLines.lines;
}
sumDur += (int64_t)duration_cast<nanoseconds>(
high_resolution_clock::now() - start ).count();
};
vector<thread> threads;
threads.reserve( params.nCPUs );
#if defined(NDEBUG)
double const MBS = 256.0;
#else
double const MBS = 1.0;
#endif
size_t repeats = (ptrdiff_t)(MBS * 1000 * 1000 /
(ptrdiff_t)params.blockSize + 0.5);
repeats += repeats == 0;
unsigned hc = thread::hardware_concurrency();
using vresult_t = vector<double>;
using vvresult_t = vector<vresult_t>;
for( descr_count_fn const &dfn : descrCountFns )
{
for( unsigned interleave = 0; interleave <= 1; ++interleave )
for( unsigned prefetch = 0; prefetch <= 1; ++prefetch )
{
std::cout << dfn.descr;
cout << (!interleave ? " / non-interleaved" : " / interleaved");
cout << (!prefetch ? " / non-prefetched" : " / prefetched ") << endl;
for( unsigned nThreads = 1; nThreads <= params.nCPUs; ++nThreads )
{
ready = nThreads;
run = false;
sumDur = 0;
threads.resize( 0 );
for( unsigned t = 0; t != nThreads; ++t )
{
threads.emplace_back( theThread, (bool)interleave, (bool)prefetch,
ref( dfn.countFn ), &block[0], params.blockSize, repeats );
unsigned affinity = !params.invert ? t : (t % 2) * (hc / 2) + t / 2;
setThreadAffinity( threads.back().native_handle(), affinity );
}
unique_lock<mutex> lock( mtx );
cvRready.wait( lock, [&]() -> bool { return !ready; } );
run = true;
cvRun.notify_all();
lock.unlock();
for( thread &thr : threads )
thr.join();
static double const MEGABYTE = 1000.0 * 1000.0;
double secs = sumDur / (1.0e9 * nThreads),
mbsPerSec = ((double)nThreads * (ptrdiff_t)params.blockSize *
(ptrdiff_t)repeats / MEGABYTE) / secs;
std::cout << "\t\t" << nThreads << (nThreads > 1 ? " threads: " : "
thread: ") << (int64_t)(mbsPerSec + 0.5) << "MB/s";
cout << endl;
}
}
}
}
inline
unsigned popCnt32( uint32_t value )
{
#if defined(_MSC_VER)
return __popcnt( value );
#elif defined(__GNUC__)
return __builtin_popcount( value );
#endif
}
vector<string> cmline_params::parse( int argc, char const *const *argv )
{
vector<string> errs;
unsigned hc = thread::hardware_concurrency();
if( hc )
{
fileName = nullptr;
blockSize = (size_t)256 * 1024 * 1024;
nCPUs = hc;
invert = false;
priority = priority_t::UNSET;
}
else
errs.emplace_back( "thread::hardware_concurrency() == 0" );
char const *const *param = argv + 1,
*const *paramEnd = argv + argc;
auto addErrString = [&]( char const *prefix, char const *param )
{
ostringstream oss;
oss << ": \"" << param << "\"";
errs.emplace_back( prefix + oss.str() );
};
while( param < paramEnd )
{
if( xstricmp( *param, "--file" ) == 0 )
{
if( ++param == paramEnd )
{
errs.emplace_back( "supply filename !" );
goto ret;
}
fileName = *param++;
continue;
}
if( xstricmp( *param, "--size" ) == 0 )
{
if( ++param == paramEnd )
{
errs.emplace_back( "supply size !" );
goto ret;
}
char const *sizeParam = *param;
double dSizeParam;
from_chars_result fcr = from_chars( sizeParam, sizeParam + strlen(
sizeParam ), dSizeParam, chars_format::general );
auto invalidSize = [&]()
{
addErrString( "invalid size", *param );
};
if(
fcr.ec == errc() && (dSizeParam = trunc( dSizeParam )) >= 1.0 )
{
char const *suffixPtr = fcr.ptr;
static const
struct suffix_mult
{
char suffix;
size_t mult;
} sms[]
{
{ 'g', (size_t)1000 * 1000 * 1000 },
{ 'm', (size_t)1000 * 1000 },
{ 'k', (size_t)1000},
{ 'b', (size_t)1 }
};
suffix_mult const *pSm = nullptr;
if( *suffixPtr )
{
char suffix = tolower( *suffixPtr );
for( suffix_mult const &sm : sms )
if( suffix == sm.suffix )
{
pSm = &sm;
break;
}
}
static
auto dblSizeCvt = []( double dbl, size_t &st ) -> bool
{
if( dbl >= (double)((int64_t)1 << 53) )
return false;
st = (ptrdiff_t)dbl;
return true;
};
if( pSm )
if( !suffixPtr[1] )
if( !dblSizeCvt( dSizeParam * (ptrdiff_t)pSm->mult, blockSize ) )
invalidSize();
else;
else
invalidSize();
else
if( !*suffixPtr )
if( !dblSizeCvt( dSizeParam, blockSize ) )
invalidSize();
else;
else
invalidSize();
}
else
invalidSize();
++param;
continue;
}
if( xstricmp( *param, "--bound" ) == 0 )
{
if( ++param == paramEnd )
{
errs.emplace_back( "supply CPU-bound !" );
goto ret;
}
unsigned cpuBound = -1;
from_chars_result fcr = from_chars( *param, *param + strlen( *param
), cpuBound );
if(
fcr.ec == errc() && !*fcr.ptr )
nCPUs = nCPUs <= cpuBound ? nCPUs : cpuBound;
else
addErrString( "invalid CPU-bound", argv[3] );
++param;
continue;
}
#if defined(_MSC_VER)
static const
struct str_prio
{
char const *prioStr;
priority_t priority;
} prios[] =
{
{ "--normal", priority_t::NORMAL },
{ "--high", priority_t::HIGH },
{ "--realtime", priority_t::REALTIME },
{ "--best", priority_t::BEST_AS_CAN },
};
bool prioSet = false;
for( str_prio const &strPrio : prios )
if( xstricmp( *param, strPrio.prioStr ) == 0 )
{
priority = strPrio.priority;
++param;
prioSet = true;
break;
}
if( prioSet )
continue;
#endif
if( xstricmp( *param, "--invert" ) == 0 )
{
++param;
int cpuIdRegs[2][4];
#if defined(_MSC_VER)
__cpuid( cpuIdRegs[0], 0 );
__cpuid( cpuIdRegs[1], 1 );
#elif defined(__GNUC__)
__cpuid(0, cpuIdRegs[0][0], cpuIdRegs[0][1], cpuIdRegs[0][2],
cpuIdRegs[0][3]);
__cpuid(1, cpuIdRegs[1][0], cpuIdRegs[1][1], cpuIdRegs[1][2],
cpuIdRegs[1][3]);
#endif
if( (unsigned)cpuIdRegs[0][0] < 1 || !((unsigned)cpuIdRegs[1][3] & 1
<< 28) )
{
errs.emplace_back( "inversion impossible - CPU hasn't SMT" );
continue;
}
invert = true;
continue;
}
addErrString( "invalid option", *param++ );
}
ret:
if( !fileName )
errs.insert( errs.begin(), "supply filename !" );
return errs;
}
static
vector<char> readFileRepeated( char const *fileName, size_t blockSize )
{
if( !blockSize )
return vector<char>();
ifstream ifs;
ifs.exceptions( ifstream::failbit | ifstream::badbit );
ifs.open( fileName, ifstream::binary );
ifs.seekg( 0, ios_base::end );
streampos fileSize = ifs.tellg();
if( !fileSize || fileSize > (size_t)-1 )
return vector<char>();
ifs.seekg( 0, ios_base::beg );
vector<char> block( blockSize, 0 );
size_t repSize = (size_t)fileSize <= blockSize ? (size_t)fileSize :
blockSize;
ifs.read( &*block.begin(), repSize );
bool lastNewline = block[repSize - 1] == '\n';
size_t remaining = block.size() - repSize;
do
{
size_t cpy = remaining >= repSize ? repSize : remaining;
copy( block.begin(), block.begin() + cpy, block.end() - remaining );
remaining -= cpy;
if( !lastNewline && remaining )
block.end()[-(ptrdiff_t)remaining--] = '\n';
} while( remaining );
return block;
}
static
void setThreadAffinity( thread::native_handle_type handle, unsigned
affinity )
{
#if defined(_MSC_VER)
SetThreadAffinityMask( handle, (DWORD_PTR)1 << affinity );
#elif defined(__unix__)
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(affinity, &cpuSet);
pthread_setaffinity_np( handle, sizeof cpuSet, &cpuSet );
#endif
}
inline
int xstricmp( char const *a, char const *b )
{
using uchar_t = unsigned char;
uchar_t lA, lB;
for( size_t i = 0; a[i] | b[i]; ++i )
if( (lA = tolower( a[i] )) != (lB = tolower( b[i] )) )
return (int)lA - (int)lB;
return 0;
}