A Distributed Implementation

MPI

MPI (Message Passing Interface) is a standard API for implementing distributed programs. There are many advantages of using MPI, but the main one is that programs will be compatible at the source level regardless of the particular MPI implementation being used.

For the rest of this discussion, we will assume the availability of LAM, an MPI implementation from Notre Dame. Here are some reasons for preferring MPI to an alternative like PVM.

Master/Slave Model

A very common model used for distributed programming is the master/slave model. In this model, there is one process called the master, which creates work and distributes it to the slaves. The slaves respond to the master with their completed work, and ask for more if it is available. This conceptually simple model works very well for problems which do not require a lot of synchronization, and whose slaves can be completely autonomous. These types of problems are often referred to as embarrassingly parallel.

Implementation

In order to build on our threaded implementation, we need to decide how to reformulate our implementation in terms of a master/slave model and add the required calls to MPI in order to distribute our problem and collect the results. Here are the changes to main():

int main (int argc, char *argv[]){
  int num_threads(2);
  int my_rank;
  int nprocs;

  // initialize MPI
  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);

  // were we called properly?
  if (argc<3)
    usage(argv[0]);
  if (argc==4)
    num_threads=atol(argv[3]);
  long start(atol(argv[1]));
  long stop(atol(argv[2]));

  if (my_rank == 0)
    master(start,stop,nprocs);
  else
    slave(num_threads);

  MPI_Finalize();

  return EXIT_SUCCESS;
}

We need to call MPI_Init() at the beginning of our distributed program in order to connect to the multicomputer. The next two function calls establish our rank and the total number of computers which will be involved in the computation.

MPI will start the same program on every computer in the multicomputer. This is why we need to establish at run time what our rank is so that we can decide if we are a master or a slave. Depending on our rank, we either call master(), or slave().

After we have finished our computations, we must call MPI_Finalize() to release our connection to the multicomputer.

The Slaves

Our slave() function takes only one argument, namely the number of threads to use. This allows us to fully utilize the processing power of SMP machines in a cluster.

The purpose of the slave is to sit and wait for work, perform the work, and then return the results. It will continue to do this until it receives a signal that there is no more work to do at which point it will return:

void slave(int num_threads){
  long result;
  long bounds[2];
  MPI_Status status;

  while(true){
    MPI_Recv(bounds, 2, MPI_LONG, 0, MPI_ANY_TAG,
	     MPI_COMM_WORLD, &status);
    if (status.MPI_TAG == KILL)
      return;

    try {
      long start(bounds[0]);
      long stop(bounds[1]);

      ... same code as in threaded version's main() ...

      MPI_Send(&count,1,MPI_LONG, 0, 0, MPI_COMM_WORLD);
    }
    catch (range_error e){
      cout << "Exception: " << e.what() << endl;
    }
  }
}

The bulk of the code in the slave() function is similar to main() in our threaded example. The only difference is how the slave gets the bounds it is supposed to count the primes in, and how it returns those results.

The slave goes into an endless loop waiting for work from the master which it gets via MPI_Recv(). This function gets two longs which are sent by the master and stores them in the bounds array. After receiving from the master, the slave checks the status of the message to see if the work is done (the KILL message), and if so returns. Otherwise, we rename the variables so that we can use exactly the same code as in the threaded version. The only remaining step is to send our results back to the master via MPI_Send(). Here we send back one long which contains the count found by this slave.

The Master

The job of the master is slightly more complicated as it must decide how to break up the work to be sent out to the slaves, and how to collect the results. The first part of the master sends the initial work units out to the slaves, and waits for results to come back in. When the master receives a result, it sends another work unit out to the same process if there is still work to be done. After there is no more work to be sent out, each process is polled once more for any remaining results, and then each slave is told to quit. Here is the listing:

void master(long start, long stop, long nprocs){
  long work[2];
  long result;
  long total(0);
  long current(0);
  MPI_Status status;
  
  // dish out some work
  for (int rank=1; rank<nprocs; ++rank){
    // set the bounds for this work
    if (make_work(work,&current,stop))
      MPI_Send(work,2,MPI_LONG,rank,WORK,MPI_COMM_WORLD);
  }
  // keep sending work out while there is some
  while(make_work(work,&current,stop)){
    MPI_Recv(&result,1,MPI_LONG,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
    total+=result;
    MPI_Send(work,2,MPI_LONG,status.MPI_SOURCE,WORK,MPI_COMM_WORLD);
  }
  // receive any outstanding requests
  for (int rank=1; rank<nprocs; ++rank){
    MPI_Recv(&result,1,MPI_LONG,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
    total+=result;
  }
  // tell all salves to exit
  for (int rank=1; rank<nprocs; ++rank){
    MPI_Send(0,0,MPI_INT,rank,KILL,MPI_COMM_WORLD);
  }
  cout << "There were " << total << " primes." << endl;
}

The make_work() function is responsible for deciding when the work is done, and how to break it up. We have chosen a simple sequential model where the size of the chunks is determined by STEP_SIZE:

bool make_work(long *work, long *current, long stop){
  if (*current>=stop)
    return false; // no work left

  work[0]=*current;
  work[1]=*current+STEP_SIZE;
  *current=*current+STEP_SIZE+1;

  if (work[1]>stop)
    work[1]=stop; // take care of overstepping last boundary
  return true;
}

The STEP_SIZE variable is key to controlling the load balancing between the machines. If it is too big, there is a possibility that some machines will remain idle, while a few machines deal with the numbers in the higher end of the range. If it is too small, then there will be too much communication overhead. These factors are generally easier to determine through experimentation. These details are further explored in the Performance section.

The complete program can be found at ftp://ftp.cim.mcgill.ca/pub/people/ericb/primes.tar.bz2.

Compiling and Running the Distributed Version

MPI programs are compiled with mpicc, or mpiCC depending on whether you are compiling C or C++ code respectively. To run the distributed program, you must first boot the multicomputer via lamboot, and then you can run your program using the mpirun command. When you are finished an MPI session, you can shutdown the multicomputer with wipe:

bash$ mpiCC -O -o primes_mpi primes_mpi.cpp -lpthread
bash$ lamboot
 
LAM 6.3.2/MPI 2 C++/ROMIO - University of Notre Dame
 
bash$ mpirun -O -np 16 primes_mpi -- 0 10000000
There were 664579 primes.
bash$ wipe

If you are having difficulty getting lamboot to run successfully, you can use the recon command to verify what may be causing you trouble.

Note

If recon fails, it is possible that you are not able to run commands on remote machines without typing a password. If you are using ssh, make sure you have set LAMRSH to reflect that:

bash$ export LAMRSH=`which ssh`

The arguments to mpicc are essentially the same as what you would normally pass directly to your compiler. One exception is the -O to both mpicc and mpirun which specifies that the multicomputer is homogeneous, and that endianness translations need not be performed. The -np argument to mpirun specifies the number of processes to start (usually the number of nodes in the multicomputer). All arguments after the double minus (--) are passed as arguments to the main program being run.