@@ -96,7 +96,6 @@ subroutine MPP_GATHER_1DV_(sbuf, ssize, rbuf, rsize, pelist)
9696 deallocate(displs)
9797end subroutine MPP_GATHER_1DV_
9898
99-
10099subroutine MPP_GATHER_PELIST_2D_(is, ie, js, je, pelist, array_seg, gather_data, is_root_pe, &
101100 ishift, jshift)
102101 integer, intent(in) :: is, ie, js, je
@@ -106,6 +105,26 @@ subroutine MPP_GATHER_PELIST_2D_(is, ie, js, je, pelist, array_seg, gather_data,
106105 logical, intent(in) :: is_root_pe
107106 integer, optional, intent(in) :: ishift, jshift
108107
108+ integer, dimension(3) :: dim_order
109+
110+ dim_order = (/1,2,3/)
111+
112+ call mpp_gather(is, ie, js, je, pelist, array_seg, gather_data, dim_order, is_root_pe, &
113+ ishift, jshift)
114+ return
115+
116+ end subroutine MPP_GATHER_PELIST_2D_
117+
118+ subroutine MPP_GATHER_PELIST_GEN_2D_(is, ie, js, je, pelist, array_seg, gather_data, dim_order, is_root_pe, &
119+ ishift, jshift)
120+ integer, intent(in) :: is, ie, js, je
121+ integer, dimension(:), intent(in) :: pelist
122+ MPP_TYPE_, dimension(:,:), contiguous, target, intent(in) :: array_seg
123+ MPP_TYPE_, dimension(:,:), contiguous, target, intent(inout) :: gather_data
124+ integer, dimension(3), intent(in) :: dim_order
125+ logical, intent(in) :: is_root_pe
126+ integer, optional, intent(in) :: ishift, jshift
127+
109128 MPP_TYPE_, pointer :: arr3D(:,:,:)
110129 MPP_TYPE_, pointer :: data3D(:,:,:)
111130
@@ -116,11 +135,11 @@ subroutine MPP_GATHER_PELIST_2D_(is, ie, js, je, pelist, array_seg, gather_data,
116135 data3D => null()
117136 endif
118137
119- call mpp_gather(is, ie, js, je, 1, pelist, arr3D, data3D, is_root_pe, &
138+ call mpp_gather(is, ie, js, je, 1, pelist, arr3D, data3D, dim_order, is_root_pe, &
120139 ishift, jshift)
121140 return
122141
123- end subroutine MPP_GATHER_PELIST_2D_
142+ end subroutine MPP_GATHER_PELIST_GEN_2D_
124143
125144
126145subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_data, is_root_pe, &
@@ -132,16 +151,43 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
132151 logical, intent(in) :: is_root_pe
133152 integer, optional, intent(in) :: ishift, jshift
134153
135- integer :: i, j, k
154+ integer, dimension(3) :: dim_order
155+
156+ dim_order = (/1, 2, 3/)
157+
158+ call mpp_gather(is, ie, js, je, nk, pelist, array_seg, gather_data, dim_order, is_root_pe, &
159+ ishift, jshift)
160+ return
161+
162+ end subroutine MPP_GATHER_PELIST_3D_
163+
164+ subroutine MPP_GATHER_PELIST_GEN_3D_(is, ie, js, je, nk, pelist, array_seg, gather_data, dim_order, is_root_pe, &
165+ ishift, jshift)
166+ integer, intent(in) :: is, ie, js, je, nk
167+ integer, dimension(:), intent(in) :: pelist
168+ MPP_TYPE_, dimension(:,:,:), intent(in) :: array_seg
169+ MPP_TYPE_, dimension(:,:,:), intent(inout) :: gather_data
170+ integer, dimension(3), intent(in) :: dim_order
171+ logical, intent(in) :: is_root_pe
172+ integer, optional, intent(in) :: ishift, jshift
173+
136174 integer :: root_pe, root_pe_test
175+ integer :: k, us, ue, vs, ve, ws, we
137176 integer :: i1, i2, j1, j2, ioff, joff
138177 integer :: base_idx, send_count, msg_start
139- integer :: blocksize_i, blocksize_j, blocksize
178+ integer :: blocksize_u, blocksize_v, blocksize_w, blocksize
179+ integer, dimension(3) :: start_idx, stop_idx
140180 integer, dimension(:), allocatable :: gind, counts
141181 MPP_TYPE_, dimension(:), allocatable :: rbuf
142182
143183 if (.not.ANY(mpp_pe().eq.pelist(:))) return
144184
185+ ! Check dim_order is a permutation of 1..3
186+ if ( any(dim_order < 1) .or. any(dim_order > 3) ) call mpp_error(FATAL, &
187+ "fms_io(mpp_gather_pelist): dim_order entries must be in {1,2,3}")
188+ if ( dim_order(1) == dim_order(2) .or. dim_order(1) == dim_order(3) .or. dim_order(2) == dim_order(3) ) &
189+ call mpp_error(FATAL, "fms_io(mpp_gather_pelist): dim_order must be a permutation of 1,2,3")
190+
145191 if (is_root_pe) then
146192 root_pe = mpp_pe()
147193 root_pe_test = 999
@@ -160,7 +206,6 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
160206 if ((is_root_pe) .and. (mpp_pe().ne.root_pe)) call mpp_error(FATAL, &
161207 "fms_io(mpp_gather_pelist): too many root_pes specified")
162208
163-
164209 ioff=0
165210 joff=0
166211 if (present(ishift)) ioff=ishift
@@ -170,7 +215,7 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
170215 if (is_root_pe) allocate(gind(4*size(pelist)))
171216 call mpp_gather((/is, ie, js, je/), gind, pelist)
172217
173- ! Compute and allocate counts and 1d recv buffer (rbuf)
218+ ! Compute recv counts and allocate 1d recv buffer (rbuf)
174219 if (is_root_pe) then
175220 allocate(counts(size(pelist)))
176221
@@ -191,8 +236,14 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
191236
192237 send_count = (ie-is+1)*(je-js+1)*nk
193238
239+ ! Get generalized stop indicies for array_seg
240+ stop_idx = (/ie-is+1, je-js+1, nk/)
241+ ue = stop_idx(dim_order(1))
242+ ve = stop_idx(dim_order(2))
243+ we = stop_idx(dim_order(3))
244+
194245 ! gather data into 1d recv buffer
195- call mpp_gather(reshape(array_seg(is:ie,js:je ,1:nk ),[send_count]), send_count, rbuf, counts, pelist)
246+ call mpp_gather(reshape(array_seg(1:ue,1:ve ,1:we ),[send_count]), send_count, rbuf, counts, pelist)
196247
197248 ! Unpack recv buffer into return array (gather_data)
198249 if (is_root_pe) then
@@ -202,12 +253,22 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
202253 i1 = gind( base_idx + 1 ) + ioff ;; i2 = gind( base_idx + 2 ) + ioff
203254 j1 = gind( base_idx + 3 ) + joff ;; j2 = gind( base_idx + 4 ) + joff
204255
205- blocksize_i = i2 - i1 + 1
206- blocksize_j = j2 - j1 + 1
207- blocksize = blocksize_i * blocksize_j * nk
256+ ! Get generalized start/stop indicies
257+ start_idx = (/i1,j1,1/)
258+ stop_idx = (/i2,j2,nk/)
259+
260+ us = start_idx(dim_order(1)) ;; ue = stop_idx(dim_order(1))
261+ vs = start_idx(dim_order(2)) ;; ve = stop_idx(dim_order(2))
262+ ws = start_idx(dim_order(3)) ;; we = stop_idx(dim_order(3))
208263
209- gather_data(i1:i2, j1:j2, 1:nk) = reshape(rbuf(msg_start:msg_start+blocksize-1), &
210- [blocksize_i, blocksize_j, nk])
264+ ! Compute block sizes
265+ blocksize_u = ue - us + 1
266+ blocksize_v = ve - vs + 1
267+ blocksize_w = we - ws + 1
268+ blocksize = blocksize_u * blocksize_v * blocksize_w
269+
270+ gather_data(us:ue, vs:ve, ws:we) = reshape(rbuf(msg_start:msg_start+blocksize-1), &
271+ [blocksize_u, blocksize_v, blocksize_w])
211272
212273 msg_start = msg_start + blocksize
213274 enddo
@@ -219,5 +280,5 @@ subroutine MPP_GATHER_PELIST_3D_(is, ie, js, je, nk, pelist, array_seg, gather_d
219280
220281 call mpp_sync_self()
221282
222- end subroutine MPP_GATHER_PELIST_3D_
283+ end subroutine MPP_GATHER_PELIST_GEN_3D_
223284!> @}
0 commit comments