Commit 3dd5b115 authored by Camille Coti's avatar Camille Coti
Browse files

Missing files

parent dc0e6c99
#include <iostream>
#include <mpi.h>
#include <ginac/ginac.h>
#include "products.h"
#include "utils_parall.h"
#include "parall_constants.h"
#include "parall_internal.h"
#include "utils.h"
namespace gi = GiNaC;
/*******************************************************************************
* Parallel hierarchical decomposition *
*******************************************************************************/
/*******************************************************************************
* Foreman *
*******************************************************************************/
gi::ex multiply_2levels_foreman_hierarch_distribute_work2( tensor3D_t& T, int size, parameters_s_t params, gi::lst symbols, MPI_Comm comm_team, int rank_foreman /* DEBUG */ ) {
gi::ex Tens = 0;
int a1, a2, a4, a6;
int N = size / 2;
MPI_Status status;
int unsigned len;
char* expr_c;
size_t expr_c_size = 0;
int src, np, running = 0;
parameters_2_1_t pzero( 0, 0 );
MPI_Comm_size( comm_team, &np );
expr_c = NULL;
/* Compute the points that need to be computed */
a4 = params.a4;
std::vector<parameters_2_1_t> input;
std::vector<gi::ex> results;
for( a2 = 0 ; a2 < N ; a2++ ) {
parameters_2_1_t p( a4, a2 );
input.push_back( p );
}
/* Distribute the work */
/* Very copy/paste from multiply_2levels_master -> possible refactoring here */
while( input.size() > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm_team, &status );
src = status.MPI_SOURCE;
if( status.MPI_TAG == TAG_PULL ) {
/* Nothing else will come: just send wome work */
send_work( input, src, comm_team );
} else {
if( status.MPI_TAG == TAG_RES ){
/* The first message contains the length of what is coming next */
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm_team, &status );
/* put it in the result queue */
std::string s( expr_c );
/* Send work */
send_work( input, src, comm_team );
/* Process what I have just received */
/* Could be given to a slave... */
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
} else {
/* Send more work */
send_work( input, src, comm_team );
}
} else{
std::cerr << "Wrong tag received " << status.MPI_TAG << std::endl;
}
}
}
/* This batch is done */
running = np - 1; // all the slaves are running
while( running > 0 ) {
/* Here we might also receive a TAG_PULL if the data set is too small */
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, /*MPI_ANY_TAG/*/ TAG_RES, comm_team, &status );
src = status.MPI_SOURCE;
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm_team, &status );
/* And send the END_BATCH signal, not the END one */
send_end_batch( src, pzero, comm_team );
running--;
/* Process what I have just received */
/* Could be given to a slave... */
/* put it in the result queue */
std::string s( expr_c );
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
} else {
send_end_batch( src, pzero, comm_team );
running--;
}
}
if( NULL != expr_c) free( expr_c );
return Tens;
}
void multiply_2levels_foreman_hierarch_finalize2( MPI_Comm comm_team ) {
int src, np, running = 0;
unsigned int len;
MPI_Status status;
parameters_2_1_t pzero( 0, 0 );
MPI_Comm_size( comm_team, &np );
running = np - 1;
while( running > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm_team, &status );
src = status.MPI_SOURCE;
send_end( src, pzero, comm_team );
running--;
}
return;
}
void multiply_2levels_foreman_hierarch2( tensor3D_t& T, int size, MPI_Comm comm_foremen, MPI_Comm comm_team ) {
gi::ex Tens;
gi::lst symbols;
unsigned int len = 0;
parameters_s_t params;
MPI_Status status;
char* expr_c;
int rank;
MPI_Comm_rank( comm_foremen, &rank );
/* Ask for some work */
MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_PULL, comm_foremen );
/* Compute the set of symbols */
symbols = all_symbols_3D( size );
while( true ){
/* Receive a set of parameters */
MPI_Recv( &params, 1, DT_PARAMETERS_S, ROOT, MPI_ANY_TAG, comm_foremen, &status );
if( status.MPI_TAG == TAG_WORK ){
/* Distribute the work on my workers */
Tens = multiply_2levels_foreman_hierarch_distribute_work2( T, size, params, symbols, comm_team, rank /* DEBUG */ );
/* Send the result to the master */
send_result( Tens, comm_foremen );
} else {
if( status.MPI_TAG == TAG_END ){
/* The end: forward the signal to our workers */
multiply_2levels_foreman_hierarch_finalize2( comm_team );
return;
} else {
std::cerr << "Wrong tag received on slave " << status.MPI_TAG << std::endl;
}
}
}
}
/*******************************************************************************
* Worker *
*******************************************************************************/
void multiply_2levels_slave_hierarch2( tensor3D_t& T, int size, MPI_Comm comm ) {
gi::ex Tens;
int a4, a2;
unsigned int len = 0;
parameters_2_1_t params;
MPI_Status status;
char* expr_c;
int rank;
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
/* Ask for some work */
pull:
MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_PULL, comm );
while( true ){
/* Receive a set of parameters */
MPI_Recv( &params, 1, DT_PARAMETERS_2_1, ROOT, MPI_ANY_TAG, comm, &status );
if( status.MPI_TAG == TAG_WORK ){
a4 = params.a4;
a2 = params.a2;
Tens = two_level2_product( &T, size, a4, a2 );
if( Tens.is_zero() ) {
len = 0;
expr_c = NULL; // just so g++ does not complain that it believes we are using it uninitialized later
} else {
/* linearize the result */
std::string expr = linearize_expression( Tens );
/* Send it to the master */
len = expr.length() + 1;
expr_c = (char*) malloc( len );
memcpy( expr_c, expr.c_str(), len-1 );
expr_c[len-1] = '\0'; // C++ std::strings do not terminate with \0
}
MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_RES, comm );
if( len != 0 ) {
MPI_Send( expr_c, len, MPI_CHAR, ROOT, TAG_EXPR, comm );
free( expr_c );
}
} else {
if( status.MPI_TAG == TAG_END_BATCH ){
/* End of this batch -- wait for another one to start */
goto pull;
} else {
if( status.MPI_TAG == TAG_END ){
return;
} else {
std::cerr << "Wrong tag received on slave " << status.MPI_TAG << std::endl;
}
}
}
}
}
/*******************************************************************************
* Master *
*******************************************************************************/
gi::ex multiply_2levels_master2( tensor3D_t& T, unsigned int size, MPI_Comm comm = MPI_COMM_WORLD ) {
gi::ex Tens = 0;
unsigned int a4;
gi::lst symbols;
MPI_Status status;
char* expr_c;
size_t expr_c_size = 0;
int src, np, running = 0;
unsigned int len;
parameters_s_t pzero( 0 );
MPI_Comm_size( comm, &np );
expr_c = NULL;
expr_c = (char*) malloc( 3279 ); // TMP
int i, j;
i = 0;
j = 0;
int receivedresults = 0;
unsigned int N = size/2;
std::vector<parameters_s_t> input;
std::vector<std::string> results_s;
std::vector<gi::ex> results;
/* Build a list of argument sets */
for( a4 = 0 ; a4 < N ; a4++ ){
parameters_s_t p( a4 );
input.push_back( p );
}
/* Compute the set of symbols */
/* Could be done while the first slave is working */
symbols = all_symbols_3D( size );
/* Distribute the work */
while( input.size() > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
src = status.MPI_SOURCE;
if( status.MPI_TAG == TAG_PULL ) {
/* Nothing else will come: just send wome work */
send_work( input, src, comm );
running++;
} else {
if( status.MPI_TAG == TAG_RES ){
src = status.MPI_SOURCE;
/* The first message contains the length of what is coming next */
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );
/* put it in the result queue */
std::string s( expr_c );
send_work( input, src, comm );
/* Process what I have just received */
/* Could be given to a slave... */
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
#if DEBUG
results.push_back( received );
results_s.push_back( s );
receivedresults++;
#endif
} else {
/* Send more work */
send_work( input, src, comm );
}
} else{
std::cerr << "Wrong tag received " << status.MPI_TAG << std::endl;
}
}
}
/* Wait until everyone is done */
// running = np - 1; // all the foremen are running
while( running > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
src = status.MPI_SOURCE;
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );
/* And send the END signal */
send_end( src, pzero, comm );
running--;
/* Process what I have just received */
/* Could be given to a slave... */
/* put it in the result queue */
std::string s( expr_c );
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
#if DEBUG
results.push_back( received );
results_s.push_back( s );
receivedresults++;
#endif
} else {
send_end( src, pzero, comm );
running--;
}
}
#if DEBUG
std::cout << "Received " << receivedresults << " results" << std::endl;
std::cout << "Tpara=" << Tens << ";" << std::endl;
#endif
if( NULL != expr_c) free( expr_c );
return Tens;
}
/* The master sends a parameter set to a foreman.
The foreman computes a set of parameter sets and distributes them to the workers.
The foreman adds the results and, when everything is done, sends the total to the master.
Possible evolution: ask a worker to perform the addition (_addslave).
4 loops on the M, 4 loops on the F, 4 loops on the W.
Communication protocol:
M -> F: Send a parameter set. Always the same size, therefore unique communication
F -> W: Same.
W -> F: send an unsigned int (size of the expression), then the expression (table of chars)
F -> M: Same
*/
gi::ex multiply_2levels_mw_hierarch2( tensor3D_t& T, int size ) { // simpler: same dimension everywhere
int rank;
gi::ex Tens = 0;
MPI_Comm COMM_FOREMEN, COMM_TEAM;
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
#if 0 // BEGIN DEBUG
// Save original std::cin, std::cout
std::streambuf *coutbuf = std::cout.rdbuf();
std::ostringstream oss;
oss << "output_" << rank << ".txt";
std::ofstream out( oss.str().c_str());
//Write to outfile.txt through std::cout
std::cout.rdbuf(out.rdbuf());
#endif // END DEBUG
/* Create new datatypes for the parameters */
create_parameters_datatype_s();
create_parameters_datatype_2_1();
/* Create the communicators */
create_communicators_hierarch( COMM_FOREMEN, COMM_TEAM );
/* Here we go*/
if( 0 == rank ) {
Tens = multiply_2levels_master2( T, size, COMM_FOREMEN );
} else {
int rank_foreman;
MPI_Comm_rank( COMM_TEAM, &rank_foreman );
if( 0 == rank_foreman ) {
multiply_2levels_foreman_hierarch2( T, size, COMM_FOREMEN, COMM_TEAM );
} else {
multiply_2levels_slave_hierarch2( T, size, COMM_TEAM );
}
}
#if 0 // BEGIN DEBUG
//Restore back.
std::cout.rdbuf(coutbuf);
#endif // END DEBUG
/* Finalize */
free_parameters_s_dt( );
free_parameters_2_1_dt( );
return Tens;
}
#include <iostream>
#include <mpi.h>
#include <ginac/ginac.h>
#include "products.h"
#include "utils_parall.h"
#include "parall_constants.h"
#include "parall_internal.h"
#include "utils.h"
namespace gi = GiNaC;
/*******************************************************************************
* Parallel 1-level decomposition *
*******************************************************************************/
gi::ex multiply_1level_master2( tensor3D_t& T, unsigned int size, MPI_Comm comm = MPI_COMM_WORLD ) {
gi::ex Tens = 0;
unsigned int a4;
gi::ex A;
gi::lst symbols;
MPI_Status status;
char* expr_c;
size_t expr_c_size = 0;
int src, np, running = 0;
unsigned int len;
parameters_s_t pzero( 0 );
MPI_Comm_size( comm, &np );
expr_c = NULL;
expr_c = (char*) malloc( 3279 ); // TMP
int i, j;
i = 0;
j = 0;
int receivedresults = 0;
unsigned int N = size/2;
std::vector<parameters_s_t> input;
std::vector<std::string> results_s;
std::vector<gi::ex> results;
/* Build a list of argument sets */
for( a4 = 0 ; a4 < N ; a4++ ){
parameters_s_t p( a4 );
input.push_back( p );
}
/* Compute the set of symbols */
/* Could be done while the first slave is working */
symbols = all_symbols_3D( size );
/* Distribute the work */
while( input.size() > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
src = status.MPI_SOURCE;
if( status.MPI_TAG == TAG_PULL ) {
/* Nothing else will come: just send wome work */
send_work( input, src, comm );
} else {
if( status.MPI_TAG == TAG_RES ){
src = status.MPI_SOURCE;
/* The first message contains the length of what is coming next */
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );
/* put it in the result queue */
std::string s( expr_c );
send_work( input, src, comm );
/* Process what I have just received */
/* Could be given to a slave... */
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
#if DEBUG
results.push_back( received );
results_s.push_back( s );
receivedresults++;
#endif
} else {
/* Send more work */
send_work( input, src, comm );
}
} else{
std::cerr << "Wrong tag received " << status.MPI_TAG << std::endl;
}
}
}
/* Wait until everyone is done */
running = np - 1; // all the slaves are running
while( running > 0 ) {
MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
src = status.MPI_SOURCE;
if( len != 0 ) {
if( len > expr_c_size ) {
expr_c_size = len;
if( NULL != expr_c ) free( expr_c );
expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
}
/* Receive the result */
MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );
/* And send the END signal */
send_end( src, pzero, comm );
running--;
/* Process what I have just received */
/* Could be given to a slave... */
/* put it in the result queue */
std::string s( expr_c );
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
#if DEBUG
results.push_back( received );
results_s.push_back( s );
receivedresults++;
#endif
} else {
send_end( src, pzero, comm );
running--;
}
}