#include "../eSkel.c" // Parameters of the model #define PROCS 2 // Number of processes, not used here but for information #define STAGES 2 // Number of pipeline stages // !! when changing be careful with processes to stages affectation !! #define SUBSTAGES 2 // Number of stages for the transiently nested pipeline #define LIMIT 29 // Parameter to filter the input data // Nested pipeline, stage 1 // Add 1000 and transfer to substage 2 void Substage1 (void) { int i; int **len; eSkel_molecule_t *tempitem; len = (int **) malloc (sizeof (int *)); tempitem = (eSkel_molecule_t *) malloc (sizeof (eSkel_molecule_t)); while (tempitem->data = Take(len)) { tempitem->len = *len; printf("Substage1 ... len = %d ... data = ",tempitem->len[0]); for (i=0;ilen[0];i++) { printf("%d ", ((int *) tempitem->data[0])[i]); ((int *) tempitem->data[0])[i] += 1000; } printf("\n"); // Send the output Give(tempitem->data, tempitem->len); } } // Nested pipeline, stage 2 // Add 1000, and generate new data void Substage2 (void) { int i; int **len; eSkel_molecule_t *tempitem; len = (int **) malloc (sizeof (int *)); tempitem = (eSkel_molecule_t *) malloc (sizeof (eSkel_molecule_t)); while (tempitem->data = Take(len)) { tempitem->len = *len; printf("Substage2 ... len = %d ... data = ",tempitem->len[0]); for (i=0;ilen[0];i++) { printf("%d ", ((int *) tempitem->data[0])[i]); ((int *) tempitem->data[0])[i] += 1000; } printf("\n"); // We generate 2 outputs // First Give Give(tempitem->data, tempitem->len); // Modify the data and perform the second Give for (i=0;ilen[0];i++) ((int *) tempitem->data[0])[i] += 10000; Give(tempitem->data, tempitem->len); } } // Main pipeline, Stage 1 // In some cases, call transiently the nested pipeline. void Stage1 (void) { int i, **len; eSkel_molecule_t *tempitem; // For the nested pipeline spread_t spreads[SUBSTAGES+1]; MPI_Datatype types[SUBSTAGES+1]; Imode_t imodes[SUBSTAGES]; int outmul, mystagenum, *inputs, *results; eSkel_molecule_t *(*substages[SUBSTAGES])(eSkel_molecule_t *); tempitem = (eSkel_molecule_t *) malloc (sizeof (eSkel_molecule_t)); len = (int **) malloc (sizeof (int *)); while (tempitem->data = Take(len)) { tempitem->len = *len; printf("Stage1 ... len = %d ... data = ",tempitem->len[0]); for (i=0;ilen[0];i++) printf("%d ", ((int *) tempitem->data[0])[i]); printf("\n"); // upper than the limit: process the data and generate new one if (((int*) tempitem->data[0])[0] > LIMIT) { inputs = (int *) malloc (sizeof(int)); //for (i=0; ilen[0]; i++) inputs[0] = ((int*) tempitem->data[0])[0]; for (i=0; ilen[0];i++) ((int *) tempitem->data[0])[i] = results[0]; Give (tempitem->data, tempitem->len); // Send second result for (i=0;ilen[0];i++) ((int *) tempitem->data[0])[i] = results[1]; Give (tempitem->data, tempitem->len); } // In this case, no filtering of data, we just transmit it else Give (tempitem->data, tempitem->len); } } // Main pipeline, Stage 2 void Stage2 (void) { int i; int **len; eSkel_molecule_t *tempitem; len = (int **) malloc (sizeof (int *)); tempitem = (eSkel_molecule_t *) malloc (sizeof (eSkel_molecule_t)); while (tempitem->data = Take(len)) { tempitem->len = *len; printf("Stage2... len = %d ... data = ",tempitem->len[0]); for (i=0;ilen[0];i++) { printf("%d ", ((int *) tempitem->data[0])[i]); ((int *) tempitem->data[0])[i] += 1000; } printf("\n"); // Generate some outputs Give(tempitem->data, tempitem->len); } } // Main procedure, defining the Pipeline parameters and calling the Pipeline int main (int argc, char *argv[]) { int i; spread_t spreads[STAGES+1]; MPI_Datatype types[STAGES+1]; Imode_t imodes[STAGES]; int outmul, mystagenum; int *inputs, *results; double secs; eSkel_molecule_t *(*stages[STAGES])(eSkel_molecule_t *); MPI_Init(&argc, &argv); SkelLibInit(); // Welcome message if (myrank()==0) printf("*** Entering Example 2 ***\n"); secs = MPI_Wtime(); // Define the input data inputs = (int *) malloc (sizeof(int)*8); for (i=0; i<8; i++) inputs[i] = myrank() + 10*(i+1); // Define the stage parameters for (i=0; i