1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
use crate::discovery::{DeltaDiscoveryRequest, DeltaDiscoveryResponse, Resource};
use std::{
    collections::{BTreeSet, HashMap},
    sync::Arc,
    time::Duration,
};

pub(crate) const BACKOFF_INITIAL_DELAY: Duration = Duration::from_millis(500);
pub(crate) const BACKOFF_MAX_DELAY: Duration = Duration::from_secs(30);
pub(crate) const BACKOFF_MAX_JITTER: Duration = Duration::from_millis(2000);
pub(crate) const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);

/// Returns the configured maximum allowed message size for gRPC messages.
/// When using State Of The World xDS, the message size can get large enough
/// that it can exceed the default limits.
pub fn max_grpc_message_size() -> usize {
    std::env::var("QUILKIN_MAX_GRPC_MESSAGE_SIZE")
        .as_deref()
        .ok()
        .and_then(|var| var.parse().ok())
        .unwrap_or(256 * 1024 * 1024)
}

/// Keeps tracking of the local versions of each resource sent from the management
/// server, allowing reconnections to the same/new management servers to send initial
/// versions to reduce the initial response size
pub struct LocalVersions {
    versions: Vec<(&'static str, parking_lot::Mutex<HashMap<String, String>>)>,
}

impl LocalVersions {
    #[inline]
    pub fn new(types: impl Iterator<Item = &'static str>) -> Self {
        Self {
            versions: types.map(|ty| (ty, Default::default())).collect(),
        }
    }

    #[inline]
    pub fn get(&self, ty: &str) -> parking_lot::MutexGuard<'_, HashMap<String, String>> {
        self.versions
            .iter()
            .find_map(|(t, hm)| (*t == ty).then_some(hm))
            .unwrap()
            .lock()
    }
}

pub type VersionMap = HashMap<String, String>;

pub struct ClientState {
    pub resource_type: String,
    pub versions: VersionMap,
    pub subscribed: BTreeSet<String>,
}

impl ClientState {
    pub fn new(resource_type: String) -> Self {
        Self {
            resource_type,
            versions: Default::default(),
            subscribed: Default::default(),
        }
    }

    pub fn reset(&mut self, versions: VersionMap) {
        let _ = std::mem::replace(&mut self.versions, versions);
        self.subscribed.clear();
    }

    pub fn update(&mut self, req: DeltaDiscoveryRequest) {
        // If the request has filled out the initial_versions field, it means the connected management servers has
        // already had a connection with a control plane, so hard reset our state to what it says it has
        if !req.initial_resource_versions.is_empty() {
            self.reset(req.initial_resource_versions);
        }

        // From the spec:
        // A resource_names_subscribe field may contain resource names that
        // the server believes the client is already subscribed to, and
        // furthermore has the most recent versions of. However, the server
        // must still provide those resources in the response; due to
        // implementation details hidden from the server, the client may
        // have “forgotten” those resources despite apparently remaining subscribed.
        if !req.resource_names_subscribe.is_empty() {
            for sub in req.resource_names_subscribe {
                self.versions.remove(&sub);
                self.subscribed.insert(sub);
            }
        }

        if !req.resource_names_unsubscribe.is_empty() {
            for sub in req.resource_names_unsubscribe {
                self.versions.remove(&sub);
                self.subscribed.remove(&sub);
            }
        }
    }

    pub fn version_matches(&self, key: &str, value: &str) -> bool {
        self.versions
            .get(key)
            .map(|v| *v == value)
            .unwrap_or_default()
    }
}

pub struct AwaitingAck {
    pub type_url: String,
    pub versions: HashMap<String, String>,
    pub removed: std::collections::HashSet<String>,
}

pub struct ClientTracker {
    pub node_id: String,
    ack_map: HashMap<uuid::Uuid, AwaitingAck>,
    states: HashMap<String, ClientState>,
}

impl ClientTracker {
    pub fn track_client(node_id: String) -> Self {
        Self {
            node_id,
            ack_map: Default::default(),
            states: Default::default(),
        }
    }

    pub fn get_state(&mut self, rt: &str) -> Option<&mut ClientState> {
        self.states.get_mut(rt)
    }

    pub fn track_state(&mut self, rt: String) -> &mut ClientState {
        self.states
            .entry(rt.clone())
            .or_insert_with(|| ClientState::new(rt))
    }

    pub fn tracked_resources(&self) -> impl Iterator<Item = String> + '_ {
        self.states.keys().cloned()
    }

    pub fn needs_ack(&mut self, ack: AwaitingAck) -> eyre::Result<uuid::Uuid> {
        // Validate that no items are both updated/added and removed
        for rem in &ack.removed {
            eyre::ensure!(
                !ack.versions.contains_key(rem),
                "{rem} is both in the removed list and version map"
            );
        }

        let uuid = uuid::Uuid::new_v4();
        self.ack_map.insert(uuid, ack);
        Ok(uuid)
    }

    pub fn apply_ack(&mut self, uuid: uuid::Uuid) -> eyre::Result<()> {
        let Some(ack_state) = self.ack_map.remove(&uuid) else {
            eyre::bail!("unknown nonce");
        };
        let Some(cs) = self.get_state(&ack_state.type_url) else {
            eyre::bail!("unknown type url");
        };

        for removed in ack_state.removed {
            cs.subscribed.remove(&removed);
            cs.versions.remove(&removed);
        }

        cs.versions.extend(ack_state.versions);
        Ok(())
    }
}

pub trait Configuration: Send + Sync + Sized + 'static {
    fn identifier(&self) -> String;

    fn apply_delta(
        &self,
        resource_type: &str,
        resources: Vec<Resource>,
        removed_resources: &[String],
        remote_addr: Option<std::net::SocketAddr>,
    ) -> crate::Result<()>;

    fn allow_request_processing(&self, resource_type: &str) -> bool;

    fn delta_discovery_request(
        &self,
        client_state: &ClientState,
    ) -> crate::Result<DeltaDiscoveryRes>;

    fn on_changed(
        &self,
        subscribed: crate::server::ControlPlane<Self>,
    ) -> impl std::future::Future<Output = ()> + Send + 'static;

    fn interested_resources(&self) -> impl Iterator<Item = (&'static str, Vec<String>)>;
}

pub struct DeltaDiscoveryRes {
    pub resources: Vec<crate::discovery::Resource>,
    pub removed: std::collections::HashSet<String>,
}

/// Processes responses from management servers, applying resources to the proxy
#[tracing::instrument(skip_all, fields(identifier))]
pub fn handle_delta_discovery_responses<C: Configuration>(
    identifier: String,
    stream: impl futures::Stream<Item = tonic::Result<DeltaDiscoveryResponse>> + 'static + Send,
    config: Arc<C>,
    local: Arc<LocalVersions>,
    remote_addr: Option<std::net::SocketAddr>,
    mut notifier: Option<tokio::sync::mpsc::UnboundedSender<String>>,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Result<DeltaDiscoveryRequest>> + Send>> {
    Box::pin(async_stream::try_stream! {
        let _stream_metrics = crate::metrics::StreamConnectionMetrics::new(identifier.clone());
        tracing::trace!("awaiting delta response");
        for await response in stream
        {
            let response = match response {
                Ok(response) => response,
                Err(error) => {
                    tracing::warn!(%error, "Error from xDS server");
                    break;
                }
            };

            if response.type_url == "ignore-me" {
                continue;
            }

            let control_plane_identifier = response.control_plane.as_ref().map(|cp| cp.identifier.as_str()).unwrap_or_default();

            crate::metrics::delta_discovery_responses(control_plane_identifier, &response.type_url).inc();
            tracing::trace!(
                version = &*response.system_version_info,
                r#type = &*response.type_url,
                nonce = &*response.nonce,
                "received delta response"
            );

            let type_url = response.type_url;

            let result = {
                tracing::trace!(num_resources = response.resources.len(), kind = type_url, "applying delta resources");

                let version_map: Vec<_> = response.resources.iter().map(|res| (res.name.clone(), res.version.clone())).collect();

                let res = config.apply_delta(&type_url, response.resources, &response.removed_resources, remote_addr);

                if res.is_ok() {
                    let mut lock = local.get(&type_url);

                    // Remove any resources the upstream server has removed/doesn't have,
                    // we do this before applying any new/updated resources in case a
                    // resource is in both lists, though really that would be a bug in
                    // the upstream server
                    for removed in response.removed_resources {
                        lock.remove(&removed);
                    }

                    for (k, v) in version_map {
                        lock.insert(k, v);
                    }
                }

                res
            };

            if let Some(note) = &notifier {
                if note.send(type_url.clone()).is_err() {
                    notifier = None;
                }
            }

            let error_detail = if let Err(error) = result {
                crate::metrics::nacks(control_plane_identifier, &type_url).inc();
                Some(quilkin_proto::generated::google::rpc::Status {
                    code: 3,
                    message: error.to_string(),
                    ..Default::default()
                })
            } else {
                crate::metrics::acks(control_plane_identifier, &type_url).inc();
                None
            };

            yield DeltaDiscoveryRequest {
                type_url,
                response_nonce: response.nonce,
                error_detail,
                ..Default::default()
            }
        }
    })
}