mw_addslave.cpp 7.42 KB
Newer Older
Camille Coti's avatar
Camille Coti committed
1
2
3
4
5
6
7
8
9
#include <iostream>
#include <mpi.h>
#include <ginac/ginac.h>

#include "products.h"
#include "utils_parall.h"
#include "parall_constants.h"
#include "parall_internal.h"
#include "utils.h"
10
#include "profiling.h"
Camille Coti's avatar
Camille Coti committed
11
12
13
14
15
16
17

namespace gi = GiNaC;

/*******************************************************************************
 *         Parallel 1-level decomposition with addition on a slave             *
 *******************************************************************************/

18
gi::ex multiply_1level_master_addslave( tensor3D_t& T, unsigned int size, MPI_Comm comm = MPI_COMM_WORLD ) { 
Camille Coti's avatar
Camille Coti committed
19
    gi::ex Tens = 0;
20
    unsigned int a1, a2, a4;
Camille Coti's avatar
Camille Coti committed
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    gi::ex A;
    gi::lst symbols;

    MPI_Status status;
    char* expr_c;
    size_t expr_c_size = 0;
    int src, np, running = 0;
    unsigned int len;

    MPI_Comm_size( comm, &np );

    expr_c = NULL;
    expr_c = (char*) malloc( 3279 );
    
 	int i, j;
    i = 0;
    j = 0;

    int receivedresults = 0;
40
    unsigned int N = size/2;
Camille Coti's avatar
Camille Coti committed
41
42
43
44

    std::vector<parameters_t> input;
    std::vector<std::string> results; /* length and char* */

Camille Coti's avatar
Camille Coti committed
45
46
    double t1 = getTime();
    
Camille Coti's avatar
Camille Coti committed
47
48
    /* Build a list of argument sets */
    
49
    for( a4 = 0 ; a4 < N ; a4++ ){
Camille Coti's avatar
Camille Coti committed
50
        i=i+1; 
51
52
53
54
55
56
57
        for( a2 = 0; a2 < N ; a2++ ){
            j=j+1; 
            for( a1 = 0 ; a1 < N ; a1++ ){
                parameters_t p( a4, a2, a1 );
                input.push_back( p );
            }
	    }
Camille Coti's avatar
Camille Coti committed
58
59
60
61
62
63
64
	}

    /* Compute the set of symbols */
    /* Could be done while the first slave is working */
    
    symbols = all_symbols_3D( size );

Camille Coti's avatar
Camille Coti committed
65
66
    double t2 = getTime();
    
Camille Coti's avatar
Camille Coti committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    /* Distribute the work */

    while( input.size() > 0 ) {
        MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
        
        if( status.MPI_TAG == TAG_PULL ) {
            /* Nothing else will come: just send wome work */
            src = status.MPI_SOURCE;
            send_work( input, src );
            
        } else {
            if( status.MPI_TAG == TAG_RES ){
                src = status.MPI_SOURCE;

                /* The first message contains the length of what is coming next */
                if( len != 0 ) {
                    if( len > expr_c_size ) {
                        expr_c_size = len;
                        if( NULL != expr_c ) free( expr_c );
                        expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
                    }
                    
                    /* Receive the result */
                    MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );

                    /* Put it in the result queue */
                    results.push_back( std::string( expr_c ) );
                }                    
                    
                /* Send more work  */
                send_work_addslave( input, results, src );
            } else {
                std::cerr << "Wrong tag received " << status.MPI_TAG << std::endl;
            }
            
        }
   }

Camille Coti's avatar
Camille Coti committed
105
106
    double t3 = getTime();

Camille Coti's avatar
Camille Coti committed
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
   /* Wait until everyone is done */

    running = np - 1; // all the slaves are running 
    while( running > 0 ) {
          /* TODO: here all we should receive is either TAG_EXPR or TAG_PULL if the input is too small */
      MPI_Recv( &len, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status );
        src = status.MPI_SOURCE;
        
        if( len != 0 ) {
            if( len > expr_c_size ) {
                expr_c_size = len;
                if( NULL != expr_c ) free( expr_c );
                expr_c = (char*)malloc( expr_c_size ); // The \0 was added by the slave
            }

            /* Receive the result */
            MPI_Recv( expr_c, len, MPI_CHAR, src, TAG_EXPR, comm, &status );

            /* Put it in the result queue */
            results.push_back( std::string( expr_c ) );
        }
        send_add_or_end_addslave( results, src, &running );
    }

Camille Coti's avatar
Camille Coti committed
131
132
    double t4 = getTime();

Camille Coti's avatar
Camille Coti committed
133
134
    /* Add whatever I have left */
    Tens = add_expressions( results, symbols );
Camille Coti's avatar
Camille Coti committed
135
136
137
138
139
140
141

    double t5 = getTime();
    std::cout << "Init: " << t2 - t1 << std::endl;
    std::cout << "Loop: " << t3 - t2 << std::endl;
    std::cout << "Fini: " << t4 - t3 << std::endl;
    std::cout << "Add:  " << t5 - t4 << std::endl;

Camille Coti's avatar
Camille Coti committed
142
143
144
145
146
147
148
149
150
151
#if DEBUG
    std::cout << "Received " << receivedresults << " results" << std::endl;

    std::cout << "Tpara=" << Tens << ";" << std::endl;
#endif
    
    if( NULL != expr_c) free( expr_c );
    return Tens;
}

152
void multiply_1level_slave_addslave( tensor3D_t& T, unsigned int size, MPI_Comm comm = MPI_COMM_WORLD ) {
Camille Coti's avatar
Camille Coti committed
153
    gi::ex Tens;
154
    int  a1, a2, a4;
Camille Coti's avatar
Camille Coti committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
    //    gi::ex A;
    unsigned int len = 0;
    
    parameters_t params;
    MPI_Status status;
    char* expr_c;

    int rank;
    MPI_Comm_rank( comm, &rank );

    /* Ask for some work */
    
    MPI_Send( &len, 1, MPI_UNSIGNED, ROOT, TAG_PULL, comm );

169
170
171
172
    /* Compute the set of symbols */
    
    gi::lst symbols = all_symbols_3D( size );

Camille Coti's avatar
Camille Coti committed
173
174
    while( true ){
        /* Receive a set of parameters */
175

Camille Coti's avatar
Camille Coti committed
176
177
178
        MPI_Recv( &params, 1, DT_PARAMETERS, ROOT, MPI_ANY_TAG, comm, &status );
        
        if( status.MPI_TAG == TAG_WORK ){
179
            a4 = params.a4;
Camille Coti's avatar
Camille Coti committed
180
            a2 = params.a2;
181
            a1 = params.a1;
182

183
            Tens = one_level1_product( &T, size, a4, a2, a1 );
Camille Coti's avatar
Camille Coti committed
184
185
186
187
188
189
190
            send_result( Tens );

        } else {
            if( status.MPI_TAG == TAG_ADD ) {
                /* Receive a set of expressions to add */

                /* Number of expressions received */
191
                int nb = params.a4;
Camille Coti's avatar
Camille Coti committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
                
                /* Length of each string */

                unsigned int* lengths = (unsigned int*) malloc( nb*sizeof( unsigned int ) );
                MPI_Recv( lengths, nb, MPI_INT, ROOT, TAG_ADD, comm, &status );
                std::vector<std::string> results_s;
                char* c_str;
                int i;
                int len;
                for( i = 0 ; i < nb ; i++ ) {
                    len = lengths[i] + 1;
                    c_str = (char*) malloc( len );
                    MPI_Recv( c_str, len, MPI_CHAR, ROOT, TAG_ADD, comm, &status );
                    c_str[len-1] = '\0';    // The master sends C++ strings, which do not contain the final '\0'
                    results_s.push_back( std::string( c_str ) );
                    free( c_str );
                }

                /* Delinearize all the expressions and add them */

                Tens = add_expressions( results_s, symbols );
                
                /* Send the result */

                send_result( Tens );

            } else {
                if( status.MPI_TAG == TAG_END ){
                    return;
                } else {
                    std::cerr << "Wrong tag received on slave " << status.MPI_TAG << std::endl;
                }
            }
        }
    }
}

/* Communication protocol:
   M -> W: always the same size, therefore unique communication
   W -> M: send an unsigned int (size of the expression), then the expression (table of chars)
*/
        
234
gi::ex multiply_1level_mw_addslave( tensor3D_t& T, int size ) {  // simpler: same dimension everywhere
Camille Coti's avatar
Camille Coti committed
235
236
237
238
239
240
    int rank;
    gi::ex Tens = 0;
    MPI_Comm_rank( MPI_COMM_WORLD, &rank );

    /* Create a new datatype for the parameters */
    
241
    create_parameters_datatype();
Camille Coti's avatar
Camille Coti committed
242
243
244
245

    /* Here we go */

    if( 0 == rank ) {
246
        Tens = multiply_1level_master_addslave( T, size );
Camille Coti's avatar
Camille Coti committed
247
    } else {
248
        multiply_1level_slave_addslave( T, size );
Camille Coti's avatar
Camille Coti committed
249
250
251
252
253
254
255
256
    }

    /* Finalize */
    
    free_parameters_dt();
    return Tens;
}