Writing Custom Filters

Quilkin provides an extensible implementation of Filters that allows us to plug in custom implementations to fit our needs. This document provides an overview of the API and how we can go about writing our own Filters.

API Components

The following components make up Quilkin's implementation of filters.

Filter

A trait representing an actual Filter instance in the pipeline.

  • An implementation provides a read and a write method.
  • Both methods are invoked by the proxy when it consults the filter chain - their arguments contain information about the packet being processed.
  • read is invoked when a packet is received on the local downstream port and is to be sent to an upstream endpoint while write is invoked in the opposite direction when a packet is received from an upstream endpoint and is to be sent to a downstream client.

FilterFactory

A trait representing a type that knows how to create instances of a particular type of Filter.

  • An implementation provides a name and create_filter method.
  • create_filter takes in configuration for the filter to create and returns a FilterInstance type containing a new instance of its filter type.
    name returns the Filter name - a unique identifier of filters of the created type (e.g quilkin.extensions.filters.debug.v1alpha1.Debug).

FilterRegistry

A struct representing the set of all filter types known to the proxy. It contains all known implementations of FilterFactory, each identified by their name.

These components come together to form the filter chain.

Note that when using dynamic configuration, the process repeats in a similar manner - new filter instances are created according to the updated filter configuration and a new filter chain is re-created while the old one is dropped.

Creating Custom Filters

To extend Quilkin's code with our own custom filter, we need to do the following:

  1. Import the Quilkin crate.
  2. Implement the Filter trait with our custom logic, as well as a FilterFactory that knows how to create instances of the Filter implementation.
  3. Start the proxy with the custom FilterFactory implementation.

The full source code used in this example can be found here

1. Import the Quilkin crate

# Start with a new crate
cargo new --bin quilkin-filter-example

Add Quilkin as a dependency in Cargo.toml.

[dependencies]
quilkin = "0.2.0"

2. Implement the filter traits

It's not terribly important what the filter in this example does so let's write a Greet filter that appends Hello to every packet in one direction and Goodbye to packets in the opposite direction.

We start with the Filter implementation

#![allow(unused)]
fn main() {

// src/main.rs
use quilkin::filters::prelude::*;
 
struct Greet;

impl Filter for Greet {
    fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
        ctx.contents.splice(0..0, String::from("Hello ").into_bytes());
        Some(ctx.into())
    }
    fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
        ctx.contents.splice(0..0, String::from("Goodbye ").into_bytes());
        Some(ctx.into())
    }
}
}

Next, we implement a FilterFactory for it and give it a name:

#![allow(unused)]
fn main() {

struct Greet;
impl Filter for Greet {}
use quilkin::filters::Filter;
// src/main.rs
use quilkin::filters::prelude::*;

pub const NAME: &str = "greet.v1";

pub fn factory() -> DynFilterFactory {
    Box::from(GreetFilterFactory)
}

struct GreetFilterFactory;
impl FilterFactory for GreetFilterFactory {
    fn name(&self) -> &'static str {
        NAME
    }
    fn create_filter(&self, _: CreateFilterArgs) -> Result<FilterInstance, Error> {
        let filter: Box<dyn Filter> = Box::new(Greet);
        Ok(FilterInstance::new(serde_json::Value::Null, filter))
    }
}
}

3. Start the proxy

We can run the proxy in the exact manner as the default Quilkin binary using the run function, passing in our custom FilterFactory. Let's add a main function that does that. Quilkin relies on the Tokio async runtime, so we need to import that crate and wrap our main function with it.

Add Tokio as a dependency in Cargo.toml.

[dependencies]
quilkin = "0.2.0"
tokio = { version = "1", features = ["full"]}

Add a main function that starts the proxy.

// src/main.rs
#[tokio::main]
async fn main() {
    quilkin::run(vec![self::factory()].into_iter())
        .await
        .unwrap();
}

Now, let's try out the proxy. The following configuration starts our extended version of the proxy at port 7001 and forwards all packets to an upstream server at port 4321.

# config.yaml
version: v1alpha1
proxy:
  port: 7001
static:
  filters:
  - name: greet.v1
  endpoints:
  - address: 127.0.0.1:4321
  • Start the proxy

    cargo run -- -c config.yaml
    
  • Start a UDP listening server on the configured port

    nc -lu 127.0.0.1 4321
    
  • Start an interactive UDP client that sends packet to the proxy

    nc -u 127.0.0.1 7001
    

Whatever we pass to the client should now show up with our modification on the listening server's standard output. For example typing Quilkin in the client prints Hello Quilkin on the server.

4. Working with Filter Configuration

Let's extend the Greet filter to require a configuration that contains what greeting to use.

The Serde crate is used to describe static YAML configuration in code while Prost is used to describe dynamic configuration as Protobuf messages when talking to the management server.

Static Configuration

First let's create the config for our static configuration:

1. Add the yaml parsing crates to Cargo.toml:
  [dependencies]
  # ...
  serde = "1.0"
  serde_yaml = "0.8"
2. Define a struct representing the config:
// src/main.rs
#[derive(Serialize, Deserialize, Debug)]
struct Config {
    greeting: String,
}
3. Update the Greet Filter to take in greeting as a parameter:
// src/main.rs
struct Greet(String);

impl Filter for Greet {
    fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
        ctx.contents
            .splice(0..0, format!("{} ", self.0).into_bytes());
        Some(ctx.into())
    }
    fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
        ctx.contents
            .splice(0..0, format!("{} ", self.0).into_bytes());
        Some(ctx.into())
    }
}
4. Finally, update GreetFilterFactory to extract the greeting from the passed in configuration and forward it onto the Greet Filter.
// src/main.rs
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
    greeting: String,
}
use quilkin::filters::prelude::*;
struct Greet(String);
impl Filter for Greet { }
use quilkin::config::ConfigType;

pub const NAME: &str = "greet.v1";

pub fn factory() -> DynFilterFactory {
    Box::from(GreetFilterFactory)
}

struct GreetFilterFactory;
impl FilterFactory for GreetFilterFactory {
    fn name(&self) -> &'static str {
        NAME
    }
    fn create_filter(&self, args: CreateFilterArgs) -> Result<FilterInstance, Error> {
        let config = match args.config.unwrap() {
          ConfigType::Static(config) => {
              serde_yaml::from_str::<Config>(serde_yaml::to_string(config).unwrap().as_str())
                .unwrap()
          }
          ConfigType::Dynamic(_) => unimplemented!("dynamic config is not yet supported for this filter"),
        };
        let filter: Box<dyn Filter> = Box::new(Greet(config.greeting));
        Ok(FilterInstance::new(serde_json::Value::Null, filter))
    }
}

And with these changes we have wired up static configuration for our filter. Try it out with the following config.yaml:

# config.yaml
version: v1alpha1
proxy:
  port: 7001
static:
  filters:
  - name: greet.v1
    config:
      greeting: Hey
  endpoints:
  - address: 127.0.0.1:4321
Dynamic Configuration

You might have noticed while adding static configuration support, that the config argument passed into our FilterFactory has a Dynamic variant.

let config = match args.config.unwrap() {
    ConfigType::Static(config) => {
        serde_yaml::from_str::<Config>(serde_yaml::to_string(config).unwrap().as_str())
         .unwrap()
    }
    ConfigType::Dynamic(_) => unimplemented!("dynamic config is not yet supported for this filter"),
};

The Dynamic contains the serialized Protobuf message received from the management server for the Filter to create. As a result, its contents are entirely opaque to Quilkin and it is represented with the Prost Any type so the FilterFactory can interpret its contents however it wishes.
However, it usually contains a Protobuf equivalent of the filter's static configuration.

1. Add the proto parsing crates to Cargo.toml:
[dependencies]
# ...
tonic = "0.5.0"
prost = "0.7"
prost-types = "0.7"
2. Create a Protobuf equivalent of the static configuration:
// src/greet.proto
syntax = "proto3";

package greet;

message Greet {
  string greeting = 1;
}
3. Generate Rust code from the proto file:

There are a few ways to generate Prost code from proto, we will use the prost_build crate in this example.

Add the required crates to Cargo.toml:

[dependencies]
# ...
bytes = "1.0"

[build-dependencies]
prost-build = "0.7"

Add a build script to generate the Rust code during compilation:

// src/build.rs
fn main() {
    prost_build::compile_protos(&["src/greet.proto"], &["src/"]).unwrap();
}

To include the generated code, we'll use a convenience macro include_proto, which imports the generated code, while recreating the grpc package name as Rust modules:

// src/main.rs
quilkin::include_proto!("greet");
use greet::Greet as ProtoGreet;
4. Decode the serialized proto message into a config:

If the message contains a Protobuf equivalent of the filter's static configuration, we can leverage the deserialize method to deserialize either a static or dynamic config. The function automatically deserializes and converts from the Protobuf type if the input contains a dynamic configuration.
As a result, the function requires that the std::convert::TryFrom is implemented from our dynamic config type to a static equivalent.

// src/main.rs
impl TryFrom<ProtoGreet> for Config {
    type Error = ConvertProtoConfigError;

    fn try_from(p: ProtoGreet) -> Result<Self, Self::Error> {
        Ok(Config {
            greeting: p.greeting,
        })
    }
}

With our conversion implementation, we can to extract a greeting from any configuration type and forward it onto the Greet Filter.

// src/main.rs
pub const NAME: &str = "greet.v1";

pub fn factory() -> DynFilterFactory {
    Box::from(GreetFilterFactory)
}

struct GreetFilterFactory;
impl FilterFactory for GreetFilterFactory {
    fn name(&self) -> &'static str {
        NAME
    }
    fn create_filter(&self, args: CreateFilterArgs) -> Result<FilterInstance, Error> {
        let (config_json, config) = self
            .require_config(args.config)?
            .deserialize::<Config, ProtoGreet>(self.name())?;
        let filter: Box<dyn Filter> = Box::new(Greet(config.greeting));
        Ok(FilterInstance::new(config_json, filter))
    }
}