From a675fd04ef9875324020e2730aa7ec796bd4c94f Mon Sep 17 00:00:00 2001 From: Frank Denis Date: Wed, 22 Jun 2022 19:47:30 +0200 Subject: [PATCH] Up --- Cargo.toml | 1 + src/globals.rs | 6 +++--- src/main.rs | 32 +++++++++++++++++++------------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 87f20e5..ce4a267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ serde = "1.0.137" serde_derive = "1.0.137" serde-big-array = "0.4.1" siphasher = "0.3.10" +slabigator = "0.1.0" tokio = { version = "1.19.2", features = [ "net", "io-std", diff --git a/src/globals.rs b/src/globals.rs index 68418ea..5e9cbbf 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -1,4 +1,3 @@ -use std::collections::vec_deque::VecDeque; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::atomic::AtomicU32; @@ -7,6 +6,7 @@ use std::time::Duration; use parking_lot::{Mutex, RwLock}; use siphasher::sip128::SipHasher13; +use slabigator::Slab; use tokio::runtime::Handle; use tokio::sync::oneshot; @@ -35,8 +35,8 @@ pub struct Globals { pub tcp_concurrent_connections: Arc, pub udp_max_active_connections: u32, pub tcp_max_active_connections: u32, - pub udp_active_connections: Arc>>>, - pub tcp_active_connections: Arc>>>, + pub udp_active_connections: Arc>>>, + pub tcp_active_connections: Arc>>>, pub key_cache_capacity: usize, pub hasher: SipHasher13, pub cache: Cache, diff --git a/src/main.rs b/src/main.rs index 03328f9..f5691eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,6 @@ mod resolver; #[cfg(feature = "metrics")] mod varz; -use std::collections::vec_deque::VecDeque; use std::convert::TryFrom; use std::fs::File; use std::io::prelude::*; @@ -67,6 +66,7 @@ use parking_lot::RwLock; use privdrop::PrivDrop; use rand::prelude::*; use siphasher::sip128::SipHasher13; +use slabigator::Slab; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; use tokio::runtime::Handle; @@ -286,14 +286,14 @@ async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Resul let active_connections = globals.tcp_active_connections.clone(); while let Ok((mut client_connection, _client_addr)) = tcp_listener.accept().await { let (tx, rx) = oneshot::channel::<()>(); - { + let tx_channel_index = { let mut active_connections = active_connections.lock(); - if active_connections.len() >= globals.tcp_max_active_connections as _ { + if active_connections.is_full() { let tx_oldest = active_connections.pop_back().unwrap(); let _ = tx_oldest.send(()); } - active_connections.push_front(tx); - } + active_connections.push_front(tx)? + }; let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed); #[cfg(feature = "metrics")] let varz = globals.varz.clone(); @@ -304,6 +304,7 @@ async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Resul } client_connection.set_nodelay(true)?; let globals = globals.clone(); + let active_connections = active_connections.clone(); let concurrent_connections = concurrent_connections.clone(); let fut = async { let mut binlen = [0u8, 0]; @@ -328,6 +329,8 @@ async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Resul let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); #[cfg(feature = "metrics")] varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _); + let mut active_connections = active_connections.lock(); + _ = active_connections.remove(tx_channel_index); })); } Ok(()) @@ -356,14 +359,14 @@ async fn udp_acceptor( client_addr, }); let (tx, rx) = oneshot::channel::<()>(); - { + let tx_channel_index = { let mut active_connections = active_connections.lock(); - if active_connections.len() >= globals.udp_max_active_connections as _ { + if active_connections.is_full() { let tx_oldest = active_connections.pop_back().unwrap(); let _ = tx_oldest.send(()); } - active_connections.push_front(tx); - } + active_connections.push_front(tx)? + }; let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed); #[cfg(feature = "metrics")] let varz = globals.varz.clone(); @@ -373,6 +376,7 @@ async fn udp_acceptor( varz.client_queries_udp.inc(); } let globals = globals.clone(); + let active_connections = active_connections.clone(); let concurrent_connections = concurrent_connections.clone(); let fut = handle_client_query(globals, client_ctx, packet); let fut_abort = rx; @@ -381,6 +385,8 @@ async fn udp_acceptor( let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed); #[cfg(feature = "metrics")] varz.inflight_udp_queries.set(_count.saturating_sub(1) as _); + let mut active_connections = active_connections.lock(); + _ = active_connections.remove(tx_channel_index); })); } } @@ -717,12 +723,12 @@ fn main() -> Result<(), Error> { tcp_concurrent_connections: Arc::new(AtomicU32::new(0)), udp_max_active_connections: config.udp_max_active_connections, tcp_max_active_connections: config.tcp_max_active_connections, - udp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity( + udp_active_connections: Arc::new(Mutex::new(Slab::with_capacity( config.udp_max_active_connections as _, - ))), - tcp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity( + )?)), + tcp_active_connections: Arc::new(Mutex::new(Slab::with_capacity( config.tcp_max_active_connections as _, - ))), + )?)), key_cache_capacity, hasher, cache,