You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
11 KiB

use crate::{constants::SERVER_WATCHER_CAPACITY, data::ChangeKind, ConnectionId};
use log::*;
use notify::{
Config as WatcherConfig, Error as WatcherError, Event as WatcherEvent, RecommendedWatcher,
RecursiveMode, Watcher,
use std::{
path::{Path, PathBuf},
use tokio::{
mpsc::{self, error::TrySendError},
mod path;
pub use path::*;
/// Holds information related to watched paths on the server
pub struct WatcherState {
channel: WatcherChannel,
task: JoinHandle<()>,
impl Drop for WatcherState {
/// Aborts the task that handles watcher path operations and management
fn drop(&mut self) {
impl WatcherState {
/// Will create a watcher and initialize watched paths to be empty
pub fn initialize() -> io::Result<Self> {
// NOTE: Cannot be something small like 1 as this seems to cause a deadlock sometimes
// with a large volume of watch requests
let (tx, rx) = mpsc::channel(SERVER_WATCHER_CAPACITY);
let mut watcher = {
let tx = tx.clone();
notify::recommended_watcher(move |res| {
match tx.try_send(match res {
Ok(x) => InnerWatcherMsg::Event { ev: x },
Err(x) => InnerWatcherMsg::Error { err: x },
}) {
Ok(_) => (),
Err(TrySendError::Full(_)) => {
"Reached watcher capacity of {}! Dropping watcher event!",
Err(TrySendError::Closed(_)) => {
warn!("Skipping watch event because watcher channel closed");
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?
// Attempt to configure watcher, but don't fail if these configurations fail
match watcher.configure(WatcherConfig::PreciseEvents(true)) {
Ok(true) => debug!("Watcher configured for precise events"),
Ok(false) => debug!("Watcher not configured for precise events",),
Err(x) => error!("Watcher configuration for precise events failed: {}", x),
// Attempt to configure watcher, but don't fail if these configurations fail
match watcher.configure(WatcherConfig::NoticeEvents(true)) {
Ok(true) => debug!("Watcher configured for notice events"),
Ok(false) => debug!("Watcher not configured for notice events",),
Err(x) => error!("Watcher configuration for notice events failed: {}", x),
Ok(Self {
channel: WatcherChannel { tx },
task: tokio::spawn(watcher_task(watcher, rx)),
pub fn clone_channel(&self) -> WatcherChannel {
/// Aborts the watcher task
pub fn abort(&self) {
impl Deref for WatcherState {
type Target = WatcherChannel;
fn deref(&self) -> &Self::Target {
pub struct WatcherChannel {
tx: mpsc::Sender<InnerWatcherMsg>,
impl Default for WatcherChannel {
/// Creates a new channel that is closed by default
fn default() -> Self {
let (tx, _) = mpsc::channel(1);
Self { tx }
impl WatcherChannel {
/// Watch a path for a specific connection denoted by the id within the registered path
pub async fn watch(&self, registered_path: RegisteredPath) -> io::Result<()> {
let (cb, rx) = oneshot::channel();
.send(InnerWatcherMsg::Watch {
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Internal watcher task closed"))?;
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Response to watch dropped"))?
/// Unwatch a path for a specific connection denoted by the id
pub async fn unwatch(&self, id: ConnectionId, path: impl AsRef<Path>) -> io::Result<()> {
let (cb, rx) = oneshot::channel();
let path = tokio::fs::canonicalize(path.as_ref())
.unwrap_or_else(|_| path.as_ref().to_path_buf());
.send(InnerWatcherMsg::Unwatch { id, path, cb })
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Internal watcher task closed"))?;
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Response to unwatch dropped"))?
/// Internal message to pass to our task below to perform some action
enum InnerWatcherMsg {
Watch {
registered_path: RegisteredPath,
cb: oneshot::Sender<io::Result<()>>,
Unwatch {
id: ConnectionId,
path: PathBuf,
cb: oneshot::Sender<io::Result<()>>,
Event {
ev: WatcherEvent,
Error {
err: WatcherError,
async fn watcher_task(mut watcher: RecommendedWatcher, mut rx: mpsc::Receiver<InnerWatcherMsg>) {
// TODO: Optimize this in some way to be more performant than
// checking every path whenever an event comes in
let mut registered_paths: Vec<RegisteredPath> = Vec::new();
let mut path_cnt: HashMap<PathBuf, usize> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
InnerWatcherMsg::Watch {
} => {
// Check if we are tracking the path across any connection
if let Some(cnt) = path_cnt.get_mut(registered_path.path()) {
// Increment the count of times we are watching that path
*cnt += 1;
// Store the registered path in our collection without worry
// since we are already watching a path that impacts this one
// Send an okay because we always succeed in this case
let _ = cb.send(Ok(()));
} else {
let res = watcher
if registered_path.is_recursive() {
} else {
.map_err(|x| io::Error::new(io::ErrorKind::Other, x));
// If we succeeded, store our registered path and set the tracking cnt to 1
if res.is_ok() {
path_cnt.insert(registered_path.path().to_path_buf(), 1);
// Send the result of the watch, but don't worry if the channel was closed
let _ = cb.send(res);
InnerWatcherMsg::Unwatch { id, path, cb } => {
// Check if we are tracking the path across any connection
if let Some(cnt) = path_cnt.get(path.as_path()) {
// Cycle through and remove all paths that match the given id and path,
// capturing how many paths we removed
let removed_cnt = {
let old_len = registered_paths.len();
.retain(|p| != id || (p.path() != path && p.raw_path() != path));
let new_len = registered_paths.len();
old_len - new_len
// 1. If we are now at zero cnt for our path, we want to actually unwatch the
// path with our watcher
// 2. If we removed nothing from our path list, we want to return an error
// 3. Otherwise, we return okay because we succeeded
if *cnt <= removed_cnt {
let _ = cb.send(
.map_err(|x| io::Error::new(io::ErrorKind::Other, x)),
} else if removed_cnt == 0 {
// Send a failure as there was nothing to unwatch for this connection
let _ = cb.send(Err(io::Error::new(
format!("{:?} is not being watched", path),
} else {
// Send a success as we removed some paths
let _ = cb.send(Ok(()));
} else {
// Send a failure as there was nothing to unwatch
let _ = cb.send(Err(io::Error::new(
format!("{:?} is not being watched", path),
InnerWatcherMsg::Event { ev } => {
let kind = ChangeKind::from(ev.kind);
for registered_path in registered_paths.iter() {
match registered_path.filter_and_send(kind, &ev.paths).await {
Ok(_) => (),
Err(x) => error!(
"[Conn {}] Failed to forward changes to paths: {}",,
InnerWatcherMsg::Error { err } => {
let msg = err.to_string();
error!("Watcher encountered an error {} for {:?}", msg, err.paths);
for registered_path in registered_paths.iter() {
match registered_path
.filter_and_send_error(&msg, &err.paths, !err.paths.is_empty())
Ok(_) => (),
Err(x) => error!(
"[Conn {}] Failed to forward changes to paths: {}",,