GADGET-4
allreduce_sparse_double_sum.cc
Go to the documentation of this file.
1/*******************************************************************************
2 * \copyright This file is part of the GADGET4 N-body/SPH code developed
3 * \copyright by Volker Springel. Copyright (C) 2014-2020 by Volker Springel
4 * \copyright (vspringel@mpa-garching.mpg.de) and all contributing authors.
5 *******************************************************************************/
6
12#include "gadgetconfig.h"
13
14#include <math.h>
15#include <mpi.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19
20#include "../data/allvars.h"
21#include "../data/dtypes.h"
22#include "../data/mymalloc.h"
23#include "../mpi_utils/mpi_utils.h"
24
25void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm Communicator)
26{
27 int ntask, thistask, ptask;
28 MPI_Comm_size(Communicator, &ntask);
29 MPI_Comm_rank(Communicator, &thistask);
30
31 for(ptask = 0; ntask > (1 << ptask); ptask++)
32 ;
33
34 int *send_count = (int *)Mem.mymalloc("send_count", sizeof(int) * ntask);
35 int *recv_count = (int *)Mem.mymalloc("recv_count", sizeof(int) * ntask);
36 int *send_offset = (int *)Mem.mymalloc("send_offset", sizeof(int) * ntask);
37 int *recv_offset = (int *)Mem.mymalloc("recv_offset", sizeof(int) * ntask);
38 int *blocksize = (int *)Mem.mymalloc("blocksize", sizeof(int) * ntask);
39
40 int blk = N / ntask;
41 int rmd = N - blk * ntask; /* remainder */
42 int pivot_n = rmd * (blk + 1);
43
44 int loc_first_n = 0;
45 for(int task = 0; task < ntask; task++)
46 {
47 if(task < rmd)
48 blocksize[task] = blk + 1;
49 else
50 blocksize[task] = blk;
51
52 if(task < thistask)
53 loc_first_n += blocksize[task];
54 }
55
56 double *loc_data = (double *)Mem.mymalloc("loc_data", blocksize[thistask] * sizeof(double));
57 memset(loc_data, 0, blocksize[thistask] * sizeof(double));
58
59 for(int j = 0; j < ntask; j++)
60 send_count[j] = 0;
61
62 /* find for each non-zero element the processor where it should go for being summed */
63 for(int n = 0; n < N; n++)
64 {
65 if(loc[n] != 0)
66 {
67 int task;
68 if(n < pivot_n)
69 task = n / (blk + 1);
70 else
71 task = rmd + (n - pivot_n) / blk; /* note: if blk=0, then this case can not occur */
72
73 send_count[task]++;
74 }
75 }
76
77 myMPI_Alltoall(send_count, 1, MPI_INT, recv_count, 1, MPI_INT, Communicator);
78
79 int nimport = 0, nexport = 0;
80
81 recv_offset[0] = 0, send_offset[0] = 0;
82
83 for(int j = 0; j < ntask; j++)
84 {
85 nexport += send_count[j];
86 nimport += recv_count[j];
87 if(j > 0)
88 {
89 send_offset[j] = send_offset[j - 1] + send_count[j - 1];
90 recv_offset[j] = recv_offset[j - 1] + recv_count[j - 1];
91 }
92 }
93
94 struct ind_data
95 {
96 int n;
97 double val;
98 };
99 ind_data *export_data, *import_data;
100
101 export_data = (ind_data *)Mem.mymalloc("export_data", nexport * sizeof(ind_data));
102 import_data = (ind_data *)Mem.mymalloc("import_data", nimport * sizeof(ind_data));
103
104 for(int j = 0; j < ntask; j++)
105 send_count[j] = 0;
106
107 for(int n = 0; n < N; n++)
108 {
109 if(loc[n] != 0)
110 {
111 int task;
112
113 if(n < pivot_n)
114 task = n / (blk + 1);
115 else
116 task = rmd + (n - pivot_n) / blk; /* note: if blk=0, then this case can not occur */
117
118 int index = send_offset[task] + send_count[task]++;
119 export_data[index].n = n;
120 export_data[index].val = loc[n];
121 }
122 }
123
124 for(int ngrp = 0; ngrp < (1 << ptask); ngrp++) /* note: here we also have a transfer from each task to itself (for ngrp=0) */
125 {
126 int recvTask = thistask ^ ngrp;
127 if(recvTask < ntask)
128 if(send_count[recvTask] > 0 || recv_count[recvTask] > 0)
129 myMPI_Sendrecv(&export_data[send_offset[recvTask]], send_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask, TAG_DENS_B,
130 &import_data[recv_offset[recvTask]], recv_count[recvTask] * sizeof(ind_data), MPI_BYTE, recvTask, TAG_DENS_B,
131 Communicator, MPI_STATUS_IGNORE);
132 }
133
134 for(int i = 0; i < nimport; i++)
135 {
136 int j = import_data[i].n - loc_first_n;
137
138 if(j < 0 || j >= blocksize[thistask])
139 Terminate("j=%d < 0 || j>= blocksize[thistask]=%d", j, blocksize[thistask]);
140
141 loc_data[j] += import_data[i].val;
142 }
143
144 Mem.myfree(import_data);
145 Mem.myfree(export_data);
146
147 /* now share the cost data across all processors */
148 int *bytecounts = (int *)Mem.mymalloc("bytecounts", sizeof(int) * ntask);
149 int *byteoffset = (int *)Mem.mymalloc("byteoffset", sizeof(int) * ntask);
150
151 for(int task = 0; task < ntask; task++)
152 bytecounts[task] = blocksize[task] * sizeof(double);
153
154 byteoffset[0] = 0;
155 for(int task = 1; task < ntask; task++)
156 byteoffset[task] = byteoffset[task - 1] + bytecounts[task - 1];
157
158 myMPI_Allgatherv(loc_data, bytecounts[thistask], MPI_BYTE, glob, bytecounts, byteoffset, MPI_BYTE, Communicator);
159
160 Mem.myfree(byteoffset);
161 Mem.myfree(bytecounts);
162
163 Mem.myfree(loc_data);
164 Mem.myfree(blocksize);
165 Mem.myfree(recv_offset);
166 Mem.myfree(send_offset);
167 Mem.myfree(recv_count);
168 Mem.myfree(send_count);
169}
void allreduce_sparse_double_sum(double *loc, double *glob, int N, MPI_Comm Communicator)
int myMPI_Allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcount, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
#define Terminate(...)
Definition: macros.h:15
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: myalltoall.cc:301
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)
#define TAG_DENS_B
Definition: mpi_utils.h:51
memory Mem
Definition: main.cc:44