Job Queue

The Concept

When a problem cannot easily be divided among the processes a priori, job scheduling can offer a way to keep all the processes busy. The master process is responsible for scheduling tasks to the other processes; when one of the node is finished it reports its result to the master process, which then assigns a new job to the slave process. This way, all the processes will be busy as long as there are jobs left. Memoization can be utilized by the master node to prevent jobs to be assigned more than once. It is important to remember the overhead of assigning jobs – network traffic should be avoided, so if the jobs are not of sufficient size, the job queue scheme will not be beneficial.

The Code

The program computes c = A*b by first distributing b to all the slaves and then assigning rows of A to available processes. When a process has computed an element of c, it returns it to the master and gets another row assigned as long as there are rows left. Each process writes a log of its communication to a file called file.[rank]. The result c is written to standard out.

$ cd basic_mpi/04_job_queue/
$ cat src/job_queue.c
/******************************************************************************
 *                                                                            *
 *  Basic MPI Example - Job Queue                                             *
 *                                                                            *
 *  Demonstrates job scheduling through performing the matrix vector product  *
 *  c = A*b. The master assigns rows of A to the slaves to compute            *
 *  each entry of c.                                                          *
 *                                                                            *
 ******************************************************************************
 *                                                                            *
 *  The original code was written by Gustav at University of Indiana in 2003. *
 *                                                                            *
 *  The current version has been tested/updated by the HPC department at      *
 *  the Norwegian University of Science and Technology in 2011.               *
 *                                                                            *
 ******************************************************************************/
#include      /* [fs]printf, fopen and fclose defined here */
#include     /* exit defined here */
#include  /* chmod defined here */
#include   /* chmod defined here */
#include 
 
#define COLS 100
#define ROWS 100
#define TRUE 1
#define FALSE 0
#define MASTER_RANK 0
 
int main ( int argc, char **argv )
{
   int pool_size, my_rank, destination;
   int i_am_the_master = FALSE;
   int A[ROWS][COLS], b[COLS], c[ROWS], i, j;
   int int_buffer[BUFSIZ];
   char my_logfile_name[BUFSIZ];
   FILE *my_logfile;
   MPI_Status status;
    
 
   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
 
   if (my_rank == MASTER_RANK) i_am_the_master = TRUE;
 
   sprintf(my_logfile_name, "output/file.%d", my_rank);
   my_logfile = fopen(my_logfile_name, "w");
   (void) chmod(my_logfile_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
 
   if (i_am_the_master) {
 
      int row, count, sender;
 
      for (j = 0; j < COLS; j++) {
         b[j] = 1;
         for (i = 0; i < ROWS; i++) A[i][j] = i;
      }
 
      MPI_Bcast(b, COLS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);
 
      count = 0;
      for (destination = 0; destination < pool_size; destination++) {
         if (destination != my_rank) {
            for (j = 0; j < COLS; j++) int_buffer[j] = A[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, destination, count,
                     MPI_COMM_WORLD);
            fprintf(my_logfile, "sent row %d to %d\n", count, destination);
            count = count + 1;
         }
      }
 
      for (i = 0; i < ROWS; i++) {
         MPI_Recv (int_buffer, BUFSIZ, MPI_INT, MPI_ANY_SOURCE,
                   MPI_ANY_TAG, MPI_COMM_WORLD, &status);
         sender = status.MPI_SOURCE;
         row = status.MPI_TAG;
         c[row] = int_buffer[0];
         fprintf(my_logfile, "\treceived row %d from %d\n", row, sender);
         if (count < ROWS) {
            for (j = 0; j < COLS; j++) int_buffer[j] = A[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, sender, count,
                     MPI_COMM_WORLD);
            fprintf(my_logfile, "sent row %d to %d\n", count, sender);
            count = count + 1;
         }
         else {
            MPI_Send(NULL, 0, MPI_INT, sender, ROWS, MPI_COMM_WORLD);
            fprintf(my_logfile, "terminated process %d with tag %d\n", sender, ROWS);
         }
      }
      for (row = 0; row < ROWS; row++) printf("%d ", c[row]);
      printf("\n");
 
   }
   else { /* I am not the master */
 
      int sum, row;
 
      MPI_Bcast(b, COLS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);
      fprintf(my_logfile, "received broadcast from %d\n", MASTER_RANK);
      MPI_Recv(int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                    MPI_COMM_WORLD, &status);
      fprintf(my_logfile, "received a message from %d, tag %d\n",
                   status.MPI_SOURCE, status.MPI_TAG);
      while (status.MPI_TAG != ROWS) { /* The job is not finished */
         row = status.MPI_TAG; sum = 0;
         for (i = 0; i < COLS; i++) sum = sum + int_buffer[i] * b[i];
         int_buffer[0] = sum;
         MPI_Send (int_buffer, 1, MPI_INT, MASTER_RANK, row, MPI_COMM_WORLD);
         fprintf(my_logfile, "sent row %d to %d\n", row, MASTER_RANK);
         MPI_Recv (int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                   MPI_COMM_WORLD, &status);
         fprintf(my_logfile, "received a message from %d, tag %d\n",
                 status.MPI_SOURCE, status.MPI_TAG);
      }
      fprintf(my_logfile, "exiting on  tag %d\n", status.MPI_TAG);
   }
 
   fclose (my_logfile);
 
   MPI_Finalize ();
 
   exit (0);
}
Previous instructions

MPI_Init() and MPI_Finalize(); Are used to initialize and end the MPI program.

MPI_Bcast(); Is used to broadcast the vector b to all processes.

MPI_Comm_size(); Is used to find the number of processes in the pool.

MPI_Comm_rank(); Is used to differ between the master and slave processes.

MPI_Send() and MPI_Recv(); Are used to send rows of A to the slaves and elements of c to the master. The row index of A is used as the tag in the communication. When there are no more rows left, the master set the tag equal to the total number of rows, indicating to the slave that it should finish its log and exit.

New instructions

None

Compile & Run

If you have not already done so, obtain all the example code here.

Switch to the Intel compiler (optional, only necessary once in each terminal session)

$ module load intel

Compile the program using

$ make

 Submit the job to the queue

$ make submit

The output files from the program execution are placed in the output folder

$ ls output/
131416.vilje.hpc.ntnu.no.ER  file.0  file.10  file.12  file.14  file.2  file.4  file.6  file.8
131416.vilje.hpc.ntnu.no.OU  file.1  file.11  file.13  file.15  file.3  file.5  file.7  file.9

The standard out is placed in the .OU file

 
$ cat output/*OU
0 100 200 300 400 500 600 700 800 900 1000 1100 1200 1300 1400 1500 1600 1700 1800 1900 2000
2100 2200 2300 2400 2500 2600 2700 2800 2900 3000 3100 3200 3300 3400 3500 3600 3700 3800
3900 4000 4100 4200 4300 4400 4500 4600 4700 4800 4900 5000 5100 5200 5300 5400 5500 5600
5700 5800 5900 6000 6100 6200 6300 6400 6500 6600 6700 6800 6900 7000 7100 7200 7300 7400
7500 7600 7700 7800 7900 8000 8100 8200 8300 8400 8500 8600 8700 8800 8900 9000 9100 9200
9300 9400 9500 9600 9700 9800 9900