1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
 * Copyright 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

use std::sync::Arc;

use notify::Watcher;
use tracing::Instrument;

use crate::Config;

pub async fn watch(
    config: Arc<Config>,
    path: impl Into<std::path::PathBuf>,
    locality: Option<crate::endpoint::Locality>,
) -> crate::Result<()> {
    let path = path.into();
    let span = tracing::info_span!("config_provider", path = %path.display());
    tracing::info!("discovering configuration through filesystem");
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    let mut watcher = notify::RecommendedWatcher::new(
        move |res| {
            tx.send(res).unwrap();
        },
        <_>::default(),
    )
    .unwrap();

    tracing::trace!("reading file");
    let buf = tokio::fs::read(&path).await?;
    tracing::info!("applying initial configuration");
    config.update_from_json(serde_yaml::from_slice(&buf)?, locality.clone())?;
    watcher.watch(&path, notify::RecursiveMode::Recursive)?;
    tracing::info!("watching file");

    while let Some(event) = rx.recv().instrument(span.clone()).await.transpose()? {
        tracing::trace!(event = ?event.kind, "new file event");

        if !matches!(
            event.kind,
            notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
        ) {
            continue;
        }

        for path in event.paths {
            // At least on macOS it's not always safe to
            // immediately read file after the change, a small
            // delay fixes that.
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
            tracing::info!(path = %path.display(), "file changed, updating config");
            let buf = tokio::fs::read(path).await?;
            config.update_from_json(serde_yaml::from_slice(&buf)?, locality.clone())?;
        }
    }

    Err(eyre::eyre!("filesystem watch unexpectedly stopped"))
}

#[cfg(test)]
mod tests {
    use super::*;
    use pretty_assertions::assert_eq;

    #[tokio::test]
    async fn basic() {
        let source = Arc::new(crate::Config::default());
        let dest = Arc::new(crate::Config::default());
        let tmp_dir = tempfile::tempdir().unwrap();
        let file_path = tmp_dir.into_path().join("config.yaml");
        tokio::fs::write(&file_path, serde_yaml::to_string(&source).unwrap())
            .await
            .unwrap();
        let _handle = tokio::spawn(watch(dest.clone(), file_path.clone(), None));
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        source.clusters.modify(|clusters| {
            clusters
                .default_cluster_mut()
                .insert(crate::endpoint::Endpoint::with_metadata(
                    (std::net::Ipv4Addr::LOCALHOST, 4321).into(),
                    crate::endpoint::Metadata {
                        tokens: <_>::from([Vec::from(*b"1x7ijy6")]),
                    },
                ));
        });

        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        tokio::fs::write(&file_path, serde_yaml::to_string(&source).unwrap())
            .await
            .unwrap();
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        assert_eq!(source, dest);
    }
}