GADGET-4
generic_comm.h
Go to the documentation of this file.
1/*******************************************************************************
2 * \copyright This file is part of the GADGET4 N-body/SPH code developed
3 * \copyright by Volker Springel. Copyright (C) 2014-2020 by Volker Springel
4 * \copyright (vspringel@mpa-garching.mpg.de) and all contributing authors.
5 *******************************************************************************/
6
12#ifndef GENERIC_COMM_H
13#define GENERIC_COMM_H
14
15#include "../domain/domain.h"
16#include "../logs/logs.h"
17#include "../mpi_utils/mpi_utils.h"
18
19#define EXTRA_SPACE 16384
20
22{
24};
25
26template <typename T_in, typename T_out, typename T_tree, typename T_domain, typename T_partset>
28{
29 public:
30 virtual void particle2in(T_in *in, int i) = 0; // Pure virtual function, *must* be overridden
31 virtual void out2particle(T_out *out, int i, int mode) = 0; // Pure virtual function, *must* be overridden
32 virtual int evaluate(int target, int mode, int thread_id, int action, T_in *in, int numnodes, node_info *firstnode,
33 T_out &out) = 0; // Pure virtual function, *must* be overridden
34
35 T_domain *D;
36 T_tree *Tree;
37 T_partset *Tp;
38
40
41 generic_comm(T_domain *dptr, T_tree *tptr, T_partset *pptr)
42 {
43 D = dptr;
44 Tree = tptr;
45 Tp = pptr;
46
47 set_MaxNexport(); /* initialisation */
48 }
49
50 /* some public diagnostic info */
51
52 long long SumNexport;
53
54 private:
55 enum logs::timers cpu_primary = logs::CPU_NONE;
56 enum logs::timers cpu_secondary = logs::CPU_NONE;
57 enum logs::timers cpu_imbalance = logs::CPU_NONE;
58
59 int Nactive;
60 int *Targets;
61 int actioncode;
62
63 T_in *DataIn, *DataGet;
64 T_out *DataResult, *DataOut;
65
66 struct send_recv_counts
67 {
68 int Count;
69 int CountNodes;
70 };
71 send_recv_counts *Send, *Recv;
72
75 int *Send_offset,
78 *Send_count,
81 *Recv_count,
84 *Recv_offset;
85
86 int *Send_offset_nodes, *Send_count_nodes, *Recv_count_nodes, *Recv_offset_nodes;
87
90 int *Exportflag;
93 int *Exportnodecount;
96 int *Exportindex;
97
98 size_t ExportSpace;
99 size_t MinSpace;
100 int NextParticle;
101 int Nexport, Nimport;
102 int NexportNodes, NimportNodes;
103
104 node_info *NodeInfoIn;
105 node_info *NodeInfoGet;
106
107 char callorigin[1000];
108
109 void primary_loop(int mode)
110 {
111 if(cpu_primary != logs::CPU_NONE)
112 Logs.timer_start(cpu_primary);
113
114 int idx;
115
116 int j;
117
118 for(j = 0; j < D->NTask; j++)
119 Thread.Exportflag[j] = -1;
120
121 while(1)
122 {
123 if(Thread.ExportSpace < MinSpace)
124 break;
125
126 idx = NextParticle++;
127
128 if(idx >= Nactive)
129 break;
130
131 int i = Targets[idx];
132 if(i < 0)
133 continue;
134
135 T_in local;
136 T_out out;
137 particle2in(&local, i);
138 local.Firstnode = 0;
139
140 int num = evaluate(i, mode, 0, actioncode, &local, 1, NULL, out);
141
142 out2particle(&out, i, mode);
143
144 Thread.Interactions += num;
145 }
146
147 if(cpu_primary != logs::CPU_NONE)
148 Logs.timer_stop(cpu_primary);
149 }
150
151 void secondary_loop(void)
152 {
153 if(cpu_secondary != logs::CPU_NONE)
154 Logs.timer_start(cpu_secondary);
155
156 /* now do the particles that were sent to us */
157 int i, cnt = 0;
158
159 {
160 while(1)
161 {
162 i = cnt++;
163
164 if(i >= Nimport)
165 break;
166
167 int numnodes;
168 node_info *firstnode;
169 generic_get_numnodes(i, &numnodes, &firstnode);
170
171 T_in *in = &DataGet[i];
172 T_out out;
173
174 int num = evaluate(i, MODE_IMPORTED_PARTICLES, 0, actioncode, in, numnodes, firstnode, out);
175
176 DataResult[i] = out;
177
178 Thread.Interactions += num;
179 }
180 }
181
182 if(cpu_secondary != logs::CPU_NONE)
183 Logs.timer_stop(cpu_secondary);
184 }
185
186 /* this function determines how much buffer space we may use based on the memory that is locally still free,
187 * and it computes how much memory may at most be needed to process a single particle. We will only continue with a particle
188 * if this can still be safely processed.
189 */
190
191 /* this function does the memory allocation at the beginning of a loop over the remaining local particles.
192 * The fields PartList[] and NodeList[] share the buffer space of size "ExportSpace" (in bytes).
193 * Here PartList will be filled in from the beginning, while NodeList will be filled in from the end.
194 * Since we do not know a priory the relative share of these two fields, we can make optimum use of
195 * the available space in this way.
196 */
197
198 void set_MaxNexport(const char *file = __FILE__, int line = __LINE__)
199 {
200 ExportSpace = 0.5 * (Mem.FreeBytes); /* we just grab at most half of the still available memory here */
201
202 if(ExportSpace <= Tp->NumPart * sizeof(int))
203 {
205 Terminate("It seems we have too little space left for properly sized ExportSpace... (%lld %lld) Need more memory.\n",
206 (long long)ExportSpace, (long long)Tp->NumPart * sizeof(int))
207 }
208
209 ExportSpace -= Tp->NumPart * sizeof(int); /* to account for the neighbor list buffer that the process allocated */
210
211 /* make the size a multiple both of data_partlist and data_nodelist */
212 ExportSpace /= (sizeof(data_partlist) * sizeof(data_nodelist));
213 ExportSpace *= (sizeof(data_partlist) * sizeof(data_nodelist));
214
215 MinSpace = (D->NTask - 1) * (sizeof(data_partlist) + sizeof(T_in) + sizeof(T_out)) +
216 D->NTopleaves * (sizeof(data_nodelist) + sizeof(int));
217
218 sprintf(callorigin, "%s|%d|", file, line);
219
220 if(ExportSpace < MinSpace)
221 {
223 Terminate(
224 "Bummer. Can't even safely process a single particle for the available memory. FreeBytes=%lld ExportSpace=%lld "
225 "MinSpace=%lld D->NTask=%d NTopleaves=%d",
226 (long long)Mem.FreeBytes, (long long)ExportSpace, (long long)MinSpace, D->NTask, D->NTopleaves);
227 }
228 }
229
230 /* this function does the memory allocation at the beginning of a loop over the remaining local particles.
231 * The fields PartList[] and NodeList[] share the buffer space of size "ExportSpace" (in bytes).
232 * Here PartList will be filled in from the beginning, while NodeList will be filled in from the end.
233 * Since we do not know a priory the relative share of these two fields, we can make optimum use of
234 * the available space in this way.
235 */
236 void generic_alloc_partlist_nodelist_ngblist_threadbufs(void)
237 {
238 Thread.Nexport = 0;
240 Thread.ExportSpace = ExportSpace;
241 Thread.InitialSpace = ExportSpace;
242 Thread.ItemSize = (sizeof(data_partlist) + sizeof(T_in) + sizeof(T_out));
243
244 Thread.PartList = (data_partlist *)Mem.mymalloc_movable_g(&Thread.PartList, "PartList", ExportSpace);
245 /* note: the NodeList array will be attached to the end of this buffer, growing backwards */
246
247 Thread.Ngblist = (int *)Mem.mymalloc_movable_g(&Thread.Ngblist, "Ngblist", Tp->NumPart * sizeof(int));
248 Thread.Shmranklist = (int *)Mem.mymalloc_movable_g(&Thread.Shmranklist, "Shmranklist", Tp->NumPart * sizeof(int));
249 Thread.Exportflag = Exportflag;
250 }
251
252 void generic_free_partlist_nodelist_ngblist_threadbufs(void)
253 {
254 Mem.myfree(Thread.Shmranklist);
255 Mem.myfree(Thread.Ngblist);
256 Mem.myfree(Thread.PartList);
257 Thread.Shmranklist = NULL;
258 Thread.Ngblist = NULL;
259 Thread.PartList = NULL;
260 }
261
262 void generic_prepare_export_counts(void)
263 {
264 for(int j = 0; j < D->NTask; j++)
265 {
266 Send[j].Count = 0;
267 Send[j].CountNodes = 0;
268 }
269
270 Nexport = 0;
271 NexportNodes = 0;
272
273 for(int j = 0; j < Thread.Nexport; j++)
274 Send[Thread.PartList[j].Task].Count++;
275
276 data_nodelist *nodelist = (data_nodelist *)(((char *)Thread.PartList) + Thread.InitialSpace);
277
278 for(int j = 0; j < Thread.NexportNodes; j++)
279 Send[nodelist[-1 - j].Task].CountNodes++;
280
281 Nexport += Thread.Nexport;
282 NexportNodes += Thread.NexportNodes;
283
284 SumNexport += Nexport;
285 }
286
287 /* establish the Recv counts from the Send counts (effectively a big transpose)
288 */
289 void generic_prepare_import_counts(void)
290 {
291 /* our standard approach for this is to use an all-to-all communication. For very large processor counts,
292 * this in principle becomes inefficient since mostly zeros need to be communicated.
293 * we have also two option experimental communication routines that use a sparse=communication pattern instead.
294 */
295 /* the default */
296 myMPI_Alltoall(Send, sizeof(send_recv_counts), MPI_BYTE, Recv, sizeof(send_recv_counts), MPI_BYTE, D->Communicator);
297 }
298
299 /* initialize offset tables that we need for the communication
300 */
301 void generic_prepare_export_offsets(void)
302 {
303 Send_offset[0] = 0;
304 Send_offset_nodes[0] = 0;
305
306 for(int j = 1; j < D->NTask; j++)
307 {
308 Send_offset[j] = Send_offset[j - 1] + Send[j - 1].Count;
309 Send_offset_nodes[j] = Send_offset_nodes[j - 1] + Send[j - 1].CountNodes;
310 }
311 }
312
313 /* organize the particle and node data for export in contiguous memory regions
314 */
315 void generic_prepare_particle_data_for_export(void)
316 {
317 int *rel_node_index = (int *)Mem.mymalloc_g("rel_node_index", D->NTask * sizeof(int));
318
319 for(int j = 0; j < D->NTask; j++)
320 {
321 Send[j].Count = 0;
322 Send[j].CountNodes = 0;
323 rel_node_index[j] = 0;
324 }
325
326 data_nodelist *nodelist = (data_nodelist *)(((char *)Thread.PartList) + Thread.InitialSpace);
327
328 for(int j = 0, jj = 0; j < Thread.Nexport; j++)
329 {
330 int task = Thread.PartList[j].Task;
331 int off = Send_offset[task] + Send[task].Count++;
332
333 int target = Thread.PartList[j].Index;
334
335 particle2in(&DataIn[off], target);
336 DataIn[off].Firstnode = rel_node_index[task];
337
338 if(j < Thread.Nexport - 1)
339 if(Thread.PartList[j].Index == Thread.PartList[j + 1].Index)
340 continue;
341
342 while(jj < Thread.NexportNodes && Thread.PartList[j].Index == nodelist[-1 - jj].Index)
343 {
344 int task = nodelist[-1 - jj].Task;
345 int off = Send_offset_nodes[task] + Send[task].CountNodes++;
346
347 NodeInfoIn[off] = nodelist[-1 - jj].NodeInfo;
348
349 rel_node_index[task]++;
350 jj++;
351 }
352 }
353
354 Mem.myfree(rel_node_index);
355 }
356
357 /* driver routine to process the results that we obtained for a particle from a remote processor
358 * by working on it with the supplied out2particle() routine
359 */
360 void generic_add_results_to_local(void)
361 {
362 for(int j = 0; j < D->NTask; j++)
363 Send[j].Count = 0;
364
365 for(int j = 0; j < Thread.Nexport; j++)
366 {
367 int task = Thread.PartList[j].Task;
368 int off = Send_offset[task] + Send[task].Count++;
369
370 int target = Thread.PartList[j].Index;
371
372 out2particle(&DataOut[off], target, MODE_IMPORTED_PARTICLES);
373 }
374 }
375
376#ifndef OMIT_GENERIC_GET_NUMNODES
377
378 /* this function is called in the actual tree walk routine to find out how the number and
379 * starting index of the section in the node-list that needs to be processed for the imported particle
380 */
381 void generic_get_numnodes(int target, int *numnodes, node_info **firstnode)
382 {
383 if(target == Nimport - 1)
384 *numnodes = NimportNodes - DataGet[target].Firstnode;
385 else
386 *numnodes = DataGet[target + 1].Firstnode - DataGet[target].Firstnode;
387
388 if(*numnodes > Tree->NumNodes)
389 {
390 Terminate(
391 "odd: target=%d Nimport=%d NimportNodes=%d numnodes=%d Tree->NumNodes=%d Tree->MaxNodes=%d "
392 "DataGet[target].Firstnode=%d\n",
393 target, Nimport, NimportNodes, *numnodes, Tree->NumNodes, Tree->MaxNodes, DataGet[target].Firstnode);
394 }
395
396 *firstnode = &NodeInfoGet[DataGet[target].Firstnode];
397 }
398
399#endif
400
401 /* calculate how many space we need to allocate to safely process a certain number of
402 * nodes and particles that are imported.
403 */
404 size_t generic_calc_import_storage(int nimport, int nimportnodes)
405 {
406 size_t needed = nimport * sizeof(T_in) + nimportnodes * sizeof(int) + nimport * sizeof(T_out);
407
408 /* add some extra space to not go to the last byte */
409 needed += EXTRA_SPACE;
410
411 return needed;
412 }
413
414 /* this routine carries out the communication step in several phases if needed
415 */
416 void generic_multiple_phases(void)
417 {
418 int ncycles;
419
420 for(int ngrpstart = 1; ngrpstart < (1 << D->PTask); ngrpstart += ncycles)
421 {
422 /* now decide how many cycles we can process in this iteration */
423 ncycles = (1 << D->PTask) - ngrpstart;
424
425 do
426 {
427 Nimport = 0;
428 NimportNodes = 0;
429
430 for(int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
431 {
432 int recvTask = D->ThisTask ^ ngrp;
433
434 if(recvTask < D->NTask)
435 {
436 if(Recv[recvTask].Count > 0)
437 {
438 Nimport += Recv[recvTask].Count;
439 NimportNodes += Recv[recvTask].CountNodes;
440 }
441 }
442 }
443
444 int flag = 0, flagall;
445
446 if(generic_calc_import_storage(Nimport, NimportNodes) > Mem.FreeBytes)
447 flag = 1;
448
449 MPI_Allreduce(&flag, &flagall, 1, MPI_INT, MPI_MAX, D->Communicator);
450
451 if(flagall)
452 ncycles /= 2;
453 else
454 break;
455 }
456 while(ncycles > 0);
457
458 if(ncycles == 0)
459 Terminate(
460 "Seems like we can't even do one cycle: ncycles=%d ngrpstart=%d Nimport=%d NimportNodes=%d FreeBytes=%lld needed "
461 "storage=%lld",
462 ncycles, ngrpstart, Nimport, NimportNodes, (long long)Mem.FreeBytes,
463 (long long)generic_calc_import_storage(Nimport, NimportNodes));
464
465 if(ngrpstart == 1 && ncycles != ((1 << D->PTask) - ngrpstart) && D->ThisTask == 0)
466 warn("need multiple import/export phases to avoid memory overflow");
467
468 /* now allocated the import and results buffers */
469
470 DataGet = (T_in *)Mem.mymalloc_movable_g(&DataGet, "DataGet", Nimport * sizeof(T_in));
471 NodeInfoGet = (node_info *)Mem.mymalloc_movable_g(&NodeInfoGet, "NodeInfoGet", NimportNodes * sizeof(node_info));
472 DataResult = (T_out *)Mem.mymalloc_movable_g(&DataResult, "DataResult", Nimport * sizeof(T_out));
473
474 Nimport = 0;
475 NimportNodes = 0;
476
477 /* exchange particle data */
478 for(int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
479 {
480 int recvTask = D->ThisTask ^ ngrp;
481
482 if(recvTask < D->NTask)
483 {
484 if(Send[recvTask].Count > 0 || Recv[recvTask].Count > 0)
485 {
486 size_t len = sizeof(T_in);
487
488 /* get the particles */
489 myMPI_Sendrecv(&DataIn[Send_offset[recvTask]], Send[recvTask].Count * len, MPI_BYTE, recvTask, TAG_HYDRO_A,
490 &DataGet[Nimport], Recv[recvTask].Count * len, MPI_BYTE, recvTask, TAG_HYDRO_A, D->Communicator,
491 MPI_STATUS_IGNORE);
492
493 /* get the node info */
494 myMPI_Sendrecv(&NodeInfoIn[Send_offset_nodes[recvTask]], Send[recvTask].CountNodes * sizeof(node_info), MPI_BYTE,
495 recvTask, TAG_GRAV_B, &NodeInfoGet[NimportNodes], Recv[recvTask].CountNodes * sizeof(node_info),
496 MPI_BYTE, recvTask, TAG_GRAV_B, D->Communicator, MPI_STATUS_IGNORE);
497
498 for(int k = 0; k < Recv[recvTask].Count; k++)
499 DataGet[Nimport + k].Firstnode += NimportNodes;
500
501 Nimport += Recv[recvTask].Count;
502 NimportNodes += Recv[recvTask].CountNodes;
503 }
504 }
505 }
506
507 /* now do the actual work for the imported points */
508 secondary_loop();
509
510 /* send the results */
511 Nimport = 0;
512 NimportNodes = 0;
513
514 for(int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
515 {
516 int recvTask = D->ThisTask ^ ngrp;
517 if(recvTask < D->NTask)
518 {
519 if(Send[recvTask].Count > 0 || Recv[recvTask].Count > 0)
520 {
521 size_t len = sizeof(T_out);
522
523 /* exchange the results */
524 myMPI_Sendrecv(&DataResult[Nimport], Recv[recvTask].Count * len, MPI_BYTE, recvTask, TAG_HYDRO_B,
525 &DataOut[Send_offset[recvTask]], Send[recvTask].Count * len, MPI_BYTE, recvTask, TAG_HYDRO_B,
526 D->Communicator, MPI_STATUS_IGNORE);
527
528 Nimport += Recv[recvTask].Count;
529 NimportNodes += Recv[recvTask].CountNodes;
530 }
531 }
532 }
533
534 Mem.myfree(DataResult);
535 Mem.myfree(NodeInfoGet);
536 Mem.myfree(DataGet);
537 }
538 }
539
540 /* this function deals with the communication step, and then processes the imported particles, and finally computes the results back.
541 * if there is not enough memory available to hold all the data sent to us from other processors, we process the incoming data in
542 * multiple stages, which will always be possible.
543 */
544 void generic_exchange(void)
545 {
546 /* set up Sendcount table */
547 generic_prepare_export_counts();
548
549 /* do the all-to-all exchange so that we have the Recvcount table as well */
550 generic_prepare_import_counts();
551
552 /* prepare offsets in export tables */
553 generic_prepare_export_offsets();
554
555 /* allocate particle data buffers */
556 DataIn = (T_in *)Mem.mymalloc_movable_g(&DataIn, "DataIn", Nexport * sizeof(T_in));
557 NodeInfoIn = (node_info *)Mem.mymalloc_movable_g(&NodeInfoIn, "NodeInfoIn", NexportNodes * sizeof(node_info));
558 DataOut = (T_out *)Mem.mymalloc_movable_g(&DataOut, "DataOut", Nexport * sizeof(T_out));
559
560 /* prepare particle data for export */
561 generic_prepare_particle_data_for_export();
562
563 /* export particles and process them, if needed in several installments */
564 generic_multiple_phases();
565
566 /* add the results to the local particles */
567 generic_add_results_to_local();
568
569 Mem.myfree(DataOut);
570 Mem.myfree(NodeInfoIn);
571 Mem.myfree(DataIn);
572 }
573
574 void generic_allocate_comm_tables(void)
575 {
576 ptrdiff_t off;
577
578 off = (char *)Tree->Nodes - Mem.Base;
579 MPI_Allgather(&off, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeNodes_offsets, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeSharedMemComm);
580
581 off = (char *)Tree->Points - Mem.Base;
582 MPI_Allgather(&off, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreePoints_offsets, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeSharedMemComm);
583
584 off = (char *)Tree->Nextnode - Mem.Base;
585 MPI_Allgather(&off, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeNextnode_offsets, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeSharedMemComm);
586
587 off = (char *)Tp->P - Mem.Base;
588 MPI_Allgather(&off, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeP_offsets, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeSharedMemComm);
589
590 off = (char *)Tp->PS - Mem.Base;
591 MPI_Allgather(&off, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreePS_offsets, sizeof(ptrdiff_t), MPI_BYTE, Tree->TreeSharedMemComm);
592
593 Exportflag = (int *)Mem.mymalloc("Exportflag", ((((D->NTask - 1) / 16) + 1) * 16) * sizeof(int));
594 Exportindex = (int *)Mem.mymalloc("Exportindex", D->NTask * sizeof(int));
595 Exportnodecount = (int *)Mem.mymalloc("Exportnodecount", D->NTask * sizeof(int));
596
597 Send = (send_recv_counts *)Mem.mymalloc("Send", sizeof(send_recv_counts) * D->NTask);
598 Recv = (send_recv_counts *)Mem.mymalloc("Recv", sizeof(send_recv_counts) * D->NTask);
599
600 Send_count = (int *)Mem.mymalloc("Send_count", sizeof(int) * D->NTask);
601 Send_offset = (int *)Mem.mymalloc("Send_offset", sizeof(int) * D->NTask);
602 Recv_count = (int *)Mem.mymalloc("Recv_count", sizeof(int) * D->NTask);
603 Recv_offset = (int *)Mem.mymalloc("Recv_offset", sizeof(int) * D->NTask);
604
605 Send_count_nodes = (int *)Mem.mymalloc("Send_count_nodes", sizeof(int) * D->NTask);
606 Send_offset_nodes = (int *)Mem.mymalloc("Send_offset_nodes", sizeof(int) * D->NTask);
607 Recv_count_nodes = (int *)Mem.mymalloc("Recv_count_nodes", sizeof(int) * D->NTask);
608 Recv_offset_nodes = (int *)Mem.mymalloc("Recv_offset_nodes", sizeof(int) * D->NTask);
609 }
610
611 void generic_free_comm_tables(void)
612 {
613 Mem.myfree(Recv_offset_nodes);
614 Mem.myfree(Recv_count_nodes);
615 Mem.myfree(Send_offset_nodes);
616 Mem.myfree(Send_count_nodes);
617 Mem.myfree(Recv_offset);
618 Mem.myfree(Recv_count);
619 Mem.myfree(Send_offset);
620 Mem.myfree(Send_count);
621 Mem.myfree(Recv);
622 Mem.myfree(Send);
623 Mem.myfree(Exportnodecount);
624 Mem.myfree(Exportindex);
625 Mem.myfree(Exportflag);
626 }
627
628 public:
629 /* Implements a repeated loop over the local particles in the list, processing them with the local kernel function,
630 * until we're done or the export buffer is full. Then we exchange the data, and process the imported ones with the provided kernel.
631 * We repeat if neeed until all processors are done.
632 */
633 int execute(int nactive, int *targetlist, int action)
634 {
635 generic_allocate_comm_tables();
636
637 Nactive = nactive;
638 Targets = targetlist;
639 actioncode = action;
640
642
643 int ndone_flag, ndone, iter = 0;
644
645 SumNexport = 0; /* can be queried as a book-keeping variable */
646
647 NextParticle = 0; /* first particle index for this task */
648
649 if(cpu_imbalance != logs::CPU_NONE)
650 Logs.timer_start(cpu_imbalance);
651
652 do
653 {
654 iter++;
655
656 /* allocate buffers to arrange communication */
657 generic_alloc_partlist_nodelist_ngblist_threadbufs();
658
659 /* do local particles */
660 if(action == MODE_DEFAULT)
661 {
662 primary_loop(MODE_LOCAL_PARTICLES);
663 }
664 else if(action == MODE_LOCAL_NO_EXPORT)
665 {
666 primary_loop(MODE_LOCAL_NO_EXPORT);
667 }
668 else
669 {
670 Terminate("unknown action code (%d) for primary loop", action);
671 }
672
673 /* do all necessary bookkeeping, data exchange, and processing of imported particles */
674 generic_exchange();
675
676 /* free the rest of the buffers */
677 generic_free_partlist_nodelist_ngblist_threadbufs();
678
679 /* check whether we are done */
680 if(NextParticle >= nactive)
681 ndone_flag = 1;
682 else
683 ndone_flag = 0;
684
685 MPI_Allreduce(&ndone_flag, &ndone, 1, MPI_INT, MPI_SUM, D->Communicator);
686 }
687 while(ndone < D->NTask);
688
689 generic_free_comm_tables();
690
691 if(cpu_imbalance != logs::CPU_NONE)
692 Logs.timer_stop(cpu_imbalance);
693
694 return iter;
695 }
696
697 int execute(int nactive, int *targetlist, int action, enum logs::timers a, enum logs::timers b, enum logs::timers c)
698 {
699 cpu_primary = a;
700 cpu_secondary = b;
701 cpu_imbalance = c;
702
703 return execute(nactive, targetlist, action);
704 }
705};
706
707#endif
virtual void out2particle(T_out *out, int i, int mode)=0
virtual void particle2in(T_in *in, int i)=0
T_partset * Tp
Definition: generic_comm.h:37
T_tree * Tree
Definition: generic_comm.h:36
long long SumNexport
Definition: generic_comm.h:52
int execute(int nactive, int *targetlist, int action)
Definition: generic_comm.h:633
T_domain * D
Definition: generic_comm.h:35
generic_comm(T_domain *dptr, T_tree *tptr, T_partset *pptr)
Definition: generic_comm.h:41
thread_data Thread
Definition: generic_comm.h:39
int execute(int nactive, int *targetlist, int action, enum logs::timers a, enum logs::timers b, enum logs::timers c)
Definition: generic_comm.h:697
virtual int evaluate(int target, int mode, int thread_id, int action, T_in *in, int numnodes, node_info *firstnode, T_out &out)=0
void timer_stop(enum timers counter)
Definition: logs.h:141
timers
Definition: logs.h:91
@ CPU_NONE
Definition: logs.h:92
void timer_start(enum timers counter)
Definition: logs.h:152
size_t FreeBytes
Definition: mymalloc.h:48
char * Base
Definition: mymalloc.h:49
void dump_memory_table(void)
Dump the buffer where the memory information is stored to the standard output.
Definition: mymalloc.cc:355
#define MODE_DEFAULT
Definition: constants.h:23
#define MODE_IMPORTED_PARTICLES
Definition: constants.h:22
#define MODE_LOCAL_NO_EXPORT
Definition: constants.h:24
#define MODE_LOCAL_PARTICLES
Definition: constants.h:21
#define EXTRA_SPACE
Definition: generic_comm.h:19
logs Logs
Definition: main.cc:43
#define Terminate(...)
Definition: macros.h:15
#define warn(...)
Definition: macros.h:30
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: myalltoall.cc:301
#define TAG_GRAV_B
Definition: mpi_utils.h:34
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)
#define TAG_HYDRO_B
Definition: mpi_utils.h:38
#define TAG_HYDRO_A
Definition: mpi_utils.h:37
memory Mem
Definition: main.cc:44
node_info NodeInfo
Definition: tree.h:85
int Task
Definition: tree.h:83
Definition: tree.h:77
int * Shmranklist
Definition: dtypes.h:353
size_t ItemSize
Definition: dtypes.h:345
int NexportNodes
Definition: dtypes.h:336
int * Exportflag
Definition: dtypes.h:354
size_t InitialSpace
Definition: dtypes.h:344
size_t ExportSpace
Definition: dtypes.h:343
data_partlist * PartList
Definition: dtypes.h:351
double Interactions
Definition: dtypes.h:338
int Nexport
Definition: dtypes.h:335
int * Ngblist
Definition: dtypes.h:352