Code Structure and Parallelization

There are various routines which have varying dependencies and accesses between elements of the data structures. To minmize inter-process communication required by these routines as much as possible, we partition the data such that the number of stars held by each processor is a multiple of MIN_CHUNK_SIZE (input parameter, defaults to 20, refer to Pattabiraman et al. (2013) for a justification for the choice of this value, and detailed explanation). The remainder of stars (which is less than MIN_CHUNK_SIZE) after division goes to the last processor.

Most routines do computations on a star by star basis, and access only the properties of the star being processed. However, the values of gravitational potential, masses of the stars and radial positions of the stars are accessed in a complex, data-dependent manner throughout the code. Hence, we store these values in separate arrays and duplicate/copy these across all processors. Since these properties change during a timestep, we synchronize these arrays at the end of each time step.

Like mentioned above, most routines perform computations on a star by star basis. The only routine that differs from this pattern is the sorting routine that sorts all the stars by their radial positions. We use a parallel sorting algorithm called Sample Sort.

The parallel sorting routine does the sorting and automatically redistributes the data to the processors, however, there is no control over the number of stars that will end up on each processor. So, after the sorting takes place, we exchange data between processors to adhere to the data partitioning scheme mentioned above.

Programming Guidelines for Developers

It is very necessary to read Pattabiraman et al. (2013) before proceeding. The paper covers most of the code and parallelization aspects which in itself serves as a guide for development. Here, we cover some more low level coding details to aid a developer who wants to modify the code. An introduction to MPI might also be very useful, and in fact necessary for any non-trivial code development.

The parallelization follows a SIMD model. Any line of code is executed by all processors. Unlike a sequential code, variables might have different values on different processors. A simple example is the variable myid which stores the ID of the processor. If p processors are used, the IDs go from 0 to p-1, and each processor will contain the value of its ID in this variable. On the other hand the variable procs stores the total number of processors used. One can see that the variable procs will have the same value on all processors whereas myid will have different value depending on the processor.

The data/stars are initially partitioned, after reading from the input file, into the user specified number of processors. The partitioning scheme is described in detail in the paper, and is chosen in such a way as to minimize communication required during the program execution. Binaries are partitioned among processors as well, and are indexed similar to the serial code i.e. the variable binind of a star, if greater than 0, refers to the index of the corresponding binary in the binary array.

The variables mpiBegin, mpiEnd, and the arrays Start and End are used to implement and store the data partitioning scheme. These are used in almost every routine of the code. Let us try to understand these with an example. Say, we start with an initial of 10000 stars, and 4 processors. The data is partitioned, and each processor has 2500 stars. These are stored in the star array. Please note that in each timestep all the stars are sorted by their radial position. So, positions of all stars in processor 0 are lesser than those in processor 1 which in turn are lesser than the ones in processor 2 and so on. Now, each local array is indexed from 1 to 2500. However, if you imagine an array containing the entire set of 10000 stars, the indices of the stars local to a processor will have a different index, which will depend on the processor ID, in this imaginary global array. For instance the local stars 1 to 2500 in processor 1 will have a “global” index 2501 to 5000. This “global” index is require at various points in the code since some routines need some quantities of the entire set of stars. These quantities (mass, positions and potential) are duplicated across all processors, please see Pattabiraman et al. (2013) for a detailed description. The variables mpiBegin and mpiEnd store the global index of the first and the last star in a given processor. For our example, in processor 0, these will be 1 and 2500 (the 0th star is always left out as it is marked as a sentinel); for processor 1 these will be 2501 and 5000 and so on. The Start and End arrays store these values of all the processors. Start[myid] essentially is the same as mpiBegin, and End[myid] has the same value as mpiEnd. In addition there are the mpiDisp and mpiLen arrays which also store essentially the same information but as displacement offsets and lengths, but these are used only at very few places.

The total number of stars in a given processor can be obtained by mpiEnd-mpiBegin+1. clus.N_MAX represents the current total number of stars in the simulation, whereas clus.N_MAX_NEW is the total number of local stars in the processor i.e. mpiEnd-mpiBegin+1. The function mpiFindIndicesCustom() sets mpiBegin and mpiEnd, whereas findLimits() populates the Start and End arrays. When one iterates over the local number of stars, and given a star i, one can get its global index using the function get_global_idx(i).

The random number generation is parallelized, and all details are hidden, so a developer does not have to mess with the details as long as one follows a similar rng calls as an already existing one.

Example

While introducing any new code, think in parallel. For instance say you are calculating some cumulative quantity like:

double Q
for(i=1; i<=clus.N_MAX_NEW; i++) {
    Q = star[i].a + star[i].b;
}

This might appear as if it’s correct, although when this runs in parallel it is far from it. Q will only contain the local sum Q for each processor. After this one needs to aggregate Q across all processors. For this you’ll have to use either MPI_Reduce or MPI_Allreduce depending on whether you want the aggregated quantity Q on one or all of the processors. The above piece of code can be fixed by:

double Q, local_Q;

for(i=1; i<=clus.N_MAX_NEW; i++) {
    local_Q = star[i].a + star[i].b;
}

MPI_Allreduce(&local_Q, &Q, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);

Here we store the local value of Q into a temporary variable local_Q, after which we aggregate the partial sums across all processors using the MPI_Allreduce call which makes sure all processors receive the aggregated value in Q. Here, we use MPI_COMM_WORLD, which is the default communicator that includes all processors.

This is probably the simplest use of parallel programming and use of MPI. For more complex operations which involve complex data dependencies such as communicating neighbors/ghost particles, gathering, scattering data etc., one might need to have more expertise in parallel programming using MPI. So, a tutorial on MPI is strongly advised before adding any non-trivial piece of code. The most common MPI calls used in the code are MPI_Reduce/MPI_Allreduce, MPI_Gather/MPI_Allgather, MPI_Alltoall, MPI_Bcast, MPI_Scan/MPI_Exscan. The signatures of all MPI calls can be found here: url{http://www.mpich.org/static/docs/v3.1/www3/}

I/O

Output is another part which one might need to add, and is non-trivial to do in parallel. However, we have introduced a decently good framework to do parallel output so the programmer doesn’t have to delve into the MPI IO details. If a file needs to be written by only one node, it’s simple. Just do:

if(myid == <NODE_ID>) {
   FILE *fp;
   fopen(...);
   fprintf(...);
   fclose(...);
}

Sometimes, the data that needs to be written out might be distributed across nodes. Provided these are ASCII data files with file sizes of a few KBs to MBs (such as log files, and not snapshots which write out nearly entire data) one can use MPI IO to write them out in parallel. An example is as follows:

MPI_File mpi_binfp;
char mpi_binfp_buf[10000], mpi_binfp_wrbuf[10000000];
long long mpi_binfp_len=0, mpi_binfp_ofst_total=0;
sprintf(filename, "a_e2.%04ld.dat", tcount);
MPI_File_open(MPI_COMM_MC, filename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &mpi_binfp);
MPI_File_set_size(mpi_binfp, 0);

for (j=1; j<=clus.N_MAX; j++) {
   if (star_array[j].binind) {
      parafprintf(binfp, "%g %g\n", binary_array[star_array[j].binind].a, sqr(binary_array[star_array[j].binind].e));
   }
}

mpi_para_file_write(mpi_binfp_wrbuf, &mpi_binfp_len, &mpi_binfp_ofst_total, &mpi_binfp);
MPI_File_close(&mpi_binfp);

The files are opened by all processors using MPI-IO. Each processor writes the data into a string/char buffer. At the end of the timestep, all processors flush the data from the buffers into the corresponding files in parallel using MPI-IO. The code uses 5 variables for this process - the MPI-IO file pointer, which follows the format mpi_<fptr_name>, 2 char buffers, which have the format mpi_<fptr_name>_buf and mpi_<fptr_name>_wrbuf, and an int/longlong variables to maintain the length of the buffer (format mpi_<fptr_name>_len) and the offset in the file (format mpi_<fptr_name>_ofst_total) where data has to be written. Given all these variables, a call to the function parafprintf(<fptr_name>, <args>) which follows a pattern similar to fprintf, but underneath writes the given args to the respective buffers and updates the offsets etc. Once all writing has been done, each processor has different data in their respective buffers which needs to be flushed out into the file. A call to mpi_para_file_write() takes care of this.