Frank Denis 2 years ago
parent 9374f340a7
commit a675fd04ef

@ -44,6 +44,7 @@ serde = "1.0.137"
serde_derive = "1.0.137"
serde-big-array = "0.4.1"
siphasher = "0.3.10"
slabigator = "0.1.0"
tokio = { version = "1.19.2", features = [
"net",
"io-std",

@ -1,4 +1,3 @@
use std::collections::vec_deque::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::AtomicU32;
@ -7,6 +6,7 @@ use std::time::Duration;
use parking_lot::{Mutex, RwLock};
use siphasher::sip128::SipHasher13;
use slabigator::Slab;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
@ -35,8 +35,8 @@ pub struct Globals {
pub tcp_concurrent_connections: Arc<AtomicU32>,
pub udp_max_active_connections: u32,
pub tcp_max_active_connections: u32,
pub udp_active_connections: Arc<Mutex<VecDeque<oneshot::Sender<()>>>>,
pub tcp_active_connections: Arc<Mutex<VecDeque<oneshot::Sender<()>>>>,
pub udp_active_connections: Arc<Mutex<Slab<oneshot::Sender<()>>>>,
pub tcp_active_connections: Arc<Mutex<Slab<oneshot::Sender<()>>>>,
pub key_cache_capacity: usize,
pub hasher: SipHasher13,
pub cache: Cache,

@ -35,7 +35,6 @@ mod resolver;
#[cfg(feature = "metrics")]
mod varz;
use std::collections::vec_deque::VecDeque;
use std::convert::TryFrom;
use std::fs::File;
use std::io::prelude::*;
@ -67,6 +66,7 @@ use parking_lot::RwLock;
use privdrop::PrivDrop;
use rand::prelude::*;
use siphasher::sip128::SipHasher13;
use slabigator::Slab;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::runtime::Handle;
@ -286,14 +286,14 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
let active_connections = globals.tcp_active_connections.clone();
while let Ok((mut client_connection, _client_addr)) = tcp_listener.accept().await {
let (tx, rx) = oneshot::channel::<()>();
{
let tx_channel_index = {
let mut active_connections = active_connections.lock();
if active_connections.len() >= globals.tcp_max_active_connections as _ {
if active_connections.is_full() {
let tx_oldest = active_connections.pop_back().unwrap();
let _ = tx_oldest.send(());
}
active_connections.push_front(tx);
}
active_connections.push_front(tx)?
};
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
let varz = globals.varz.clone();
@ -304,6 +304,7 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
}
client_connection.set_nodelay(true)?;
let globals = globals.clone();
let active_connections = active_connections.clone();
let concurrent_connections = concurrent_connections.clone();
let fut = async {
let mut binlen = [0u8, 0];
@ -328,6 +329,8 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _);
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index);
}));
}
Ok(())
@ -356,14 +359,14 @@ async fn udp_acceptor(
client_addr,
});
let (tx, rx) = oneshot::channel::<()>();
{
let tx_channel_index = {
let mut active_connections = active_connections.lock();
if active_connections.len() >= globals.udp_max_active_connections as _ {
if active_connections.is_full() {
let tx_oldest = active_connections.pop_back().unwrap();
let _ = tx_oldest.send(());
}
active_connections.push_front(tx);
}
active_connections.push_front(tx)?
};
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
let varz = globals.varz.clone();
@ -373,6 +376,7 @@ async fn udp_acceptor(
varz.client_queries_udp.inc();
}
let globals = globals.clone();
let active_connections = active_connections.clone();
let concurrent_connections = concurrent_connections.clone();
let fut = handle_client_query(globals, client_ctx, packet);
let fut_abort = rx;
@ -381,6 +385,8 @@ async fn udp_acceptor(
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
varz.inflight_udp_queries.set(_count.saturating_sub(1) as _);
let mut active_connections = active_connections.lock();
_ = active_connections.remove(tx_channel_index);
}));
}
}
@ -717,12 +723,12 @@ fn main() -> Result<(), Error> {
tcp_concurrent_connections: Arc::new(AtomicU32::new(0)),
udp_max_active_connections: config.udp_max_active_connections,
tcp_max_active_connections: config.tcp_max_active_connections,
udp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
udp_active_connections: Arc::new(Mutex::new(Slab::with_capacity(
config.udp_max_active_connections as _,
))),
tcp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
)?)),
tcp_active_connections: Arc::new(Mutex::new(Slab::with_capacity(
config.tcp_max_active_connections as _,
))),
)?)),
key_cache_capacity,
hasher,
cache,

Loading…
Cancel
Save