GADGET-4
test_io_bandwidth.cc
Go to the documentation of this file.
1/*******************************************************************************
2 * \copyright This file is part of the GADGET4 N-body/SPH code developed
3 * \copyright by Volker Springel. Copyright (C) 2014-2020 by Volker Springel
4 * \copyright (vspringel@mpa-garching.mpg.de) and all contributing authors.
5 *******************************************************************************/
6
12#include "gadgetconfig.h"
13
14#include <math.h>
15#include <mpi.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <sys/stat.h>
20#include <unistd.h>
21
22#include "../data/allvars.h"
23#include "../data/dtypes.h"
24#include "../data/mymalloc.h"
25#include "../domain/domain.h"
26#include "../io/io.h"
27#include "../io/test_io_bandwidth.h"
28#include "../lightcone/lightcone.h"
29#include "../logs/timer.h"
30#include "../main/simulation.h"
31#include "../mpi_utils/mpi_utils.h"
32#include "../ngbtree/ngbtree.h"
33#include "../system/system.h"
34#include "../time_integration/timestep.h"
35
37{
38 /* create directory for test data */
39 if(ThisTask == 0)
40 {
41 char buf[MAXLEN_PATH_EXTRA];
42 sprintf(buf, "%s/testdata", All.OutputDir);
43 mkdir(buf, 02755);
44 }
45 MPI_Barrier(Communicator);
46
48
50 {
51 write_test_data();
52
54 }
55
56 mpi_printf("\n\nTEST: Completed.\n");
57
58 fflush(stdout);
59}
60
61void test_io_bandwidth::write_test_data(void)
62{
63 double t0 = Logs.second();
65
66 mpi_printf("TEST: Writing test data...\n");
67
68 /* now work the I/O of the files, controlled by scheduler to achieve optimum I/O bandwidth under the constraint of a maximum number
69 * for the concurrent file access */
70 work_files(MODUS_WRITE);
71
72 long long byte_count = get_io_byte_count(), byte_count_all;
73 sumup_longs(1, &byte_count, &byte_count_all, Communicator);
74
75 double t1 = Logs.second();
76
78 "TEST: done. MaxFilesWithConcurrentIO=%6d load/save took %g sec, total size %g MB, corresponds to effective I/O rate of %g "
79 "MB/sec\n",
80 All.MaxFilesWithConcurrentIO, Logs.timediff(t0, t1), byte_count_all / (1024.0 * 1024.0),
81 byte_count_all / (1024.0 * 1024.0) / Logs.timediff(t0, t1));
82
83 /* now delete test data */
84 char buf[MAXLEN_PATH_EXTRA];
85 sprintf(buf, "%s/testdata/%s.%d", All.OutputDir, "testdata", ThisTask);
86 unlink(buf);
87 MPI_Barrier(Communicator);
88}
89
90void test_io_bandwidth::polling(int modus)
91{
92 if(ThisTask == 0)
93 if(files_completed < NTask)
94 {
95 MPI_Status status;
96 int flag;
97
98 /* now check for a completion message */
99 MPI_Iprobe(MPI_ANY_SOURCE, TAG_KEY, Communicator, &flag, &status);
100
101 if(flag)
102 {
103 int source = status.MPI_SOURCE;
104
105 int dummy;
106 MPI_Recv(&dummy, 1, MPI_INT, source, TAG_KEY, Communicator, MPI_STATUS_IGNORE);
107 files_completed++;
108
109 if(files_started < NTask)
110 {
111 /* send start signal */
112 MPI_Ssend(&ThisTask, 1, MPI_INT, seq[files_started++].thistask, TAG_N, Communicator);
113 }
114 }
115 }
116}
117
118void test_io_bandwidth::work_files(int modus)
119{
120 if(ThisTask == 0)
121 if(!(seq = (seq_data *)malloc(NTask * sizeof(seq_data))))
122 Terminate("can't allocate seq_data");
123
124 seq_data seq_loc;
125 seq_loc.thistask = ThisTask;
126 seq_loc.rankinnode = RankInThisNode;
127 seq_loc.thisnode = ThisNode;
128
129 MPI_Gather(&seq_loc, sizeof(seq_data), MPI_BYTE, seq, sizeof(seq_data), MPI_BYTE, 0, Communicator);
130
131 if(ThisTask == 0)
132 {
133 std::sort(seq, seq + NTask);
134
135 files_started = 0;
136 files_completed = 0;
137
138 for(int i = 1; i < All.MaxFilesWithConcurrentIO; i++)
139 {
140 files_started++;
141 MPI_Ssend(&ThisTask, 1, MPI_INT, seq[i].thistask, TAG_N, Communicator);
142 }
143
144 files_started++;
145 contents_restart_file(modus);
146 files_completed++;
147
148 if(files_started < NTask)
149 {
150 /* send start signal */
151 MPI_Ssend(&ThisTask, 1, MPI_INT, seq[files_started++].thistask, TAG_N, Communicator);
152 }
153
154 while(files_completed < NTask)
155 polling(modus);
156
157 free(seq);
158 }
159 else
160 {
161 /* wait for start signal */
162 int dummy;
163 MPI_Recv(&dummy, 1, MPI_INT, 0, TAG_N, Communicator, MPI_STATUS_IGNORE); /* wait until we are told to start */
164
165 contents_restart_file(modus);
166
167 /* send back completion notice */
168 MPI_Ssend(&ThisTask, 1, MPI_INT, 0, TAG_KEY, Communicator);
169 }
170}
171
172void test_io_bandwidth::contents_restart_file(int modus)
173{
174 char buf[MAXLEN_PATH_EXTRA];
175 sprintf(buf, "%s/testdata/%s.%d", All.OutputDir, "testdata", ThisTask);
176
177 if(modus == MODUS_READ)
178 {
179 if(!(fd = fopen(buf, "r")))
180 {
181 Terminate("TEST: File '%s' not found.\n", buf);
182 }
183 }
184 else if(modus == MODUS_WRITE)
185 {
186 if(!(fd = fopen(buf, "w")))
187 {
188 Terminate("TEST: File '%s' cannot be opened.\n", buf);
189 }
190 }
191 else
192 Terminate("unknown modus\n");
193
194 size_t len = BUF_IN_MB * 1024LL * 1024LL;
195
196 char *p = (char *)Mem.mymalloc("p", len);
197
198 byten(p, len, modus);
199
200 Mem.myfree(p);
201
202 fclose(fd);
203}
204
205void test_io_bandwidth::byten(void *x, size_t n, int modus)
206{
207 char *p = (char *)x;
208
209 while(n > BLKSIZE)
210 {
211 byten_doit(p, BLKSIZE, modus);
212 p += BLKSIZE;
213 n -= BLKSIZE;
214 polling(modus);
215 }
216
217 if(n > 0)
218 byten_doit(p, n, modus);
219}
220
223void test_io_bandwidth::byten_doit(void *x, size_t n, int modus)
224{
225 if(modus == MODUS_READ)
226 my_fread(x, n, 1, fd);
227 else
228 my_fwrite(x, n, 1, fd);
229}
global_data_all_processes All
Definition: main.cc:40
long long byte_count
size_t my_fread(void *ptr, size_t size, size_t nmemb, FILE *stream)
A wrapper for the fread() function.
void reset_io_byte_count(void)
long long get_io_byte_count(void)
size_t my_fwrite(const void *ptr, size_t size, size_t nmemb, FILE *stream)
A wrapper for the fwrite() function.
double timediff(double t0, double t1)
Definition: logs.cc:488
double second(void)
Definition: logs.cc:471
int ThisNode
Definition: setcomm.h:36
void mpi_printf(const char *fmt,...)
Definition: setcomm.h:55
int ThisTask
Definition: setcomm.h:33
int NTask
Definition: setcomm.h:32
int RankInThisNode
Definition: setcomm.h:39
MPI_Comm Communicator
Definition: setcomm.h:31
void measure_io_bandwidth(void)
#define MAXLEN_PATH_EXTRA
Definition: constants.h:301
logs Logs
Definition: main.cc:43
#define Terminate(...)
Definition: macros.h:15
#define TAG_N
Definition: mpi_utils.h:25
#define TAG_KEY
Definition: mpi_utils.h:29
void sumup_longs(int n, long long *src, long long *res, MPI_Comm comm)
memory Mem
Definition: main.cc:44
#define MODUS_READ
Definition: restart.h:16
#define MODUS_WRITE
Definition: restart.h:15
#define BLKSIZE
Definition: restart.h:18
char OutputDir[MAXLEN_PATH]
Definition: allvars.h:272
#define BUF_IN_MB