Skip to content

Cleanup the output-filename options so they work as expected. #1388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion orte/mca/iof/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_iof_read_event_t);
typedef struct {
opal_list_item_t super;
orte_process_name_t name;
orte_iof_sink_t *stdin;
orte_iof_sink_t *stdinev;
orte_iof_read_event_t *revstdout;
orte_iof_read_event_t *revstderr;
orte_iof_read_event_t *revstddiag;
Expand Down Expand Up @@ -202,6 +202,7 @@ ORTE_DECLSPEC extern orte_iof_base_t orte_iof_base;
ORTE_DECLSPEC int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream,
unsigned char *data, int numbytes,
orte_iof_write_event_t *channel);
ORTE_DECLSPEC void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev);
ORTE_DECLSPEC void orte_iof_base_write_handler(int fd, short event, void *cbdata);

END_C_DECLS
Expand Down
6 changes: 3 additions & 3 deletions orte/mca/iof/base/iof_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ OBJ_CLASS_INSTANCE(orte_iof_job_t,

static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
{
ptr->stdin = NULL;
ptr->stdinev = NULL;
ptr->revstdout = NULL;
ptr->revstderr = NULL;
ptr->revstddiag = NULL;
Expand All @@ -81,8 +81,8 @@ static void orte_iof_base_proc_construct(orte_iof_proc_t* ptr)
}
static void orte_iof_base_proc_destruct(orte_iof_proc_t* ptr)
{
if (NULL != ptr->stdin) {
OBJ_RELEASE(ptr->stdin);
if (NULL != ptr->stdinev) {
OBJ_RELEASE(ptr->stdinev);
}
if (NULL != ptr->revstdout) {
OBJ_RELEASE(ptr->revstdout);
Expand Down
26 changes: 26 additions & 0 deletions orte/mca/iof/base/iof_base_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,32 @@ int orte_iof_base_write_output(orte_process_name_t *name, orte_iof_tag_t stream,
return num_buffered;
}

void orte_iof_base_static_dump_output(orte_iof_read_event_t *rev)
{
bool dump;
int num_written;
orte_iof_write_event_t *wev;
orte_iof_write_output_t *output;

if (NULL != rev->sink) {
wev = rev->sink->wev;
if (NULL != wev && !opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
/* don't retry - just cleanout the list and dump it */
dump = true;
}
}
OBJ_RELEASE(output);
}
}
}
}

void orte_iof_base_write_handler(int fd, short event, void *cbdata)
{
orte_iof_sink_t *sink = (orte_iof_sink_t*)cbdata;
Expand Down
52 changes: 34 additions & 18 deletions orte/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
*/
if (ORTE_VPID_WILDCARD == dst_name->vpid) {
/* if wildcard, define a sink with that info so it gets sent out */
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = ORTE_VPID_WILDCARD;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = ORTE_VPID_WILDCARD;
} else {
/* no - lookup the proc's daemon and set that into sink */
if (NULL == (jdata = orte_get_job_data_object(dst_name->jobid))) {
Expand All @@ -252,10 +252,10 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
}
/* if it is me, then don't set this up - we'll get it on the pull */
if (ORTE_PROC_MY_NAME->vpid != proc->node->daemon->name.vpid) {
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, -1, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, -1, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = proc->node->daemon->name.vpid;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = proc->node->daemon->name.vpid;
}
}

Expand Down Expand Up @@ -370,10 +370,10 @@ static int hnp_pull(const orte_process_name_t* dst_name,
opal_list_append(&mca_iof_hnp_component.procs, &proct->super);

SETUP:
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler);
proct->stdin->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdin->daemon.vpid = ORTE_PROC_MY_NAME->vpid;
proct->stdinev->daemon.jobid = ORTE_PROC_MY_NAME->jobid;
proct->stdinev->daemon.vpid = ORTE_PROC_MY_NAME->vpid;

return ORTE_SUCCESS;
}
Expand All @@ -392,25 +392,28 @@ static int hnp_close(const orte_process_name_t* peer,
OPAL_LIST_FOREACH(proct, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
if (ORTE_IOF_STDIN & source_tag) {
if (NULL != proct->stdin) {
OBJ_RELEASE(proct->stdin);
if (NULL != proct->stdinev) {
OBJ_RELEASE(proct->stdinev);
}
++cnt;
}
if (ORTE_IOF_STDOUT & source_tag) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
}
++cnt;
}
if (ORTE_IOF_STDERR & source_tag) {
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
}
++cnt;
}
if (ORTE_IOF_STDDIAG & source_tag) {
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
++cnt;
Expand All @@ -428,19 +431,18 @@ static int hnp_close(const orte_process_name_t* peer,

static int finalize(void)
{
opal_list_item_t* item;
orte_iof_write_output_t *output;
orte_iof_write_event_t *wev;
int num_written;
orte_iof_proc_t *proct;
bool dump;
orte_iof_write_output_t *output;
int num_written;

/* check if anything is still trying to be written out */
wev = orte_iof_base.iof_write_stdout->wev;
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
Expand All @@ -457,8 +459,7 @@ static int finalize(void)
if (!opal_list_is_empty(&wev->outputs)) {
dump = false;
/* make one last attempt to write this out */
while (NULL != (item = opal_list_remove_first(&wev->outputs))) {
output = (orte_iof_write_output_t*)item;
while (NULL != (output = (orte_iof_write_output_t*)opal_list_remove_first(&wev->outputs))) {
if (!dump) {
num_written = write(wev->fd, output->data, output->numbytes);
if (num_written < output->numbytes) {
Expand All @@ -471,6 +472,21 @@ static int finalize(void)
}
}

/* cycle thru the procs and ensure all their output was delivered
* if they were writing to files */
while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_hnp_component.procs))) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
}
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
}
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
}
OBJ_RELEASE(proct);
}
OBJ_DESTRUCT(&mca_iof_hnp_component.procs);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_HNP);

return ORTE_SUCCESS;
Expand Down
35 changes: 24 additions & 11 deletions orte/mca/iof/hnp/iof_hnp_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
return;
}
/* if the daemon is me, then this is a local sink */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdin->daemon)) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, ORTE_PROC_MY_NAME, &proct->stdinev->daemon)) {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s read %d bytes from stdin - writing to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
Expand All @@ -151,8 +151,8 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* down the pipe so it forces out any preceding data before
* closing the output stream
*/
if (NULL != proct->stdin->wev) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdin->wev)) {
if (NULL != proct->stdinev->wev) {
if (ORTE_IOF_MAX_INPUT_BUFFERS < orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, proct->stdinev->wev)) {
/* getting too backed up - stop the read event for now if it is still active */

OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
Expand All @@ -162,9 +162,9 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
} else {
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s sending %d bytes from stdin to daemon %s",
"%s sending %d bytes from stdinev to daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numbytes,
ORTE_NAME_PRINT(&proct->stdin->daemon)));
ORTE_NAME_PRINT(&proct->stdinev->daemon)));

/* send the data to the daemon so it can
* write it to the proc's fd - in this case,
Expand All @@ -174,7 +174,7 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* sent - this will tell the daemon to close
* the fd for stdin to that proc
*/
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdin->daemon, &proct->stdin->name, ORTE_IOF_STDIN, data, numbytes))) {
if( ORTE_SUCCESS != (rc = orte_iof_hnp_send_data_to_endpoint(&proct->stdinev->daemon, &proct->stdinev->name, ORTE_IOF_STDIN, data, numbytes))) {
/* if the addressee is unknown, remove the sink from the list */
if( ORTE_ERR_ADDRESSEE_UNKNOWN == rc ) {
OBJ_RELEASE(rev->sink);
Expand Down Expand Up @@ -244,10 +244,13 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
* nothing to output - release the appropriate event.
* This will delete the read event and close the file descriptor */
if (rev->tag & ORTE_IOF_STDOUT) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
} else if (rev->tag & ORTE_IOF_STDERR) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
} else if (rev->tag & ORTE_IOF_STDDIAG) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
/* check to see if they are all done */
Expand All @@ -262,11 +265,16 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
return;
}

if (!exclusive) {
/* see if the user wanted the output directed to files */
if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
/* output to the corresponding file */
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
if (proct->copy) {
if (NULL != proct->subscribers) {
if (!exclusive) {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stdout->wev);
} else {
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, orte_iof_base.iof_write_stderr->wev);
}
}
} else {
/* output this to our local output */
if (ORTE_IOF_STDOUT & rev->tag || orte_xml_output) {
Expand All @@ -276,6 +284,11 @@ void orte_iof_hnp_read_local_handler(int fd, short event, void *cbdata)
}
}
}
/* see if the user wanted the output directed to files */
if (NULL != rev->sink && !(ORTE_IOF_STDIN & rev->sink->tag)) {
/* output to the corresponding file */
orte_iof_base_write_output(&proct->name, rev->tag, data, numbytes, rev->sink->wev);
}

/* re-add the event */
opal_event_add(rev->ev, 0);
Expand Down
4 changes: 4 additions & 0 deletions orte/mca/iof/hnp/iof_hnp_receive.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ void orte_iof_hnp_recv(int status, orte_process_name_t* sender,
}
}
}
/* if the user doesn't want a copy written to the screen, then we are done */
if (!proct->copy) {
return;
}

/* output this to our local output unless one of the sinks was exclusive */
if (!exclusive) {
Expand Down
28 changes: 24 additions & 4 deletions orte/mca/iof/orted/iof_orted.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ static int orted_pull(const orte_process_name_t* dst_name,
opal_list_append(&mca_iof_orted_component.procs, &proct->super);

SETUP:
ORTE_IOF_SINK_DEFINE(&proct->stdin, dst_name, fd, ORTE_IOF_STDIN,
ORTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, ORTE_IOF_STDIN,
stdin_write_handler);

return ORTE_SUCCESS;
Expand All @@ -270,25 +270,28 @@ static int orted_close(const orte_process_name_t* peer,
OPAL_LIST_FOREACH(proct, &mca_iof_orted_component.procs, orte_iof_proc_t) {
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &proct->name, peer)) {
if (ORTE_IOF_STDIN & source_tag) {
if (NULL != proct->stdin) {
OBJ_RELEASE(proct->stdin);
if (NULL != proct->stdinev) {
OBJ_RELEASE(proct->stdinev);
}
++cnt;
}
if (ORTE_IOF_STDOUT & source_tag) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
OBJ_RELEASE(proct->revstdout);
}
++cnt;
}
if (ORTE_IOF_STDERR & source_tag) {
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
OBJ_RELEASE(proct->revstderr);
}
++cnt;
}
if (ORTE_IOF_STDDIAG & source_tag) {
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
OBJ_RELEASE(proct->revstddiag);
}
++cnt;
Expand All @@ -307,7 +310,24 @@ static int orted_close(const orte_process_name_t* peer,

static int finalize(void)
{
OPAL_LIST_DESTRUCT(&mca_iof_orted_component.procs);
orte_iof_proc_t *proct;

/* cycle thru the procs and ensure all their output was delivered
* if they were writing to files */
while (NULL != (proct = (orte_iof_proc_t*)opal_list_remove_first(&mca_iof_orted_component.procs))) {
if (NULL != proct->revstdout) {
orte_iof_base_static_dump_output(proct->revstdout);
}
if (NULL != proct->revstderr) {
orte_iof_base_static_dump_output(proct->revstderr);
}
if (NULL != proct->revstddiag) {
orte_iof_base_static_dump_output(proct->revstddiag);
}
OBJ_RELEASE(proct);
}
OBJ_DESTRUCT(&mca_iof_orted_component.procs);

/* Cancel the RML receive */
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_IOF_PROXY);
return ORTE_SUCCESS;
Expand Down
Loading