Skip to content

Commit e7f0d53

Browse files
authored
ADAPT: fallback to previous coll module on non-commutative operations (open-mpi#30)
Signed-off-by: Joseph Schuchart <[email protected]>
1 parent 5c271b9 commit e7f0d53

File tree

4 files changed

+82
-0
lines changed

4 files changed

+82
-0
lines changed

ompi/mca/coll/adapt/coll_adapt.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,44 @@ typedef struct mca_coll_adapt_component_t {
7373

7474
} mca_coll_adapt_component_t;
7575

76+
/*
77+
* Structure used to store what is necessary for the collective operations
78+
* routines in case of fallback.
79+
*/
80+
typedef struct mca_coll_adapt_collective_fallback_s {
81+
union {
82+
mca_coll_base_module_reduce_fn_t reduce;
83+
mca_coll_base_module_ireduce_fn_t ireduce;
84+
} previous_routine;
85+
mca_coll_base_module_t *previous_module;
86+
} mca_coll_adapt_collective_fallback_t;
87+
88+
89+
typedef enum mca_coll_adapt_colltype {
90+
ADAPT_REDUCE = 0,
91+
ADAPT_IREDUCE = 1,
92+
ADAPT_COLLCOUNT
93+
} mca_coll_adapt_colltype_t;
94+
95+
/*
96+
* Some defines to stick to the naming used in the other components in terms of
97+
* fallback routines
98+
*/
99+
#define previous_reduce previous_routines[ADAPT_REDUCE].previous_routine.reduce
100+
#define previous_ireduce previous_routines[ADAPT_IREDUCE].previous_routine.ireduce
101+
102+
#define previous_reduce_module previous_routines[ADAPT_REDUCE].previous_module
103+
#define previous_ireduce_module previous_routines[ADAPT_IREDUCE].previous_module
104+
105+
76106
/* Coll adapt module per communicator*/
77107
struct mca_coll_adapt_module_t {
78108
/* Base module */
79109
mca_coll_base_module_t super;
80110

111+
/* To be able to fallback when the cases are not supported */
112+
struct mca_coll_adapt_collective_fallback_s previous_routines[ADAPT_COLLCOUNT];
113+
81114
/* Whether this module has been lazily initialized or not yet */
82115
bool adapt_enabled;
83116
};

ompi/mca/coll/adapt/coll_adapt_ireduce.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,18 @@ int ompi_coll_adapt_ireduce(const void *sbuf, void *rbuf, int count, struct ompi
521521
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
522522
ompi_request_t ** request, mca_coll_base_module_t * module)
523523
{
524+
525+
/* Fall-back if operation is commutative */
526+
if (!ompi_op_is_commute(op)){
527+
mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module;
528+
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
529+
"ADAPT cannot handle reduce with this (non-commutative) operation. It needs to fall back on another component\n"));
530+
return adapt_module->previous_ireduce(sbuf, rbuf, count, dtype, op, root,
531+
comm, request,
532+
adapt_module->previous_reduce_module);
533+
}
534+
535+
524536
OPAL_OUTPUT_VERBOSE((10, mca_coll_adapt_component.adapt_output,
525537
"ireduce root %d, algorithm %d, coll_adapt_ireduce_segment_size %zu, coll_adapt_ireduce_max_send_requests %d, coll_adapt_ireduce_max_recv_requests %d\n",
526538
root, mca_coll_adapt_component.adapt_ireduce_algorithm,

ompi/mca/coll/adapt/coll_adapt_module.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,37 @@ OBJ_CLASS_INSTANCE(mca_coll_adapt_module_t,
6969
adapt_module_construct,
7070
adapt_module_destruct);
7171

72+
/*
73+
* In this macro, the following variables are supposed to have been declared
74+
* in the caller:
75+
* . ompi_communicator_t *comm
76+
* . mca_coll_adapt_module_t *adapt_module
77+
*/
78+
#define ADAPT_SAVE_PREV_COLL_API(__api) \
79+
do { \
80+
adapt_module->previous_ ## __api = comm->c_coll->coll_ ## __api; \
81+
adapt_module->previous_ ## __api ## _module = comm->c_coll->coll_ ## __api ## _module; \
82+
if (!comm->c_coll->coll_ ## __api || !comm->c_coll->coll_ ## __api ## _module) { \
83+
opal_output_verbose(1, ompi_coll_base_framework.framework_output, \
84+
"(%d/%s): no underlying " # __api"; disqualifying myself", \
85+
comm->c_contextid, comm->c_name); \
86+
return OMPI_ERROR; \
87+
} \
88+
OBJ_RETAIN(adapt_module->previous_ ## __api ## _module); \
89+
} while(0)
90+
91+
7292
/*
7393
* Init module on the communicator
7494
*/
7595
static int adapt_module_enable(mca_coll_base_module_t * module,
7696
struct ompi_communicator_t *comm)
7797
{
98+
mca_coll_adapt_module_t * adapt_module = (mca_coll_adapt_module_t*) module;
99+
100+
ADAPT_SAVE_PREV_COLL_API(reduce);
101+
ADAPT_SAVE_PREV_COLL_API(ireduce);
102+
78103
return OMPI_SUCCESS;
79104
}
80105

ompi/mca/coll/adapt/coll_adapt_reduce.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
* $HEADER$
1010
*/
1111

12+
13+
#include "ompi/op/op.h"
1214
#include "coll_adapt.h"
1315
#include "coll_adapt_algorithms.h"
1416

@@ -17,6 +19,16 @@ int ompi_coll_adapt_reduce(const void *sbuf, void *rbuf, int count, struct ompi_
1719
struct ompi_op_t *op, int root, struct ompi_communicator_t *comm,
1820
mca_coll_base_module_t * module)
1921
{
22+
/* Fall-back if operation is commutative */
23+
if (!ompi_op_is_commute(op)){
24+
mca_coll_adapt_module_t *adapt_module = (mca_coll_adapt_module_t *) module;
25+
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output,
26+
"ADAPT cannot handle reduce with this (commutative) operation. It needs to fall back on another component\n"));
27+
return adapt_module->previous_reduce(sbuf, rbuf, count, dtype, op, root,
28+
comm,
29+
adapt_module->previous_reduce_module);
30+
}
31+
2032
ompi_request_t *request = NULL;
2133
int err = ompi_coll_adapt_ireduce(sbuf, rbuf, count, dtype, op, root, comm, &request, module);
2234
if( MPI_SUCCESS != err ) {

0 commit comments

Comments
 (0)