15#include "../data/dtypes.h"
16#include "../data/mymalloc.h"
19#define TAG_TOPNODE_FREE 4
20#define TAG_TOPNODE_OFFSET 5
21#define TAG_TOPNODE_ALLOC 6
22#define TAG_EWALD_ALLOC 7
23#define TAG_TABLE_ALLOC 8
24#define TAG_TABLE_FREE 9
35#define TAG_DIRECT_A 20
36#define TAG_DIRECT_B 21
39#define TAG_NFORTHISTASK 24
40#define TAG_PERIODIC_A 25
41#define TAG_PERIODIC_B 26
42#define TAG_PERIODIC_C 27
43#define TAG_PERIODIC_D 28
44#define TAG_NONPERIOD_A 29
45#define TAG_NONPERIOD_B 30
46#define TAG_NONPERIOD_C 31
47#define TAG_NONPERIOD_D 32
48#define TAG_POTENTIAL_A 33
49#define TAG_POTENTIAL_B 34
55#define TAG_SMOOTH_A 40
56#define TAG_SMOOTH_B 41
57#define TAG_ENRICH_A 42
58#define TAG_CONDUCT_A 43
59#define TAG_CONDUCT_B 44
67#define TAG_HOTNGB_A 52
68#define TAG_HOTNGB_B 53
74#define TAG_SEARCH_A 58
75#define TAG_SEARCH_B 59
77#define TAG_INJECT_A 61
79#define TAG_PDATA_SPH 70
82#define TAG_PDATA_STAR 72
83#define TAG_STARDATA 73
84#define TAG_KEY_STAR 74
86#define TAG_PDATA_BH 75
90#define TAG_GRAVCOST_A 79
91#define TAG_GRAVCOST_B 80
96#define TAG_PART_DATA 83
97#define TAG_NODE_DATA 84
99#define TAG_DRIFT_INIT 86
100#define TAG_ALL_UPDATE 87
101#define TAG_METDATA 500
102#define TAG_FETCH_GRAVTREE 1000
103#define TAG_FETCH_SPH_DENSITY 2000
104#define TAG_FETCH_SPH_HYDRO 3000
105#define TAG_FETCH_SPH_TREETIMESTEP 4000
109int myMPI_Sendrecv(
void *sendbuf,
size_t sendcount, MPI_Datatype sendtype,
int dest,
int sendtag,
void *recvbuf,
size_t recvcount,
110 MPI_Datatype recvtype,
int source,
int recvtag, MPI_Comm comm, MPI_Status *status);
114void myMPI_Alltoallv_new(
void *sendb,
int *sendcounts,
int *sdispls, MPI_Datatype sendtype,
void *recvb,
int *recvcounts,
int *rdispls,
115 MPI_Datatype recvtype, MPI_Comm comm,
int method);
117void myMPI_Alltoallv(
void *sendbuf,
size_t *sendcounts,
size_t *sdispls,
void *recvbuf,
size_t *recvcounts,
size_t *rdispls,
int len,
118 int big_flag, MPI_Comm comm);
120void my_int_MPI_Alltoallv(
void *sendb,
int *sendcounts,
int *sdispls,
void *recvb,
int *recvcounts,
int *rdispls,
int len,
121 int big_flag, MPI_Comm comm);
123int myMPI_Allreduce(
const void *sendbuf,
void *recvbuf,
int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
125int myMPI_Allgatherv(
void *sendbuf,
int sendcount, MPI_Datatype sendtype,
void *recvbuf,
int *recvcount,
int *displs,
126 MPI_Datatype recvtype, MPI_Comm comm);
128int myMPI_Alltoall(
const void *sendbuf,
int sendcount, MPI_Datatype sendtype,
void *recvbuf,
int recvcount, MPI_Datatype recvtype,
134void sumup_longs(
int n,
long long *src,
long long *res, MPI_Comm comm);
147 int ntask, thistask, ptask;
148 MPI_Comm_size(Communicator, &ntask);
149 MPI_Comm_rank(Communicator, &thistask);
151 for(ptask = 0; ntask > (1 << ptask); ptask++)
156 int *blocksize = (
int *)
Mem.mymalloc(
"blocksize",
sizeof(
int) * ntask);
157 int *blockstart = (
int *)
Mem.mymalloc(
"blockstart",
sizeof(
int) * ntask);
160 int rmd = N - blk * ntask;
161 int pivot_n = rmd * (blk + 1);
166 for(
int task = 0; task < ntask; task++)
169 blocksize[task] = blk + 1;
171 blocksize[task] = blk;
174 loc_first_n += blocksize[task];
177 blockstart[task] = blockstart[task - 1] + blocksize[task - 1];
181 T *loc_data = (T *)
Mem.mymalloc_clear(
"loc_data", blocksize[thistask] *
sizeof(T));
183 int *send_count = (
int *)
Mem.mymalloc(
"send_count",
sizeof(
int) * ntask);
184 int *recv_count = (
int *)
Mem.mymalloc(
"recv_count",
sizeof(
int) * ntask);
186 int *send_offset = (
int *)
Mem.mymalloc(
"send_offset",
sizeof(
int) * ntask);
194 ind_data *export_data = NULL;
197 for(
int rep = 0; rep < 2; rep++)
199 for(
int j = 0; j < ntask; j++)
203 for(
int n = 0; n < N; n++)
209 task = n / (blk + 1);
211 task = rmd + (n - pivot_n) / blk;
217 int index = send_offset[task] + send_count[task]++;
218 export_data[index].n = n;
219 export_data[index].val = glob[n];
226 myMPI_Alltoall(send_count, 1, MPI_INT, recv_count, 1, MPI_INT, Communicator);
230 for(
int j = 0; j < ntask; j++)
232 nexport += send_count[j];
235 send_offset[j] = send_offset[j - 1] + send_count[j - 1];
238 export_data = (ind_data *)
Mem.mymalloc(
"export_data", nexport *
sizeof(ind_data));
242 for(
int ngrp = 0; ngrp < (1 << ptask); ngrp++)
244 int recvTask = thistask ^ ngrp;
246 if(send_count[recvTask] > 0 || recv_count[recvTask] > 0)
248 int nimport = recv_count[recvTask];
250 ind_data *import_data = (ind_data *)
Mem.mymalloc(
"import_data", nimport *
sizeof(ind_data));
252 myMPI_Sendrecv(&export_data[send_offset[recvTask]], send_count[recvTask] *
sizeof(ind_data), MPI_BYTE, recvTask,
253 TAG_DENS_B, import_data, recv_count[recvTask] *
sizeof(ind_data), MPI_BYTE, recvTask,
TAG_DENS_B,
254 Communicator, MPI_STATUS_IGNORE);
256 for(
int i = 0; i < nimport; i++)
258 int j = import_data[i].n - loc_first_n;
260 if(j < 0 || j >= blocksize[thistask])
261 Terminate(
"j=%d < 0 || j>= blocksize[thistask]=%d", j, blocksize[thistask]);
263 loc_data[j] += import_data[i].val;
266 Mem.myfree(import_data);
270 Mem.myfree(export_data);
274 Mem.myfree(send_offset);
275 Mem.myfree(recv_count);
276 Mem.myfree(send_count);
279 for(
int ngrp = 0; ngrp < (1 << ptask); ngrp++)
281 int recvTask = thistask ^ ngrp;
283 if(blocksize[thistask] > 0 || blocksize[recvTask] > 0)
284 myMPI_Sendrecv(loc_data, blocksize[thistask] *
sizeof(T), MPI_BYTE, recvTask,
TAG_DENS_A, &glob[blockstart[recvTask]],
285 blocksize[recvTask] *
sizeof(T), MPI_BYTE, recvTask,
TAG_DENS_A, Communicator, MPI_STATUS_IGNORE);
288 Mem.myfree(loc_data);
289 Mem.myfree(blockstart);
290 Mem.myfree(blocksize);
MPI_Datatype MPI_MyIntPosType
void my_int_MPI_Alltoallv(void *sendb, int *sendcounts, int *sdispls, void *recvb, int *recvcounts, int *rdispls, int len, int big_flag, MPI_Comm comm)
MPI_Op MPI_MAX_MySignedIntPosType
MPI_Op MPI_MIN_MySignedIntPosType
void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm comm)
void myMPI_Alltoallv_new(void *sendb, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void *recvb, int *recvcounts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm, int method)
void my_mpi_types_init(void)
void allreduce_sum(T *glob, int N, MPI_Comm Communicator)
void sumup_large_ints(int n, int *src, long long *res, MPI_Comm comm)
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
int myMPI_Alltoallv_new_prep(int *sendcnt, int *recvcnt, int *rdispls, MPI_Comm comm, int method)
MPI_Op MPI_MAX_MyIntPosType
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)
int myMPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
int myMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
void minimum_large_ints(int n, long long *src, long long *res, MPI_Comm comm)
MPI_Op MPI_MIN_MyIntPosType
void sumup_longs(int n, long long *src, long long *res, MPI_Comm comm)
void myMPI_Alltoallv(void *sendbuf, size_t *sendcounts, size_t *sdispls, void *recvbuf, size_t *recvcounts, size_t *rdispls, int len, int big_flag, MPI_Comm comm)