Skip to content

Commit 658837d

Browse files
committed
implement net.Socket
- support TCP sockets for now, i.e. no IPC - extra features like keep-alive, no-delay etc. are absent due to limitations of uSockets - fix `jest` to treat `done(nullish)` as success
1 parent 4eb9e8b commit 658837d

File tree

3 files changed

+430
-3
lines changed

3 files changed

+430
-3
lines changed

src/bun.js/net.exports.js

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,255 @@ export function isIP(s) {
5353
return 0;
5454
}
5555

56+
const { Bun, createFIFO } = import.meta.primordials;
57+
const { connect: bunConnect } = Bun;
58+
const { Duplex } = import.meta.require("node:stream");
59+
60+
export class Socket extends Duplex {
61+
static #Handlers = {
62+
close: Socket.#Close,
63+
data(socket, buffer) {
64+
const self = socket.data;
65+
self.bytesRead += buffer.length;
66+
if (self.#readQueue.isEmpty()) {
67+
if (self.push(buffer)) return;
68+
}
69+
self.#readQueue.push(buffer);
70+
},
71+
drain: Socket.#Drain,
72+
end: Socket.#Close,
73+
error(socket, error) {
74+
const self = socket.data;
75+
const callback = self.#writeCallback;
76+
if (callback) {
77+
self.#writeCallback = null;
78+
callback(error);
79+
}
80+
self.emit("error", error);
81+
},
82+
open(socket) {
83+
const self = socket.data;
84+
socket.timeout(self.timeout);
85+
self.#socket = socket;
86+
self.connecting = false;
87+
self.emit("connect");
88+
Socket.#Drain(socket);
89+
},
90+
timeout() {
91+
const self = socket.data;
92+
self.emit("timeout");
93+
},
94+
};
95+
96+
static #Close(socket) {
97+
const self = socket.data;
98+
if (self.#closed) return;
99+
self.#closed = true;
100+
if (self.#readQueue.isEmpty()) {
101+
if (self.push(null)) return;
102+
}
103+
self.#readQueue.push(null);
104+
}
105+
106+
static #Drain(socket) {
107+
const self = socket.data;
108+
const callback = self.#writeCallback;
109+
if (callback) {
110+
const chunk = self.#writeChunk;
111+
const written = socket.write(chunk);
112+
self.bytesWritten += written;
113+
if (written < chunk.length) {
114+
self.#writeChunk = chunk.slice(written);
115+
} else {
116+
self.#writeCallback = null;
117+
self.#writeChunk = null;
118+
callback(null);
119+
}
120+
}
121+
}
122+
123+
bytesRead = 0;
124+
bytesWritten = 0;
125+
#closed = false;
126+
connecting = false;
127+
localAddress = "127.0.0.1";
128+
#readQueue = createFIFO();
129+
remotePort;
130+
#socket;
131+
timeout = 0;
132+
#writeCallback;
133+
#writeChunk;
134+
135+
constructor(options) {
136+
super({
137+
allowHalfOpen: options?.allowHalfOpen || false,
138+
readable: true,
139+
writable: true,
140+
});
141+
options?.signal?.once("abort", () => this.destroy());
142+
this.once("connect", () => this.emit("ready"));
143+
// TODO support `options.fd`
144+
}
145+
146+
address() {
147+
return {
148+
address: this.localAddress,
149+
family: this.localFamily,
150+
port: this.localPort,
151+
};
152+
}
153+
154+
get bufferSize() {
155+
return this.writableLength;
156+
}
157+
158+
connect(port, host, connectListener) {
159+
// TODO support IPC sockets
160+
if (typeof host == "function") {
161+
connectListener = host;
162+
host = undefined;
163+
}
164+
if (typeof port == "object") {
165+
var {
166+
port,
167+
host,
168+
// TODOs
169+
localAddress,
170+
localPort,
171+
family,
172+
hints,
173+
lookup,
174+
noDelay,
175+
keepAlive,
176+
keepAliveInitialDelay,
177+
} = port;
178+
}
179+
this.connecting = true;
180+
this.remotePort = port;
181+
if (connectListener) this.on("connect", connectListener);
182+
bunConnect({
183+
data: this,
184+
hostname: host || "localhost",
185+
port: port,
186+
socket: Socket.#Handlers,
187+
});
188+
return this;
189+
}
190+
191+
_destroy(err, callback) {
192+
this.#socket?.end();
193+
callback(err);
194+
}
195+
196+
_final(callback) {
197+
this.#socket.end();
198+
callback();
199+
}
200+
201+
get localAddress() {
202+
return "127.0.0.1";
203+
}
204+
205+
get localFamily() {
206+
return "IPv4";
207+
}
208+
209+
get localPort() {
210+
return this.#socket?.localPort;
211+
}
212+
213+
get pending() {
214+
return this.connecting;
215+
}
216+
217+
_read(size) {
218+
const queue = this.#readQueue;
219+
let chunk;
220+
while (chunk = queue.peek()) {
221+
if (!this.push(chunk)) break;
222+
queue.shift();
223+
}
224+
}
225+
226+
get readyState() {
227+
if (this.connecting) return "opening";
228+
if (this.readable) {
229+
return this.writable ? "open" : "readOnly";
230+
} else {
231+
return this.writable ? "writeOnly" : "closed";
232+
}
233+
}
234+
235+
ref() {
236+
this.#socket?.ref();
237+
}
238+
239+
get remoteAddress() {
240+
return this.#socket.remoteAddress;
241+
}
242+
243+
get remoteFamily() {
244+
return "IPv4";
245+
}
246+
247+
resetAndDestroy() {
248+
this.#socket?.end();
249+
}
250+
251+
setKeepAlive(enable = false, initialDelay = 0) {
252+
// TODO
253+
}
254+
255+
setNoDelay(noDelay = true) {
256+
// TODO
257+
}
258+
259+
setTimeout(timeout, callback) {
260+
this.#socket?.timeout(timeout);
261+
this.timeout = timeout;
262+
if (callback) this.once("timeout", callback);
263+
return this;
264+
}
265+
266+
unref() {
267+
this.#socket?.unref();
268+
}
269+
270+
_write(chunk, encoding, callback) {
271+
if (typeof chunk == "string" && encoding !== "utf8") chunk = Buffer.from(chunk, encoding);
272+
var written = this.#socket?.write(chunk);
273+
if (written == chunk.length) {
274+
callback();
275+
} else if (this.#writeCallback) {
276+
callback(new Error("overlapping _write()"));
277+
} else {
278+
if (written > 0) chunk = chunk.slice(written);
279+
this.#writeCallback = callback;
280+
this.#writeChunk = chunk;
281+
}
282+
}
283+
}
284+
285+
export function createConnection(port, host, connectListener) {
286+
if (typeof host == "function") {
287+
connectListener = host;
288+
host = undefined;
289+
}
290+
var options = typeof port == "object" ? port : {
291+
host: host,
292+
port: port,
293+
};
294+
return new Socket(options).connect(options, connectListener);
295+
}
296+
297+
export const connect = createConnection;
298+
56299
export default {
300+
createConnection,
301+
connect,
57302
isIP,
58303
isIPv4,
59304
isIPv6,
305+
Socket,
60306
[Symbol.for("CommonJS")]: 0,
61307
};

src/bun.js/test/jest.zig

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,8 +1275,12 @@ pub const TestScope = struct {
12751275
JSC.setFunctionData(function, null);
12761276
if (args.len > 0) {
12771277
const err = args.ptr[0];
1278-
globalThis.bunVM().runErrorHandlerWithDedupe(err, null);
1279-
task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback);
1278+
if (err.isEmptyOrUndefinedOrNull()) {
1279+
task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback);
1280+
} else {
1281+
globalThis.bunVM().runErrorHandlerWithDedupe(err, null);
1282+
task.handleResult(.{ .fail = active_test_expectation_counter.actual }, .callback);
1283+
}
12801284
} else {
12811285
task.handleResult(.{ .pass = active_test_expectation_counter.actual }, .callback);
12821286
}
@@ -1510,7 +1514,9 @@ pub const DescribeScope = struct {
15101514
JSC.setFunctionData(function, null);
15111515
if (args.len > 0) {
15121516
const err = args.ptr[0];
1513-
ctx.bunVM().runErrorHandlerWithDedupe(err, null);
1517+
if (!err.isEmptyOrUndefinedOrNull()) {
1518+
ctx.bunVM().runErrorHandlerWithDedupe(err, null);
1519+
}
15141520
}
15151521
scope.done = true;
15161522
}

0 commit comments

Comments
 (0)