15#include "../domain/domain.h"
16#include "../logs/logs.h"
17#include "../mpi_utils/mpi_utils.h"
19#define EXTRA_SPACE 16384
26template <
typename T_in,
typename T_out,
typename T_tree,
typename T_domain,
typename T_partset>
32 virtual int evaluate(
int target,
int mode,
int thread_id,
int action, T_in *in,
int numnodes,
node_info *firstnode,
63 T_in *DataIn, *DataGet;
64 T_out *DataResult, *DataOut;
66 struct send_recv_counts
71 send_recv_counts *Send, *Recv;
86 int *Send_offset_nodes, *Send_count_nodes, *Recv_count_nodes, *Recv_offset_nodes;
101 int Nexport, Nimport;
102 int NexportNodes, NimportNodes;
107 char callorigin[1000];
109 void primary_loop(
int mode)
118 for(j = 0; j <
D->NTask; j++)
126 idx = NextParticle++;
131 int i = Targets[idx];
140 int num =
evaluate(i, mode, 0, actioncode, &local, 1, NULL, out);
151 void secondary_loop(
void)
169 generic_get_numnodes(i, &numnodes, &firstnode);
171 T_in *in = &DataGet[i];
198 void set_MaxNexport(
const char *file = __FILE__,
int line = __LINE__)
202 if(ExportSpace <= Tp->NumPart *
sizeof(
int))
205 Terminate(
"It seems we have too little space left for properly sized ExportSpace... (%lld %lld) Need more memory.\n",
206 (
long long)ExportSpace, (
long long)
Tp->NumPart *
sizeof(
int))
209 ExportSpace -=
Tp->NumPart *
sizeof(int);
215 MinSpace = (
D->NTask - 1) * (
sizeof(
data_partlist) +
sizeof(T_in) +
sizeof(T_out)) +
218 sprintf(callorigin,
"%s|%d|", file, line);
220 if(ExportSpace < MinSpace)
224 "Bummer. Can't even safely process a single particle for the available memory. FreeBytes=%lld ExportSpace=%lld "
225 "MinSpace=%lld D->NTask=%d NTopleaves=%d",
226 (
long long)
Mem.
FreeBytes, (
long long)ExportSpace, (
long long)MinSpace,
D->NTask,
D->NTopleaves);
236 void generic_alloc_partlist_nodelist_ngblist_threadbufs(
void)
252 void generic_free_partlist_nodelist_ngblist_threadbufs(
void)
262 void generic_prepare_export_counts(
void)
264 for(
int j = 0; j <
D->NTask; j++)
267 Send[j].CountNodes = 0;
279 Send[nodelist[-1 - j].Task].CountNodes++;
289 void generic_prepare_import_counts(
void)
296 myMPI_Alltoall(Send,
sizeof(send_recv_counts), MPI_BYTE, Recv,
sizeof(send_recv_counts), MPI_BYTE,
D->Communicator);
301 void generic_prepare_export_offsets(
void)
304 Send_offset_nodes[0] = 0;
306 for(
int j = 1; j <
D->NTask; j++)
308 Send_offset[j] = Send_offset[j - 1] + Send[j - 1].Count;
309 Send_offset_nodes[j] = Send_offset_nodes[j - 1] + Send[j - 1].CountNodes;
315 void generic_prepare_particle_data_for_export(
void)
317 int *rel_node_index = (
int *)
Mem.mymalloc_g(
"rel_node_index",
D->NTask *
sizeof(
int));
319 for(
int j = 0; j <
D->NTask; j++)
322 Send[j].CountNodes = 0;
323 rel_node_index[j] = 0;
331 int off = Send_offset[task] + Send[task].Count++;
336 DataIn[off].Firstnode = rel_node_index[task];
344 int task = nodelist[-1 - jj].
Task;
345 int off = Send_offset_nodes[task] + Send[task].CountNodes++;
347 NodeInfoIn[off] = nodelist[-1 - jj].
NodeInfo;
349 rel_node_index[task]++;
354 Mem.myfree(rel_node_index);
360 void generic_add_results_to_local(
void)
362 for(
int j = 0; j <
D->NTask; j++)
368 int off = Send_offset[task] + Send[task].Count++;
376#ifndef OMIT_GENERIC_GET_NUMNODES
381 void generic_get_numnodes(
int target,
int *numnodes,
node_info **firstnode)
383 if(target == Nimport - 1)
384 *numnodes = NimportNodes - DataGet[target].Firstnode;
386 *numnodes = DataGet[target + 1].Firstnode - DataGet[target].Firstnode;
388 if(*numnodes >
Tree->NumNodes)
391 "odd: target=%d Nimport=%d NimportNodes=%d numnodes=%d Tree->NumNodes=%d Tree->MaxNodes=%d "
392 "DataGet[target].Firstnode=%d\n",
393 target, Nimport, NimportNodes, *numnodes,
Tree->NumNodes,
Tree->MaxNodes, DataGet[target].Firstnode);
396 *firstnode = &NodeInfoGet[DataGet[target].Firstnode];
404 size_t generic_calc_import_storage(
int nimport,
int nimportnodes)
406 size_t needed = nimport *
sizeof(T_in) + nimportnodes *
sizeof(
int) + nimport *
sizeof(T_out);
416 void generic_multiple_phases(
void)
420 for(
int ngrpstart = 1; ngrpstart < (1 <<
D->PTask); ngrpstart += ncycles)
423 ncycles = (1 <<
D->PTask) - ngrpstart;
430 for(
int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
432 int recvTask =
D->ThisTask ^ ngrp;
434 if(recvTask < D->NTask)
436 if(Recv[recvTask].Count > 0)
438 Nimport += Recv[recvTask].Count;
439 NimportNodes += Recv[recvTask].CountNodes;
444 int flag = 0, flagall;
446 if(generic_calc_import_storage(Nimport, NimportNodes) >
Mem.
FreeBytes)
449 MPI_Allreduce(&flag, &flagall, 1, MPI_INT, MPI_MAX,
D->Communicator);
460 "Seems like we can't even do one cycle: ncycles=%d ngrpstart=%d Nimport=%d NimportNodes=%d FreeBytes=%lld needed "
462 ncycles, ngrpstart, Nimport, NimportNodes, (
long long)
Mem.
FreeBytes,
463 (
long long)generic_calc_import_storage(Nimport, NimportNodes));
465 if(ngrpstart == 1 && ncycles != ((1 <<
D->PTask) - ngrpstart) &&
D->ThisTask == 0)
466 warn(
"need multiple import/export phases to avoid memory overflow");
470 DataGet = (T_in *)
Mem.mymalloc_movable_g(&DataGet,
"DataGet", Nimport *
sizeof(T_in));
471 NodeInfoGet = (
node_info *)
Mem.mymalloc_movable_g(&NodeInfoGet,
"NodeInfoGet", NimportNodes *
sizeof(
node_info));
472 DataResult = (T_out *)
Mem.mymalloc_movable_g(&DataResult,
"DataResult", Nimport *
sizeof(T_out));
478 for(
int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
480 int recvTask =
D->ThisTask ^ ngrp;
482 if(recvTask < D->NTask)
484 if(Send[recvTask].Count > 0 || Recv[recvTask].Count > 0)
486 size_t len =
sizeof(T_in);
490 &DataGet[Nimport], Recv[recvTask].Count * len, MPI_BYTE, recvTask,
TAG_HYDRO_A,
D->Communicator,
494 myMPI_Sendrecv(&NodeInfoIn[Send_offset_nodes[recvTask]], Send[recvTask].CountNodes *
sizeof(
node_info), MPI_BYTE,
495 recvTask,
TAG_GRAV_B, &NodeInfoGet[NimportNodes], Recv[recvTask].CountNodes *
sizeof(
node_info),
496 MPI_BYTE, recvTask,
TAG_GRAV_B,
D->Communicator, MPI_STATUS_IGNORE);
498 for(
int k = 0; k < Recv[recvTask].Count; k++)
499 DataGet[Nimport + k].Firstnode += NimportNodes;
501 Nimport += Recv[recvTask].Count;
502 NimportNodes += Recv[recvTask].CountNodes;
514 for(
int ngrp = ngrpstart; ngrp < ngrpstart + ncycles; ngrp++)
516 int recvTask =
D->ThisTask ^ ngrp;
517 if(recvTask < D->NTask)
519 if(Send[recvTask].Count > 0 || Recv[recvTask].Count > 0)
521 size_t len =
sizeof(T_out);
525 &DataOut[Send_offset[recvTask]], Send[recvTask].Count * len, MPI_BYTE, recvTask,
TAG_HYDRO_B,
526 D->Communicator, MPI_STATUS_IGNORE);
528 Nimport += Recv[recvTask].Count;
529 NimportNodes += Recv[recvTask].CountNodes;
534 Mem.myfree(DataResult);
535 Mem.myfree(NodeInfoGet);
544 void generic_exchange(
void)
547 generic_prepare_export_counts();
550 generic_prepare_import_counts();
553 generic_prepare_export_offsets();
556 DataIn = (T_in *)
Mem.mymalloc_movable_g(&DataIn,
"DataIn", Nexport *
sizeof(T_in));
557 NodeInfoIn = (
node_info *)
Mem.mymalloc_movable_g(&NodeInfoIn,
"NodeInfoIn", NexportNodes *
sizeof(
node_info));
558 DataOut = (T_out *)
Mem.mymalloc_movable_g(&DataOut,
"DataOut", Nexport *
sizeof(T_out));
561 generic_prepare_particle_data_for_export();
564 generic_multiple_phases();
567 generic_add_results_to_local();
570 Mem.myfree(NodeInfoIn);
574 void generic_allocate_comm_tables(
void)
579 MPI_Allgather(&off,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeNodes_offsets,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeSharedMemComm);
582 MPI_Allgather(&off,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreePoints_offsets,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeSharedMemComm);
585 MPI_Allgather(&off,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeNextnode_offsets,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeSharedMemComm);
588 MPI_Allgather(&off,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeP_offsets,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeSharedMemComm);
591 MPI_Allgather(&off,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreePS_offsets,
sizeof(ptrdiff_t), MPI_BYTE,
Tree->TreeSharedMemComm);
593 Exportflag = (
int *)
Mem.mymalloc(
"Exportflag", ((((
D->NTask - 1) / 16) + 1) * 16) *
sizeof(
int));
594 Exportindex = (
int *)
Mem.mymalloc(
"Exportindex",
D->NTask *
sizeof(
int));
595 Exportnodecount = (
int *)
Mem.mymalloc(
"Exportnodecount",
D->NTask *
sizeof(
int));
597 Send = (send_recv_counts *)
Mem.mymalloc(
"Send",
sizeof(send_recv_counts) *
D->NTask);
598 Recv = (send_recv_counts *)
Mem.mymalloc(
"Recv",
sizeof(send_recv_counts) *
D->NTask);
600 Send_count = (
int *)
Mem.mymalloc(
"Send_count",
sizeof(
int) *
D->NTask);
601 Send_offset = (
int *)
Mem.mymalloc(
"Send_offset",
sizeof(
int) *
D->NTask);
602 Recv_count = (
int *)
Mem.mymalloc(
"Recv_count",
sizeof(
int) *
D->NTask);
603 Recv_offset = (
int *)
Mem.mymalloc(
"Recv_offset",
sizeof(
int) *
D->NTask);
605 Send_count_nodes = (
int *)
Mem.mymalloc(
"Send_count_nodes",
sizeof(
int) *
D->NTask);
606 Send_offset_nodes = (
int *)
Mem.mymalloc(
"Send_offset_nodes",
sizeof(
int) *
D->NTask);
607 Recv_count_nodes = (
int *)
Mem.mymalloc(
"Recv_count_nodes",
sizeof(
int) *
D->NTask);
608 Recv_offset_nodes = (
int *)
Mem.mymalloc(
"Recv_offset_nodes",
sizeof(
int) *
D->NTask);
611 void generic_free_comm_tables(
void)
613 Mem.myfree(Recv_offset_nodes);
614 Mem.myfree(Recv_count_nodes);
615 Mem.myfree(Send_offset_nodes);
616 Mem.myfree(Send_count_nodes);
617 Mem.myfree(Recv_offset);
618 Mem.myfree(Recv_count);
619 Mem.myfree(Send_offset);
620 Mem.myfree(Send_count);
623 Mem.myfree(Exportnodecount);
624 Mem.myfree(Exportindex);
625 Mem.myfree(Exportflag);
633 int execute(
int nactive,
int *targetlist,
int action)
635 generic_allocate_comm_tables();
638 Targets = targetlist;
643 int ndone_flag, ndone, iter = 0;
657 generic_alloc_partlist_nodelist_ngblist_threadbufs();
670 Terminate(
"unknown action code (%d) for primary loop", action);
677 generic_free_partlist_nodelist_ngblist_threadbufs();
680 if(NextParticle >= nactive)
685 MPI_Allreduce(&ndone_flag, &ndone, 1, MPI_INT, MPI_SUM,
D->Communicator);
687 while(ndone < D->NTask);
689 generic_free_comm_tables();
703 return execute(nactive, targetlist, action);
virtual void out2particle(T_out *out, int i, int mode)=0
virtual void particle2in(T_in *in, int i)=0
int execute(int nactive, int *targetlist, int action)
generic_comm(T_domain *dptr, T_tree *tptr, T_partset *pptr)
int execute(int nactive, int *targetlist, int action, enum logs::timers a, enum logs::timers b, enum logs::timers c)
virtual int evaluate(int target, int mode, int thread_id, int action, T_in *in, int numnodes, node_info *firstnode, T_out &out)=0
void timer_stop(enum timers counter)
void timer_start(enum timers counter)
void dump_memory_table(void)
Dump the buffer where the memory information is stored to the standard output.
#define MODE_IMPORTED_PARTICLES
#define MODE_LOCAL_NO_EXPORT
#define MODE_LOCAL_PARTICLES
int myMPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
int myMPI_Sendrecv(void *sendbuf, size_t sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, size_t recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status)