Skip to content

Commit dc538e9

Browse files
authored
Merge pull request #1177 from bosilca/topic/large_msg
Topic/large msg
2 parents 62739c6 + d10522a commit dc538e9

File tree

10 files changed

+50
-46
lines changed

10 files changed

+50
-46
lines changed

ompi/mca/pml/ob1/pml_ob1.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ struct mca_pml_ob1_t {
5555
int free_list_num; /* initial size of free list */
5656
int free_list_max; /* maximum size of free list */
5757
int free_list_inc; /* number of elements to grow free list */
58-
size_t send_pipeline_depth;
59-
size_t recv_pipeline_depth;
58+
int32_t send_pipeline_depth;
59+
int32_t recv_pipeline_depth;
6060
size_t rdma_retries_limit;
6161
int max_rdma_per_request;
6262
int max_send_per_range;

ompi/mca/pml/ob1/pml_ob1_component.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ static int mca_pml_ob1_component_register(void)
184184
mca_pml_ob1_param_register_int("free_list_max", -1, &mca_pml_ob1.free_list_max);
185185
mca_pml_ob1_param_register_int("free_list_inc", 64, &mca_pml_ob1.free_list_inc);
186186
mca_pml_ob1_param_register_int("priority", 20, &mca_pml_ob1.priority);
187-
mca_pml_ob1_param_register_sizet("send_pipeline_depth", 3, &mca_pml_ob1.send_pipeline_depth);
188-
mca_pml_ob1_param_register_sizet("recv_pipeline_depth", 4, &mca_pml_ob1.recv_pipeline_depth);
187+
mca_pml_ob1_param_register_int("send_pipeline_depth", 3, &mca_pml_ob1.send_pipeline_depth);
188+
mca_pml_ob1_param_register_int("recv_pipeline_depth", 4, &mca_pml_ob1.recv_pipeline_depth);
189189

190190
/* NTH: we can get into a live-lock situation in the RDMA failure path so disable
191191
RDMA retries for now. Falling back to send may suck but it is better than

ompi/mca/pml/ob1/pml_ob1_recvreq.c

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,15 @@ static void mca_pml_ob1_put_completion (mca_pml_ob1_rdma_frag_t *frag, int64_t r
190190
mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t *) frag->rdma_req;
191191
mca_bml_base_btl_t *bml_btl = frag->rdma_bml;
192192

193-
OPAL_THREAD_SUB_SIZE_T(&recvreq->req_pipeline_depth, 1);
193+
OPAL_THREAD_ADD32(&recvreq->req_pipeline_depth, -1);
194194

195195
MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
196196

197197
if (OPAL_LIKELY(0 < rdma_size)) {
198198
assert ((uint64_t) rdma_size == frag->rdma_length);
199199

200200
/* check completion status */
201-
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, (size_t) rdma_size);
201+
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, rdma_size);
202202
if (recv_request_pml_complete_check(recvreq) == false &&
203203
recvreq->req_rdma_offset < recvreq->req_send_offset) {
204204
/* schedule additional rdma operations */
@@ -951,7 +951,7 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
951951
}
952952

953953
while(bytes_remaining > 0 &&
954-
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
954+
recvreq->req_pipeline_depth < mca_pml_ob1.recv_pipeline_depth) {
955955
mca_pml_ob1_rdma_frag_t *frag = NULL;
956956
mca_btl_base_module_t *btl;
957957
int rc, rdma_idx;
@@ -983,14 +983,10 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
983983
} while(!size);
984984
btl = bml_btl->btl;
985985

986-
/* NTH: This conditional used to check if there was a registration in
987-
* recvreq->req_rdma[rdma_idx].btl_reg. If once existed it was due to
988-
* the btl not needed registration (equivalent to btl->btl_register_mem
989-
* != NULL. This new check is equivalent. Note: I feel this protocol
990-
* needs work to better improve resource usage when running with a
991-
* leave pinned protocol. */
992-
if (btl->btl_register_mem && (btl->btl_rdma_pipeline_frag_size != 0) &&
993-
(size > btl->btl_rdma_pipeline_frag_size)) {
986+
/* NTH: Note: I feel this protocol needs work to better improve resource
987+
* usage when running with a leave pinned protocol. */
988+
/* GB: We should always abide by the BTL RDMA pipeline fragment limit (if one is set) */
989+
if ((btl->btl_rdma_pipeline_frag_size != 0) && (size > btl->btl_rdma_pipeline_frag_size)) {
994990
size = btl->btl_rdma_pipeline_frag_size;
995991
}
996992

@@ -1028,7 +1024,7 @@ int mca_pml_ob1_recv_request_schedule_once( mca_pml_ob1_recv_request_t* recvreq,
10281024
if (OPAL_LIKELY(OMPI_SUCCESS == rc)) {
10291025
/* update request state */
10301026
recvreq->req_rdma_offset += size;
1031-
OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth, 1);
1027+
OPAL_THREAD_ADD32(&recvreq->req_pipeline_depth, 1);
10321028
recvreq->req_rdma[rdma_idx].length -= size;
10331029
bytes_remaining -= size;
10341030
} else {

ompi/mca/pml/ob1/pml_ob1_recvreq.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ BEGIN_C_DECLS
4141
struct mca_pml_ob1_recv_request_t {
4242
mca_pml_base_recv_request_t req_recv;
4343
opal_ptr_t remote_req_send;
44-
int32_t req_lock;
45-
size_t req_pipeline_depth;
46-
size_t req_bytes_received; /**< amount of data transferred into the user buffer */
47-
size_t req_bytes_expected; /**< local size of the data as suggested by the user */
48-
size_t req_rdma_offset;
49-
size_t req_send_offset;
44+
int32_t req_lock;
45+
int32_t req_pipeline_depth;
46+
size_t req_bytes_received; /**< amount of data transferred into the user buffer */
47+
size_t req_bytes_expected; /**< local size of the data as suggested by the user */
48+
size_t req_rdma_offset;
49+
size_t req_send_offset;
5050
uint32_t req_rdma_cnt;
5151
uint32_t req_rdma_idx;
5252
bool req_pending;

ompi/mca/pml/ob1/pml_ob1_sendreq.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ mca_pml_ob1_frag_completion( mca_btl_base_module_t* btl,
313313
des->des_segment_count,
314314
sizeof(mca_pml_ob1_frag_hdr_t));
315315

316-
OPAL_THREAD_SUB_SIZE_T(&sendreq->req_pipeline_depth, 1);
316+
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, -1);
317317
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, req_bytes_delivered);
318318

319319
if(send_request_pml_complete_check(sendreq) == false) {
@@ -913,13 +913,13 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)
913913

914914
/* check pipeline_depth here before attempting to get any locks */
915915
if(true == sendreq->req_throttle_sends &&
916-
sendreq->req_pipeline_depth >= mca_pml_ob1.send_pipeline_depth)
916+
sendreq->req_pipeline_depth >= mca_pml_ob1.send_pipeline_depth)
917917
return OMPI_SUCCESS;
918918

919919
range = get_send_range(sendreq);
920920

921921
while(range && (false == sendreq->req_throttle_sends ||
922-
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
922+
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
923923
mca_pml_ob1_frag_hdr_t* hdr;
924924
mca_btl_base_descriptor_t* des;
925925
int rc, btl_idx;
@@ -1044,7 +1044,7 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)
10441044
range->range_btls[btl_idx].length -= size;
10451045
range->range_send_length -= size;
10461046
range->range_send_offset += size;
1047-
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
1047+
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, 1);
10481048
if(range->range_send_length == 0) {
10491049
range = get_next_send_range(sendreq, range);
10501050
prev_bytes_remaining = 0;
@@ -1060,7 +1060,7 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)
10601060
range->range_btls[btl_idx].length -= size;
10611061
range->range_send_length -= size;
10621062
range->range_send_offset += size;
1063-
OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1);
1063+
OPAL_THREAD_ADD32(&sendreq->req_pipeline_depth, 1);
10641064
if(range->range_send_length == 0) {
10651065
range = get_next_send_range(sendreq, range);
10661066
prev_bytes_remaining = 0;

ompi/mca/pml/ob1/pml_ob1_sendreq.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ struct mca_pml_ob1_send_request_t {
4545
mca_pml_base_send_request_t req_send;
4646
mca_bml_base_endpoint_t* req_endpoint;
4747
opal_ptr_t req_recv;
48-
int32_t req_state;
49-
int32_t req_lock;
50-
bool req_throttle_sends;
51-
size_t req_pipeline_depth;
52-
size_t req_bytes_delivered;
48+
int32_t req_state;
49+
int32_t req_lock;
50+
bool req_throttle_sends;
51+
int32_t req_pipeline_depth;
52+
size_t req_bytes_delivered;
5353
uint32_t req_rdma_cnt;
5454
mca_pml_ob1_send_pending_t req_pending;
5555
opal_mutex_t req_send_range_lock;

opal/include/opal/sys/atomic.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ int64_t opal_atomic_sub_64(volatile int64_t *addr, int64_t delta);
447447
*/
448448
#if defined(DOXYGEN) || OPAL_ENABLE_DEBUG
449449
static inline size_t
450-
opal_atomic_add_size_t(volatile size_t *addr, int delta)
450+
opal_atomic_add_size_t(volatile size_t *addr, size_t delta)
451451
{
452452
#if SIZEOF_SIZE_T == 4
453453
return (size_t) opal_atomic_add_32((int32_t*) addr, delta);
@@ -458,7 +458,7 @@ opal_atomic_add_size_t(volatile size_t *addr, int delta)
458458
#endif
459459
}
460460
static inline size_t
461-
opal_atomic_sub_size_t(volatile size_t *addr, int delta)
461+
opal_atomic_sub_size_t(volatile size_t *addr, size_t delta)
462462
{
463463
#if SIZEOF_SIZE_T == 4
464464
return (size_t) opal_atomic_sub_32((int32_t*) addr, delta);

opal/mca/btl/tcp/btl_tcp_component.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,12 @@ static int mca_btl_tcp_component_register(void)
318318
mca_btl_tcp_module.super.btl_rndv_eager_limit = 64*1024;
319319
mca_btl_tcp_module.super.btl_max_send_size = 128*1024;
320320
mca_btl_tcp_module.super.btl_rdma_pipeline_send_length = 128*1024;
321-
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = INT_MAX;
321+
/* Some OSes have hard coded limits on how many bytes can be manipulated
322+
* by each writev operation. Force a reasonable limit, to prevent overflowing
323+
* a signed 32-bit integer (limit comes from BSD and OS X). We remove 1k to
324+
* make some room for our internal headers.
325+
*/
326+
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
322327
mca_btl_tcp_module.super.btl_min_rdma_pipeline_size = 0;
323328
mca_btl_tcp_module.super.btl_flags = MCA_BTL_FLAGS_PUT |
324329
MCA_BTL_FLAGS_SEND_INPLACE |
@@ -335,7 +340,11 @@ static int mca_btl_tcp_component_register(void)
335340

336341
mca_btl_base_param_register(&mca_btl_tcp_component.super.btl_version,
337342
&mca_btl_tcp_module.super);
338-
343+
if (mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size > ((1UL<<31) - 1024) ) {
344+
/* Assume a hard limit. A test in configure would be a better solution, but until then
345+
* kicking-in the pipeline RDMA for extremely large data is good enough. */
346+
mca_btl_tcp_module.super.btl_rdma_pipeline_frag_size = ((1UL<<31) - 1024);
347+
}
339348
mca_btl_tcp_param_register_int ("disable_family", NULL, 0, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_disable_family);
340349

341350
return mca_btl_tcp_component_verify();

opal/mca/btl/tcp/btl_tcp_frag.c

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,11 @@ size_t mca_btl_tcp_frag_dump(mca_btl_tcp_frag_t* frag, char* msg, char* buf, siz
112112

113113
bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
114114
{
115-
ssize_t cnt = -1;
115+
ssize_t cnt;
116116
size_t i, num_vecs;
117117

118118
/* non-blocking write, but continue if interrupted */
119-
while(cnt < 0) {
119+
do {
120120
cnt = writev(sd, frag->iov_ptr, frag->iov_cnt);
121121
if(cnt < 0) {
122122
switch(opal_socket_errno) {
@@ -140,11 +140,11 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
140140
return false;
141141
}
142142
}
143-
}
143+
} while(cnt < 0);
144144

145145
/* if the write didn't complete - update the iovec state */
146146
num_vecs = frag->iov_cnt;
147-
for(i=0; i<num_vecs; i++) {
147+
for( i = 0; i < num_vecs; i++) {
148148
if(cnt >= (ssize_t)frag->iov_ptr->iov_len) {
149149
cnt -= frag->iov_ptr->iov_len;
150150
frag->iov_ptr++;
@@ -166,8 +166,8 @@ bool mca_btl_tcp_frag_send(mca_btl_tcp_frag_t* frag, int sd)
166166
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
167167
{
168168
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
169-
int i, num_vecs, dont_copy_data = 0;
170169
ssize_t cnt;
170+
int32_t i, num_vecs, dont_copy_data = 0;
171171

172172
repeat:
173173
num_vecs = frag->iov_cnt;
@@ -208,8 +208,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
208208
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
209209

210210
/* non-blocking read, but continue if interrupted */
211-
cnt = -1;
212-
while( cnt < 0 ) {
211+
do {
213212
cnt = readv(sd, frag->iov_ptr, num_vecs);
214213
if( 0 < cnt ) goto advance_iov_position;
215214
if( cnt == 0 ) {
@@ -247,7 +246,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
247246
mca_btl_tcp_endpoint_close(btl_endpoint);
248247
return false;
249248
}
250-
}
249+
} while( cnt < 0 );
251250

252251
advance_iov_position:
253252
/* if the read didn't complete - update the iovec state */

opal/mca/btl/tcp/btl_tcp_frag.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ struct mca_btl_tcp_frag_t {
5353
mca_btl_tcp_hdr_t hdr;
5454
struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
5555
struct iovec *iov_ptr;
56-
size_t iov_cnt;
57-
size_t iov_idx;
56+
uint32_t iov_cnt;
57+
uint32_t iov_idx;
5858
size_t size;
5959
uint16_t next_step;
6060
int rc;

0 commit comments

Comments
 (0)