1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
 * Copyright 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

use std::net::SocketAddr;
use tonic::transport::Endpoint;

#[cfg(doc)]
use crate::filters::FilterFactory;

use crate::ShutdownRx;

pub use crate::components::proxy::Ready;

define_port!(7777);

const QCMP_PORT: u16 = 7600;

/// Run Quilkin as a UDP reverse proxy.
#[derive(clap::Args, Clone, Debug)]
pub struct Proxy {
    /// One or more `quilkin manage` endpoints to listen to for config changes
    #[clap(short, long, env = "QUILKIN_MANAGEMENT_SERVER", conflicts_with("to"))]
    pub management_server: Vec<Endpoint>,
    /// The remote URL or local file path to retrieve the Maxmind database.
    #[clap(long, env)]
    pub mmdb: Option<crate::net::maxmind_db::Source>,
    /// The port to listen on.
    #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = PORT)]
    pub port: u16,
    /// The port to listen on.
    #[clap(short, long, env = "QUILKIN_QCMP_PORT", default_value_t = QCMP_PORT)]
    pub qcmp_port: u16,
    /// One or more socket addresses to forward packets to.
    #[clap(short, long, env = "QUILKIN_DEST")]
    pub to: Vec<SocketAddr>,
    /// The interval in seconds at which the relay will send a discovery request
    /// to an management server after receiving no updates.
    #[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS")]
    pub idle_request_interval_secs: Option<u64>,
    /// Number of worker threads used to process packets. If not specified defaults
    /// to number of cpus.
    #[clap(short, long, env = "QUILKIN_WORKERS")]
    pub workers: Option<std::num::NonZeroUsize>,
}

impl Default for Proxy {
    fn default() -> Self {
        Self {
            management_server: <_>::default(),
            mmdb: <_>::default(),
            port: PORT,
            qcmp_port: QCMP_PORT,
            to: <_>::default(),
            idle_request_interval_secs: None,
            workers: None,
        }
    }
}

impl Proxy {
    /// Start and run a proxy.
    #[tracing::instrument(skip_all)]
    pub async fn run(
        self,
        config: std::sync::Arc<crate::Config>,
        ready: Ready,
        initialized: Option<tokio::sync::oneshot::Sender<()>>,
        shutdown_rx: ShutdownRx,
    ) -> crate::Result<()> {
        tracing::info!(
            port = self.port,
            proxy_id = &*config.id.load(),
            "Starting proxy"
        );

        // The number of worker tasks to spawn. Each task gets a dedicated queue to
        // consume packets off.
        let num_workers = self.workers.unwrap_or_else(|| {
            std::num::NonZeroUsize::new(num_cpus::get())
                .expect("num_cpus returned 0, which should be impossible")
        });

        let socket = crate::net::raw_socket_with_reuse(self.port)?;
        let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
        let phoenix = crate::net::TcpListener::bind(Some(self.qcmp_port))?;

        crate::components::proxy::Proxy {
            management_servers: self.management_server,
            mmdb: self.mmdb,
            to: self.to,
            num_workers,
            socket,
            qcmp,
            phoenix,
        }
        .run(
            crate::components::RunArgs {
                config,
                ready,
                shutdown_rx,
            },
            initialized,
        )
        .await
    }
}