1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::net::SocketAddr;
#[cfg(doc)]
use crate::filters::FilterFactory;
#[derive(clap::Args, Clone)]
#[non_exhaustive]
pub struct Run {
#[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))]
pub management_server: Vec<String>,
#[clap(long, env)]
pub mmdb: Option<crate::maxmind_db::Source>,
#[clap(short, long, env = "QUILKIN_PORT")]
pub port: Option<u16>,
#[clap(short, long, env = "QUILKIN_DEST")]
pub to: Vec<SocketAddr>,
}
impl Run {
pub async fn run(
&self,
config: std::sync::Arc<crate::Config>,
shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
if let Some(port) = self.port {
config.port.store(port.into());
}
let _mmdb_task = self.mmdb.clone().map(|source| {
tokio::spawn(async move {
use crate::config::BACKOFF_INITIAL_DELAY_MILLISECONDS;
while let Err(error) =
tryhard::retry_fn(|| crate::MaxmindDb::update(source.clone()))
.retries(10)
.exponential_backoff(std::time::Duration::from_millis(
BACKOFF_INITIAL_DELAY_MILLISECONDS,
))
.await
{
tracing::warn!(%error, "error updating maxmind database");
}
})
});
if !self.to.is_empty() {
config.clusters.modify(|clusters| {
clusters.default_cluster_mut().localities = vec![self.to.clone().into()].into();
});
}
if !self.management_server.is_empty() {
config.management_servers.modify(|servers| {
*servers = self
.management_server
.iter()
.map(ToOwned::to_owned)
.map(|address| crate::config::ManagementServer { address })
.collect();
});
} else if config.clusters.load().endpoints().count() == 0
&& config.management_servers.load().is_empty()
{
return Err(eyre::eyre!(
"`quilkin run` requires at least one `to` address or `management_server` endpoint."
));
}
crate::Proxy::try_from(config)?.run(shutdown_rx).await
}
}