macro_rules! uring_spawn {
($span:expr, $future:expr) => {{
let (tx, rx) = tokio::sync::oneshot::channel::<crate::Result<()>>();
use tracing::Instrument as _;
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
std::thread::Builder::new().name("io-uring".into()).spawn(move || {
let _guard = tracing::dispatcher::set_default(&dispatcher);
match tokio_uring::Runtime::new(&tokio_uring::builder().entries(2048)) {
Ok(runtime) => {
let _ = tx.send(Ok(()));
if let Some(span) = $span {
runtime.block_on($future.instrument(span));
} else {
runtime.block_on($future);
}
}
Err(error) => {
let _ = tx.send(Err(error.into()));
}
};
}).expect("failed to spawn io-uring thread");
} else {
use tracing::instrument::WithSubscriber as _;
let fut = async move {
let _ = tx.send(Ok(()));
$future.await
};
if let Some(span) = $span {
tokio::spawn(fut.instrument(span).with_current_subscriber());
} else {
tokio::spawn(fut.with_current_subscriber());
}
}
}
rx
}};
}
macro_rules! uring_inner_spawn {
($future:expr) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
tokio_uring::spawn($future);
} else {
tokio::spawn($future);
}
}
};
}
macro_rules! uring_span {
($span:expr) => {{
cfg_if::cfg_if! {
if #[cfg(debug_assertions)] {
Some($span)
} else {
None
}
}
}};
}
pub mod cluster;
pub mod endpoint;
pub(crate) mod maxmind_db;
pub mod phoenix;
pub mod xds;
use std::{
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
};
use socket2::{Protocol, Socket, Type};
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
use tokio_uring::net::UdpSocket;
} else {
use tokio::net::UdpSocket;
}
}
pub use self::{
cluster::ClusterMap,
endpoint::{Endpoint, EndpointAddress},
};
fn socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result<UdpSocket> {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
raw_socket_with_reuse_and_address(addr)
.map(From::from)
.map(UdpSocket::from_std)
} else {
epoll_socket_with_reuse_and_address(addr)
}
}
}
fn epoll_socket_with_reuse(port: u16) -> std::io::Result<tokio::net::UdpSocket> {
raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into())
.map(From::from)
.and_then(tokio::net::UdpSocket::from_std)
}
fn epoll_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result<tokio::net::UdpSocket> {
raw_socket_with_reuse_and_address(addr)
.map(From::from)
.and_then(tokio::net::UdpSocket::from_std)
}
#[inline]
pub fn raw_socket_with_reuse(port: u16) -> std::io::Result<Socket> {
raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into())
}
pub fn raw_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result<Socket> {
let domain = match addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
};
let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
enable_reuse(&sock)?;
sock.set_nonblocking(true)?;
if domain == socket2::Domain::IPV6 {
sock.set_only_v6(false)?;
}
sock.bind(&addr.into())?;
Ok(sock)
}
#[inline]
pub fn socket_port(socket: &socket2::Socket) -> u16 {
match socket.local_addr().unwrap().as_socket().unwrap() {
SocketAddr::V4(addr) => addr.port(),
SocketAddr::V6(addr) => addr.port(),
}
}
#[cfg(not(target_family = "windows"))]
fn enable_reuse(sock: &Socket) -> io::Result<()> {
sock.set_reuse_port(true)?;
Ok(())
}
#[cfg(target_family = "windows")]
fn enable_reuse(sock: &Socket) -> io::Result<()> {
sock.set_reuse_address(true)?;
Ok(())
}
pub struct DualStackLocalSocket {
socket: UdpSocket,
local_addr: SocketAddr,
}
impl DualStackLocalSocket {
pub fn from_raw(socket: Socket) -> Self {
let socket: std::net::UdpSocket = socket.into();
let local_addr = socket.local_addr().unwrap();
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let socket = UdpSocket::from_std(socket);
} else {
let socket = UdpSocket::from_std(socket).unwrap();
}
}
Self { socket, local_addr }
}
pub fn new(port: u16) -> std::io::Result<Self> {
raw_socket_with_reuse(port).map(Self::from_raw)
}
pub fn bind_local(port: u16) -> std::io::Result<Self> {
let local_addr = (Ipv6Addr::LOCALHOST, port).into();
let socket = socket_with_reuse_and_address(local_addr)?;
Ok(Self { socket, local_addr })
}
pub fn local_ipv4_addr(&self) -> io::Result<SocketAddr> {
Ok(match self.local_addr {
SocketAddr::V4(_) => self.local_addr,
SocketAddr::V6(_) => (Ipv4Addr::UNSPECIFIED, self.local_addr.port()).into(),
})
}
pub fn local_ipv6_addr(&self) -> io::Result<SocketAddr> {
Ok(match self.local_addr {
SocketAddr::V4(v4addr) => SocketAddr::new(
IpAddr::V6(v4addr.ip().to_ipv6_mapped()),
self.local_addr.port(),
),
SocketAddr::V6(_) => self.local_addr,
})
}
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
pub async fn recv_from<B: tokio_uring::buf::IoBufMut>(&self, buf: B) -> (io::Result<(usize, SocketAddr)>, B) {
self.socket.recv_from(buf).await
}
pub async fn send_to<B: tokio_uring::buf::IoBuf>(&self, buf: B, target: SocketAddr) -> (io::Result<usize>, B) {
self.socket.send_to(buf, target).await
}
} else {
pub async fn recv_from<B: std::ops::DerefMut<Target = [u8]>>(&self, mut buf: B) -> (io::Result<(usize, SocketAddr)>, B) {
let result = self.socket.recv_from(&mut buf).await;
(result, buf)
}
pub async fn send_to<B: std::ops::Deref<Target = [u8]>>(&self, buf: B, target: SocketAddr) -> (io::Result<usize>, B) {
let result = self.socket.send_to(&buf, target).await;
(result, buf)
}
}
}
pub fn make_refcnt(self) -> DualStackLocalSocketRc {
DualStackLocalSocketRc::new(self)
}
}
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
pub type DualStackLocalSocketRc = std::rc::Rc<DualStackLocalSocket>;
} else {
pub type DualStackLocalSocketRc = std::sync::Arc<DualStackLocalSocket>;
}
}
#[derive(Debug)]
pub struct DualStackEpollSocket {
socket: tokio::net::UdpSocket,
}
impl DualStackEpollSocket {
pub fn new(port: u16) -> std::io::Result<Self> {
Ok(Self {
socket: epoll_socket_with_reuse(port)?,
})
}
pub fn bind_local(port: u16) -> std::io::Result<Self> {
Ok(Self {
socket: epoll_socket_with_reuse_and_address((Ipv6Addr::LOCALHOST, port).into())?,
})
}
pub(crate) fn new_with_address(addr: SocketAddr) -> std::io::Result<Self> {
Ok(Self {
socket: epoll_socket_with_reuse_and_address(addr)?,
})
}
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.socket.recv_from(buf).await
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.socket.local_addr()
}
pub fn local_ipv4_addr(&self) -> io::Result<SocketAddr> {
let addr = self.socket.local_addr()?;
match addr {
SocketAddr::V4(_) => Ok(addr),
SocketAddr::V6(_) => Ok((Ipv4Addr::UNSPECIFIED, addr.port()).into()),
}
}
pub fn local_ipv6_addr(&self) -> io::Result<SocketAddr> {
let addr = self.socket.local_addr()?;
match addr {
SocketAddr::V4(v4addr) => Ok(SocketAddr::new(
IpAddr::V6(v4addr.ip().to_ipv6_mapped()),
addr.port(),
)),
SocketAddr::V6(_) => Ok(addr),
}
}
pub async fn send_to<A: tokio::net::ToSocketAddrs>(
&self,
buf: &[u8],
target: A,
) -> io::Result<usize> {
self.socket.send_to(buf, target).await
}
}
pub struct TcpListener {
inner: std::net::TcpListener,
}
impl TcpListener {
#[inline]
pub fn bind(port: Option<u16>) -> io::Result<Self> {
std::net::TcpListener::bind((std::net::Ipv6Addr::UNSPECIFIED, port.unwrap_or_default()))
.map(|inner| Self { inner })
}
#[inline]
pub fn port(&self) -> u16 {
self.inner.local_addr().expect("failed to bind").port()
}
#[inline]
pub fn local_addr(&self) -> SocketAddr {
self.inner.local_addr().expect("failed to bind")
}
#[inline]
pub fn into_stream(self) -> io::Result<tokio_stream::wrappers::TcpListenerStream> {
self.inner.set_nonblocking(true)?;
let tl = tokio::net::TcpListener::from_std(self.inner)?;
Ok(tokio_stream::wrappers::TcpListenerStream::new(tl))
}
}
impl From<TcpListener> for std::net::TcpListener {
#[inline]
fn from(value: TcpListener) -> Self {
value.inner
}
}
#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
time::Duration,
};
use tokio::time::timeout;
use crate::net::endpoint::address::AddressKind;
use crate::test::{available_addr, AddressType, TestHelper};
#[tokio::test]
async fn dual_stack_socket_reusable() {
let expected = available_addr(AddressType::Random).await;
let socket = super::DualStackEpollSocket::new(expected.port()).unwrap();
let addr = socket.local_ipv4_addr().unwrap();
match expected {
SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()),
SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()),
}
assert_eq!(expected.port(), socket.local_ipv4_addr().unwrap().port());
assert_eq!(expected.port(), socket.local_ipv6_addr().unwrap().port());
let socket = super::DualStackEpollSocket::new(expected.port()).unwrap();
match expected {
SocketAddr::V4(_) => assert_eq!(expected, socket.local_ipv4_addr().unwrap()),
SocketAddr::V6(_) => assert_eq!(expected, socket.local_ipv6_addr().unwrap()),
}
assert_eq!(addr.port(), socket.local_ipv4_addr().unwrap().port());
assert_eq!(addr.port(), socket.local_ipv6_addr().unwrap().port());
}
#[tokio::test]
async fn dual_stack_socket() {
let mut t = TestHelper::default();
let echo_addr = t.run_echo_server(AddressType::Random).await;
let (mut rx, socket) = t.open_socket_and_recv_multiple_packets().await;
let msg = "hello";
let addr = echo_addr.to_socket_addr().await.unwrap();
socket.send_to(msg.as_bytes(), &addr).await.unwrap();
assert_eq!(
msg,
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should not timeout")
.unwrap()
);
let ipv4_echo_addr = (Ipv4Addr::UNSPECIFIED, echo_addr.port).into();
let opp_addr: SocketAddr = match echo_addr.host {
AddressKind::Ip(ip) => match ip {
IpAddr::V4(_) => (Ipv6Addr::UNSPECIFIED, echo_addr.port).into(),
IpAddr::V6(_) => ipv4_echo_addr,
},
AddressKind::Name(_) => unreachable!(),
};
socket.send_to(msg.as_bytes(), &opp_addr).await.unwrap();
assert_eq!(
msg,
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should not timeout")
.unwrap()
);
let (mut rx, socket) = t.open_ipv4_socket_and_recv_multiple_packets().await;
socket
.send_to(msg.as_bytes(), &ipv4_echo_addr)
.await
.unwrap();
assert_eq!(
msg,
timeout(Duration::from_secs(5), rx.recv())
.await
.expect("should not timeout")
.unwrap()
);
}
}