"e p chandler" <
ep...@juno.com> wrote in message
news:kar1k7$7b9$1...@dont-email.me...
>e p chandler wrote:
>> A simple exchange sort with flag (bubble sort).
>> Replacing that with a comb sort multiplied the run time by 5.
> Thomas Koenig replied:
>>Take this piece of advice from Numerical Recipes:
>># We _will_ draw the line, however, at the inefficient N^2 algorithm,
>># beloved of elementary computer science texts, called _bubble sort_.
>># If you know what bubble sort is, wipe it from your mind; if you
>># don't know, make a point of never finding out!
>>(I know NR has lots of weak spots, but this piece of advice
>>is part of The Truth, together with their advice on evaluating
>>polynomials :-)
> After some additional refinements, using a simple exchange sort *is*
> justifiable.
The O(N**2) may here be applied to sweeping through arrays xb, yb, and zb
5000 times. With an octree, nowhere near as much access to these arrays
will be necessary.
Actually your solution and Michael Metcalf's are indistinguishable.
Consider that the top ten array is only going to be updated about
100-120 times per [x,y,z] value here, while NB = 1000000, and that
the top ten array (here d(NC) and m(NC)) isn't very large so
the difference in run times is negligible. Using either your
method or Michael Metcalf's or a heap to represent the top ten
array resulted in a run time of about 15 seconds on my Core 2 Duo,
provided I split the inner loop into two parts, one to fill up the
top ten data structure and one to handle the general case so as to
remove a conditional from the inner loop.
Since all the time is spent on the two lines
> q=(x-xb(j))**2 + (y-yb(j))**2 + (z-zb(j))**2
> if (q >= d(NC)) cycle
I tried writing these in assembly language:
C:\gfortran\clf\neighbor>type inner.asm
format MS64 coff
section 'CODE' code readable executable align 16
align 16
;interface
; subroutine
inner(xArray,yArray,zArray,xyz,size,heap,heapsize,work,firstindex
,callback) bind(C,name='inner')
; use ISO_C_BINDING
; implicit none
; integer(C_SIZE_T), value :: size ! Size in units of 4 REALs
; real(C_FLOAT) xArray(size) ! Aligned
; real(C_FLOAT) yArray(size) ! Aligned
; real(C_FLOAT) zArray(size) ! Aligned
; real(C_FLOAT) xyz(3)
; integer(C_SIZE_T), value :: heapsize
; type, bind(C) :: heap_entry
; real(C_FLOAT) value
; integer(C_INT) index
; end type heap_entry
; type(heap_entry) heap(heapsize) ! Aligned
; real(C_FLOAT) work(4) ! Aligned
; integer(C_SIZE_T), value :: firstindex
; interface
; function callback(compares,heap,heapsize,firstindex) bind(C)
; import
; implicit none
; real(C_FLOAT) callback
; real(C_FLOAT) compares(4)
; integer(C_SIZE_T), value :: heapsize
; type(heap_entry) heap(heapsize)
; integer(C_SIZE_T), value :: firstindex
; end function callback
; end interface
; end subroutine inner
;end interface
public inner
inner:
sub rsp, 120
movaps [rsp+32], xmm7
movaps [rsp+48], xmm8
movaps [rsp+64], xmm9
mov [rsp+88], r15
mov [rsp+104], rdi
mov [rsp+112], rsi
mov [rsp+80], rbx
movaps [rsp+128], xmm6
mov rax, [rsp+160] ; size
shl rax, 4
lea rbx, [rcx+rax] ; xArray
lea rsi, [rdx+rax] ; yArray
lea rdi, [r8+rax] ; zArray
movups xmm6, [r9] ; xyz
pshufd xmm7, xmm6, 55h ; y
pshufd xmm8, xmm6, 0AAh ; z
pshufd xmm6, xmm6, 00h ; x
mov rcx, [rsp+168] ; heap
movss xmm0, [rcx]
pshufd xmm9, xmm0, 00h ; compare value
mov r15, [rsp+192] ; firstindex
lea r15, [rax+4*r15]
neg rax
align 16
main_loop:
movaps xmm0, [rax+rbx]
movaps xmm1, [rax+rsi]
movaps xmm2, [rax+rdi]
;jmp cleanup
subps xmm0, xmm6
subps xmm1, xmm7
subps xmm2, xmm8
mulps xmm0, xmm0
mulps xmm1, xmm1
mulps xmm2, xmm2
addps xmm0, xmm1
addps xmm0, xmm2
movaps xmm1, xmm0
cmpltps xmm0, xmm9
pmovmskb ecx, xmm0
test ecx, ecx
jnz insert
cleanup:
add rax, 16
js main_loop
movaps xmm7, [rsp+32]
movaps xmm8, [rsp+48]
movaps xmm9, [rsp+64]
mov r15, [rsp+88]
mov rdi, [rsp+104]
mov rsi, [rsp+112]
mov rbx, [rsp+80]
movaps xmm6, [rsp+128]
add rsp, 120
ret
insert:
mov [rsp+96], rax
mov rcx, [rsp+184] ; work
movaps [rcx], xmm1
mov rdx, [rsp+168] ; heap
mov r8, [rsp+176] ; heapsize
lea r9, [rax+r15]; firstindex
shr r9, 2
mov rax, [rsp+200] ; callback
call rax
pshufd xmm9, xmm0, 00h
mov rax, [rsp+96]
jmp cleanup
C:\gfortran\clf\neighbor>type heap3.f90
module funcs
use ISO_C_BINDING
implicit none
private
public inner, callback
type, bind(C), public :: heap_entry
real(C_FLOAT) value
integer(C_INT) index
end type heap_entry
interface
recursive subroutine
inner(xArray,yArray,zArray,xyz,size,heap,heapsize, &
work,firstindex,callback) bind(C,name='inner')
import
implicit none
integer(C_SIZE_T), value :: size ! Size in units of 4 REALs
real(C_FLOAT) xArray(size) ! Aligned
real(C_FLOAT) yArray(size) ! Aligned
real(C_FLOAT) zArray(size) ! Aligned
real(C_FLOAT) xyz(3)
integer(C_SIZE_T), value :: heapsize
type(heap_entry) heap(heapsize) ! Aligned
real(C_FLOAT) work(4) ! Aligned
integer(C_SIZE_T), value :: firstindex
interface
recursive function callback(compares,heap,heapsize,firstindex)
bind(
C)
import
implicit none
real(C_FLOAT) callback
real(C_FLOAT) compares(4)
integer(C_SIZE_T), value :: heapsize
type(heap_entry) heap(heapsize)
integer(C_SIZE_T), value :: firstindex
end function callback
end interface
end subroutine inner
end interface
contains
recursive function callback(compares,heap,heapsize,firstindex) bind(C)
real(C_FLOAT) callback
real(C_FLOAT) compares(4)
integer(C_SIZE_T), value :: heapsize
type(heap_entry) heap(heapsize)
integer(C_SIZE_T), value :: firstindex
integer i
integer current, left, right, child
do i = 1, 4
if(compares(i) < heap(1)%value) then
heap(1) = heap_entry(compares(i),firstindex+i-1)
current = 1
left = 2*current
right = 2*current+1
do while(left <= heapsize)
child = left
if(right <= heapsize) then
if(heap(right)%value > heap(left)%value) then
child = right
end if
end if
if(heap(child)%value > heap(current)%value) then
heap([current,child]) = heap([child,current])
current = child
left = 2*current
right = 2*current+1
else
exit
end if
end do
end if
end do
callback = heap(1)%value
end function callback
end module funcs
program test
use funcs
use ISO_C_BINDING
implicit none
integer, parameter :: n1 = 5000, n2 = 1000000, n3 = 10
real a(n1, 3)
real, target :: b(n2,3)
type(heap_entry) points(n3)
real dist2, t1, t2
integer i, j
integer current, parent, left, right, child
integer(C_INTPTR_T) address
real xyz(3)
real, target :: work(7)
real, pointer :: pwork(:)
open(10,file='test.dat',status='replace')
call cpu_time(t1)
call random_seed()
call random_number(a)
call random_number(b)
address = transfer(C_LOC(work(1)),address)
pwork => work(1+IAND(-int(address),3):)
do i = 1, n1
xyz = a(i,:)
do j = 1, n3
! dist2 = sum((a(i,:)-b(j,:))**2)
dist2 = (a(i,1)-b(j,1))**2+(a(i,2)-b(j,2))**2+(a(i,3)-b(j,3))**2
points(j) = heap_entry(dist2,j)
current = j
do while(current > 1)
parent = current/2
if(points(current)%value > points(parent)%value) then
points([current,parent]) = points([parent,current])
current = parent
else
exit
end if
end do
end do
address = transfer(C_LOC(b(j,1)),address)/4
do j = j, j+IAND(-int(address),3)-1
! dist2 = sum((a(i,:)-b(j,:))**2)
dist2 = (a(i,1)-b(j,1))**2+(a(i,2)-b(j,2))**2+(a(i,3)-b(j,3))**2
if(dist2 < points(1)%value) then
points(1) = heap_entry(dist2,j)
current = 1
left = 2*current
right = 2*current+1
do while(left <= n3)
child = left
if(right <= n3) then
if(points(right)%value > points(left)%value) then
child = right
end if
end if
if(points(child)%value > points(current)%value) then
points([current,child]) = points([child,current])
current = child
left = 2*current
right = 2*current+1
else
exit
end if
end do
end if
end do
call inner(b(j,1),b(j,2),b(j,3),xyz,int((n2-j+1)/4,C_SIZE_T), &
points,int(n3,C_SIZE_T),pwork,int(j,C_SIZE_T),callback)
do j = j+(n2-j+1)/4*4, n2
! dist2 = sum((a(i,:)-b(j,:))**2)
dist2 = (a(i,1)-b(j,1))**2+(a(i,2)-b(j,2))**2+(a(i,3)-b(j,3))**2
if(dist2 < points(1)%value) then
points(1) = heap_entry(dist2,j)
current = 1
left = 2*current
right = 2*current+1
do while(left <= n3)
child = left
if(right <= n3) then
if(points(right)%value > points(left)%value) then
child = right
end if
end if
if(points(child)%value > points(current)%value) then
points([current,child]) = points([child,current])
current = child
left = 2*current
right = 2*current+1
else
exit
end if
end do
end if
end do
if(mod(i,1000) == 0) then
write(*,'(a,i0,a,10(1x,i0))') 'Ten closest points to a(', i, ') are
',
points%index
write(*,'(a,10(1x,f0.4))') 'Distances are ', sqrt(points%value)
end if
end do
call cpu_time(t2)
write(*,'(a,f0.2)') 'Time used in seconds: ', t2-t1
end program test
C:\gfortran\clf\neighbor>fasm inner.asm
flat assembler version 1.70 (1578954 kilobytes memory)
2 passes, 400 bytes.
C:\gfortran\clf\neighbor>gfortran -O2 heap3.f90 inner.obj -oheap3
C:\gfortran\clf\neighbor>heap3
Ten closest points to a(1000) are 439860 730552 732299 114619 474475 381987
350
023 276078 492099 535861
Distances are .0115 .0108 .0109 .0106 .0070 .0048 .0069 .0100 .0061 .0034
Ten closest points to a(2000) are 402266 755814 950446 685965 749339 992119
511
450 523805 791419 312908
Distances are .0138 .0136 .0130 .0113 .0081 .0104 .0129 .0087 .0113 .0072
Ten closest points to a(3000) are 312774 409874 983250 37724 760385 636340
4280
8 601657 149218 481998
Distances are .0118 .0106 .0117 .0105 .0077 .0082 .0089 .0070 .0101 .0076
Ten closest points to a(4000) are 493 322619 67245 578194 552804 439767
858292
460116 725680 296431
Distances are .0132 .0130 .0105 .0126 .0111 .0083 .0056 .0118 .0120 .0093
Ten closest points to a(5000) are 137180 180546 116382 953344 694621 570112
664
350 831770 396725 180879
Distances are .0129 .0106 .0123 .0095 .0104 .0106 .0089 .0061 .0088 .0036
Time used in seconds: 10.28
So even though my SSE code is vectorized and gfortran's compiler output is
probably not, I'm only getting a 33% speedup. The reason for this can be
seen if I uncomment the jmp cleanup instruction in the inner loop:
main_loop:
movaps xmm0, [rax+rbx]
movaps xmm1, [rax+rsi]
movaps xmm2, [rax+rdi]
jmp cleanup ; Skip all calculation!
subps xmm0, xmm6
subps xmm1, xmm7
subps xmm2, xmm8
mulps xmm0, xmm0
mulps xmm1, xmm1
mulps xmm2, xmm2
addps xmm0, xmm1
addps xmm0, xmm2
movaps xmm1, xmm0
cmpltps xmm0, xmm9
pmovmskb ecx, xmm0
test ecx, ecx
jnz insert
cleanup:
add rax, 16
js main_loop
In that case, no calculation will get done but all the 60 GB of memory
are read into the processor, so the program takes 9.61 seconds. Thus
we may conclude that pretty much all the time is taken up in
transferring data from memory and almost none in useful calculation.
Going to octrees should make this exercise almost instantaneous, but
I don't have any octree code handy.
--
write(*,*) transfer((/17.392111325966148d0,6.5794487871554595D-85, &
6.0134700243160014d-154/),(/'x'/)); end