Skip to content

Commit 163deaa

Browse files
Thananon Patinyasakdikulbosilca
authored andcommitted
Enable Threading in the BTL TCP
Added mca parameter to turn progress thread on/off Add a flag to check if we have btl progress thread. Added macro for ob1 matching lock. Update the AUTHORS file.
1 parent 0150d71 commit 163deaa

File tree

12 files changed

+193
-131
lines changed

12 files changed

+193
-131
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ [email protected] Sylvain Jeaugey Bull
108108
[email protected] Terry Dontje Sun, Oracle
109109
[email protected] Todd Kordenbrock SNL
110110
[email protected] Tim Mattox IU, Cisco
111+
[email protected] Thananon Patinyasakdikul UTK
111112
[email protected] Tim Prins IU, LANL
112113
[email protected] Tim Woodall LANL
113114
[email protected] Vasily Filipov Mellanox

ompi/mca/pml/ob1/pml_ob1.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ typedef struct mca_pml_ob1_t mca_pml_ob1_t;
8989

9090
extern mca_pml_ob1_t mca_pml_ob1;
9191
extern int mca_pml_ob1_output;
92-
92+
extern bool mca_pml_ob1_matching_protection;
9393
/*
9494
* PML interface functions.
9595
*/
@@ -261,7 +261,25 @@ do { \
261261
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock); \
262262
} while(0)
263263

264+
#define OB1_MATCHING_LOCK(lock) \
265+
do { \
266+
if( mca_pml_ob1_matching_protection ) { \
267+
opal_mutex_lock(lock); \
268+
} \
269+
else { OPAL_THREAD_LOCK(lock); } \
270+
} while(0)
271+
272+
273+
#define OB1_MATCHING_UNLOCK(lock) \
274+
do { \
275+
if( mca_pml_ob1_matching_protection ) { \
276+
opal_mutex_unlock(lock); \
277+
} \
278+
else { OPAL_THREAD_UNLOCK(lock); } \
279+
} while(0)
280+
264281

282+
265283
int mca_pml_ob1_send_fin(ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl,
266284
opal_ptr_t hdr_frag, uint64_t size, uint8_t order, int status);
267285

ompi/mca/pml/ob1/pml_ob1_component.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mca_pml_ob1_component_init( int* priority, bool enable_progress_threads,
5454
static int mca_pml_ob1_component_fini(void);
5555
int mca_pml_ob1_output = 0;
5656
static int mca_pml_ob1_verbose = 0;
57+
bool mca_pml_ob1_matching_protection = false;
5758

5859
mca_pml_base_component_2_0_0_t mca_pml_ob1_component = {
5960
/* First, the mca_base_component_t struct containing meta
@@ -277,10 +278,15 @@ mca_pml_ob1_component_init( int* priority,
277278
OPAL_LIST_FOREACH(selected_btl, &mca_btl_base_modules_initialized, mca_btl_base_selected_module_t) {
278279
mca_btl_base_module_t *btl = selected_btl->btl_module;
279280

281+
if (btl->btl_flags & MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED) {
282+
mca_pml_ob1_matching_protection = true;
283+
}
284+
280285
if (btl->btl_flags & MCA_BTL_FLAGS_SINGLE_ADD_PROCS) {
281286
mca_pml_ob1.super.pml_flags |= MCA_PML_BASE_FLAG_REQUIRE_WORLD;
282287
break;
283288
}
289+
284290
}
285291

286292
/* Set this here (vs in component_open()) because

ompi/mca/pml/ob1/pml_ob1_recvfrag.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
161161
* end points) from being processed, and potentially "loosing"
162162
* the fragment.
163163
*/
164-
OPAL_THREAD_LOCK(&comm->matching_lock);
164+
OB1_MATCHING_LOCK(&comm->matching_lock);
165165

166166
/* get sequence number of next message that can be processed */
167167
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
@@ -194,7 +194,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
194194
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
195195

196196
/* release matching lock before processing fragment */
197-
OPAL_THREAD_UNLOCK(&comm->matching_lock);
197+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
198198

199199
if(OPAL_LIKELY(match)) {
200200
bytes_received = segments->seg_len - OMPI_PML_OB1_MATCH_HDR_LEN;
@@ -247,7 +247,7 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
247247
return;
248248

249249
slow_path:
250-
OPAL_THREAD_UNLOCK(&comm->matching_lock);
250+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
251251
mca_pml_ob1_recv_frag_match(btl, hdr, segments,
252252
num_segments, MCA_PML_OB1_HDR_TYPE_MATCH);
253253
}
@@ -668,7 +668,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
668668
* end points) from being processed, and potentially "loosing"
669669
* the fragment.
670670
*/
671-
OPAL_THREAD_LOCK(&comm->matching_lock);
671+
OB1_MATCHING_LOCK(&comm->matching_lock);
672672

673673
/* get sequence number of next message that can be processed */
674674
next_msg_seq_expected = (uint16_t)proc->expected_sequence;
@@ -704,7 +704,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
704704
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
705705

706706
/* release matching lock before processing fragment */
707-
OPAL_THREAD_UNLOCK(&comm->matching_lock);
707+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
708708

709709
if(OPAL_LIKELY(match)) {
710710
switch(type) {
@@ -729,7 +729,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
729729
* may now be used to form new matchs
730730
*/
731731
if(OPAL_UNLIKELY(opal_list_get_size(&proc->frags_cant_match) > 0)) {
732-
OPAL_THREAD_LOCK(&comm->matching_lock);
732+
OB1_MATCHING_LOCK(&comm->matching_lock);
733733
if((frag = check_cantmatch_for_match(proc))) {
734734
hdr = &frag->hdr.hdr_match;
735735
segments = frag->segments;
@@ -738,7 +738,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
738738
type = hdr->hdr_common.hdr_type;
739739
goto out_of_order_match;
740740
}
741-
OPAL_THREAD_UNLOCK(&comm->matching_lock);
741+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
742742
}
743743

744744
return OMPI_SUCCESS;
@@ -749,7 +749,7 @@ static int mca_pml_ob1_recv_frag_match( mca_btl_base_module_t *btl,
749749
*/
750750
append_frag_to_list(&proc->frags_cant_match, btl, hdr, segments,
751751
num_segments, NULL);
752-
OPAL_THREAD_UNLOCK(&comm->matching_lock);
752+
OB1_MATCHING_UNLOCK(&comm->matching_lock);
753753
return OMPI_SUCCESS;
754754
}
755755

ompi/mca/pml/ob1/pml_ob1_recvreq.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
104104
mca_pml_ob1_comm_t *ob1_comm = comm->c_pml_comm;
105105

106106
/* The rest should be protected behind the match logic lock */
107-
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
107+
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
108108
if( true == request->req_match_received ) { /* way to late to cancel this one */
109109
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
110110
assert( OMPI_ANY_TAG != ompi_request->req_status.MPI_TAG ); /* not matched isn't it */
@@ -124,7 +124,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request,
124124
* to true. Otherwise, the request will never be freed.
125125
*/
126126
request->req_recv.req_base.req_pml_complete = true;
127-
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
127+
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
128128

129129
OPAL_THREAD_LOCK(&ompi_request_lock);
130130
ompi_request->req_status._cancelled = true;
@@ -1177,7 +1177,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
11771177

11781178
MCA_PML_BASE_RECV_START(&req->req_recv.req_base);
11791179

1180-
OPAL_THREAD_LOCK(&ob1_comm->matching_lock);
1180+
OB1_MATCHING_LOCK(&ob1_comm->matching_lock);
11811181
/**
11821182
* The laps of time between the ACTIVATE event and the SEARCH_UNEX one include
11831183
* the cost of the request lock.
@@ -1219,7 +1219,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
12191219
it when the message comes in. */
12201220
append_recv_req_to_queue(queue, req);
12211221
req->req_match_received = false;
1222-
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
1222+
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
12231223
} else {
12241224
if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
12251225
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
@@ -1237,7 +1237,7 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
12371237

12381238
opal_list_remove_item(&proc->unexpected_frags,
12391239
(opal_list_item_t*)frag);
1240-
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
1240+
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
12411241

12421242
switch(hdr->hdr_common.hdr_type) {
12431243
case MCA_PML_OB1_HDR_TYPE_MATCH:
@@ -1267,14 +1267,14 @@ void mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
12671267
restarted with this request during mrecv */
12681268
opal_list_remove_item(&proc->unexpected_frags,
12691269
(opal_list_item_t*)frag);
1270-
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
1270+
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
12711271

12721272
req->req_recv.req_base.req_addr = frag;
12731273
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
12741274
frag->segments, frag->num_segments);
12751275

12761276
} else {
1277-
OPAL_THREAD_UNLOCK(&ob1_comm->matching_lock);
1277+
OB1_MATCHING_UNLOCK(&ob1_comm->matching_lock);
12781278
mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
12791279
frag->segments, frag->num_segments);
12801280
}

opal/mca/btl/btl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ typedef uint8_t mca_btl_base_tag_t;
241241
* BTLs should not set this flag. */
242242
#define MCA_BTL_FLAGS_SINGLE_ADD_PROCS 0x20000
243243

244+
/* The BTL is using progress thread and need the protection on matching */
245+
#define MCA_BTL_FLAGS_BTL_PROGRESS_THREAD_ENABLED 0x40000
246+
244247
/* Default exclusivity levels */
245248
#define MCA_BTL_EXCLUSIVITY_HIGH (64*1024) /* internal loopback */
246249
#define MCA_BTL_EXCLUSIVITY_DEFAULT 1024 /* GM/IB/etc. */

opal/mca/btl/tcp/btl_tcp.h

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,20 @@
4343

4444
/* Open MPI includes */
4545
#include "opal/mca/event/event.h"
46-
#include "ompi/class/ompi_free_list.h"
47-
#include "ompi/mca/btl/btl.h"
48-
#include "ompi/mca/btl/base/base.h"
49-
#include "ompi/mca/mpool/mpool.h"
46+
#include "opal/class/opal_free_list.h"
47+
#include "opal/mca/btl/btl.h"
48+
#include "opal/mca/btl/base/base.h"
49+
#include "opal/mca/mpool/mpool.h"
5050
#include "opal/class/opal_hash_table.h"
51+
#include "opal/util/fd.h"
5152

5253
#define MCA_BTL_TCP_STATISTICS 0
5354
BEGIN_C_DECLS
5455

5556
#if (HAVE_PTHREAD_H == 1)
56-
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 1
57+
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 1
5758
#else
58-
#define MCA_BTL_TCP_USES_PROGRESS_THREAD 0
59+
#define MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD 0
5960
#endif /* (HAVE_PTHREAD_H == 1) */
6061

6162
extern opal_event_base_t* mca_btl_tcp_event_base;
@@ -80,10 +81,11 @@ extern opal_event_base_t* mca_btl_tcp_event_base;
8081
} \
8182
} while (0)
8283

83-
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
84+
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
8485
extern opal_list_t mca_btl_tcp_ready_frag_pending_queue;
8586
extern opal_mutex_t mca_btl_tcp_ready_frag_mutex;
8687
extern int mca_btl_tcp_pipe_to_progress[2];
88+
extern int mca_btl_tcp_progress_thread_trigger;
8789

8890
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name) \
8991
opal_mutex_atomic_lock((name))
@@ -109,9 +111,14 @@ extern int mca_btl_tcp_pipe_to_progress[2];
109111
} while (0)
110112
#define MCA_BTL_TCP_ACTIVATE_EVENT(event, value) \
111113
do { \
112-
opal_event_t* _event = (opal_event_t*)(event); \
113-
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
114-
&_event); \
114+
if(0 < mca_btl_tcp_progress_thread_trigger) { \
115+
opal_event_t* _event = (opal_event_t*)(event); \
116+
opal_fd_write( mca_btl_tcp_pipe_to_progress[1], sizeof(opal_event_t*), \
117+
&_event); \
118+
} \
119+
else { \
120+
opal_event_add(event, (value)); \
121+
} \
115122
} while (0)
116123
#else
117124
#define MCA_BTL_TCP_CRITICAL_SECTION_ENTER(name)
@@ -124,7 +131,7 @@ extern int mca_btl_tcp_pipe_to_progress[2];
124131
do { \
125132
opal_event_add(event, (value)); \
126133
} while (0)
127-
#endif /* MCA_BTL_TCP_USES_PROGRESS_THREAD */
134+
#endif /* MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD */
128135

129136
/**
130137
* TCP BTL component.
@@ -143,6 +150,7 @@ struct mca_btl_tcp_component_t {
143150
int tcp_endpoint_cache; /**< amount of cache on each endpoint */
144151
opal_proc_table_t tcp_procs; /**< hash table of tcp proc structures */
145152
opal_mutex_t tcp_lock; /**< lock for accessing module state */
153+
opal_list_t tcp_events;
146154

147155
opal_event_t tcp_recv_event; /**< recv event for IPv4 listen socket */
148156
int tcp_listen_sd; /**< IPv4 listen socket for incoming connection requests */
@@ -169,7 +177,9 @@ struct mca_btl_tcp_component_t {
169177
opal_free_list_t tcp_frag_max;
170178
opal_free_list_t tcp_frag_user;
171179

172-
#if MCA_BTL_TCP_USES_PROGRESS_THREAD
180+
int tcp_enable_progress_thread; /** Support for tcp progress thread flag */
181+
182+
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
173183
opal_event_t tcp_recv_thread_async_event;
174184
opal_mutex_t tcp_frag_eager_mutex;
175185
opal_mutex_t tcp_frag_max_mutex;

0 commit comments

Comments
 (0)