Skip to content
Snippets Groups Projects
utils_parall.cpp 7.08 KiB
Newer Older
  • Learn to ignore specific revisions
  • Camille Coti's avatar
    Camille Coti committed
    #include <iostream>
    
    Camille Coti's avatar
    Camille Coti committed
    #include <sstream>
    
    Camille Coti's avatar
    Camille Coti committed
    #include <mpi.h>
    
    #include "utils_parall.h"
    #include "parall_constants.h"
    #include "tensormatrix.h"
    
    /*******************************************************************************
     *                         Utilities for parallelism                           *
     *******************************************************************************/
    
    std::string linearize_expression( gi::ex expr ){
        std::ostringstream oss;
        oss << expr;
        return oss.str();
    }
    
    gi::ex de_linearize_expression( std::string s, gi::lst symbols ){
        return gi::ex( s, symbols );
    }
    
    
    parameters_t::parameters_t( unsigned int a4,  unsigned int a2, unsigned int a1 ){
        this->a4 = a4;
    
    Camille Coti's avatar
    Camille Coti committed
        this->a2 = a2;
    
    parameters_s_t::parameters_s_t( unsigned int a4 ){
        this->a4 = a4;
    }
    
    
    parameters_2_1_t::parameters_2_1_t( unsigned int a4, unsigned int a2 ){
        this->a4 = a4;
    
    Camille Coti's avatar
    Camille Coti committed
        this->a2 = a2;
    
    parameters_2_2_t::parameters_2_2_t( unsigned int a4, unsigned int a2, unsigned int a1, unsigned int a6 ){
        this->a4 = a4;
        this->a2 = a2;
    
        this->a6 = a6;
    
    void create_parameters_datatype(){
    
        MPI_Type_contiguous( 3, MPI_UNSIGNED, &DT_PARAMETERS );
    
    Camille Coti's avatar
    Camille Coti committed
        MPI_Type_commit( &DT_PARAMETERS );
    }
    
    
    void create_parameters_datatype_s(){
        MPI_Type_contiguous( 1, MPI_UNSIGNED, &DT_PARAMETERS_S );
        MPI_Type_commit( &DT_PARAMETERS_S );
    }
    
    void create_parameters_datatype_2_1(){
    
        MPI_Type_contiguous( 2, MPI_UNSIGNED, &DT_PARAMETERS_2_1 );
        MPI_Type_commit( &DT_PARAMETERS_2_1 );
    }
    
    
    void create_parameters_datatype_2_2(){
    
        MPI_Type_contiguous( 4, MPI_UNSIGNED, &DT_PARAMETERS_2_2 );
    
        MPI_Type_commit( &DT_PARAMETERS_2_2 );
    
    Camille Coti's avatar
    Camille Coti committed
    }
    
    void free_parameters_dt( ){
        MPI_Type_free( &DT_PARAMETERS );
    }
    
    
    void free_parameters_2_1_dt( ){
        MPI_Type_free( &DT_PARAMETERS_2_1 );
    }
    
    void free_parameters_2_2_dt( ){
        MPI_Type_free( &DT_PARAMETERS_2_2 );
    
    void free_parameters_s_dt( ){
        MPI_Type_free( &DT_PARAMETERS_S );
    }
    
    
    Camille Coti's avatar
    Camille Coti committed
    gi::ex add_expressions( std::vector<std::string> expressions, gi::lst symbols ) {
        gi::ex Tens = 0;
        for( auto s: expressions  ) {
            gi::ex received = de_linearize_expression( s, symbols );
            Tens += received;
        }
        return Tens;
    }
    
    /* M -> W: Send the end signal */
    
    void send_end( int peer, MPI_Comm comm ) {
        parameters_t para;
        MPI_Send( &para, 1, DT_PARAMETERS, peer, TAG_END, comm );
    }
    
    
    void send_end( int peer, parameters_2_1_t p, MPI_Comm comm ) {
    
        /* The parameters_2_1_t argument is not used, but needed to distinguish between functions */
    
        MPI_Send( &p, 1, DT_PARAMETERS_2_1, peer, TAG_END, comm );
    }
    
    
    void send_end( int peer, parameters_s_t p, MPI_Comm comm ) {
        /* The parameters_s_t argument is not used, but needed to distinguish between functions */
        MPI_Send( &p, 1, DT_PARAMETERS_S, peer, TAG_END, comm );
    }
    
    
    Camille Coti's avatar
    Camille Coti committed
    void send_end_batch( int peer, MPI_Comm comm ) {
        parameters_t para;
    
        MPI_Send( &para, 1, DT_PARAMETERS_2_1, peer, TAG_END_BATCH, comm );
    
    Camille Coti's avatar
    Camille Coti committed
    }
    
    /* M -> W: Send some work: just a parameter set */
    
    void send_work( std::vector<parameters_t>& input, int peer, MPI_Comm comm ){
        parameters_t para = input.back();
        input.pop_back();
        MPI_Send( &para, 1, DT_PARAMETERS, peer, TAG_WORK, comm );
    }
    
    
    void send_work( std::vector<parameters_2_1_t>& input, int peer, MPI_Comm comm ){
        parameters_2_1_t para = input.back();
    
    Camille Coti's avatar
    Camille Coti committed
        input.pop_back();
    
        MPI_Send( &para, 1, DT_PARAMETERS_2_1, peer, TAG_WORK, comm );
    
    void send_work( std::vector<parameters_2_2_t>& input, int peer, MPI_Comm comm ){
        parameters_2_2_t para = input.back();
        input.pop_back();
        MPI_Send( &para, 1, DT_PARAMETERS_2_2, peer, TAG_WORK, comm );
    }
    
    
    void send_work( std::vector<parameters_s_t>& input, int peer, MPI_Comm comm ){
        parameters_s_t para = input.back();
        input.pop_back();
        MPI_Send( &para, 1, DT_PARAMETERS_S, peer, TAG_WORK, comm );
    }
    
    
    Camille Coti's avatar
    Camille Coti committed
    /* M -> W: Send a set of expressions to be added */
    
    void send_expressions_to_add( std::vector<std::string>& results, int peer ) {
    
        /* Fill a bogus parameter object */
        int nb = results.size();
        int i;
    
    Camille Coti's avatar
    Camille Coti committed
        char* expr;
    
        MPI_Send( &p, 1, DT_PARAMETERS, peer, TAG_ADD, MPI_COMM_WORLD );
        
        /* Send the length of each string */
        unsigned int* lengths = (unsigned int*) malloc( nb*sizeof( unsigned int ) );
        for( i = 0 ; i < nb ; i++ ) {
            lengths[i] = results[i].length();
        }
        MPI_Send( lengths, nb, MPI_INT, peer, TAG_ADD, MPI_COMM_WORLD );
    
        /* Send the strings (should be nicely pipelined) */
        for( i = 0 ; i < nb ; i++ ) {
            expr = const_cast<char*>( results[i].c_str() );
            MPI_Send( expr, results[i].length(), MPI_CHAR, peer, TAG_ADD, MPI_COMM_WORLD );
        }
    
    Camille Coti's avatar
    Camille Coti committed
        results.erase( results.begin(), results.end() );
       
        free( lengths );
    }
    
    /* M -> W: Send either a set of expressions to add, or the end signal */
    
    
    void send_add_or_end_addslave(  std::vector<std::string>& results, int peer, int* running ){
    
    Camille Coti's avatar
    Camille Coti committed
        
        /* Do I have a lot of results to be treated in the result queue? */
    
    Camille Coti's avatar
    Camille Coti committed
            /* if the result queue is too big, send it */
            send_expressions_to_add( results, peer );
        } else {
            send_end( peer );
    
    void send_add_or_end_addslave(  std::vector<std::string>& results, int peer, int* running, parameters_s_t p ){
        
        /* Do I have a lot of results to be treated in the result queue? */
    
        if( results.size() > maxresult ) {
            /* if the result queue is too big, send it */
            send_expressions_to_add( results, peer );
        } else {
            send_end( peer, p );
            (*running)--;
        }
    }
    
    
    Camille Coti's avatar
    Camille Coti committed
    /* M -> W: Send work: either a set of expressions to add, or a parameter set */
    
    void send_work_addslave(  std::vector<parameters_t>& input, std::vector<std::string>& results, int peer ) {
    
    
            /* if the result queue is too big, send it */
            send_expressions_to_add( results, peer );
        } else {
            send_work( input, peer );
        }    
    }
    
    void send_work_addslave(  std::vector<parameters_s_t>& input, std::vector<std::string>& results, int peer ) {
    
        if( results.size() > maxresult ) {
    
    Camille Coti's avatar
    Camille Coti committed
            /* if the result queue is too big, send it */
            send_expressions_to_add( results, peer );
        } else {
            send_work( input, peer );
        }    
    }
    
    /* W -> M: send the result of a computation */
    
    void send_result( gi::ex T, MPI_Comm comm ){
        unsigned int len;
        char* expr_c;
        
        if( T.is_zero() ) {
            len = 0;
            expr_c = NULL;  // just so g++ does not complain that it believes we are using it uninitialized later
        } else {
            /* linearize the result */
            
            std::string expr = linearize_expression( T );
            
            /* Send it to the master */
            
            len = expr.length() + 1;
            expr_c = (char*) malloc( len );
            memcpy( expr_c, expr.c_str(), len-1 );
            expr_c[len-1] = '\0'; // C++ std::strings do not terminate with \0
        }
        
        MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_RES, comm );
        if( len != 0 ) {        
            MPI_Send( expr_c, len, MPI_CHAR, ROOT, TAG_EXPR, comm );
            free( expr_c );
        }
        
    }