Skip to content

Commit c4c9ee3

Browse files
committed
Add support for ip4s NetworkInterface
1 parent 855070e commit c4c9ee3

File tree

19 files changed

+195
-140
lines changed

19 files changed

+195
-140
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
428428
.settings(
429429
name := "fs2-io",
430430
tlVersionIntroduced ~= { _.updated("3", "3.1.0") },
431-
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.7.0",
431+
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.8.0-M3",
432432
tlJdkRelease := None
433433
)
434434
.jvmSettings(

io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import com.comcast.ip4s._
3131

3232
import scala.concurrent.duration._
3333

34-
class UdpSuite extends Fs2Suite with UdpSuitePlatform {
34+
class UdpSuite extends Fs2Suite {
3535
private def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
3636
socket
3737
.write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend)))
@@ -114,25 +114,45 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
114114
val group = mip"239.10.10.10"
115115
val groupJoin = MulticastJoin.asm(group)
116116
val msg = Chunk.array("Hello, world!".getBytes)
117+
val outgoingInterface =
118+
// Get first non-loopback interface with an IPv4 address
119+
Network[IO].interfaces.getAll.map { interfaces =>
120+
interfaces.values
121+
.filterNot(_.isLoopback)
122+
.flatMap(iface =>
123+
iface.addresses.filter(_.address.fold(_ => true, _ => false)).as(iface)
124+
)
125+
.head
126+
}
117127
Stream
118-
.resource(
119-
Network[IO]
120-
.bindDatagramSocket(
121-
options = List(SocketOption.multicastTtl(1))
128+
.eval(outgoingInterface)
129+
.flatMap { out =>
130+
Stream
131+
.resource(
132+
Network[IO]
133+
.bindDatagramSocket(
134+
options = List(SocketOption.multicastTtl(1), SocketOption.multicastInterface(out))
135+
)
136+
.evalMap { serverSocket =>
137+
Network[IO].interfaces.getAll.flatMap { interfaces =>
138+
interfaces.values.toList
139+
.filter(iface =>
140+
iface.addresses.exists(_.address.fold(_ => true, _ => false))
141+
)
142+
.traverse_(iface => serverSocket.join(groupJoin, iface))
143+
.as(serverSocket)
144+
}
145+
}
122146
)
123-
.evalMap { serverSocket =>
124-
v4Interfaces
125-
.traverse_(interface => serverSocket.join(groupJoin, interface))
126-
.as(serverSocket)
147+
.flatMap { serverSocket =>
148+
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
149+
val client =
150+
Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
151+
val to = SocketAddress(group.address, serverSocket.address.asIpUnsafe.port)
152+
Stream.eval(clientSocket.write(msg, to) >> clientSocket.read)
153+
}
154+
client.concurrently(server)
127155
}
128-
)
129-
.flatMap { serverSocket =>
130-
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
131-
val client = Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
132-
val to = SocketAddress(group.address, serverSocket.address.asIpUnsafe.port)
133-
Stream.eval(clientSocket.write(msg, to) >> clientSocket.read)
134-
}
135-
client.concurrently(server)
136156
}
137157
.compile
138158
.lastOrError

io/js/src/main/scala/fs2/io/internal/facade/dgram.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private[io] object dgram {
9797
@js.native
9898
trait AddressInfo extends js.Object {
9999
def address: String = js.native
100-
def family: Int = js.native
100+
def family: String = js.native
101101
def port: Int = js.native
102102
}
103103

@@ -109,7 +109,7 @@ private[io] object dgram {
109109
@js.native
110110
trait RemoteInfo extends js.Object {
111111
def address: String = js.native
112-
def family: Int = js.native
112+
def family: String = js.native
113113
def port: Int = js.native
114114
def size: Int = js.native
115115
}

io/js/src/main/scala/fs2/io/internal/facade/os.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,7 @@ private[io] object os {
3838
@JSImport("os", "type")
3939
def `type`(): String = js.native
4040

41-
@js.native
42-
@JSImport("os", "networkInterfaces")
43-
def networkInterfaces(): js.Dictionary[js.Array[NetworkInterfaceInfo]] = js.native
44-
4541
@js.native
4642
@JSImport("os", "EOL")
4743
def EOL: String = js.native
48-
49-
@js.native
50-
trait NetworkInterfaceInfo extends js.Object {
51-
def family: String = js.native
52-
def address: String = js.native
53-
}
54-
5544
}

io/js/src/main/scala/fs2/io/net/DatagramSocketOption.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package io
2424
package net
2525

2626
import cats.effect.kernel.Sync
27+
import com.comcast.ip4s.NetworkInterface
2728
import fs2.io.internal.facade
2829

2930
/** Specifies a socket option on a TCP/UDP socket.
@@ -65,10 +66,13 @@ object DatagramSocketOption {
6566
override private[net] def toSocketOption: SocketOption.Key[Boolean] = SocketOption.Broadcast
6667
}
6768

68-
object MulticastInterface extends Key[String] {
69-
override private[net] def set[F[_]: Sync](sock: facade.dgram.Socket, value: String): F[Unit] =
70-
Sync[F].delay(sock.setMulticastInterface(value))
71-
override private[net] def toSocketOption: SocketOption.Key[String] =
69+
object MulticastInterface extends Key[NetworkInterface] {
70+
override private[net] def set[F[_]: Sync](
71+
sock: facade.dgram.Socket,
72+
value: NetworkInterface
73+
): F[Unit] =
74+
SocketOption.MulticastInterface.set(sock, value)
75+
override private[net] def toSocketOption: SocketOption.Key[NetworkInterface] =
7276
SocketOption.MulticastInterface
7377
}
7478

@@ -116,7 +120,8 @@ object DatagramSocketOption {
116120
}
117121

118122
def broadcast(value: Boolean): DatagramSocketOption = apply(Broadcast, value)
119-
def multicastInterface(value: String): DatagramSocketOption = apply(MulticastInterface, value)
123+
def multicastInterface(value: NetworkInterface): DatagramSocketOption =
124+
apply(MulticastInterface, value)
120125
def multicastLoopback(value: Boolean): DatagramSocketOption = apply(MulticastLoopback, value)
121126
def multicastTtl(value: Int): DatagramSocketOption = apply(MulticastTtl, value)
122127
def receiveBufferSize(value: Int): DatagramSocketOption = apply(ReceiveBufferSize, value)

io/js/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ import cats.effect.std.Dispatcher
3131
import cats.effect.std.Queue
3232
import cats.effect.syntax.all._
3333
import cats.syntax.all._
34-
import com.comcast.ip4s.AnySourceMulticastJoin
35-
import com.comcast.ip4s.GenSocketAddress
36-
import com.comcast.ip4s.IpAddress
37-
import com.comcast.ip4s.MulticastJoin
38-
import com.comcast.ip4s.Port
39-
import com.comcast.ip4s.SocketAddress
40-
import com.comcast.ip4s.SourceSpecificMulticastJoin
34+
import com.comcast.ip4s.{
35+
AnySourceMulticastJoin,
36+
GenSocketAddress,
37+
IpAddress,
38+
MulticastJoin,
39+
NetworkInterface => Ip4sNetworkInterface,
40+
Port,
41+
SocketAddress,
42+
SourceSpecificMulticastJoin
43+
}
4144
import fs2.io.internal.facade
4245

4346
import scala.scalajs.js
@@ -48,6 +51,8 @@ private[net] trait DatagramSocketPlatform[F[_]] {
4851
}
4952

5053
private[net] trait DatagramSocketCompanionPlatform {
54+
55+
@deprecated("Use com.comcast.ip4s.NetworkInterface", "3.13.0")
5156
type NetworkInterface = String
5257

5358
private[net] def forAsync[F[_]](
@@ -142,7 +147,49 @@ private[net] trait DatagramSocketCompanionPlatform {
142147

143148
override def join(
144149
join: MulticastJoin[IpAddress],
145-
interface: NetworkInterface
150+
interface: Ip4sNetworkInterface
151+
): F[GroupMembership] = F
152+
.delay {
153+
val interfaceAddress = interface.addresses
154+
.collectFirst { case c if c.address.fold(_ => true, _ => false) => c.address }
155+
.getOrElse(
156+
throw new IllegalArgumentException("specified interface does not have ipv4 address")
157+
)
158+
.toString
159+
join match {
160+
case AnySourceMulticastJoin(group) =>
161+
sock.addMembership(group.address.toString, interfaceAddress)
162+
case SourceSpecificMulticastJoin(source, group) =>
163+
sock.addSourceSpecificMembership(
164+
source.toString,
165+
group.address.toString,
166+
interfaceAddress
167+
)
168+
}
169+
interfaceAddress
170+
}
171+
.map { interfaceAddress =>
172+
new GroupMembership {
173+
174+
override def drop: F[Unit] = F.delay {
175+
join match {
176+
case AnySourceMulticastJoin(group) =>
177+
sock.dropMembership(group.address.toString, interfaceAddress)
178+
case SourceSpecificMulticastJoin(source, group) =>
179+
sock.dropSourceSpecificMembership(
180+
source.toString,
181+
group.address.toString,
182+
interfaceAddress
183+
)
184+
}
185+
}
186+
}
187+
}
188+
189+
@deprecated("Use overload that takes a com.comcast.ip4s.NetworkInterface", "3.13.0")
190+
override def join(
191+
join: MulticastJoin[IpAddress],
192+
interface: DatagramSocket.NetworkInterface
146193
): F[GroupMembership] = F
147194
.delay {
148195
join match {

io/js/src/main/scala/fs2/io/net/SocketOptionPlatform.scala

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package fs2.io.net
2323

2424
import cats.effect.kernel.Sync
25+
import com.comcast.ip4s.NetworkInterface
2526
import fs2.io.internal.facade
2627

2728
import scala.annotation.nowarn
@@ -118,11 +119,30 @@ private[net] trait SocketOptionCompanionPlatform { self: SocketOption.type =>
118119
}
119120
def broadcast(value: Boolean): SocketOption = apply(Broadcast, value)
120121

121-
object MulticastInterface extends Key[String] {
122-
override private[net] def set[F[_]: Sync](sock: facade.dgram.Socket, value: String): F[Unit] =
123-
Sync[F].delay(sock.setMulticastInterface(value))
122+
object MulticastInterface extends Key[NetworkInterface] {
123+
override private[net] def set[F[_]: Sync](
124+
sock: facade.dgram.Socket,
125+
value: NetworkInterface
126+
): F[Unit] =
127+
Sync[F].delay {
128+
val mi = sock.address().family match {
129+
case "IPv4" =>
130+
value.addresses
131+
.collectFirst {
132+
case c if c.address.fold(_ => true, _ => false) => c.address.toString
133+
}
134+
.getOrElse(
135+
throw new IllegalArgumentException(
136+
"socket is IPv4 but specified interface does not have an IPv4 address"
137+
)
138+
)
139+
case "IPv6" => "::%" + value.name
140+
case other => throw new IllegalStateException(s"unexpected socket family: $other")
141+
}
142+
sock.setMulticastInterface(mi)
143+
}
124144
}
125-
def multicastInterface(value: String): SocketOption = apply(MulticastInterface, value)
145+
def multicastInterface(value: NetworkInterface): SocketOption = apply(MulticastInterface, value)
126146

127147
object MulticastLoop extends Key[Boolean] {
128148
override private[net] def set[F[_]: Sync](sock: facade.dgram.Socket, value: Boolean): F[Unit] =

io/js/src/test/scala/fs2/io/net/udp/UdpSuitePlatform.scala

Lines changed: 0 additions & 42 deletions
This file was deleted.

io/jvm-native/src/main/scala/fs2/io/net/SocketOptionPlatform.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import java.net.{SocketOption => JSocketOption}
2525
import java.net.StandardSocketOptions
2626

2727
import java.lang.{Boolean => JBoolean, Integer => JInt}
28-
import java.net.NetworkInterface
28+
import java.net.{NetworkInterface => JNetworkInterface}
29+
30+
import com.comcast.ip4s.NetworkInterface
2931

3032
private[net] trait SocketOptionCompanionPlatform {
3133
type Key[A] = JSocketOption[A]
@@ -38,6 +40,9 @@ private[net] trait SocketOptionCompanionPlatform {
3840

3941
val MulticastInterface = StandardSocketOptions.IP_MULTICAST_IF
4042
def multicastInterface(value: NetworkInterface): SocketOption =
43+
SocketOption(MulticastInterface, JNetworkInterface.getByName(value.name))
44+
45+
def multicastInterface(value: JNetworkInterface): SocketOption =
4146
SocketOption(MulticastInterface, value)
4247

4348
val MulticastLoop = StandardSocketOptions.IP_MULTICAST_LOOP

io/jvm/src/main/scala/fs2/io/net/AsyncIpDatagramSocketsProvider.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ import java.net.InetSocketAddress
2727

2828
import cats.syntax.all._
2929
import cats.effect.{Async, Resource}
30-
import com.comcast.ip4s.{Dns, Host, SocketAddress}
30+
import com.comcast.ip4s.{Dns, Host, NetworkInterface, SocketAddress}
3131

3232
import fs2.internal.ThreadFactories
3333
import java.net.StandardProtocolFamily
3434
import java.nio.channels.DatagramChannel
3535
import com.comcast.ip4s.*
36-
import java.net.NetworkInterface
36+
import java.net.{NetworkInterface => JNetworkInterface}
3737
import CollectionCompat.*
3838

3939
private[net] object AsyncIpDatagramSocketsProvider {
@@ -131,10 +131,11 @@ private[net] object AsyncIpDatagramSocketsProvider {
131131
interface: NetworkInterface
132132
): F[GroupMembership] =
133133
Async[F].delay {
134+
val jinterface = JNetworkInterface.getByName(interface.name)
134135
val membership = join.fold(
135-
j => channel.join(j.group.address.toInetAddress, interface),
136+
j => channel.join(j.group.address.toInetAddress, jinterface),
136137
j =>
137-
channel.join(j.group.address.toInetAddress, interface, j.source.toInetAddress)
138+
channel.join(j.group.address.toInetAddress, jinterface, j.source.toInetAddress)
138139
)
139140
new GroupMembership {
140141
def drop = Async[F].delay(membership.drop)
@@ -146,6 +147,12 @@ private[net] object AsyncIpDatagramSocketsProvider {
146147
}
147148
}
148149

150+
override def join(
151+
j: MulticastJoin[IpAddress],
152+
interface: JNetworkInterface
153+
): F[GroupMembership] =
154+
join(j, NetworkInterface.fromJava(interface))
155+
149156
override def supportedOptions: F[Set[SocketOption.Key[?]]] =
150157
Async[F].delay {
151158
channel.supportedOptions.asScala.toSet

io/jvm/src/main/scala/fs2/io/net/DatagramSocketPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ private[net] trait DatagramSocketPlatform[F[_]] {
3737
}
3838

3939
private[net] trait DatagramSocketCompanionPlatform {
40-
// TODO deprecate and replace with real cross-platform type
40+
@deprecated("Use com.comcast.ip4s.NetworkInterface", "3.13.0")
4141
type NetworkInterface = java.net.NetworkInterface
4242
}

0 commit comments

Comments
 (0)