use std::{
collections::BTreeMap,
env,
net::SocketAddr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use futures::{AsyncBufReadExt, TryStreamExt};
use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{
ConfigMap, Container, EnvVar, Event, HTTPGetAction, Namespace, Pod, PodSpec,
PodTemplateSpec, Probe, ResourceRequirements, ServiceAccount, VolumeMount,
},
core::v1::{ContainerPort, Node},
rbac::{
v1::PolicyRule,
v1::{ClusterRole, RoleBinding, RoleRef, Subject},
},
},
apimachinery::pkg::{
api::resource::Quantity,
apis::meta::v1::{LabelSelector, ObjectMeta},
util::intstr::IntOrString,
},
chrono,
};
use kube::{
api::{DeleteParams, ListParams, LogParams, PostParams},
runtime::wait::{await_condition, Condition},
Api, Resource, ResourceExt,
};
use tokio::{sync::OnceCell, time::timeout};
use tracing::debug;
use quilkin::config::providers::k8s::agones::{
Fleet, FleetSpec, GameServer, GameServerPort, GameServerSpec, GameServerState,
GameServerTemplateSpec,
};
mod pod;
mod provider;
mod relay;
mod sidecar;
#[allow(dead_code)]
static CLIENT: OnceCell<Client> = OnceCell::const_new();
#[allow(dead_code)]
const IMAGE_TAG: &str = "IMAGE_TAG";
const DELETE_DELAY_SECONDS: &str = "DELETE_DELAY_SECONDS";
pub const GAMESERVER_IMAGE: &str =
"us-docker.pkg.dev/agones-images/examples/simple-game-server:0.16";
pub const TOKEN_KEY: &str = "quilkin.dev/tokens";
#[derive(Clone)]
pub struct Client {
pub kubernetes: kube::Client,
pub namespace: String,
pub quilkin_image: String,
}
impl Client {
pub async fn new() -> Client {
let mut client = CLIENT
.get_or_init(|| async {
let client = kube::Client::try_default()
.await
.expect("Kubernetes client to be created");
Client {
kubernetes: client.clone(),
namespace: setup_namespace(client).await,
quilkin_image: env::var(IMAGE_TAG).unwrap(),
}
})
.await
.clone();
client.kubernetes = kube::Client::try_default()
.await
.expect("Kubernetes client to be created");
client
}
pub fn namespaced_api<K: Resource<Scope = kube::core::NamespaceResourceScope>>(&self) -> Api<K>
where
<K as Resource>::DynamicType: Default,
{
Api::namespaced(self.kubernetes.clone(), self.namespace.as_str())
}
}
#[allow(dead_code)]
async fn setup_namespace(client: kube::Client) -> String {
let namespaces: Api<Namespace> = Api::all(client.clone());
let lp = ListParams::default().labels("owner=quilkin-test");
let nss = namespaces.list(&lp).await.unwrap();
let dp = DeleteParams::default();
let delay = env::var(DELETE_DELAY_SECONDS)
.ok()
.and_then(|s| s.parse::<i64>().ok())
.map(chrono::Duration::seconds);
for ns in nss {
let name = ns.name_unchecked();
let delete = delay
.and_then(|duration| {
let expiry = ns.creation_timestamp()?.0 + duration;
Some(chrono::Utc::now() > expiry)
})
.unwrap_or(true);
if delete {
if let Err(err) = namespaces.delete(name.as_str(), &dp).await {
println!("Failure attempting to deleted namespace: {:?}, {err}", name);
}
}
}
let name = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string();
let metadata = ObjectMeta {
name: Some(name),
labels: Some(BTreeMap::from([("owner".into(), "quilkin-test".into())])),
..Default::default()
};
let test_namespace = Namespace {
metadata,
spec: None,
status: None,
};
let pp = PostParams::default();
namespaces
.create(&pp, &test_namespace)
.await
.expect("namespace to be created");
add_agones_service_account(client, test_namespace.name_unchecked()).await;
test_namespace.name_unchecked()
}
async fn add_agones_service_account(client: kube::Client, namespace: String) {
let service_accounts: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace.as_str());
let role_bindings: Api<RoleBinding> = Api::namespaced(client, namespace.as_str());
let pp = PostParams::default();
let labels = BTreeMap::from([("app".to_string(), "agones".to_string())]);
let service_account = ServiceAccount {
metadata: ObjectMeta {
name: Some("agones-sdk".into()),
namespace: Some(namespace.clone()),
labels: Some(labels.clone()),
..Default::default()
},
..Default::default()
};
let service_account = service_accounts
.create(&pp, &service_account)
.await
.unwrap();
let role_binding = RoleBinding {
metadata: ObjectMeta {
name: Some("agones-sdk-access".into()),
namespace: Some(namespace.clone()),
labels: Some(labels),
..Default::default()
},
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "ClusterRole".into(),
name: "agones-sdk".into(),
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".into(),
name: service_account.name_unchecked(),
namespace: Some(namespace),
api_group: None,
}]),
};
let _ = role_bindings.create(&pp, &role_binding).await.unwrap();
}
pub async fn create_agones_rbac_read_account(
client: &Client,
service_accounts: Api<ServiceAccount>,
cluster_roles: Api<ClusterRole>,
role_bindings: Api<RoleBinding>,
) -> String {
let pp = PostParams::default();
let rbac_name = "quilkin-agones";
if service_accounts.get(rbac_name).await.is_ok() {
return rbac_name.into();
}
let rbac_meta = ObjectMeta {
name: Some(rbac_name.into()),
..Default::default()
};
let service_account = ServiceAccount {
metadata: rbac_meta.clone(),
..Default::default()
};
service_accounts
.create(&pp, &service_account)
.await
.unwrap();
match cluster_roles
.delete(rbac_name, &DeleteParams::default())
.await
{
Ok(_) => {}
Err(err) => println!("Cluster role not found: {err}"),
};
let cluster_role = ClusterRole {
metadata: rbac_meta.clone(),
rules: Some(vec![
PolicyRule {
api_groups: Some(vec!["agones.dev".into()]),
resources: Some(vec!["gameservers".into()]),
verbs: ["get", "list", "watch"].map(String::from).to_vec(),
..Default::default()
},
PolicyRule {
api_groups: Some(vec!["".into()]),
resources: Some(vec!["configmaps".into()]),
verbs: ["get", "list", "watch"].map(String::from).to_vec(),
..Default::default()
},
]),
..Default::default()
};
cluster_roles.create(&pp, &cluster_role).await.unwrap();
let binding = RoleBinding {
metadata: rbac_meta,
subjects: Some(vec![Subject {
kind: "User".into(),
name: format!("system:serviceaccount:{}:{rbac_name}", client.namespace),
api_group: Some("rbac.authorization.k8s.io".into()),
..Default::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "ClusterRole".into(),
name: rbac_name.into(),
},
};
role_bindings.create(&pp, &binding).await.unwrap();
rbac_name.into()
}
pub async fn quilkin_proxy_deployment(
client: &Client,
deployments: Api<Deployment>,
name: String,
host_port: u16,
management_server: String,
) -> SocketAddr {
let pp = PostParams::default();
let mut container = quilkin_container(
client,
Some(vec![
"proxy".into(),
format!("--management-server={management_server}"),
]),
None,
);
container.ports = Some(vec![ContainerPort {
container_port: 7777,
host_port: Some(host_port as i32),
protocol: Some("UDP".into()),
..Default::default()
}]);
let labels = BTreeMap::from([("role".to_string(), name.clone())]);
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(name),
labels: Some(labels.clone()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_expressions: None,
match_labels: Some(labels.clone()),
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels.clone()),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![container],
..Default::default()
}),
},
..Default::default()
}),
..Default::default()
};
let deployment = deployments.create(&pp, &deployment).await.unwrap();
let name = deployment.name_unchecked();
let result = timeout(
Duration::from_secs(3),
await_condition(deployments.clone(), name.as_str(), is_deployment_ready()),
)
.await;
assert!(result.is_err());
let pods = client.namespaced_api::<Pod>();
let list = pods
.list(&ListParams {
label_selector: Some(format!("role={name}")),
..Default::default()
})
.await
.unwrap();
assert_eq!(1, list.items.len());
let nodes: Api<Node> = Api::all(client.kubernetes.clone());
let name = list.items[0]
.spec
.as_ref()
.unwrap()
.node_name
.as_ref()
.unwrap();
let node = nodes.get(name.as_str()).await.unwrap();
let external_ip = node
.status
.unwrap()
.addresses
.unwrap()
.iter()
.find(|addr| addr.type_ == "ExternalIP")
.unwrap()
.address
.clone();
SocketAddr::new(external_ip.parse().unwrap(), host_port)
}
pub async fn create_tokenised_gameserver(
fleets: Api<Fleet>,
gameservers: Api<GameServer>,
token: &str,
) -> GameServer {
let pp = PostParams::default();
let fleet = fleet();
let fleet = fleets.create(&pp, &fleet).await.unwrap();
let name = fleet.name_unchecked();
timeout(
Duration::from_secs(30),
await_condition(fleets.clone(), name.as_str(), is_fleet_ready()),
)
.await
.expect("Fleet should be ready")
.unwrap();
let lp = ListParams {
label_selector: Some(format!("agones.dev/fleet={}", fleet.name_unchecked())),
..Default::default()
};
let list = gameservers.list(&lp).await.unwrap();
let mut gs = list.items[0].clone();
assert_eq!(3, token.as_bytes().len());
gs.metadata
.annotations
.get_or_insert(Default::default())
.insert(
TOKEN_KEY.into(),
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token),
);
gameservers
.replace(gs.name_unchecked().as_str(), &pp, &gs)
.await
.unwrap();
gs
}
pub fn game_server() -> GameServer {
let mut resources = BTreeMap::new();
resources.insert("cpu".into(), Quantity("30m".into()));
resources.insert("memory".into(), Quantity("32Mi".into()));
GameServer {
metadata: ObjectMeta {
generate_name: Some("gameserver-".into()),
..Default::default()
},
spec: GameServerSpec {
ports: vec![GameServerPort {
container_port: 7654,
host_port: None,
name: "udp-port".into(),
port_policy: Default::default(),
container: None,
protocol: Default::default(),
}],
template: PodTemplateSpec {
spec: Some(PodSpec {
containers: vec![Container {
name: "game-server".into(),
image: Some(GAMESERVER_IMAGE.into()),
resources: Some(ResourceRequirements {
limits: Some(resources.clone()),
requests: Some(resources),
claims: None,
}),
..Default::default()
}],
..Default::default()
}),
..Default::default()
},
..Default::default()
},
status: None,
}
}
pub fn fleet() -> Fleet {
let gs = game_server();
Fleet {
metadata: ObjectMeta {
generate_name: Some("fleet-".into()),
..Default::default()
},
spec: FleetSpec {
replicas: Some(3),
template: GameServerTemplateSpec {
metadata: None,
spec: gs.spec,
},
..Default::default()
},
status: None,
}
}
pub fn is_gameserver_ready() -> impl Condition<GameServer> {
|obj: Option<&GameServer>| {
obj.and_then(|gs| gs.status.clone())
.map(|status| matches!(status.state, GameServerState::Ready))
.unwrap_or(false)
}
}
pub fn is_pod_ready() -> impl Condition<Pod> {
|obj: Option<&Pod>| {
if let Some(pod) = obj {
return pod
.status
.as_ref()
.and_then(|status| status.conditions.as_ref())
.and_then(|conditions| {
conditions
.iter()
.find(|condition| condition.type_ == "Ready" && condition.status == "True")
})
.is_some();
}
false
}
}
pub fn is_deployment_ready() -> impl Condition<Deployment> {
|obj: Option<&Deployment>| {
if let Some(deployment) = obj {
let expected = deployment.spec.as_ref().unwrap().replicas.as_ref().unwrap();
return deployment
.status
.as_ref()
.and_then(|status| status.ready_replicas)
.map(|replicas| &replicas == expected)
.unwrap_or(false);
}
false
}
}
pub fn is_fleet_ready() -> impl Condition<Fleet> {
|obj: Option<&Fleet>| {
if let Some(fleet) = obj {
let expected = fleet.spec.replicas.as_ref().unwrap();
return fleet
.status
.as_ref()
.and_then(|status| status.ready_replicas)
.map(|replicas| &replicas == expected)
.unwrap_or(false);
}
false
}
}
pub fn quilkin_container(
client: &Client,
args: Option<Vec<String>>,
volume_mount: Option<String>,
) -> Container {
let mut container = Container {
name: "quilkin".into(),
image: Some(client.quilkin_image.clone()),
args,
env: Some(vec![EnvVar {
name: "RUST_LOG".to_string(),
value: Some("quilkin=trace".into()),
value_from: None,
}]),
liveness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
path: Some("/live".into()),
port: IntOrString::Int(8000),
..Default::default()
}),
initial_delay_seconds: Some(3),
period_seconds: Some(2),
..Default::default()
}),
readiness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
path: Some("/ready".into()),
port: IntOrString::Int(8000),
..Default::default()
}),
initial_delay_seconds: Some(3),
period_seconds: Some(1),
..Default::default()
}),
..Default::default()
};
if let Some(name) = volume_mount {
container.volume_mounts = Some(vec![VolumeMount {
name,
mount_path: "/etc/quilkin".into(),
..Default::default()
}])
};
container
}
pub fn quilkin_config_map(config: &str) -> ConfigMap {
ConfigMap {
metadata: ObjectMeta {
generate_name: Some("quilkin-config-".into()),
..Default::default()
},
data: Some(BTreeMap::from([(
"quilkin.yaml".to_string(),
config.to_string(),
)])),
..Default::default()
}
}
pub async fn create_token_router_config(config_maps: &Api<ConfigMap>) -> ConfigMap {
let pp = PostParams::default();
let config = r#"
version: v1alpha1
filters:
- name: quilkin.filters.capture.v1alpha1.Capture # Capture and remove the authentication token
config:
suffix:
size: 3
remove: true
- name: quilkin.filters.token_router.v1alpha1.TokenRouter
"#;
let mut config_map = quilkin_config_map(config);
config_map
.metadata
.labels
.get_or_insert(Default::default())
.insert("quilkin.dev/configmap".into(), "true".into());
config_maps.create(&pp, &config_map).await.unwrap()
}
pub fn gameserver_address(gs: &GameServer) -> String {
let status = gs.status.as_ref().unwrap();
let address = format!(
"{}:{}",
status.address,
status.ports.as_ref().unwrap()[0].port
);
address
}
pub async fn debug_pods(client: &Client, labels: String) {
debug!(labels, "🪓 Debug output for Selector");
let pods: Api<Pod> = client.namespaced_api();
let events: Api<Event> = client.namespaced_api();
let params = ListParams::default();
let event_list = events.list(¶ms).await.unwrap();
let pod_list = pods
.list(&ListParams {
label_selector: Some(labels),
..Default::default()
})
.await
.unwrap();
let params = LogParams::default();
for pod in pod_list {
let name = pod.name_unchecked();
let pod_events: Vec<&Event> = event_list
.iter()
.filter(|item| {
item.involved_object.kind == Some("Pod".into())
&& item.involved_object.name == Some(name.clone())
})
.collect();
debug!(pod = name, "🗓️ Pod Events");
for event in pod_events {
debug!(
pod = name,
type_ = event.type_,
reason = event.reason,
message = event.message,
count = event.count
);
}
debug!(pod = name, "📃 Pod Logs");
let mut logs = pods
.log_stream(name.as_str(), ¶ms)
.await
.unwrap()
.lines();
while let Some(line) = logs.try_next().await.unwrap() {
debug!(pod = name, line);
}
}
}