Newer
Older
#include <mpi.h>
#include "utils_parall.h"
#include "parall_constants.h"
#include "tensormatrix.h"
/*******************************************************************************
* Utilities for parallelism *
*******************************************************************************/
std::string linearize_expression( gi::ex expr ){
std::ostringstream oss;
oss << expr;
return oss.str();
}
gi::ex de_linearize_expression( std::string s, gi::lst symbols ){
return gi::ex( s, symbols );
}
parameters_t::parameters_t( unsigned int a4, unsigned int a2, unsigned int a1 ){
this->a4 = a4;
parameters_2_1_t::parameters_2_1_t( unsigned int a4, unsigned int a2 ){
this->a4 = a4;
}
parameters_2_2_t::parameters_2_2_t( unsigned int a4, unsigned int a2 ){
this->a6 = a6;
this->a1 = a1;
MPI_Type_contiguous( 3, MPI_UNSIGNED, &DT_PARAMETERS );
void create_parameters_datatye_2_1(){
MPI_Type_contiguous( 2, MPI_UNSIGNED, &DT_PARAMETERS_2_1 );
MPI_Type_commit( &DT_PARAMETERS_2_1 );
}
MPI_Type_contiguous( 2, MPI_UNSIGNED, &DT_PARAMETERS_2_2 );
MPI_Type_commit( &DT_PARAMETERS_2_2 );
}
void free_parameters_dt( ){
MPI_Type_free( &DT_PARAMETERS );
}
void free_parameters_2_1_dt( ){
MPI_Type_free( &DT_PARAMETERS_2_1 );
}
void free_parameters_2_2_dt( ){
MPI_Type_free( &DT_PARAMETERS_2_2 );
}
gi::ex add_expressions( std::vector<std::string> expressions, gi::lst symbols ) {
gi::ex Tens = 0;
for( auto s: expressions ) {
gi::ex received = de_linearize_expression( s, symbols );
Tens += received;
}
return Tens;
}
/* M -> W: Send the end signal */
void send_end( int peer, MPI_Comm comm ) {
parameters_t para;
MPI_Send( ¶, 1, DT_PARAMETERS, peer, TAG_END, comm );
}
void send_end_batch( int peer, MPI_Comm comm ) {
parameters_t para;
MPI_Send( ¶, 1, DT_PARAMETERS_2_1, peer, TAG_END_BATCH, comm );
}
/* M -> W: Send some work: just a parameter set */
void send_work( std::vector<parameters_t>& input, int peer, MPI_Comm comm ){
parameters_t para = input.back();
input.pop_back();
MPI_Send( ¶, 1, DT_PARAMETERS, peer, TAG_WORK, comm );
}
void send_work( std::vector<parameters_2_1_t>& input, int peer, MPI_Comm comm ){
parameters_2_1_t para = input.back();
MPI_Send( ¶, 1, DT_PARAMETERS_2_1, peer, TAG_WORK, comm );
}
/* M -> W: Send a set of expressions to be added */
void send_expressions_to_add( std::vector<std::string>& results, int peer ) {
/* Fill a bogus parameter object */
int nb = results.size();
int i;
parameters_t p( nb, 0, 0 );
char* expr;
MPI_Send( &p, 1, DT_PARAMETERS, peer, TAG_ADD, MPI_COMM_WORLD );
/* Send the length of each string */
unsigned int* lengths = (unsigned int*) malloc( nb*sizeof( unsigned int ) );
for( i = 0 ; i < nb ; i++ ) {
lengths[i] = results[i].length();
}
MPI_Send( lengths, nb, MPI_INT, peer, TAG_ADD, MPI_COMM_WORLD );
/* Send the strings (should be nicely pipelined) */
for( i = 0 ; i < nb ; i++ ) {
expr = const_cast<char*>( results[i].c_str() );
MPI_Send( expr, results[i].length(), MPI_CHAR, peer, TAG_ADD, MPI_COMM_WORLD );
}
results.erase( results.begin(), results.end() );
free( lengths );
}
/* M -> W: Send either a set of expressions to add, or the end signal */
void send_add_or_end_addslave( std::vector<std::string>& results, int peer, int* running ){
/* Do I have a lot of results to be treated in the result queue? */
if( results.size() > maxresult ) {
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_end( peer );
}
}
/* M -> W: Send work: either a set of expressions to add, or a parameter set */
void send_work_addslave( std::vector<parameters_t>& input, std::vector<std::string>& results, int peer ) {
if( results.size() > maxresult ) {
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/* if the result queue is too big, send it */
send_expressions_to_add( results, peer );
} else {
send_work( input, peer );
}
}
/* W -> M: send the result of a computation */
void send_result( gi::ex T, MPI_Comm comm ){
unsigned int len;
char* expr_c;
if( T.is_zero() ) {
len = 0;
expr_c = NULL; // just so g++ does not complain that it believes we are using it uninitialized later
} else {
/* linearize the result */
std::string expr = linearize_expression( T );
/* Send it to the master */
len = expr.length() + 1;
expr_c = (char*) malloc( len );
memcpy( expr_c, expr.c_str(), len-1 );
expr_c[len-1] = '\0'; // C++ std::strings do not terminate with \0
}
MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_RES, comm );
if( len != 0 ) {
MPI_Send( expr_c, len, MPI_CHAR, ROOT, TAG_EXPR, comm );
free( expr_c );
}
}