12#include "gadgetconfig.h"
20#include "../data/allvars.h"
21#include "../data/dtypes.h"
22#include "../data/mymalloc.h"
23#include "../mpi_utils/mpi_utils.h"
27 int ntask, thistask, ptask;
28 MPI_Comm_size(Communicator, &ntask);
29 MPI_Comm_rank(Communicator, &thistask);
31 for(ptask = 0; ntask > (1 << ptask); ptask++)
34 int *send_count = (
int *)
Mem.mymalloc(
"send_count",
sizeof(
int) * ntask);
35 int *recv_count = (
int *)
Mem.mymalloc(
"recv_count",
sizeof(
int) * ntask);
36 int *send_offset = (
int *)
Mem.mymalloc(
"send_offset",
sizeof(
int) * ntask);
37 int *recv_offset = (
int *)
Mem.mymalloc(
"recv_offset",
sizeof(
int) * ntask);
38 int *blocksize = (
int *)
Mem.mymalloc(
"blocksize",
sizeof(
int) * ntask);
41 int rmd = N - blk * ntask;
42 int pivot_n = rmd * (blk + 1);
45 for(
int task = 0; task < ntask; task++)
48 blocksize[task] = blk + 1;
50 blocksize[task] = blk;
53 loc_first_n += blocksize[task];
56 double *loc_data = (
double *)
Mem.mymalloc(
"loc_data", blocksize[thistask] *
sizeof(
double));
57 memset(loc_data, 0, blocksize[thistask] *
sizeof(
double));
59 for(
int j = 0; j < ntask; j++)
63 for(
int n = 0; n < N; n++)
71 task = rmd + (n - pivot_n) / blk;
77 myMPI_Alltoall(send_count, 1, MPI_INT, recv_count, 1, MPI_INT, Communicator);
79 int nimport = 0, nexport = 0;
81 recv_offset[0] = 0, send_offset[0] = 0;
83 for(
int j = 0; j < ntask; j++)
85 nexport += send_count[j];
86 nimport += recv_count[j];
89 send_offset[j] = send_offset[j - 1] + send_count[j - 1];
90 recv_offset[j] = recv_offset[j - 1] + recv_count[j - 1];
99 ind_data *export_data, *import_data;
101 export_data = (ind_data *)
Mem.mymalloc(
"export_data", nexport *
sizeof(ind_data));
102 import_data = (ind_data *)
Mem.mymalloc(
"import_data", nimport *
sizeof(ind_data));
104 for(
int j = 0; j < ntask; j++)
107 for(
int n = 0; n < N; n++)
114 task = n / (blk + 1);
116 task = rmd + (n - pivot_n) / blk;
118 int index = send_offset[task] + send_count[task]++;
119 export_data[index].n = n;
120 export_data[index].val = loc[n];
124 for(
int ngrp = 0; ngrp < (1 << ptask); ngrp++)
126 int recvTask = thistask ^ ngrp;
128 if(send_count[recvTask] > 0 || recv_count[recvTask] > 0)
129 myMPI_Sendrecv(&export_data[send_offset[recvTask]], send_count[recvTask] *
sizeof(ind_data), MPI_BYTE, recvTask,
TAG_DENS_B,
130 &import_data[recv_offset[recvTask]], recv_count[recvTask] *
sizeof(ind_data), MPI_BYTE, recvTask,
TAG_DENS_B,
131 Communicator, MPI_STATUS_IGNORE);
134 for(
int i = 0; i < nimport; i++)
136 int j = import_data[i].n - loc_first_n;
138 if(j < 0 || j >= blocksize[thistask])
139 Terminate(
"j=%d < 0 || j>= blocksize[thistask]=%d", j, blocksize[thistask]);
141 loc_data[j] += import_data[i].val;
144 Mem.myfree(import_data);
145 Mem.myfree(export_data);
148 int *bytecounts = (
int *)
Mem.mymalloc(
"bytecounts",
sizeof(
int) * ntask);
149 int *byteoffset = (
int *)
Mem.mymalloc(
"byteoffset",
sizeof(
int) * ntask);
151 for(
int task = 0; task < ntask; task++)
152 bytecounts[task] = blocksize[task] *
sizeof(
double);
155 for(
int task = 1; task < ntask; task++)
156 byteoffset[task] = byteoffset[task - 1] + bytecounts[task - 1];
158 myMPI_Allgatherv(loc_data, bytecounts[thistask], MPI_BYTE, glob, bytecounts, byteoffset, MPI_BYTE, Communicator);
160 Mem.myfree(byteoffset);
161 Mem.myfree(bytecounts);
163 Mem.myfree(loc_data);
164 Mem.myfree(blocksize);
165 Mem.myfree(recv_offset);
166 Mem.myfree(send_offset);
167 Mem.myfree(recv_count);
168 Mem.myfree(send_count);
void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm Communicator)
int myMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)