[cig-commits] [commit] devel: Replace Numerical Routines sorting code. (ee7076f)
cig_noreply at geodynamics.org
cig_noreply at geodynamics.org
Thu May 1 00:50:13 PDT 2014
Repository : ssh://geoshell/specfem3d
On branch : devel
Link : https://github.com/geodynamics/specfem3d/compare/cb32c88d6155d7974561a6f72fc17aea596e2c4d...50aa953c1db3f565d76415f5305410a529996b75
>---------------------------------------------------------------
commit ee7076f9acb4d42f8ad60523ea9db2ad5722b451
Author: Elliott Sales de Andrade <esalesde at physics.utoronto.ca>
Date: Thu Jan 23 18:54:48 2014 -0500
Replace Numerical Routines sorting code.
Numerical Routines source code is not re-distributable in any way at all
and is 100% incompatible with the GPL.
The replacement code originates from the heap_sort.f90 in the global
code. It appears to be a fairly straight-forward translation of the heap
sort algorithm (e.g., from RosettaCode) and should be unencumbered. I
have added additional parameters in order to sort multiple arrays at the
same time. Interestingly, this code is slightly faster than the
previous version.
>---------------------------------------------------------------
ee7076f9acb4d42f8ad60523ea9db2ad5722b451
src/decompose_mesh/fault_scotch.f90 | 6 +-
src/generate_databases/get_MPI.f90 | 14 +-
src/shared/get_global.f90 | 14 +-
src/shared/sort_array_coordinates.f90 | 242 +++++++++++++++++-----------------
4 files changed, 129 insertions(+), 147 deletions(-)
diff --git a/src/decompose_mesh/fault_scotch.f90 b/src/decompose_mesh/fault_scotch.f90
index fdeab7c..922cc9e 100644
--- a/src/decompose_mesh/fault_scotch.f90
+++ b/src/decompose_mesh/fault_scotch.f90
@@ -296,8 +296,8 @@ CONTAINS
integer, intent(out) :: locval(nspec)
double precision, intent(in) :: xyz_c(3,nspec)
- double precision, dimension(nspec) :: work,xp,yp,zp
- integer, dimension(nspec) :: ind,ninseg,iwork,ibool,iglob
+ double precision, dimension(nspec) :: xp,yp,zp
+ integer, dimension(nspec) :: ninseg,ibool,iglob
integer :: nglob
logical :: ifseg(nspec)
double precision :: xtol
@@ -310,7 +310,7 @@ CONTAINS
xtol = 1.d-10 * maxval( maxval(xyz_c,2) - minval(xyz_c,2) )
call sort_array_coordinates(nspec,xp,yp,zp,ibool,iglob,locval,ifseg, &
- nglob,ind,ninseg,iwork,work,xtol)
+ nglob,ninseg,xtol)
end subroutine lex_order
diff --git a/src/generate_databases/get_MPI.f90 b/src/generate_databases/get_MPI.f90
index 7bf1126..8dfffdc 100644
--- a/src/generate_databases/get_MPI.f90
+++ b/src/generate_databases/get_MPI.f90
@@ -63,14 +63,13 @@
!local parameters
double precision, dimension(:), allocatable :: xp,yp,zp
- double precision, dimension(:), allocatable :: work_ext_mesh
integer, dimension(:), allocatable :: locval
integer, dimension(:), allocatable :: nibool_interfaces_ext_mesh_true
! for MPI buffers
integer, dimension(:), allocatable :: reorder_interface_ext_mesh, &
- ind_ext_mesh,ninseg_ext_mesh,iwork_ext_mesh
+ ninseg_ext_mesh
logical, dimension(:), allocatable :: ifseg
integer :: iinterface,ilocnum
integer :: num_points1, num_points2
@@ -113,14 +112,8 @@
if( ier /= 0 ) stop 'error allocating array ifseg'
allocate(reorder_interface_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
if( ier /= 0 ) stop 'error allocating array reorder_interface_ext_mesh'
- allocate(ind_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
- if( ier /= 0 ) stop 'error allocating array ind_ext_mesh'
allocate(ninseg_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
if( ier /= 0 ) stop 'error allocating array ninseg_ext_mesh'
- allocate(iwork_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
- if( ier /= 0 ) stop 'error allocating array iwork_ext_mesh'
- allocate(work_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
- if( ier /= 0 ) stop 'error allocating array work_ext_mesh'
! gets x,y,z coordinates of global points on MPI interface
do ilocnum = 1, nibool_interfaces_ext_mesh(iinterface)
@@ -135,7 +128,7 @@
ibool_interfaces_ext_mesh(1:nibool_interfaces_ext_mesh(iinterface),iinterface), &
reorder_interface_ext_mesh,locval,ifseg, &
nibool_interfaces_ext_mesh_true(iinterface), &
- ind_ext_mesh,ninseg_ext_mesh,iwork_ext_mesh,work_ext_mesh,SMALLVAL_TOL)
+ ninseg_ext_mesh,SMALLVAL_TOL)
! checks that number of MPI points are still the same
num_points1 = num_points1 + nibool_interfaces_ext_mesh(iinterface)
@@ -153,10 +146,7 @@
deallocate(locval)
deallocate(ifseg)
deallocate(reorder_interface_ext_mesh)
- deallocate(ind_ext_mesh)
deallocate(ninseg_ext_mesh)
- deallocate(iwork_ext_mesh)
- deallocate(work_ext_mesh)
enddo
diff --git a/src/shared/get_global.f90 b/src/shared/get_global.f90
index 7f6a4ab..d34713d 100644
--- a/src/shared/get_global.f90
+++ b/src/shared/get_global.f90
@@ -48,8 +48,7 @@
integer ispec,i,j,ier
integer ieoff,ilocnum,nseg,ioff,iseg,ig
- integer, dimension(:), allocatable :: ind,ninseg,iwork,idummy
- double precision, dimension(:), allocatable :: work
+ integer, dimension(:), allocatable :: ninseg,idummy
! geometry tolerance parameter to calculate number of independent grid points
! small value for double precision and to avoid sensitivity to roundoff
@@ -59,25 +58,16 @@
SMALLVALTOL = 1.d-10 * dabs(UTM_X_MAX - UTM_X_MIN)
! dynamically allocate arrays
- allocate(ind(npointot),stat=ier)
- if( ier /= 0 ) stop 'error allocating array ind'
allocate(ninseg(npointot),stat=ier)
if( ier /= 0 ) stop 'error allocating array ninseg'
- allocate(iwork(npointot),stat=ier)
- if( ier /= 0 ) stop 'error allocating array iwork'
- allocate(work(npointot),stat=ier)
- if( ier /= 0 ) stop 'error allocating array work'
allocate(idummy(npointot),stat=ier)
if( ier /= 0 ) stop 'error allocating array idummy'
call sort_array_coordinates(npointot,xp,yp,zp,idummy,iglob,locval,ifseg, &
- nglob,ind,ninseg,iwork,work,SMALLVALTOL)
+ nglob,ninseg,SMALLVALTOL)
! deallocate arrays
- deallocate(ind)
deallocate(ninseg)
- deallocate(iwork)
- deallocate(work)
deallocate(idummy)
end subroutine get_global
diff --git a/src/shared/sort_array_coordinates.f90 b/src/shared/sort_array_coordinates.f90
index a21e7f6..a0a9026 100644
--- a/src/shared/sort_array_coordinates.f90
+++ b/src/shared/sort_array_coordinates.f90
@@ -28,7 +28,7 @@
! subroutines to sort MPI buffers to assemble between chunks
subroutine sort_array_coordinates(npointot,x,y,z,ibool,iglob,locval,ifseg, &
- nglob,ind,ninseg,iwork,work,xtol)
+ nglob,ninseg,xtol)
! this routine MUST be in double precision to avoid sensitivity
! to roundoff errors in the coordinates of the points
@@ -39,190 +39,192 @@
include "constants.h"
- integer npointot,nglob
+ integer, intent(in) :: npointot
+ double precision, dimension(npointot), intent(inout) :: x, y, z
+ integer, dimension(npointot), intent(inout) :: ibool
+ integer, dimension(npointot), intent(out) :: iglob, locval, ninseg
+ logical, dimension(npointot), intent(out) :: ifseg
+ integer, intent(out) :: nglob
+ double precision, intent(in) :: xtol
- integer ibool(npointot),iglob(npointot),locval(npointot)
- integer ind(npointot),ninseg(npointot)
- logical ifseg(npointot)
- double precision x(npointot),y(npointot),z(npointot)
- integer iwork(npointot)
- double precision work(npointot)
! local parameters
- integer ipoin,i,j
- integer nseg,ioff,iseg,ig
- double precision xtol
+ integer :: i, j
+ integer :: nseg, ioff, iseg, ig
-! establish initial pointers
- do ipoin=1,npointot
- locval(ipoin)=ipoin
+ ! establish initial pointers
+ do i=1,npointot
+ locval(i) = i
enddo
- ifseg(:)=.false.
+ ifseg(:) = .false.
- nseg=1
- ifseg(1)=.true.
- ninseg(1)=npointot
+ nseg = 1
+ ifseg(1) = .true.
+ ninseg(1) = npointot
do j=1,NDIM
-! sort within each segment
- ioff=1
+ ! sort within each segment
+ ioff = 1
do iseg=1,nseg
- if(j == 1) then
+ if (j == 1) then
- call rank_buffers(x(ioff),ind,ninseg(iseg))
+ call heap_sort_multi(ninseg(iseg), x(ioff), y(ioff), z(ioff), ibool(ioff), locval(ioff))
- else if(j == 2) then
+ else if (j == 2) then
- call rank_buffers(y(ioff),ind,ninseg(iseg))
+ call heap_sort_multi(ninseg(iseg), y(ioff), x(ioff), z(ioff), ibool(ioff), locval(ioff))
else
- call rank_buffers(z(ioff),ind,ninseg(iseg))
+ call heap_sort_multi(ninseg(iseg), z(ioff), x(ioff), y(ioff), ibool(ioff), locval(ioff))
endif
- call swap_all_buffers(ibool(ioff),locval(ioff), &
- x(ioff),y(ioff),z(ioff),iwork,work,ind,ninseg(iseg))
-
- ioff=ioff+ninseg(iseg)
+ ioff = ioff + ninseg(iseg)
enddo
! check for jumps in current coordinate
if (j == 1) then
do i=2,npointot
- if(dabs(x(i)-x(i-1)) > xtol) ifseg(i)=.true.
+ if (dabs(x(i) - x(i-1)) > xtol) ifseg(i) = .true.
enddo
else if (j == 2) then
do i=2,npointot
- if(dabs(y(i)-y(i-1)) > xtol) ifseg(i)=.true.
+ if (dabs(y(i) - y(i-1)) > xtol) ifseg(i) = .true.
enddo
else
do i=2,npointot
- if(dabs(z(i)-z(i-1)) > xtol) ifseg(i)=.true.
+ if (dabs(z(i) - z(i-1)) > xtol) ifseg(i) = .true.
enddo
endif
! count up number of different segments
- nseg=0
+ nseg = 0
do i=1,npointot
- if(ifseg(i)) then
- nseg=nseg+1
- ninseg(nseg)=1
+ if (ifseg(i)) then
+ nseg = nseg + 1
+ ninseg(nseg) = 1
else
- ninseg(nseg)=ninseg(nseg)+1
+ ninseg(nseg) = ninseg(nseg) + 1
endif
enddo
enddo
-! assign global node numbers (now sorted lexicographically)
- ig=0
+ ! assign global node numbers (now sorted lexicographically)
+ ig = 0
do i=1,npointot
- if(ifseg(i)) ig=ig+1
- iglob(locval(i))=ig
+ if (ifseg(i)) ig = ig + 1
+ iglob(locval(i)) = ig
enddo
- nglob=ig
+ nglob = ig
end subroutine sort_array_coordinates
! -------------------- library for sorting routine ------------------
-! sorting routines put here in same file to allow for inlining
+ subroutine heap_sort_multi(N, dx, dy, dz, ia, ib)
- subroutine rank_buffers(A,IND,N)
-!
-! Use Heap Sort (Numerical Recipes)
-!
implicit none
+ integer, intent(in) :: N
+ double precision, dimension(N), intent(inout) :: dx
+ double precision, dimension(N), intent(inout) :: dy
+ double precision, dimension(N), intent(inout) :: dz
+ integer, dimension(N), intent(inout) :: ia
+ integer, dimension(N), intent(inout) :: ib
- integer n
- double precision A(n)
- integer IND(n)
+ integer :: i
- integer i,j,l,ir,indx
- double precision q
+ ! checks if anything to do
+ if (N < 2) return
- do j=1,n
- IND(j)=j
+ ! builds heap
+ do i = N/2, 1, -1
+ call heap_sort_siftdown(i, n)
enddo
- if(n == 1) return
-
- L=n/2+1
- ir=n
- 100 CONTINUE
- IF(l>1) THEN
- l=l-1
- indx=ind(l)
- q=a(indx)
- ELSE
- indx=ind(ir)
- q=a(indx)
- ind(ir)=ind(1)
- ir=ir-1
- if (ir == 1) then
- ind(1)=indx
- return
- endif
- endif
- i=l
- j=l+l
- 200 CONTINUE
- IF(J <= IR) THEN
- IF(J < IR) THEN
- IF(A(IND(j)) < A(IND(j+1))) j=j+1
- endif
- IF (q < A(IND(j))) THEN
- IND(I)=IND(J)
- I=J
- J=J+J
- ELSE
- J=IR+1
- endif
- goto 200
- endif
- IND(I)=INDX
- goto 100
- end subroutine rank_buffers
+ ! sorts array
+ do i = N, 2, -1
+ ! swaps last and first entry in this section
+ call dswap(dx, 1, i)
+ call dswap(dy, 1, i)
+ call dswap(dz, 1, i)
+ call iswap(ia, 1, i)
+ call iswap(ib, 1, i)
+ call heap_sort_siftdown(1, i - 1)
+ enddo
-! -------------------------------------------------------------------
+ contains
- subroutine swap_all_buffers(IA,IB,A,B,C,IW,W,ind,n)
-!
-! swap arrays IA, IB, A, B and C according to addressing in array IND
-!
- implicit none
+ subroutine dswap(A, i, j)
- integer n
+ double precision, dimension(:), intent(inout) :: A
+ integer, intent(in) :: i
+ integer, intent(in) :: j
- integer IND(n)
- integer IA(n),IB(n),IW(n)
- double precision A(n),B(n),C(n),W(n)
+ double precision :: tmp
- integer i
+ tmp = A(i)
+ A(i) = A(j)
+ A(j) = tmp
- W(:) = A(:)
- IW(:) = IA(:)
+ end subroutine
- do i=1,n
- A(i)=W(ind(i))
- IA(i)=IW(ind(i))
- enddo
+ subroutine iswap(A, i, j)
- W(:) = B(:)
- IW(:) = IB(:)
+ integer, dimension(:), intent(inout) :: A
+ integer, intent(in) :: i
+ integer, intent(in) :: j
- do i=1,n
- B(i)=W(ind(i))
- IB(i)=IW(ind(i))
- enddo
+ integer :: tmp
- W(:) = C(:)
+ tmp = A(i)
+ A(i) = A(j)
+ A(j) = tmp
- do i=1,n
- C(i)=W(ind(i))
- enddo
+ end subroutine
+
+ subroutine heap_sort_siftdown(start, bottom)
+
+ integer, intent(in) :: start
+ integer, intent(in) :: bottom
+
+ integer :: i, j
+ double precision :: xtmp, ytmp, ztmp
+ integer :: atmp, btmp
+
+ i = start
+ xtmp = dx(i)
+ ytmp = dy(i)
+ ztmp = dz(i)
+ atmp = ia(i)
+ btmp = ib(i)
+
+ j = 2 * i
+ do while (j <= bottom)
+ ! chooses larger value first in this section
+ if (j < bottom) then
+ if (dx(j) <= dx(j+1)) j = j + 1
+ endif
+
+ ! checks if section already smaller than initial value
+ if (dx(j) < xtmp) exit
+
+ dx(i) = dx(j)
+ dy(i) = dy(j)
+ dz(i) = dz(j)
+ ia(i) = ia(j)
+ ib(i) = ib(j)
+ i = j
+ j = 2 * i
+ enddo
- end subroutine swap_all_buffers
+ dx(i) = xtmp
+ dy(i) = ytmp
+ dz(i) = ztmp
+ ia(i) = atmp
+ ib(i) = btmp
+ end subroutine heap_sort_siftdown
+ end subroutine heap_sort_multi
More information about the CIG-COMMITS
mailing list