Merge branch 'metrics'

* metrics:
  Some initial metrics
  Prometheus metrics
  Add metrics
pull/5/head
Frank Denis 5 years ago
commit 47890e7e5a

@ -37,6 +37,20 @@ tokio = "=0.2.0-alpha.6"
tokio-net = "=0.2.0-alpha.6"
toml = "0.5.3"
[dependencies.hyper]
optional = true
version = "0.13.0-alpha.3"
default_features = false
[dependencies.prometheus]
optional = true
version = "0.7.0"
default_features = false
[features]
default = ["metrics"]
metrics = ["hyper", "prometheus"]
[profile.release]
codegen-units = 1
incremental = false

@ -166,3 +166,17 @@ key_cache_capacity = 10000
[filtering]
# domain_blacklist = "/etc/domain_blacklist.txt"
#######################################
# Server-side filtering #
#######################################
[metrics]
type = "prometheus"
listen_addr = "0.0.0.0:9100"
path = "/metrics"

@ -9,7 +9,15 @@ use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
use tokio::prelude::*;
#[derive(Serialize, Deserialize, Debug)]
#[cfg(feature = "metrics")]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetricsConfig {
pub r#type: String,
pub listen_addr: SocketAddr,
pub path: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DNSCryptConfig {
pub provider_name: String,
pub key_cache_capacity: usize,
@ -18,23 +26,23 @@ pub struct DNSCryptConfig {
pub no_logs: bool,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TLSConfig {
pub upstream_addr: Option<SocketAddr>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListenAddrConfig {
pub local: SocketAddr,
pub external: SocketAddr,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FilteringConfig {
pub domain_blacklist: Option<PathBuf>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Config {
pub listen_addrs: Vec<ListenAddrConfig>,
pub external_addr: IpAddr,
@ -57,6 +65,8 @@ pub struct Config {
pub daemonize: bool,
pub pid_file: Option<PathBuf>,
pub log_file: Option<PathBuf>,
#[cfg(feature = "metrics")]
pub metrics: Option<MetricsConfig>,
}
impl Config {

@ -2,6 +2,8 @@ use crate::blacklist::*;
use crate::cache::*;
use crate::crypto::*;
use crate::dnscrypt_certs::*;
#[cfg(feature = "metrics")]
use crate::varz::*;
use parking_lot::{Mutex, RwLock};
use siphasher::sip128::SipHasher13;
@ -14,7 +16,8 @@ use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
#[derive(Debug, Clone)]
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct Globals {
pub runtime: Arc<Runtime>,
pub state_file: PathBuf,
@ -37,4 +40,7 @@ pub struct Globals {
pub hasher: SipHasher13,
pub cache: Cache,
pub blacklist: Option<BlackList>,
#[cfg(feature = "metrics")]
#[derivative(Debug = "ignore")]
pub varz: Varz,
}

@ -18,6 +18,9 @@ extern crate log;
extern crate serde_derive;
#[macro_use]
extern crate serde_big_array;
#[cfg(feature = "metrics")]
#[macro_use]
extern crate prometheus;
mod blacklist;
mod cache;
@ -28,7 +31,11 @@ mod dnscrypt;
mod dnscrypt_certs;
mod errors;
mod globals;
#[cfg(feature = "metrics")]
mod metrics;
mod resolver;
#[cfg(feature = "metrics")]
mod varz;
use blacklist::*;
use cache::*;
@ -39,6 +46,8 @@ use dnscrypt::*;
use dnscrypt_certs::*;
use errors::*;
use globals::*;
#[cfg(feature = "metrics")]
use varz::*;
use byteorder::{BigEndian, ByteOrder};
use clap::Arg;
@ -245,7 +254,15 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
}
active_connections.push_front(tx);
}
concurrent_connections.fetch_add(1, Ordering::Relaxed);
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
let varz = globals.varz.clone();
#[cfg(feature = "metrics")]
{
varz.inflight_tcp_queries
.set(_count.saturating_add(1) as f64);
varz.client_queries_tcp.inc();
}
client_connection.set_nodelay(true)?;
let globals = globals.clone();
let concurrent_connections = concurrent_connections.clone();
@ -269,7 +286,10 @@ async fn tcp_acceptor(globals: Arc<Globals>, tcp_listener: TcpListener) -> Resul
let fut_abort = rx;
let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout);
runtime.spawn(fut_all.map(move |_| {
concurrent_connections.fetch_sub(1, Ordering::Relaxed);
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
varz.inflight_tcp_queries
.set(_count.saturating_sub(1) as f64);
}));
}
Ok(())
@ -306,14 +326,25 @@ async fn udp_acceptor(
}
active_connections.push_front(tx);
}
concurrent_connections.fetch_add(1, Ordering::Relaxed);
let _count = concurrent_connections.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
let varz = globals.varz.clone();
#[cfg(feature = "metrics")]
{
varz.inflight_udp_queries
.set(_count.saturating_add(1) as f64);
varz.client_queries_udp.inc();
}
let globals = globals.clone();
let concurrent_connections = concurrent_connections.clone();
let fut = handle_client_query(globals, client_ctx, packet);
let fut_abort = rx;
let fut_all = future::select(fut.boxed(), fut_abort).timeout(timeout);
runtime.spawn(fut_all.map(move |_| {
concurrent_connections.fetch_sub(1, Ordering::Relaxed);
let _count = concurrent_connections.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
varz.inflight_udp_queries
.set(_count.saturating_sub(1) as f64);
}));
}
}
@ -559,11 +590,30 @@ fn main() -> Result<(), Error> {
hasher,
cache,
blacklist,
#[cfg(feature = "metrics")]
varz: Varz::default(),
});
let updater = DNSCryptEncryptionParamsUpdater::new(globals.clone());
if !state_is_new {
updater.update();
}
#[cfg(feature = "metrics")]
{
if let Some(metrics_config) = config.metrics {
runtime.spawn(
metrics::prometheus_service(
globals.varz.clone(),
metrics_config.clone(),
runtime.clone(),
)
.map_err(|e| {
error!("Unable to start the metrics service: [{}]", e);
std::process::exit(1);
})
.map(|_| ()),
);
}
}
runtime.spawn(
start(globals, runtime.clone(), listeners)
.map_err(|e| {
@ -573,6 +623,5 @@ fn main() -> Result<(), Error> {
.map(|_| ()),
);
runtime.block_on(updater.run());
Ok(())
}

@ -0,0 +1,57 @@
use crate::config::*;
use crate::errors::*;
use crate::varz::*;
use futures::prelude::*;
use hyper::header::CONTENT_TYPE;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response, StatusCode};
use prometheus::{self, Encoder, TextEncoder};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
async fn handle_client_connection(
req: Request<Body>,
varz: Varz,
path: Arc<String>,
) -> Result<Response<Body>, Error> {
let mut buffer = vec![];
if req.uri().path() != path.as_str() {
let response = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())?;
return Ok(response);
}
let StartInstant(start_instant) = varz.start_instant;
let uptime = start_instant.elapsed().as_secs();
varz.uptime.set(uptime as f64);
let client_queries = varz.client_queries_udp.get() + varz.client_queries_tcp.get();
varz.client_queries.set(client_queries);
let metric_families = prometheus::gather();
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?;
let response = Response::builder()
.header(CONTENT_TYPE, encoder.format_type())
.body(buffer.into())?;
Ok(response)
}
pub async fn prometheus_service(
varz: Varz,
metrics_config: MetricsConfig,
runtime: Arc<Runtime>,
) -> Result<(), Error> {
let path = Arc::new(metrics_config.path);
let mut stream = TcpListener::bind(metrics_config.listen_addr).await?;
loop {
let (client, _client_addr) = stream.accept().await?;
let path = path.clone();
let varz = varz.clone();
let service =
service_fn(move |req| handle_client_connection(req, varz.clone(), path.clone()));
let connection = Http::new().serve_connection(client, service);
runtime.spawn(connection.map(|_| {}));
}
}

@ -0,0 +1,184 @@
use coarsetime::Instant;
use prometheus::{Counter, Gauge, Histogram};
use std::sync::Arc;
pub struct StartInstant(pub Instant);
pub struct Inner {
pub start_instant: StartInstant,
pub uptime: Gauge,
pub cache_frequent_len: Gauge,
pub cache_recent_len: Gauge,
pub cache_test_len: Gauge,
pub cache_inserted: Gauge,
pub cache_evicted: Gauge,
pub client_queries: Gauge,
pub client_queries_udp: Counter,
pub client_queries_tcp: Counter,
pub client_queries_cached: Counter,
pub client_queries_expired: Counter,
pub client_queries_offline: Counter,
pub client_queries_errors: Counter,
pub inflight_udp_queries: Gauge,
pub inflight_tcp_queries: Gauge,
pub upstream_errors: Counter,
pub upstream_sent: Counter,
pub upstream_received: Counter,
pub upstream_timeout: Counter,
pub upstream_response_sizes: Histogram,
}
pub type Varz = Arc<Inner>;
pub fn new() -> Varz {
Arc::new(Inner::new())
}
impl Inner {
pub fn new() -> Inner {
Inner {
start_instant: StartInstant::default(),
uptime: register_gauge!(opts!(
"encrypted_dns_uptime",
"Uptime",
labels! {"handler" => "all",}
))
.unwrap(),
cache_frequent_len: register_gauge!(opts!(
"encrypted_dns_cache_frequent_len",
"Number of entries in the cached set of \
frequent items",
labels! {"handler" => "all",}
))
.unwrap(),
cache_recent_len: register_gauge!(opts!(
"encrypted_dns_cache_recent_len",
"Number of entries in the cached set of \
recent items",
labels! {"handler" => "all",}
))
.unwrap(),
cache_test_len: register_gauge!(opts!(
"encrypted_dns_cache_test_len",
"Number of entries in the cached set of \
staged items",
labels! {"handler" => "all",}
))
.unwrap(),
cache_inserted: register_gauge!(opts!(
"encrypted_dns_cache_inserted",
"Number of entries added to the cache",
labels! {"handler" => "all",}
))
.unwrap(),
cache_evicted: register_gauge!(opts!(
"encrypted_dns_cache_evicted",
"Number of entries evicted from the cache",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries: register_gauge!(opts!(
"encrypted_dns_client_queries",
"Number of client queries received",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_udp: register_counter!(opts!(
"encrypted_dns_client_queries_udp",
"Number of client queries received \
using UDP",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_tcp: register_counter!(opts!(
"encrypted_dns_client_queries_tcp",
"Number of client queries received \
using TCP",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_cached: register_counter!(opts!(
"encrypted_dns_client_queries_cached",
"Number of client queries sent from \
the cache",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_expired: register_counter!(opts!(
"encrypted_dns_client_queries_expired",
"Number of expired client queries",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_offline: register_counter!(opts!(
"encrypted_dns_client_queries_offline",
"Number of client queries answered \
while upstream resolvers are \
unresponsive",
labels! {"handler" => "all",}
))
.unwrap(),
client_queries_errors: register_counter!(opts!(
"encrypted_dns_client_queries_errors",
"Number of bogus client queries",
labels! {"handler" => "all",}
))
.unwrap(),
inflight_udp_queries: register_gauge!(opts!(
"encrypted_dns_inflight_udp_queries",
"Number of UDP queries currently waiting for a response",
labels! {"handler" => "all",}
))
.unwrap(),
inflight_tcp_queries: register_gauge!(opts!(
"encrypted_dns_inflight_tcp_queries",
"Number of TCP queries currently waiting for a response",
labels! {"handler" => "all",}
))
.unwrap(),
upstream_errors: register_counter!(opts!(
"encrypted_dns_upstream_errors",
"Number of bogus upstream servers responses",
labels! {"handler" => "all",}
))
.unwrap(),
upstream_sent: register_counter!(opts!(
"encrypted_dns_upstream_sent",
"Number of upstream servers queries sent",
labels! {"handler" => "all",}
))
.unwrap(),
upstream_received: register_counter!(opts!(
"encrypted_dns_upstream_received",
"Number of upstream servers responses received",
labels! {"handler" => "all",}
))
.unwrap(),
upstream_timeout: register_counter!(opts!(
"encrypted_dns_upstream_timeout",
"Number of upstream servers responses \
having timed out",
labels! {"handler" => "all",}
))
.unwrap(),
upstream_response_sizes: register_histogram!(histogram_opts!(
"encrypted_dns_upstream_response_sizes",
"Response size in bytes",
vec![64.0, 128.0, 192.0, 256.0, 512.0, 1024.0, 2048.0]
))
.unwrap(),
}
}
}
impl Default for Inner {
fn default() -> Self {
Self::new()
}
}
impl Default for StartInstant {
fn default() -> StartInstant {
StartInstant(Instant::now())
}
}
Loading…
Cancel
Save