[cig-commits] [commit] devel, master: Replace Numerical Routines sorting code. (55b14dc)

cig_noreply at geodynamics.org cig_noreply at geodynamics.org
Thu Nov 6 08:20:15 PST 2014


Repository : https://github.com/geodynamics/specfem3d_globe

On branches: devel,master
Link       : https://github.com/geodynamics/specfem3d_globe/compare/bc58e579b3b0838a0968725a076f5904845437ca...be63f20cbb6f462104e949894dbe205d2398cd7f

>---------------------------------------------------------------

commit 55b14dc030e988ee5712f6ef79108603108c14f2
Author: Elliott Sales de Andrade <esalesde at physics.utoronto.ca>
Date:   Sun Jun 1 01:56:22 2014 -0400

    Replace Numerical Routines sorting code.
    
    Numerical Routines source code is not re-distributable in any way at all
    and is 100% incompatible with the GPL.
    
    The replacement code originates from the heap_sort.f90 in the mesher
    code. It appears to be a fairly straight-forward translation of the heap
    sort algorithm (e.g., from RosettaCode) and should be unencumbered. I
    have added additional parameters in order to sort multiple arrays at the
    same time. Interestingly, this code is slightly faster than the
    previous version.


>---------------------------------------------------------------

55b14dc030e988ee5712f6ef79108603108c14f2
 src/meshfem3D/create_chunk_buffers.f90 |  13 +-
 src/meshfem3D/get_MPI_interfaces.f90   |  13 +-
 src/shared/get_global.f90              |  16 +--
 src/shared/sort_array_coordinates.f90  | 230 ++++++++++++++++-----------------
 4 files changed, 119 insertions(+), 153 deletions(-)

diff --git a/src/meshfem3D/create_chunk_buffers.f90 b/src/meshfem3D/create_chunk_buffers.f90
index 70d0e87..03f0223 100644
--- a/src/meshfem3D/create_chunk_buffers.f90
+++ b/src/meshfem3D/create_chunk_buffers.f90
@@ -87,9 +87,8 @@
   double precision, dimension(:), allocatable :: xstore_selected,ystore_selected,zstore_selected
 
   ! arrays for sorting routine
-  integer, dimension(:), allocatable :: ind,ninseg,iglob,locval,iwork
+  integer, dimension(:), allocatable :: ninseg,iglob,locval
   logical, dimension(:), allocatable :: ifseg
-  double precision, dimension(:), allocatable :: work
 
   ! pairs generated theoretically
 
@@ -239,13 +238,10 @@
           xstore_selected(NGLOB2DMAX_XY), &
           ystore_selected(NGLOB2DMAX_XY), &
           zstore_selected(NGLOB2DMAX_XY), &
-          ind(NGLOB2DMAX_XY), &
           ninseg(NGLOB2DMAX_XY), &
           iglob(NGLOB2DMAX_XY), &
           locval(NGLOB2DMAX_XY), &
           ifseg(NGLOB2DMAX_XY), &
-          iwork(NGLOB2DMAX_XY), &
-          work(NGLOB2DMAX_XY), &
           stat=ier)
   if( ier /= 0 ) call exit_MPI(myrank,'error allocating temporary arrays in create_chunk_buffers')
 
@@ -724,7 +720,7 @@
             ! sort on x, y and z, the other arrays will be swapped as well
 
             call sort_array_coordinates(npoin2D,xstore_selected,ystore_selected,zstore_selected, &
-              ibool_selected,iglob,locval,ifseg,nglob,ind,ninseg,iwork,work)
+                                        ibool_selected,iglob,locval,ifseg,nglob,ninseg)
 
             ! check that no duplicate has been detected
             if(nglob /= npoin2D) call exit_MPI(myrank,'duplicates detected in buffer')
@@ -1023,7 +1019,7 @@
         ! sort array read based upon the coordinates of the points
         ! to ensure conforming matching with other buffers from neighbors
         call sort_array_coordinates(NGLOB1D_RADIAL,xread1D,yread1D,zread1D, &
-                      ibool1D,iglob,locval,ifseg,nglob,ind,ninseg,iwork,work)
+                                    ibool1D,iglob,locval,ifseg,nglob,ninseg)
 
         ! check that no duplicates have been found
         if(nglob /= NGLOB1D_RADIAL) then
@@ -1087,13 +1083,10 @@
   deallocate(xstore_selected)
   deallocate(ystore_selected)
   deallocate(zstore_selected)
-  deallocate(ind)
   deallocate(ninseg)
   deallocate(iglob)
   deallocate(locval)
   deallocate(ifseg)
-  deallocate(iwork)
-  deallocate(work)
 
   deallocate(mask_ibool)
 
diff --git a/src/meshfem3D/get_MPI_interfaces.f90 b/src/meshfem3D/get_MPI_interfaces.f90
index 91e5249..9382c38 100644
--- a/src/meshfem3D/get_MPI_interfaces.f90
+++ b/src/meshfem3D/get_MPI_interfaces.f90
@@ -592,10 +592,9 @@
 
   ! local parameters
   ! arrays for sorting routine
-  double precision, dimension(:), allocatable :: work
   double precision, dimension(:), allocatable :: xstore_selected,ystore_selected,zstore_selected
   integer, dimension(:), allocatable :: ibool_selected
-  integer, dimension(:), allocatable :: ind,ninseg,iglob,locval,iwork
+  integer, dimension(:), allocatable :: ninseg,iglob,locval
   logical, dimension(:), allocatable :: ifseg
   integer :: nglob_selected,i,ipoin,ier
 
@@ -604,13 +603,10 @@
           xstore_selected(npoin), &
           ystore_selected(npoin), &
           zstore_selected(npoin), &
-          ind(npoin), &
           ninseg(npoin), &
           iglob(npoin), &
           locval(npoin), &
-          ifseg(npoin), &
-          iwork(npoin), &
-          work(npoin),stat=ier)
+          ifseg(npoin),stat=ier)
   if( ier /= 0 ) call exit_MPI(myrank,'error sort MPI interface: allocating temporary sorting arrays')
 
   ! sets up working arrays
@@ -627,8 +623,7 @@
   ! sort buffer obtained to be conforming with neighbor in other chunk
   ! sort on x, y and z, the other arrays will be swapped as well
   call sort_array_coordinates(npoin,xstore_selected,ystore_selected,zstore_selected, &
-                             ibool_selected,iglob,locval,ifseg,nglob_selected, &
-                             ind,ninseg,iwork,work)
+                              ibool_selected,iglob,locval,ifseg,nglob_selected,ninseg)
 
   ! check that no duplicate has been detected
   if(nglob_selected /= npoin) call exit_MPI(myrank,'error sort MPI interface: duplicates detected in buffer')
@@ -638,7 +633,7 @@
 
   ! frees array memory
   deallocate(ibool_selected,xstore_selected,ystore_selected,zstore_selected, &
-            ind,ninseg,iglob,locval,ifseg,iwork,work)
+             ninseg,iglob,locval,ifseg)
 
 
   end subroutine sort_MPI_interface
diff --git a/src/shared/get_global.f90 b/src/shared/get_global.f90
index dc4b018..441d230 100644
--- a/src/shared/get_global.f90
+++ b/src/shared/get_global.f90
@@ -48,31 +48,21 @@
   integer, intent(out) :: nglob
 
   ! local variables
-  double precision, dimension(:), allocatable :: work
-  integer, dimension(:), allocatable :: ind,ninseg,iwork,idummy
+  integer, dimension(:), allocatable :: ninseg,idummy
   integer :: ier
 
   ! dynamically allocate arrays
-  allocate(ind(npointot),stat=ier)
-  if (ier /= 0) stop 'error while allocating ind'
-
   allocate(ninseg(npointot),stat=ier)
   if (ier /= 0) stop 'error while allocating ninseg'
 
   allocate(idummy(npointot),stat=ier)
   if (ier /= 0) stop 'error while allocating idummy'
 
-  allocate(iwork(npointot),stat=ier)
-  if (ier /= 0) stop 'error while allocating iwork'
-
-  allocate(work(npointot),stat=ier)
-  if (ier /= 0) stop 'error while allocating work'
-
   call sort_array_coordinates(npointot,xp,yp,zp,idummy,iglob,locval,ifseg, &
-                              nglob,ind,ninseg,iwork,work)
+                              nglob,ninseg)
 
   ! deallocate arrays
-  deallocate(ind,ninseg,idummy,iwork,work)
+  deallocate(ninseg,idummy)
 
   end subroutine get_global
 
diff --git a/src/shared/sort_array_coordinates.f90 b/src/shared/sort_array_coordinates.f90
index fb6521b..bc23aae 100644
--- a/src/shared/sort_array_coordinates.f90
+++ b/src/shared/sort_array_coordinates.f90
@@ -27,9 +27,8 @@
 
 ! subroutines to sort MPI buffers to assemble between chunks
 
-  subroutine sort_array_coordinates(npointot,x,y,z, &
-                                    ibool,iglob,locval,ifseg,nglob, &
-                                    ind,ninseg,iwork,work)
+  subroutine sort_array_coordinates(npointot,x,y,z,ibool,iglob, &
+                                    locval,ifseg,nglob,ninseg)
 
 ! this routine MUST be in double precision to avoid sensitivity
 ! to roundoff errors in the coordinates of the points
@@ -38,198 +37,187 @@
 
   implicit none
 
-  integer :: npointot,nglob
-
-  double precision,dimension(npointot) :: x,y,z
-
-  integer,dimension(npointot) :: ibool,iglob,locval
-  integer,dimension(npointot) :: ind,ninseg
-  logical,dimension(npointot) :: ifseg
-
-  integer,dimension(npointot) :: iwork
-  double precision,dimension(npointot) :: work
+  integer, intent(in) :: npointot
+  double precision, dimension(npointot), intent(inout) :: x, y, z
+  integer, dimension(npointot), intent(inout) :: ibool
+  integer, dimension(npointot), intent(out) :: iglob, locval, ninseg
+  logical, dimension(npointot), intent(out) :: ifseg
+  integer, intent(out) :: nglob
 
   ! local parameters
-  integer :: ipoin,i,j
-  integer :: nseg,ioff,iseg,ig
+  integer :: i, j
+  integer :: nseg, ioff, iseg, ig
 
   ! establish initial pointers
-  do ipoin=1,npointot
-    locval(ipoin)=ipoin
+  do i=1,npointot
+    locval(i) = i
   enddo
 
-  ifseg(:)=.false.
+  ifseg(:) = .false.
 
-  nseg=1
-  ifseg(1)=.true.
-  ninseg(1)=npointot
+  nseg = 1
+  ifseg(1) = .true.
+  ninseg(1) = npointot
 
   do j=1,NDIM
-
     ! sort within each segment
-    ioff=1
+    ioff = 1
     do iseg=1,nseg
       if (j == 1) then
-        call rank_buffers(x(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), x(ioff), y(ioff), z(ioff), ibool(ioff), locval(ioff))
       else if (j == 2) then
-        call rank_buffers(y(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), y(ioff), x(ioff), z(ioff), ibool(ioff), locval(ioff))
       else
-        call rank_buffers(z(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), z(ioff), x(ioff), y(ioff), ibool(ioff), locval(ioff))
       endif
 
-      call swap_all_buffers(ibool(ioff),locval(ioff), &
-              x(ioff),y(ioff),z(ioff),iwork,work,ind,ninseg(iseg))
-
-      ioff=ioff+ninseg(iseg)
+      ioff = ioff + ninseg(iseg)
     enddo
 
     ! check for jumps in current coordinate
     ! define a tolerance, normalized radius is 1., so let's use a small value
     if (j == 1) then
       do i=2,npointot
-        if (dabs(x(i)-x(i-1)) > SMALLVALTOL) ifseg(i)=.true.
+        if (dabs(x(i) - x(i-1)) > SMALLVALTOL) ifseg(i) = .true.
       enddo
     else if (j == 2) then
       do i=2,npointot
-        if (dabs(y(i)-y(i-1)) > SMALLVALTOL) ifseg(i)=.true.
+        if (dabs(y(i) - y(i-1)) > SMALLVALTOL) ifseg(i) = .true.
       enddo
     else
       do i=2,npointot
-        if (dabs(z(i)-z(i-1)) > SMALLVALTOL) ifseg(i)=.true.
+        if (dabs(z(i) - z(i-1)) > SMALLVALTOL) ifseg(i) = .true.
       enddo
     endif
 
     ! count up number of different segments
-    nseg=0
+    nseg = 0
     do i=1,npointot
       if (ifseg(i)) then
-        nseg=nseg+1
-        ninseg(nseg)=1
+        nseg = nseg + 1
+        ninseg(nseg) = 1
       else
-        ninseg(nseg)=ninseg(nseg)+1
+        ninseg(nseg) = ninseg(nseg) + 1
       endif
     enddo
 
   enddo
 
   ! assign global node numbers (now sorted lexicographically)
-  ig=0
+  ig = 0
   do i=1,npointot
-    if (ifseg(i)) ig=ig+1
-    iglob(locval(i))=ig
+    if (ifseg(i)) ig = ig + 1
+    iglob(locval(i)) = ig
   enddo
 
-  nglob=ig
+  nglob = ig
 
   end subroutine sort_array_coordinates
 
 ! -------------------- library for sorting routine ------------------
 
-! sorting routines put here in same file to allow for inlining
+  subroutine heap_sort_multi(N, dx, dy, dz, ia, ib)
 
-  subroutine rank_buffers(A,IND,N)
-!
-! Use Heap Sort (Numerical Recipes)
-!
   implicit none
+  integer, intent(in) :: N
+  double precision, dimension(N), intent(inout) :: dx
+  double precision, dimension(N), intent(inout) :: dy
+  double precision, dimension(N), intent(inout) :: dz
+  integer, dimension(N), intent(inout) :: ia
+  integer, dimension(N), intent(inout) :: ib
 
-  integer :: n
-  double precision,dimension(n) :: A
-  integer,dimension(n) :: IND
+  integer :: i
 
-  ! local parameters
-  integer :: i,j,l,ir,indx
-  double precision :: q
+  ! checks if anything to do
+  if (N < 2) return
 
-  do j=1,n
-    IND(j)=j
+  ! builds heap
+  do i = N/2, 1, -1
+    call heap_sort_siftdown(i, n)
   enddo
 
-  if(n == 1) return
-
-  L = n/2 + 1
-  ir = n
-
-  do while( .true. )
+  ! sorts array
+  do i = N, 2, -1
+    ! swaps last and first entry in this section
+    call dswap(dx, 1, i)
+    call dswap(dy, 1, i)
+    call dswap(dz, 1, i)
+    call iswap(ia, 1, i)
+    call iswap(ib, 1, i)
+    call heap_sort_siftdown(1, i - 1)
+  enddo
 
-    IF ( l > 1 ) THEN
-      l = l-1
-      indx = ind(l)
-      q = a(indx)
-    ELSE
-      indx = ind(ir)
-      q = a(indx)
-      ind(ir) = ind(1)
-      ir = ir-1
+  contains
 
-      ! checks exit criterion
-      if (ir == 1) then
-         ind(1) = indx
-         return
-      endif
+    subroutine dswap(A, i, j)
 
-    endif
+    double precision, dimension(:), intent(inout) :: A
+    integer, intent(in) :: i
+    integer, intent(in) :: j
 
-    i = l
-    j = l+l
+    double precision :: tmp
 
-    do while( J <= IR )
-      IF ( J < IR ) THEN
-        IF ( A(IND(j)) < A(IND(j+1)) ) j=j+1
-      endif
-      IF ( q < A(IND(j)) ) THEN
-        IND(I) = IND(J)
-        I = J
-        J = J+J
-      ELSE
-        J = IR+1
-      endif
+    tmp = A(i)
+    A(i) = A(j)
+    A(j) = tmp
 
-    enddo
+    end subroutine
 
-    IND(I)=INDX
-  enddo
+    subroutine iswap(A, i, j)
 
-  end subroutine rank_buffers
+    integer, dimension(:), intent(inout) :: A
+    integer, intent(in) :: i
+    integer, intent(in) :: j
 
-! -------------------------------------------------------------------
+    integer :: tmp
 
-  subroutine swap_all_buffers(IA,IB,A,B,C,IW,W,ind,n)
-!
-! swap arrays IA, IB, A, B and C according to addressing in array IND
-!
-  implicit none
+    tmp = A(i)
+    A(i) = A(j)
+    A(j) = tmp
 
-  integer :: n
-  integer,dimension(n) :: IND
-  integer,dimension(n) :: IA,IB,IW
-  double precision,dimension(n) :: A,B,C,W
+    end subroutine
 
-  ! local parameter
-  integer :: i
+    subroutine heap_sort_siftdown(start, bottom)
 
-  W(:) = A(:)
-  IW(:) = IA(:)
+    integer, intent(in) :: start
+    integer, intent(in) :: bottom
 
-  do i=1,n
-    A(i)=W(ind(i))
-    IA(i)=IW(ind(i))
-  enddo
+    integer :: i, j
+    double precision :: xtmp, ytmp, ztmp
+    integer :: atmp, btmp
 
-  W(:) = B(:)
-  IW(:) = IB(:)
+    i = start
+    xtmp = dx(i)
+    ytmp = dy(i)
+    ztmp = dz(i)
+    atmp = ia(i)
+    btmp = ib(i)
 
-  do i=1,n
-    B(i)=W(ind(i))
-    IB(i)=IW(ind(i))
-  enddo
+    j = 2 * i
+    do while (j <= bottom)
+      ! chooses larger value first in this section
+      if (j < bottom) then
+        if (dx(j) <= dx(j+1)) j = j + 1
+      endif
 
-  W(:) = C(:)
+      ! checks if section already smaller than initial value
+      if (dx(j) < xtmp) exit
 
-  do i=1,n
-    C(i)=W(ind(i))
-  enddo
+      dx(i) = dx(j)
+      dy(i) = dy(j)
+      dz(i) = dz(j)
+      ia(i) = ia(j)
+      ib(i) = ib(j)
+      i = j
+      j = 2 * i
+    enddo
 
-  end subroutine swap_all_buffers
+    dx(i) = xtmp
+    dy(i) = ytmp
+    dz(i) = ztmp
+    ia(i) = atmp
+    ib(i) = btmp
 
+    end subroutine heap_sort_siftdown
 
+  end subroutine heap_sort_multi



More information about the CIG-COMMITS mailing list