GADGET-4
mpi_utils.h
Go to the documentation of this file.
1/*******************************************************************************
2 * \copyright This file is part of the GADGET4 N-body/SPH code developed
3 * \copyright by Volker Springel. Copyright (C) 2014-2020 by Volker Springel
4 * \copyright (vspringel@mpa-garching.mpg.de) and all contributing authors.
5 *******************************************************************************/
6
11#ifndef MPI_UTILS_H
12#define MPI_UTILS_H
13
14#include <mpi.h>
15#include "../data/dtypes.h"
16#include "../data/mymalloc.h"
17
19#define TAG_TOPNODE_FREE 4
20#define TAG_TOPNODE_OFFSET 5
21#define TAG_TOPNODE_ALLOC 6
22#define TAG_EWALD_ALLOC 7
23#define TAG_TABLE_ALLOC 8
24#define TAG_TABLE_FREE 9
25#define TAG_N 10
26#define TAG_HEADER 11
27#define TAG_PDATA 12
28#define TAG_SPHDATA 13
29#define TAG_KEY 14
30#define TAG_DMOM 15
31#define TAG_NODELEN 16
32#define TAG_HMAX 17
33#define TAG_GRAV_A 18
34#define TAG_GRAV_B 19
35#define TAG_DIRECT_A 20
36#define TAG_DIRECT_B 21
37#define TAG_HYDRO_A 22
38#define TAG_HYDRO_B 23
39#define TAG_NFORTHISTASK 24
40#define TAG_PERIODIC_A 25
41#define TAG_PERIODIC_B 26
42#define TAG_PERIODIC_C 27
43#define TAG_PERIODIC_D 28
44#define TAG_NONPERIOD_A 29
45#define TAG_NONPERIOD_B 30
46#define TAG_NONPERIOD_C 31
47#define TAG_NONPERIOD_D 32
48#define TAG_POTENTIAL_A 33
49#define TAG_POTENTIAL_B 34
50#define TAG_DENS_A 35
51#define TAG_DENS_B 36
52#define TAG_LOCALN 37
53#define TAG_BH_A 38
54#define TAG_BH_B 39
55#define TAG_SMOOTH_A 40
56#define TAG_SMOOTH_B 41
57#define TAG_ENRICH_A 42
58#define TAG_CONDUCT_A 43
59#define TAG_CONDUCT_B 44
60#define TAG_FOF_A 45
61#define TAG_FOF_B 46
62#define TAG_FOF_C 47
63#define TAG_FOF_D 48
64#define TAG_FOF_E 49
65#define TAG_FOF_F 50
66#define TAG_FOF_G 51
67#define TAG_HOTNGB_A 52
68#define TAG_HOTNGB_B 53
69#define TAG_GRAD_A 54
70#define TAG_GRAD_B 55
71
72#define TAG_SE 56
73
74#define TAG_SEARCH_A 58
75#define TAG_SEARCH_B 59
76
77#define TAG_INJECT_A 61
78
79#define TAG_PDATA_SPH 70
80#define TAG_KEY_SPH 71
81
82#define TAG_PDATA_STAR 72
83#define TAG_STARDATA 73
84#define TAG_KEY_STAR 74
85
86#define TAG_PDATA_BH 75
87#define TAG_BHDATA 76
88#define TAG_KEY_BH 77
89
90#define TAG_GRAVCOST_A 79
91#define TAG_GRAVCOST_B 80
92
93#define TAG_PM_FOLD 81
94
95#define TAG_BARRIER 82
96#define TAG_PART_DATA 83
97#define TAG_NODE_DATA 84
98#define TAG_RESULTS 85
99#define TAG_DRIFT_INIT 86
100#define TAG_ALL_UPDATE 87
101#define TAG_METDATA 500
102#define TAG_FETCH_GRAVTREE 1000
103#define TAG_FETCH_SPH_DENSITY 2000
104#define TAG_FETCH_SPH_HYDRO 3000
105#define TAG_FETCH_SPH_TREETIMESTEP 4000
106
107void my_mpi_types_init(void);
108
109int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount,
110 MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status);
111
112int myMPI_Alltoallv_new_prep(int *sendcnt, int *recvcnt, int *rdispls, MPI_Comm comm, int method);
113
114void myMPI_Alltoallv_new(void *sendb, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void *recvb, int *recvcounts, int *rdispls,
115 MPI_Datatype recvtype, MPI_Comm comm, int method);
116
117void myMPI_Alltoallv(void *sendbuf, size_t *sendcounts, size_t *sdispls, void *recvbuf, size_t *recvcounts, size_t *rdispls, int len,
118 int big_flag, MPI_Comm comm);
119
120void my_int_MPI_Alltoallv(void *sendb, int *sendcounts, int *sdispls, void *recvb, int *recvcounts, int *rdispls, int len,
121 int big_flag, MPI_Comm comm);
122
123int myMPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
124
125int myMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs,
126 MPI_Datatype recvtype, MPI_Comm comm);
127
128int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype,
129 MPI_Comm comm);
130
131void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm comm);
132
133void minimum_large_ints(int n, long long *src, long long *res, MPI_Comm comm);
134void sumup_longs(int n, long long *src, long long *res, MPI_Comm comm);
135void sumup_large_ints(int n, int *src, long long *res, MPI_Comm comm);
136
137extern MPI_Datatype MPI_MyIntPosType;
138
139extern MPI_Op MPI_MIN_MyIntPosType;
140extern MPI_Op MPI_MAX_MyIntPosType;
141extern MPI_Op MPI_MIN_MySignedIntPosType;
142extern MPI_Op MPI_MAX_MySignedIntPosType;
143
144template <typename T>
145void allreduce_sum(T *glob, int N, MPI_Comm Communicator)
146{
147 int ntask, thistask, ptask;
148 MPI_Comm_size(Communicator, &ntask);
149 MPI_Comm_rank(Communicator, &thistask);
150
151 for(ptask = 0; ntask > (1 << ptask); ptask++)
152 ;
153
154 // we are responsible for a certain stretch of the result, namely the one starting at loc_first_n, of length blocksize[thistask]
155
156 int *blocksize = (int *)Mem.mymalloc("blocksize", sizeof(int) * ntask);
157 int *blockstart = (int *)Mem.mymalloc("blockstart", sizeof(int) * ntask);
158
159 int blk = N / ntask;
160 int rmd = N - blk * ntask; /* remainder */
161 int pivot_n = rmd * (blk + 1);
162
163 int loc_first_n = 0;
164 blockstart[0] = 0;
165
166 for(int task = 0; task < ntask; task++)
167 {
168 if(task < rmd)
169 blocksize[task] = blk + 1;
170 else
171 blocksize[task] = blk;
172
173 if(task < thistask)
174 loc_first_n += blocksize[task];
175
176 if(task > 0)
177 blockstart[task] = blockstart[task - 1] + blocksize[task - 1];
178 }
179
180 /* here we store the local result */
181 T *loc_data = (T *)Mem.mymalloc_clear("loc_data", blocksize[thistask] * sizeof(T));
182
183 int *send_count = (int *)Mem.mymalloc("send_count", sizeof(int) * ntask);
184 int *recv_count = (int *)Mem.mymalloc("recv_count", sizeof(int) * ntask);
185
186 int *send_offset = (int *)Mem.mymalloc("send_offset", sizeof(int) * ntask);
187
188 struct ind_data
189 {
190 int n;
191 T val;
192 };
193
194 ind_data *export_data = NULL;
195 int nexport = 0;
196
197 for(int rep = 0; rep < 2; rep++)
198 {
199 for(int j = 0; j < ntask; j++)
200 send_count[j] = 0;
201
202 /* find for each non-zero element the processor where it should go for being summed */
203 for(int n = 0; n < N; n++)
204 {
205 if(glob[n] != 0)
206 {
207 int task;
208 if(n < pivot_n)
209 task = n / (blk + 1);
210 else
211 task = rmd + (n - pivot_n) / blk; /* note: if blk=0, then this case can not occur */
212
213 if(rep == 0)
214 send_count[task]++;
215 else
216 {
217 int index = send_offset[task] + send_count[task]++;
218 export_data[index].n = n;
219 export_data[index].val = glob[n];
220 }
221 }
222 }
223
224 if(rep == 0)
225 {
226 myMPI_Alltoall(send_count, 1, MPI_INT, recv_count, 1, MPI_INT, Communicator);
227
228 send_offset[0] = 0;
229
230 for(int j = 0; j < ntask; j++)
231 {
232 nexport += send_count[j];
233
234 if(j > 0)
235 send_offset[j] = send_offset[j - 1] + send_count[j - 1];
236 }
237
238 export_data = (ind_data *)Mem.mymalloc("export_data", nexport * sizeof(ind_data));
239 }
240 else
241 {
242 for(int ngrp = 0; ngrp < (1 << ptask); ngrp++) /* note: here we also have a transfer from each task to itself (for ngrp=0) */
243 {
244 int recvTask = thistask ^ ngrp;
245 if(recvTask < ntask)
246 if(send_count[recvTask] > 0 || recv_count[recvTask] > 0)
247 {
248 int nimport = recv_count[recvTask];
249
250 ind_data *import_data = (ind_data *)Mem.mymalloc("import_data", nimport * sizeof(ind_data));
251
252 myMPI_Sendrecv(&export_data[send_offset[recvTask]], send_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask,
253 TAG_DENS_B, import_data, recv_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask, TAG_DENS_B,
254 Communicator, MPI_STATUS_IGNORE);
255
256 for(int i = 0; i < nimport; i++)
257 {
258 int j = import_data[i].n - loc_first_n;
259
260 if(j < 0 || j >= blocksize[thistask])
261 Terminate("j=%d < 0 || j>= blocksize[thistask]=%d", j, blocksize[thistask]);
262
263 loc_data[j] += import_data[i].val;
264 }
265
266 Mem.myfree(import_data);
267 }
268 }
269
270 Mem.myfree(export_data);
271 }
272 }
273
274 Mem.myfree(send_offset);
275 Mem.myfree(recv_count);
276 Mem.myfree(send_count);
277
278 /* now share the result across all processors */
279 for(int ngrp = 0; ngrp < (1 << ptask); ngrp++) /* note: here we also have a transfer from each task to itself (for ngrp=0) */
280 {
281 int recvTask = thistask ^ ngrp;
282 if(recvTask < ntask)
283 if(blocksize[thistask] > 0 || blocksize[recvTask] > 0)
284 myMPI_Sendrecv(loc_data, blocksize[thistask] * sizeof(T), MPI_BYTE, recvTask, TAG_DENS_A, &glob[blockstart[recvTask]],
285 blocksize[recvTask] * sizeof(T), MPI_BYTE, recvTask, TAG_DENS_A, Communicator, MPI_STATUS_IGNORE);
286 }
287
288 Mem.myfree(loc_data);
289 Mem.myfree(blockstart);
290 Mem.myfree(blocksize);
291}
292
293#endif
#define Terminate(...)
Definition: macros.h:15
MPI_Datatype MPI_MyIntPosType
Definition: mpi_vars.cc:24
#define TAG_DENS_A
Definition: mpi_utils.h:50
void my_int_MPI_Alltoallv(void *sendb, int *sendcounts, int *sdispls, void *recvb, int *recvcounts, int *rdispls, int len, int big_flag, MPI_Comm comm)
Definition: myalltoall.cc:241
MPI_Op MPI_MAX_MySignedIntPosType
Definition: mpi_vars.cc:29
MPI_Op MPI_MIN_MySignedIntPosType
Definition: mpi_vars.cc:28
void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm comm)
void myMPI_Alltoallv_new(void *sendb, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void *recvb, int *recvcounts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm, int method)
Definition: myalltoall.cc:74
void my_mpi_types_init(void)
Definition: mpi_types.cc:72
void allreduce_sum(T *glob, int N, MPI_Comm Communicator)
Definition: mpi_utils.h:145
void sumup_large_ints(int n, int *src, long long *res, MPI_Comm comm)
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: myalltoall.cc:301
int myMPI_Alltoallv_new_prep(int *sendcnt, int *recvcnt, int *rdispls, MPI_Comm comm, int method)
Definition: myalltoall.cc:36
MPI_Op MPI_MAX_MyIntPosType
Definition: mpi_vars.cc:27
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)
int myMPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
int myMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
void minimum_large_ints(int n, long long *src, long long *res, MPI_Comm comm)
MPI_Op MPI_MIN_MyIntPosType
Definition: mpi_vars.cc:26
void sumup_longs(int n, long long *src, long long *res, MPI_Comm comm)
void myMPI_Alltoallv(void *sendbuf, size_t *sendcounts, size_t *sdispls, void *recvbuf, size_t *recvcounts, size_t *rdispls, int len, int big_flag, MPI_Comm comm)
Definition: myalltoall.cc:181
#define TAG_DENS_B
Definition: mpi_utils.h:51
memory Mem
Definition: main.cc:44