Implement Grpc Client in Rust for Openresty

gems

Back in 2019, when I work as individual postgresql/nginx consultant. Many clients ask me if they could do hybrid programming in nginx.

They said, nginx/openresty is powerful and high performance, but the most significant drawback is it lacks of ecosystem of mainstream programming languages, especially when they have to interact with popular frameworks, e.g. kafka, consul, k8s, etc. The C/Lua ecosystem is slim, even if they bite the bullet and write down C/Lua modules, they face the awkward fact that most frameworks do not provide C/Lua API.

As known, nginx is multi-proceesses and single threading. In the main thread, it does nonblocking async event processing. Openresty wraps it in Lua API, combines the coroutine and epoll into coscoket concept.

Meanwhile, each mainstream programming language has its own threading model. So the problem is, how to couple with them in coroutine way? For example, when I call the function in Rust (maybe runs in a thread, or maybe in tokio async task, whatever), how to make it nonblocking in Lua land, and make the call/return in yield/resume way?

Of course, the traditional way is to spawn an individual process to run Rust, and at the lua side, use unix domain socket to do the communication, as known as proxy model. But that way is low-efficient (either development or runtime). You need to encode/decode the messsage in TLV (type-length-value) format, and, you need to maintain nginx and proxy process separately, as well as failover and load-balance topics.

Could I make it simple like FFI call but with high performance? The answer is yes, I created lua-resty-ffi.

lua-resty-ffi

https://github.com/kingluo/lua-resty-ffi

lua-resty-ffi provides an efficient and generic API to do hybrid programming in openresty with mainstream languages (Go, Python, Java, Rust, etc.).

Features:

  • nonblcking, in coroutine way
  • simple but extensible interface, supports any C ABI compliant language
  • once and for all, no need to write C/Lua codes to do coupling anymore
  • high performance, faster than unix domain socket way
  • generic loader library for python/java
  • any serialization message format you like

Architecture

architecture

The combination of library and configuration would init a new runtime, which represents some threads or goroutines to do jobs.

lua-resty-ffi has a high performance IPC mechanism, based on request-response model.

The request from lua is appended to a queue. This queue is protected by pthread mutex. The language runtime polls this queue, if it’s empty, then wait on the queue via pthread_cond_wait(). In busy scenario, almost all enqueue/dequeue happens in userspace, without futex system calls.

lua-resty-ffi makes full use of nginx nonblocking event loop to dispatch the response from the language runtime. The response would be injected into the global done queue of nginx, and notify the nginx main thread via eventfd to handle it. In main thread, the response handler would setup the return value and resume the coroutine waiting on the response.

As known, linux eventfd is high performance. It’s just an accumulated counter, so multiple responses would be folded into one event.

Both request and response data are exchanged in userspace.

What it looks like?

Take python as example.

ffi/echo.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from cffi import FFI
ffi = FFI()
ffi.cdef("""
char *strdup(const char *s);
void* ngx_http_lua_ffi_task_poll(void *p);
char* ngx_http_lua_ffi_get_req(void *tsk, int *len);
void ngx_http_lua_ffi_respond(void *tsk, int rc, char* rsp, int rsp_len);
""")
C = ffi.dlopen(None)

import threading

class State:
    def poll(self, tq):
        while True:
            task = C.ngx_http_lua_ffi_task_poll(ffi.cast("void*", tq))
            if task == ffi.NULL:
                break
            req = C.ngx_http_lua_ffi_get_req(task, ffi.NULL)
            res = C.strdup(req)
            C.ngx_http_lua_ffi_respond(task, 0, res, 0)
        print("exit python echo runtime")

def init(cfg, tq):
    st = State()
    t = threading.Thread(target=st.poll, args=(tq,))
    t.daemon = True
    t.start()
    return 0

Calls it like normal function call in your lua coroutine:

1
2
3
4
5
local demo = ngx.load_ffi("ffi_python3", "ffi.echo?,init",
    {is_global = true, unpin = true})
local ok, res = demo:echo("hello")
assert(ok)
assert(res == "hello")

Benchmark

benchmark1

Send 100,000 requests, in length of 512B, 4096B, 10KB, 100KB respectively. The result is in seconds, lower is better.

You could see that lua-resty-ffi is faster than unix domain socket, and the difference is proportional to the length.

Check benchmark in detail.

Develop GRPC client in Rust

Let’s use lua-resty-ffi to do some useful stuff.

In k8s, GRPC is standard RPC. How to make GRPC calls in lua?

Let’s use Rust to develop a simple but complete GRPC client for lua, covering unary, client/server/bidirectional streaming calls, as well as TLS/MTLS.

In this example, you could also see the high development efficiency using lua-resty-ffi.

Check https://github.com/kingluo/lua-resty-ffi/tree/main/examples/rust for code.

tonic

tonic is a gRPC over HTTP/2 implementation focused on high performance, interoperability, and flexibility. This library was created to have first class support of async/await and to act as a core building block for production systems written in Rust.

tonic is my favourite rust library. It’s based on hyper and tokio. Everything works in async/await way.

Use low-level APIs

Normal steps to develop grpc client:

  • write a .proto file
  • compiles it into high-level API for use, via build.rs
  • calls generated API

For example:

helloworld.proto

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

client.rs

1
2
3
4
let request = tonic::Request::new(HelloRequest {
    name: "Tonic".into(),
});
let response = client.say_hello(request).await?;

But it’s not suitable for lua, because it’s expected to call arbitrary protobuf interface and does not need to compile .proto file.

So we have to look at low-level API provided by tonic.

Let’s check the generated file:

helloworld.rs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloRequest {
    #[prost(string, tag="1")]
    pub name: ::prost::alloc::string::String,
}
...
pub async fn say_hello(
    &mut self,
    request: impl tonic::IntoRequest<super::HelloRequest>,
) -> Result<tonic::Response<super::HelloReply>, tonic::Status> {
...
    let codec = tonic::codec::ProstCodec::default();
    let path = http::uri::PathAndQuery::from_static(
        "/helloworld.Greeter/SayHello",
    );
    self.inner.unary(request.into_request(), path, codec).await
}

message encode/decode

You could see that the request structure is marked with ::prost::Message attribute. And it uses tonic::codec::ProstCodec::default() to do the encoding.

This part could be done in lua using lua-protobuf. It could do encode/decode based on the .proto file, just like JSON encode/decode.

Check grpc_client.lua for detail.

But wait, if you do encoding in lua, then how to tell rust to bypass it?

The answer is customized codec, instead of ProstCodec, so that we could transfer the byte array “AS IS”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct EncodedBytes(*mut c_char, usize, bool);
...
impl Encoder for MyCodec {
    type Item = EncodedBytes;
    type Error = Status;

    fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
        let slice = unsafe { slice::from_raw_parts(item.0 as *const u8, item.1) };
        dst.put_slice(slice);
...
    }
}

impl Decoder for MyCodec {
    type Item = EncodedBytes;
    type Error = Status;

    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
        let chunk = buf.chunk();
        let len = chunk.len();
        if len > 0 {
            let cbuf = unsafe { libc::malloc(len) };
            unsafe { libc::memcpy(cbuf, chunk.as_ptr() as *const c_void, len) };
            buf.advance(len);
            Ok(Some(EncodedBytes(cbuf as *mut c_char, len, false)))
...
    }
}

For encode, we put the original byte array encoded by lua into the request.

For decode, we copy the byte array to lua and let lua decode it later.

Then, we could use unary directly.

connection management

Each GRPC request is called upon the connection object (interestingly, the connection in tonic has retry mechanism). I use hashmap to store connections, and assign each connection a unique string, so that the lua side could index it. In Lua, you could explicitly close it or let the lua gc to handle the close.

Each connection could be configured with TLS/MTLS info, e.g. CA certificate, private key.

message format of request and response

Now, let’s look at the message format between lua and rust, which is most important part.

request

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#[derive(Deserialize, Debug)]
pub struct GrpcCommand {
    cmd: i32,
    key: String,
    host: Option<String>,
    ca: Option<String>,
    cert: Option<String>,
    priv_key: Option<String>,
    path: Option<String>,
    payload: Option<Base64>,
}

Since we need to handle different kinds of functions, e.g. create a connection, GRPC unary call, etc. So it’s obviously we need a structure to describe all information.

  • cmd: the command indicator, e.g. NEW_CONNECTION
  • key: depends on cmd, could be connection URL, connection id or streaming id.
  • host: HTTP2 host header, used for TLS
  • ca, cert, priv_key: TLS related stuff
  • path: full-qualified GRPC method string, e.g. /helloworld.Greeter/SayHello
  • payload: the based64 and protobuf encoded requset byte array by lua

Note that, all optional fields use Option<>.

JSON is high performance in lua, so no need to worry about the overhead.

Check my blog post for benchmark between JSON and protobuf if you are interested in this part.

response

This part is more straightforward than the request. Because for each request type, you could expect only one kind of result.

For example, for NEW_CONNECTION, it returns connection id string; for UNARY, it returns the protobuf encoded response from the network.

So, returning the malloc() byte array is ok, no need to wrap it in JSON.

GRPC streaming

Under the hood, no matter which type the call is, unary request, client-side streaming, server-side streaming or bidirectional streaming, tonic encapsulates the request and response into Stream object.

So we just need to establish the bridge between lua and Stream, then we could support GRPC streaming in lua.

send direction

We create a channel pair, wrap the receiver part into request Stream.

1
2
3
4
5
6
7
8
let (send_tx, send_rx) = mpsc::unbounded_channel::<EncodedBytes>();
let send_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(send_rx);
...
let mut stream = cli
    .streaming(tonic::Request::new(send_stream), path, MyCodec)
    .await
    .unwrap()
    .into_inner();

Then the call upon the stream would be transfer to the network directly.

recv direction

We create an async task to handle the response retrieval.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
tokio::spawn(async move {
...
    while let Some(task) = recv_rx.recv().await {
        if let Some(res) = stream.message().await.unwrap() {
            unsafe {
                ngx_http_lua_ffi_respond(task.0, 0, res.0, res.1 as i32);
            }
...
    }
});

close the send direction

Thanks to the channel characteristic, when we drop the send part of the channel pair, the receiver part would be closed and in turn close the send direction of GRPC stream.

Final look

Unary call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
local ok, conn = grpc.connect("[::1]:50051",
    {
        host = "example.com",
        ca = grpc.readfile("/opt/tonic/examples/data/tls/ca.pem"),
        cert = grpc.readfile("/opt/tonic/examples/data/tls/client1.pem"),
        priv_key = grpc.readfile("/opt/tonic/examples/data/tls/client1.key"),
    }
)
assert(ok)

local ok, res = conn:unary(
    "/grpc.examples.echo.Echo/UnaryEcho",
    {message = "hello"}
)
assert(ok)
assert(res.message == "hello")

conn:close()

Streaming calls:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local ok, conn = grpc.connect("[::1]:10000")
assert(ok)

-- A Bidirectional streaming RPC.
local ok, stream = conn:new_stream("/routeguide.RouteGuide/RouteChat")
assert(ok)

for i=1,3 do
    local note = {
        location = {latitude = 409146138 + i*100, longitude = -746188906 + i*50},
        message = string.format("note-%d", i),
    }
    local ok = stream:send(note)
    assert(ok)

    local ok, res = stream:recv()
    assert(ok)
    ngx.say(cjson.encode(res))
end

local ok = stream:close()
assert(ok)

local ok = conn:close()
assert(ok)

Benchmark

The most important question is, since we wrap the tonic, so how much is the overhead?

I use AWS EC2 t2.medium (Ubuntu 22.04) to do the benchmark.

grpc_client_benchmark

Send 100,000 calls, use lua-resty-ffi and tonic helloworld client example respectively.

The result is in seconds, lower is better.

You could see that the overhead is 20% to 30%, depending the CPU affinity. No too bad, right?

Conclusion

With lua-resty-ffi, you could use your favourite mainstream programming language, e.g. Go, Java, Python or Rust, to do development in Openresty/Nginx, so that you could enjoy their rich ecosystem directly.

I just use 400 code lines (including lua and rust) to implement a complete GRPC client in OpenResty. And this is just an example, you could use lua-resty-ffi to do anything you want!

Welcome to discuss and like my github repo:

https://github.com/kingluo/lua-resty-ffi