Multi-threaded Implementation

Background

In order to improve the speed of our example on symmetric multi-processing (SMP) machines, we need to make use of threads. We would like to stick to our design in the previous example, which means we need to find a way for each object to have its own thread of execution. Unfortunately, mixing C++ and pthreads is non-trivial, as pthread_create() expects a function which has been linked with C-style linking, not C++. We have solved this problem by creating an accessory class and a static member function:

class Threaded {
public:
  Threaded() {};
  virtual ~Threaded() {};
  static void *entry (void *ptr);
  virtual void *run ()=0;
};

void *Threaded::entry(void *ptr){
  return reinterpret_cast<Threaded*>(ptr)->run();
}

class CountPrimes : public Threaded {
public:
  CountPrimes(long start_, long stop_);
  long total();
  void *run();
private:
  long start;
  long stop;
  long count;
  bool counted;
  bool is_prime (long candidate);
};

void *CountPrimes::run(){
  long *return_val=new long(total());
  return reinterpret_cast<void*>(return_val);
}

The remainder of the CountPrimes object implementation is the same. The points to note here are:

Using Our New Object

We would like to retain the usage form of our serial implementation, and therefore add only one extra parameter which controls the number of threads which will be spawned to complete the task. Because we do not know beforehand how many objects (threads) will be needed, we will allocate them dynamically:

int main (int argc, char *argv[]){
  int num_threads(2);
  // were we called properly?
  if (argc<3)
    usage(argv[0]);
  if (argc==4)
    num_threads=atol(argv[3]);

  try {
    pthread_t *t_id=new pthread_t[num_threads];
    CountPrimes **counter=new CountPrimes*[num_threads];
    // start up the threads
    long start(atol(argv[1]));
    long stop(atol(argv[2]));
    long incr((stop-start)/num_threads);
    for (int i=0; i<num_threads; i++){
      counter[i]=new CountPrimes(start,(start+incr)>stop?stop:start+incr);
      start+=incr+1;
      pthread_create(&t_id[i],NULL,counter[i]->entry,counter[i]);
    }
    // now wait for the results
    long count=0;
    for (int i=0; i<num_threads; i++){
      void *return_val;
      pthread_join(t_id[i],&return_val);
      count+=*(reinterpret_cast<long*>(return_val));
      delete reinterpret_cast<long*>(return_val);
    }
    for (int i=0; i<num_threads; i++)
      delete counter[i];

    delete[] counter;
    delete[] t_id;

    if (count>1)
      cout << "There were " << count << " primes." << endl;
    else
      cout << "There was " << count << " prime." << endl;
  }
  catch (range_error e){
    cout << "Exception: " << e.what() << endl;
    exit(EXIT_FAILURE);
  }
  
  return EXIT_SUCCESS;
}

There are a few more subtleties in this example, so we will go through the code in a little more detail:

The complete program can be found at ftp://ftp.cim.mcgill.ca/pub/people/ericb/primes.tar.bz2. Here is an example of compiling and using the program:

bash$ g++ -o primes_threaded primes_threaded.cpp -lpthread
bash$ ./primes_threaded 0 10000 4
There were 1229 primes.