Newer
Older
#include <mpi.h>
#include "utils_parall.h"
#include "parall_constants.h"
#include "tensormatrix.h"
/*******************************************************************************
* Utilities for parallelism *
*******************************************************************************/
std::string linearize_expression( gi::ex expr ){
std::ostringstream oss;
oss << expr;
return oss.str();
}
gi::ex de_linearize_expression( std::string s, gi::lst symbols ){
return gi::ex( s, symbols );
}
parameters_t::parameters_t( unsigned int a4, unsigned int a2, unsigned int a1 ){
this->a4 = a4;
parameters_s_t::parameters_s_t( unsigned int a4 ){
this->a4 = a4;
}
parameters_2_1_t::parameters_2_1_t( unsigned int a4, unsigned int a2 ){
this->a4 = a4;
parameters_2_2_t::parameters_2_2_t( unsigned int a4, unsigned int a2, unsigned int a1, unsigned int a6 ){
this->a4 = a4;
this->a2 = a2;
MPI_Type_contiguous( 3, MPI_UNSIGNED, &DT_PARAMETERS );
void create_parameters_datatype_s(){
MPI_Type_contiguous( 1, MPI_UNSIGNED, &DT_PARAMETERS_S );
MPI_Type_commit( &DT_PARAMETERS_S );
}
void create_parameters_datatype_2_1(){
MPI_Type_contiguous( 2, MPI_UNSIGNED, &DT_PARAMETERS_2_1 );
MPI_Type_commit( &DT_PARAMETERS_2_1 );
}
void create_parameters_datatype_2_2(){
MPI_Type_contiguous( 4, MPI_UNSIGNED, &DT_PARAMETERS_2_2 );
MPI_Type_commit( &DT_PARAMETERS_2_2 );
}
void free_parameters_dt( ){
MPI_Type_free( &DT_PARAMETERS );
}
void free_parameters_2_1_dt( ){
MPI_Type_free( &DT_PARAMETERS_2_1 );
}
void free_parameters_2_2_dt( ){
MPI_Type_free( &DT_PARAMETERS_2_2 );
void free_parameters_s_dt( ){
MPI_Type_free( &DT_PARAMETERS_S );
}
gi::ex add_expressions( std::vector<std::string> expressions, gi::lst symbols ) {
gi::ex Tens = 0;
for( auto s: expressions ) {
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
}
return Tens;
}
/* M -> W: Send the end signal */
void send_end( int peer, MPI_Comm comm ) {
parameters_t para;
MPI_Send( ¶, 1, DT_PARAMETERS, peer, TAG_END, comm );
}
void send_end( int peer, parameters_2_1_t p, MPI_Comm comm ) {
/* The parameters_2_1_t argument is not used, but needed to distinguish between functions */
MPI_Send( &p, 1, DT_PARAMETERS_2_1, peer, TAG_END, comm );
}
void send_end( int peer, parameters_s_t p, MPI_Comm comm ) {
/* The parameters_s_t argument is not used, but needed to distinguish between functions */
MPI_Send( &p, 1, DT_PARAMETERS_S, peer, TAG_END, comm );
}
void send_end_batch( int peer, MPI_Comm comm ) {
parameters_t para;
MPI_Send( ¶, 1, DT_PARAMETERS_2_1, peer, TAG_END_BATCH, comm );
}
/* M -> W: Send some work: just a parameter set */
void send_work( std::vector<parameters_t>& input, int peer, MPI_Comm comm ){
parameters_t para = input.back();
input.pop_back();
MPI_Send( ¶, 1, DT_PARAMETERS, peer, TAG_WORK, comm );
}
void send_work( std::vector<parameters_2_1_t>& input, int peer, MPI_Comm comm ){
parameters_2_1_t para = input.back();
MPI_Send( ¶, 1, DT_PARAMETERS_2_1, peer, TAG_WORK, comm );
void send_work( std::vector<parameters_2_2_t>& input, int peer, MPI_Comm comm ){
parameters_2_2_t para = input.back();
input.pop_back();
MPI_Send( ¶, 1, DT_PARAMETERS_2_2, peer, TAG_WORK, comm );
}
void send_work( std::vector<parameters_s_t>& input, int peer, MPI_Comm comm ){
parameters_s_t para = input.back();
input.pop_back();
MPI_Send( ¶, 1, DT_PARAMETERS_S, peer, TAG_WORK, comm );
}
/* M -> W: Send a set of expressions to be added */
void send_expressions_to_add( std::vector<std::string>& results, int peer ) {
/* Fill a bogus parameter object */
int nb = results.size();
int i;
parameters_t p( nb, 0, 0 );
char* expr;
MPI_Send( &p, 1, DT_PARAMETERS, peer, TAG_ADD, MPI_COMM_WORLD );
/* Send the length of each string */
unsigned int* lengths = (unsigned int*) malloc( nb*sizeof( unsigned int ) );
for( i = 0 ; i < nb ; i++ ) {
lengths[i] = results[i].length();
}
MPI_Send( lengths, nb, MPI_INT, peer, TAG_ADD, MPI_COMM_WORLD );
/* Send the strings (should be nicely pipelined) */
for( i = 0 ; i < nb ; i++ ) {
expr = const_cast<char*>( results[i].c_str() );
MPI_Send( expr, results[i].length(), MPI_CHAR, peer, TAG_ADD, MPI_COMM_WORLD );
}
results.erase( results.begin(), results.end() );
free( lengths );
}
/* M -> W: Send either a set of expressions to add, or the end signal */
void send_add_or_end_addslave( std::vector<std::string>& results, int peer, int* running ){
/* Do I have a lot of results to be treated in the result queue? */
if( results.size() > maxresult ) {
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_end( peer );
void send_add_or_end_addslave( std::vector<std::string>& results, int peer, int* running, parameters_s_t p ){
/* Do I have a lot of results to be treated in the result queue? */
if( results.size() > maxresult ) {
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_end( peer, p );
(*running)--;
}
}
/* M -> W: Send work: either a set of expressions to add, or a parameter set */
void send_work_addslave( std::vector<parameters_t>& input, std::vector<std::string>& results, int peer ) {
if( results.size() > maxresult ) {
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_work( input, peer );
}
}
void send_work_addslave( std::vector<parameters_s_t>& input, std::vector<std::string>& results, int peer ) {
if( results.size() > maxresult ) {
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_work( input, peer );
}
}
/* W -> M: send the result of a computation */
void send_result( gi::ex T, MPI_Comm comm ){
unsigned int len;
char* expr_c;
if( T.is_zero() ) {
len = 0;
expr_c = NULL; // just so g++ does not complain that it believes we are using it uninitialized later
} else {
/* linearize the result */
std::string expr = linearize_expression( T );
/* Send it to the master */
len = expr.length() + 1;
expr_c = (char*) malloc( len );
memcpy( expr_c, expr.c_str(), len-1 );
expr_c[len-1] = '\0'; // C++ std::strings do not terminate with \0
}
MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_RES, comm );
if( len != 0 ) {
MPI_Send( expr_c, len, MPI_CHAR, ROOT, TAG_EXPR, comm );
free( expr_c );
}
}