[cig-commits] [commit] devel: Replace Numerical Routines sorting code. (ee7076f)

cig_noreply at geodynamics.org cig_noreply at geodynamics.org
Thu May 1 00:50:13 PDT 2014


Repository : ssh://geoshell/specfem3d

On branch  : devel
Link       : https://github.com/geodynamics/specfem3d/compare/cb32c88d6155d7974561a6f72fc17aea596e2c4d...50aa953c1db3f565d76415f5305410a529996b75

>---------------------------------------------------------------

commit ee7076f9acb4d42f8ad60523ea9db2ad5722b451
Author: Elliott Sales de Andrade <esalesde at physics.utoronto.ca>
Date:   Thu Jan 23 18:54:48 2014 -0500

    Replace Numerical Routines sorting code.
    
    Numerical Routines source code is not re-distributable in any way at all
    and is 100% incompatible with the GPL.
    
    The replacement code originates from the heap_sort.f90 in the global
    code. It appears to be a fairly straight-forward translation of the heap
    sort algorithm (e.g., from RosettaCode) and should be unencumbered. I
    have added additional parameters in order to sort multiple arrays at the
    same time. Interestingly, this code is slightly faster than the
    previous version.


>---------------------------------------------------------------

ee7076f9acb4d42f8ad60523ea9db2ad5722b451
 src/decompose_mesh/fault_scotch.f90   |   6 +-
 src/generate_databases/get_MPI.f90    |  14 +-
 src/shared/get_global.f90             |  14 +-
 src/shared/sort_array_coordinates.f90 | 242 +++++++++++++++++-----------------
 4 files changed, 129 insertions(+), 147 deletions(-)

diff --git a/src/decompose_mesh/fault_scotch.f90 b/src/decompose_mesh/fault_scotch.f90
index fdeab7c..922cc9e 100644
--- a/src/decompose_mesh/fault_scotch.f90
+++ b/src/decompose_mesh/fault_scotch.f90
@@ -296,8 +296,8 @@ CONTAINS
   integer, intent(out) :: locval(nspec)
   double precision, intent(in) :: xyz_c(3,nspec)
 
-  double precision, dimension(nspec) :: work,xp,yp,zp
-  integer, dimension(nspec) :: ind,ninseg,iwork,ibool,iglob
+  double precision, dimension(nspec) :: xp,yp,zp
+  integer, dimension(nspec) :: ninseg,ibool,iglob
   integer :: nglob
   logical :: ifseg(nspec)
   double precision :: xtol
@@ -310,7 +310,7 @@ CONTAINS
   xtol = 1.d-10 * maxval( maxval(xyz_c,2) - minval(xyz_c,2) )
 
   call sort_array_coordinates(nspec,xp,yp,zp,ibool,iglob,locval,ifseg, &
-                              nglob,ind,ninseg,iwork,work,xtol)
+                              nglob,ninseg,xtol)
 
   end subroutine lex_order
 
diff --git a/src/generate_databases/get_MPI.f90 b/src/generate_databases/get_MPI.f90
index 7bf1126..8dfffdc 100644
--- a/src/generate_databases/get_MPI.f90
+++ b/src/generate_databases/get_MPI.f90
@@ -63,14 +63,13 @@
 
   !local parameters
   double precision, dimension(:), allocatable :: xp,yp,zp
-  double precision, dimension(:), allocatable :: work_ext_mesh
 
   integer, dimension(:), allocatable :: locval
   integer, dimension(:), allocatable :: nibool_interfaces_ext_mesh_true
 
   ! for MPI buffers
   integer, dimension(:), allocatable :: reorder_interface_ext_mesh, &
-    ind_ext_mesh,ninseg_ext_mesh,iwork_ext_mesh
+    ninseg_ext_mesh
   logical, dimension(:), allocatable :: ifseg
   integer :: iinterface,ilocnum
   integer :: num_points1, num_points2
@@ -113,14 +112,8 @@
     if( ier /= 0 ) stop 'error allocating array ifseg'
     allocate(reorder_interface_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
     if( ier /= 0 ) stop 'error allocating array reorder_interface_ext_mesh'
-    allocate(ind_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
-    if( ier /= 0 ) stop 'error allocating array ind_ext_mesh'
     allocate(ninseg_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
     if( ier /= 0 ) stop 'error allocating array ninseg_ext_mesh'
-    allocate(iwork_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
-    if( ier /= 0 ) stop 'error allocating array iwork_ext_mesh'
-    allocate(work_ext_mesh(nibool_interfaces_ext_mesh(iinterface)),stat=ier)
-    if( ier /= 0 ) stop 'error allocating array work_ext_mesh'
 
     ! gets x,y,z coordinates of global points on MPI interface
     do ilocnum = 1, nibool_interfaces_ext_mesh(iinterface)
@@ -135,7 +128,7 @@
          ibool_interfaces_ext_mesh(1:nibool_interfaces_ext_mesh(iinterface),iinterface), &
          reorder_interface_ext_mesh,locval,ifseg, &
          nibool_interfaces_ext_mesh_true(iinterface), &
-         ind_ext_mesh,ninseg_ext_mesh,iwork_ext_mesh,work_ext_mesh,SMALLVAL_TOL)
+         ninseg_ext_mesh,SMALLVAL_TOL)
 
     ! checks that number of MPI points are still the same
     num_points1 = num_points1 + nibool_interfaces_ext_mesh(iinterface)
@@ -153,10 +146,7 @@
     deallocate(locval)
     deallocate(ifseg)
     deallocate(reorder_interface_ext_mesh)
-    deallocate(ind_ext_mesh)
     deallocate(ninseg_ext_mesh)
-    deallocate(iwork_ext_mesh)
-    deallocate(work_ext_mesh)
 
   enddo
 
diff --git a/src/shared/get_global.f90 b/src/shared/get_global.f90
index 7f6a4ab..d34713d 100644
--- a/src/shared/get_global.f90
+++ b/src/shared/get_global.f90
@@ -48,8 +48,7 @@
   integer ispec,i,j,ier
   integer ieoff,ilocnum,nseg,ioff,iseg,ig
 
-  integer, dimension(:), allocatable :: ind,ninseg,iwork,idummy
-  double precision, dimension(:), allocatable :: work
+  integer, dimension(:), allocatable :: ninseg,idummy
 
 ! geometry tolerance parameter to calculate number of independent grid points
 ! small value for double precision and to avoid sensitivity to roundoff
@@ -59,25 +58,16 @@
   SMALLVALTOL = 1.d-10 * dabs(UTM_X_MAX - UTM_X_MIN)
 
 ! dynamically allocate arrays
-  allocate(ind(npointot),stat=ier)
-  if( ier /= 0 ) stop 'error allocating array ind'
   allocate(ninseg(npointot),stat=ier)
   if( ier /= 0 ) stop 'error allocating array ninseg'
-  allocate(iwork(npointot),stat=ier)
-  if( ier /= 0 ) stop 'error allocating array iwork'
-  allocate(work(npointot),stat=ier)
-  if( ier /= 0 ) stop 'error allocating array work'
   allocate(idummy(npointot),stat=ier)
   if( ier /= 0 ) stop 'error allocating array idummy'
 
   call sort_array_coordinates(npointot,xp,yp,zp,idummy,iglob,locval,ifseg, &
-                              nglob,ind,ninseg,iwork,work,SMALLVALTOL)
+                              nglob,ninseg,SMALLVALTOL)
 
 ! deallocate arrays
-  deallocate(ind)
   deallocate(ninseg)
-  deallocate(iwork)
-  deallocate(work)
   deallocate(idummy)
 
   end subroutine get_global
diff --git a/src/shared/sort_array_coordinates.f90 b/src/shared/sort_array_coordinates.f90
index a21e7f6..a0a9026 100644
--- a/src/shared/sort_array_coordinates.f90
+++ b/src/shared/sort_array_coordinates.f90
@@ -28,7 +28,7 @@
 ! subroutines to sort MPI buffers to assemble between chunks
 
   subroutine sort_array_coordinates(npointot,x,y,z,ibool,iglob,locval,ifseg, &
-                                    nglob,ind,ninseg,iwork,work,xtol)
+                                    nglob,ninseg,xtol)
 
 ! this routine MUST be in double precision to avoid sensitivity
 ! to roundoff errors in the coordinates of the points
@@ -39,190 +39,192 @@
 
   include "constants.h"
 
-  integer npointot,nglob
+  integer, intent(in) :: npointot
+  double precision, dimension(npointot), intent(inout) :: x, y, z
+  integer, dimension(npointot), intent(inout) :: ibool
+  integer, dimension(npointot), intent(out) :: iglob, locval, ninseg
+  logical, dimension(npointot), intent(out) :: ifseg
+  integer, intent(out) :: nglob
+  double precision, intent(in) :: xtol
 
-  integer ibool(npointot),iglob(npointot),locval(npointot)
-  integer ind(npointot),ninseg(npointot)
-  logical ifseg(npointot)
-  double precision x(npointot),y(npointot),z(npointot)
-  integer iwork(npointot)
-  double precision work(npointot)
   ! local parameters
-  integer ipoin,i,j
-  integer nseg,ioff,iseg,ig
-  double precision xtol
+  integer :: i, j
+  integer :: nseg, ioff, iseg, ig
 
-! establish initial pointers
-  do ipoin=1,npointot
-    locval(ipoin)=ipoin
+  ! establish initial pointers
+  do i=1,npointot
+    locval(i) = i
   enddo
 
-  ifseg(:)=.false.
+  ifseg(:) = .false.
 
-  nseg=1
-  ifseg(1)=.true.
-  ninseg(1)=npointot
+  nseg = 1
+  ifseg(1) = .true.
+  ninseg(1) = npointot
 
   do j=1,NDIM
-! sort within each segment
-    ioff=1
+    ! sort within each segment
+    ioff = 1
     do iseg=1,nseg
-      if(j == 1) then
+      if (j == 1) then
 
-        call rank_buffers(x(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), x(ioff), y(ioff), z(ioff), ibool(ioff), locval(ioff))
 
-      else if(j == 2) then
+      else if (j == 2) then
 
-        call rank_buffers(y(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), y(ioff), x(ioff), z(ioff), ibool(ioff), locval(ioff))
 
       else
 
-        call rank_buffers(z(ioff),ind,ninseg(iseg))
+        call heap_sort_multi(ninseg(iseg), z(ioff), x(ioff), y(ioff), ibool(ioff), locval(ioff))
 
       endif
 
-      call swap_all_buffers(ibool(ioff),locval(ioff), &
-              x(ioff),y(ioff),z(ioff),iwork,work,ind,ninseg(iseg))
-
-      ioff=ioff+ninseg(iseg)
+      ioff = ioff + ninseg(iseg)
     enddo
 
     ! check for jumps in current coordinate
     if (j == 1) then
       do i=2,npointot
-        if(dabs(x(i)-x(i-1)) > xtol) ifseg(i)=.true.
+        if (dabs(x(i) - x(i-1)) > xtol) ifseg(i) = .true.
       enddo
     else if (j == 2) then
       do i=2,npointot
-        if(dabs(y(i)-y(i-1)) > xtol) ifseg(i)=.true.
+        if (dabs(y(i) - y(i-1)) > xtol) ifseg(i) = .true.
       enddo
     else
       do i=2,npointot
-        if(dabs(z(i)-z(i-1)) > xtol) ifseg(i)=.true.
+        if (dabs(z(i) - z(i-1)) > xtol) ifseg(i) = .true.
       enddo
     endif
 
     ! count up number of different segments
-    nseg=0
+    nseg = 0
     do i=1,npointot
-      if(ifseg(i)) then
-        nseg=nseg+1
-        ninseg(nseg)=1
+      if (ifseg(i)) then
+        nseg = nseg + 1
+        ninseg(nseg) = 1
       else
-        ninseg(nseg)=ninseg(nseg)+1
+        ninseg(nseg) = ninseg(nseg) + 1
       endif
     enddo
   enddo
 
-! assign global node numbers (now sorted lexicographically)
-  ig=0
+  ! assign global node numbers (now sorted lexicographically)
+  ig = 0
   do i=1,npointot
-    if(ifseg(i)) ig=ig+1
-    iglob(locval(i))=ig
+    if (ifseg(i)) ig = ig + 1
+    iglob(locval(i)) = ig
   enddo
 
-  nglob=ig
+  nglob = ig
 
   end subroutine sort_array_coordinates
 
 ! -------------------- library for sorting routine ------------------
 
-! sorting routines put here in same file to allow for inlining
+  subroutine heap_sort_multi(N, dx, dy, dz, ia, ib)
 
-  subroutine rank_buffers(A,IND,N)
-!
-! Use Heap Sort (Numerical Recipes)
-!
   implicit none
+  integer, intent(in) :: N
+  double precision, dimension(N), intent(inout) :: dx
+  double precision, dimension(N), intent(inout) :: dy
+  double precision, dimension(N), intent(inout) :: dz
+  integer, dimension(N), intent(inout) :: ia
+  integer, dimension(N), intent(inout) :: ib
 
-  integer n
-  double precision A(n)
-  integer IND(n)
+  integer :: i
 
-  integer i,j,l,ir,indx
-  double precision q
+  ! checks if anything to do
+  if (N < 2) return
 
-  do j=1,n
-    IND(j)=j
+  ! builds heap
+  do i = N/2, 1, -1
+    call heap_sort_siftdown(i, n)
   enddo
 
-  if(n == 1) return
-
-  L=n/2+1
-  ir=n
-  100 CONTINUE
-   IF(l>1) THEN
-      l=l-1
-      indx=ind(l)
-      q=a(indx)
-   ELSE
-      indx=ind(ir)
-      q=a(indx)
-      ind(ir)=ind(1)
-      ir=ir-1
-      if (ir == 1) then
-         ind(1)=indx
-         return
-      endif
-   endif
-   i=l
-   j=l+l
-  200    CONTINUE
-   IF(J <= IR) THEN
-      IF(J < IR) THEN
-         IF(A(IND(j)) < A(IND(j+1))) j=j+1
-      endif
-      IF (q < A(IND(j))) THEN
-         IND(I)=IND(J)
-         I=J
-         J=J+J
-      ELSE
-         J=IR+1
-      endif
-   goto 200
-   endif
-   IND(I)=INDX
-  goto 100
-  end subroutine rank_buffers
+  ! sorts array
+  do i = N, 2, -1
+    ! swaps last and first entry in this section
+    call dswap(dx, 1, i)
+    call dswap(dy, 1, i)
+    call dswap(dz, 1, i)
+    call iswap(ia, 1, i)
+    call iswap(ib, 1, i)
+    call heap_sort_siftdown(1, i - 1)
+  enddo
 
-! -------------------------------------------------------------------
+  contains
 
-  subroutine swap_all_buffers(IA,IB,A,B,C,IW,W,ind,n)
-!
-! swap arrays IA, IB, A, B and C according to addressing in array IND
-!
-  implicit none
+    subroutine dswap(A, i, j)
 
-  integer n
+    double precision, dimension(:), intent(inout) :: A
+    integer, intent(in) :: i
+    integer, intent(in) :: j
 
-  integer IND(n)
-  integer IA(n),IB(n),IW(n)
-  double precision A(n),B(n),C(n),W(n)
+    double precision :: tmp
 
-  integer i
+    tmp = A(i)
+    A(i) = A(j)
+    A(j) = tmp
 
-  W(:) = A(:)
-  IW(:) = IA(:)
+    end subroutine
 
-  do i=1,n
-    A(i)=W(ind(i))
-    IA(i)=IW(ind(i))
-  enddo
+    subroutine iswap(A, i, j)
 
-  W(:) = B(:)
-  IW(:) = IB(:)
+    integer, dimension(:), intent(inout) :: A
+    integer, intent(in) :: i
+    integer, intent(in) :: j
 
-  do i=1,n
-    B(i)=W(ind(i))
-    IB(i)=IW(ind(i))
-  enddo
+    integer :: tmp
 
-  W(:) = C(:)
+    tmp = A(i)
+    A(i) = A(j)
+    A(j) = tmp
 
-  do i=1,n
-    C(i)=W(ind(i))
-  enddo
+    end subroutine
+
+    subroutine heap_sort_siftdown(start, bottom)
+
+    integer, intent(in) :: start
+    integer, intent(in) :: bottom
+
+    integer :: i, j
+    double precision :: xtmp, ytmp, ztmp
+    integer :: atmp, btmp
+
+    i = start
+    xtmp = dx(i)
+    ytmp = dy(i)
+    ztmp = dz(i)
+    atmp = ia(i)
+    btmp = ib(i)
+
+    j = 2 * i
+    do while (j <= bottom)
+      ! chooses larger value first in this section
+      if (j < bottom) then
+        if (dx(j) <= dx(j+1)) j = j + 1
+      endif
+
+      ! checks if section already smaller than initial value
+      if (dx(j) < xtmp) exit
+
+      dx(i) = dx(j)
+      dy(i) = dy(j)
+      dz(i) = dz(j)
+      ia(i) = ia(j)
+      ib(i) = ib(j)
+      i = j
+      j = 2 * i
+    enddo
 
-  end subroutine swap_all_buffers
+    dx(i) = xtmp
+    dy(i) = ytmp
+    dz(i) = ztmp
+    ia(i) = atmp
+    ib(i) = btmp
 
+    end subroutine heap_sort_siftdown
 
+  end subroutine heap_sort_multi



More information about the CIG-COMMITS mailing list