|
|
|
@ -35,7 +35,6 @@ mod resolver;
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
mod varz;
|
|
|
|
|
|
|
|
|
|
use std::collections::vec_deque::VecDeque;
|
|
|
|
|
use std::convert::TryFrom;
|
|
|
|
|
use std::fs::File;
|
|
|
|
|
use std::io::prelude::*;
|
|
|
|
@ -67,6 +66,7 @@ use parking_lot::RwLock;
|
|
|
|
|
use privdrop::PrivDrop;
|
|
|
|
|
use rand::prelude::*;
|
|
|
|
|
use siphasher::sip128::SipHasher13;
|
|
|
|
|
use slabigator::Slab;
|
|
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
|
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
|
|
|
|
|
use tokio::runtime::Handle;
|
|
|
|
@ -286,14 +286,14 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
|
|
|
|
|
let active_connections = globals.tcp_active_connections.clone();
|
|
|
|
|
while let Ok((mut client_connection, _client_addr)) = tcp_listener.accept().await {
|
|
|
|
|
let (tx, rx) = oneshot::channel::<()>();
|
|
|
|
|
{
|
|
|
|
|
let tx_channel_index = {
|
|
|
|
|
let mut active_connections = active_connections.lock();
|
|
|
|
|
if active_connections.len() >= globals.tcp_max_active_connections as _ {
|
|
|
|
|
if active_connections.is_full() {
|
|
|
|
|
let tx_oldest = active_connections.pop_back().unwrap();
|
|
|
|
|
let _ = tx_oldest.send(());
|
|
|
|
|
}
|
|
|
|
|
active_connections.push_front(tx);
|
|
|
|
|
}
|
|
|
|
|
active_connections.push_front(tx)?
|
|
|
|
|
};
|
|
|
|
|
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
let varz = globals.varz.clone();
|
|
|
|
@ -304,6 +304,7 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
|
|
|
|
|
}
|
|
|
|
|
client_connection.set_nodelay(true)?;
|
|
|
|
|
let globals = globals.clone();
|
|
|
|
|
let active_connections = active_connections.clone();
|
|
|
|
|
let concurrent_connections = concurrent_connections.clone();
|
|
|
|
|
let fut = async {
|
|
|
|
|
let mut binlen = [0u8, 0];
|
|
|
|
@ -328,6 +329,8 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
|
|
|
|
|
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
varz.inflight_tcp_queries.set(_count.saturating_sub(1) as _);
|
|
|
|
|
let mut active_connections = active_connections.lock();
|
|
|
|
|
_ = active_connections.remove(tx_channel_index);
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
@ -356,14 +359,14 @@ async fn udp_acceptor(
|
|
|
|
|
client_addr,
|
|
|
|
|
});
|
|
|
|
|
let (tx, rx) = oneshot::channel::<()>();
|
|
|
|
|
{
|
|
|
|
|
let tx_channel_index = {
|
|
|
|
|
let mut active_connections = active_connections.lock();
|
|
|
|
|
if active_connections.len() >= globals.udp_max_active_connections as _ {
|
|
|
|
|
if active_connections.is_full() {
|
|
|
|
|
let tx_oldest = active_connections.pop_back().unwrap();
|
|
|
|
|
let _ = tx_oldest.send(());
|
|
|
|
|
}
|
|
|
|
|
active_connections.push_front(tx);
|
|
|
|
|
}
|
|
|
|
|
active_connections.push_front(tx)?
|
|
|
|
|
};
|
|
|
|
|
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
let varz = globals.varz.clone();
|
|
|
|
@ -373,6 +376,7 @@ async fn udp_acceptor(
|
|
|
|
|
varz.client_queries_udp.inc();
|
|
|
|
|
}
|
|
|
|
|
let globals = globals.clone();
|
|
|
|
|
let active_connections = active_connections.clone();
|
|
|
|
|
let concurrent_connections = concurrent_connections.clone();
|
|
|
|
|
let fut = handle_client_query(globals, client_ctx, packet);
|
|
|
|
|
let fut_abort = rx;
|
|
|
|
@ -381,6 +385,8 @@ async fn udp_acceptor(
|
|
|
|
|
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
|
|
|
|
|
#[cfg(feature = "metrics")]
|
|
|
|
|
varz.inflight_udp_queries.set(_count.saturating_sub(1) as _);
|
|
|
|
|
let mut active_connections = active_connections.lock();
|
|
|
|
|
_ = active_connections.remove(tx_channel_index);
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -717,12 +723,12 @@ fn main() -> Result<(), Error> {
|
|
|
|
|
tcp_concurrent_connections: Arc::new(AtomicU32::new(0)),
|
|
|
|
|
udp_max_active_connections: config.udp_max_active_connections,
|
|
|
|
|
tcp_max_active_connections: config.tcp_max_active_connections,
|
|
|
|
|
udp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
|
|
|
|
|
udp_active_connections: Arc::new(Mutex::new(Slab::with_capacity(
|
|
|
|
|
config.udp_max_active_connections as _,
|
|
|
|
|
))),
|
|
|
|
|
tcp_active_connections: Arc::new(Mutex::new(VecDeque::with_capacity(
|
|
|
|
|
)?)),
|
|
|
|
|
tcp_active_connections: Arc::new(Mutex::new(Slab::with_capacity(
|
|
|
|
|
config.tcp_max_active_connections as _,
|
|
|
|
|
))),
|
|
|
|
|
)?)),
|
|
|
|
|
key_cache_capacity,
|
|
|
|
|
hasher,
|
|
|
|
|
cache,
|
|
|
|
|