Skip to content
This repository was archived by the owner on Oct 8, 2025. It is now read-only.

Commit 029942f

Browse files
committed
I/O operations refactoring.
1 parent 059a864 commit 029942f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1143
-1214
lines changed

auto/sources

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ NXT_LIB_SRCS=" \
126126
src/nxt_job_resolve.c \
127127
src/nxt_sockaddr.c \
128128
src/nxt_listen_socket.c \
129+
src/nxt_upstream_round_robin.c \
129130
"
130131

131132
NXT_LIB_SRC0=" \
@@ -190,7 +191,6 @@ NXT_LIB_UNIT_TEST_SRCS=" \
190191
test/nxt_rbtree_unit_test.c \
191192
test/nxt_term_parse_unit_test.c \
192193
test/nxt_msec_diff_unit_test.c \
193-
test/nxt_exp_approximation.c \
194194
test/nxt_mem_cache_pool_unit_test.c \
195195
test/nxt_mem_zone_unit_test.c \
196196
test/nxt_lvlhsh_unit_test.c \
@@ -330,6 +330,7 @@ NXT_SRCS=" \
330330
src/nxt_cycle.c \
331331
src/nxt_port.c \
332332
src/nxt_application.c \
333+
src/nxt_stream_module.c \
333334
src/nxt_master_process.c \
334335
src/nxt_worker_process.c \
335336
"

src/nxt_aix_send_file.c

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
3232
sb.size = 0;
3333
sb.limit = limit;
3434

35-
nhd = nxt_sendbuf_mem_coalesce(&sb);
35+
nhd = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
3636

3737
if (nhd == 0 && sb.sync) {
3838
return 0;
@@ -53,7 +53,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
5353
sb.iobuf = &tr;
5454
sb.nmax = 1;
5555

56-
ntr = nxt_sendbuf_mem_coalesce(&sb);
56+
ntr = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
5757

5858
nxt_memzero(&sfp, sizeof(struct sf_parms));
5959

@@ -71,17 +71,16 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
7171
sfp.trailer_length = tr.iov_len;
7272
}
7373

74-
nxt_log_debug(c->socket.log, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui",
75-
c->socket.fd, fb->file->fd, fb->file_pos, file_size,
76-
nhd, ntr);
74+
nxt_debug(c->socket.task, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui",
75+
c->socket.fd, fb->file->fd, fb->file_pos, file_size, nhd, ntr);
7776

7877
n = send_file(&c->socket.fd, &sfp, 0);
7978

8079
err = (n == -1) ? nxt_errno : 0;
8180
sent = sfp.bytes_sent;
8281

83-
nxt_log_debug(c->socket.log, "send_file(%d): %d sent:%O",
84-
c->socket.fd, n, sent);
82+
nxt_debug(c->socket.task, "send_file(%d): %d sent:%O",
83+
c->socket.fd, n, sent);
8584

8685
/*
8786
* -1 an error has occurred, errno contains the error code;
@@ -102,16 +101,15 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
102101

103102
default:
104103
c->socket.error = err;
105-
nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
106-
c->socket.log, "send_file(%d) failed %E \"%FN\" "
107-
"fd:%FD @%O:%O hd:%ui tr:%ui", c->socket.fd, err,
108-
fb->file->name, fb->file->fd, fb->file_pos,
109-
file_size, nhd, ntr);
104+
nxt_log(c->socket.task, nxt_socket_error_level(err),
105+
"send_file(%d) failed %E \"%FN\" fd:%FD @%O:%O hd:%ui tr:%ui",
106+
c->socket.fd, err, fb->file->name, fb->file->fd, fb->file_pos,
107+
file_size, nhd, ntr);
110108

111109
return NXT_ERROR;
112110
}
113111

114-
nxt_log_debug(c->socket.log, "sendfile() %E", err);
112+
nxt_debug(c->socket.task, "sendfile() %E", err);
115113

116114
return sent;
117115
}

src/nxt_application.c

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
2020
nxt_log_t *log);
2121
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
2222
static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
23-
static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data);
2423
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
2524
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
2625
static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
@@ -683,7 +682,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length)
683682

684683
nxt_buf_mem_init(b, start, 4096);
685684

686-
b->completion_handler = nxt_app_buf_completion;
685+
b->completion_handler = NULL;
687686

688687
nxt_app_buf_current_number++;
689688
}
@@ -713,7 +712,7 @@ nxt_app_write_finish(nxt_app_request_t *r)
713712
return NXT_ERROR;
714713
}
715714

716-
b->completion_handler = nxt_app_buf_completion;
715+
b->completion_handler = NULL;
717716
b->parent = (nxt_buf_t *) r;
718717

719718
out = r->output_buf;
@@ -745,20 +744,6 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
745744
}
746745

747746

748-
static void
749-
nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data)
750-
{
751-
nxt_buf_t *b;
752-
753-
b = obj;
754-
755-
nxt_debug(task, "app buf completion");
756-
757-
b->next = nxt_app_buf_done;
758-
nxt_app_buf_done = b;
759-
}
760-
761-
762747
static void
763748
nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
764749
{
@@ -797,14 +782,14 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
797782
c->write = b;
798783
c->write_state = &nxt_app_delivery_write_state;
799784

800-
nxt_event_conn_write(task, c);
785+
nxt_event_conn_write(task->thread->engine, c);
801786
}
802787

803788

804789
static const nxt_event_conn_state_t nxt_app_delivery_write_state
805790
nxt_aligned(64) =
806791
{
807-
NXT_EVENT_BUF_PROCESS,
792+
NXT_EVENT_NO_BUF_PROCESS,
808793
NXT_EVENT_TIMER_AUTORESET,
809794

810795
nxt_app_delivery_ready,
@@ -820,12 +805,26 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state
820805
static void
821806
nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
822807
{
808+
nxt_buf_t *b, *next;
823809
nxt_event_conn_t *c;
824810

825811
c = obj;
826812

827813
nxt_debug(task, "app delivery ready");
828814

815+
for (b = c->write; b != NULL; b = next) {
816+
817+
if (nxt_buf_is_mem(b)) {
818+
if (b->mem.pos != b->mem.free) {
819+
break;
820+
}
821+
}
822+
823+
next = b->next;
824+
b->next = nxt_app_buf_done;
825+
nxt_app_buf_done = b;
826+
}
827+
829828
nxt_work_queue_add(c->write_work_queue,
830829
nxt_app_delivery_completion, task, c, NULL);
831830
}

0 commit comments

Comments
 (0)