Published:

Jarle Aase

Exploring how to implement the server using gRPC async callbacks

bookmark 7 min read

What the callback interface give us on the server side is a C++ virtual interface that let us override methods to implement the RPC's in our proto-file. That's exactely what I would expect from a RPC code generator for C++. I was a bit surprised when I first started to experiment with async gRPC some years ago - when there was only the lecacy async interface.

So, have they gotten it right, this time around? Yes and no.

Yes, because they now deal with all the details, so that we can deal with the RPC request directly. We implement an override for the interface created by rpcgen, and don't worry about what happens before that gets called, or after the request is complete. I emphasize request, because for the streaming RPC types, the callback is just the initiation of the work-flow. Our implementation methods for the RPC's are called from a fixes size thread pool owned by gRPC. If we do anything time consuming there, our server will not be very performant. So we have to return from the callback immediately. What we will do to facilitate streaming RPC's is to just instantiate an object that carries out the operations we need to complete the request. That's also what we did in the async implementation - only back then we pre-created one instance for each RPC method, and then we created a new instance as soon as we had a new RPC request in progress. With the callbacks, we just get notified when there is a new RPC and then it's up to us how we choose to proceed.

No, because we still have to create an implementation class for the state and event-flow for each stream API.

One thing to keep in mind is that the callback's may be called simultaneously from different threads. Our implementation must be thread-safe.

Simplified, what we implement here for the server-side stream RPC's looks like below. We create overrides for each RPC method, and we also create an implementation class for the streaming class required to read/write the stream.

class CallbackImpl : public ::routeguide::RouteGuide::CallbackService {
public:

    class ServerWriteReactorImpl
        : public ::grpc::ServerWriteReactor< ::routeguide::Feature> {
    public:
        ServerWriteReactorImpl() { ... }

        // Override callbacks for events we need
        void OnDone() override { ... }
        void OnWriteDone(bool ok) override { ... }

    private:
        // Implement functionality we need
        reply() { ... }
    };

    // Example of an RPC with one request in and a stream out
    auto ListFeatures (::grpc::CallbackServerContext* ctx,
        const ::routeguide::Rectangle* req) override {

        return new ServerWriteReactorImpl();
    }
    ...
};

The really exiting good news is that unary RPC's (the ones without streams) are trivial to implement. I figure that these will be the majority in most real use-cases.

GetFeature

This is the override implementation of the GetFeature() method.

class CallbackServiceImpl : public ::routeguide::RouteGuide::CallbackService {
public:
    ...

    grpc::ServerUnaryReactor *GetFeature(grpc::CallbackServerContext *ctx,
                                         const routeguide::Point *req,
                                         routeguide::Feature *resp) override {

        // Give a nice, thoughtful response
        resp->set_name("whatever");

        // We could have implemented our own reactor, but this is the recommended
        // way to do it in unary methods.
        auto* reactor = ctx->DefaultReactor();
        reactor->Finish(grpc::Status::OK);
        return reactor;
    }

    ...
};

Our callback will be called each time someone requests that RPC. Remember that we don't control the threads. gRPC will use it's own thread-pool and we must expect our method to be called potentially many times in parallel. If we use some shared resources, we must use a lock or some other synchronization strategy to handle race conditions.

Also remember that callbacks must return immediately. If you need to do some IO or heavy calculations, you must take the request and schedule it on another thread and then return from the callback. You can call Finish() later. This will of course add complexity to your code. But it is what it is. That's the rules we have to obey by. That was by the way also the case in our async implementations.

ListFeatures

Here we use the same pattern that gRPC team use in their example for the callback interface. We put the implementation for the stream class/work-flow inside the callback method override.

/*! RPC callback event for ListFeatures
 */
::grpc::ServerWriteReactor< ::routeguide::Feature>* ListFeatures(
    ::grpc::CallbackServerContext* ctx, const ::routeguide::Rectangle* req) override {

    // Now, we need to keep the state and buffers required to handle the stream
    // until the request is completed. We also need to return from this
    // method immediately.
    //
    // We solve these conflicting requirements by implementing a class that overrides
    // the async gRPC stream interface `ServerWriteReactor`, and then adding our
    // state and buffers here.
    //
    // This class will deal with the request. The method we are in will just
    // allocate an instance of our class on the heap and return.
    //
    // Yes - it's ugly. It's the best the gRPC team have been able to come up with :/
    class ServerWriteReactorImpl
        // Some shared code we need for convenience for all the RPC Request classes
        : public ReqBase<ServerWriteReactorImpl>
        // The interface to the gRPC async stream for this request.
        , public ::grpc::ServerWriteReactor< ::routeguide::Feature> {
    public:
        ServerWriteReactorImpl(CallbackSvc& owner)
            : owner_{owner} {

            // Start replying with the first message on the stream
            reply();
        }

        /*! Callback event when the RPC is completed */
        void OnDone() override {
            done();
        }

        /*! Callback event when a write operation is complete */
        void OnWriteDone(bool ok) override {
            if (!ok) [[unlikely]] {
                LOG_WARN << "The write-operation failed.";

                // We still need to call Finish or the request will remain stuck!
                Finish({grpc::StatusCode::UNKNOWN, "stream write failed"});
                return;
            }

            reply();
        }

    private:
        void reply() {
            // Reply with the number of messages in config
            if (++replies_ > owner_.config().num_stream_messages) {
                reply_.Clear();

                // Since it's a stream, it make sense to return different data for each message.
                reply_.set_name(std::string{"stream-reply #"} + std::to_string(replies_));

                return StartWrite(&reply_);
            }

            // Didn't write anything, all is done.
            Finish(grpc::Status::OK);
        }

        CallbackSvc& owner_;
        size_t replies_ = 0;
        ::routeguide::Feature reply_;
    };

    return createNew<ServerWriteReactorImpl>(owner_);
};

As you can see, the logic is similar to our final async example. But it is simpler. Simple is good. It gives us less opportunity to mess up ;)

RecordRoute

Here we read from the stream until it ends (ok == false). Then we reply.

Just as above, the code is simpler than before.

 /*! RPC callback event for RecordRoute
  */
::grpc::ServerReadReactor< ::routeguide::Point>* RecordRoute(
    ::grpc::CallbackServerContext* ctx, ::routeguide::RouteSummary* reply) override {

    class ServerReadReactorImpl
        // Some shared code we need for convenience for all the RPC Request class
        : public ReqBase<ServerReadReactorImpl>
        // The async gRPC stream interface for this RPC
        , public grpc::ServerReadReactor<::routeguide::Point> {
    public:
        ServerReadReactorImpl(CallbackSvc& owner, ::routeguide::RouteSummary* reply)
            : owner_{owner}, reply_{reply} {
            assert(reply_);

            // Initiate the first read operation
            StartRead(&req_);
        }

            /*! Callback event when the RPC is complete */
        void OnDone() override {
            done();
        }

        /*! Callback event when a read operation is complete */
        void OnReadDone(bool ok) override {
            if (ok) {
                // We have read a message from the request.

                LOG_TRACE << "Got message: longitude=" << req_.longitude()
                            << ", latitude=" << req_.latitude();

                req_.Clear();

                // Initiate the next async read
                return StartRead(&req_);
            }

            LOG_TRACE << "The read-operation failed. It's probably not an error :)";

            // Let's compose an exiting reply to the client.
            reply_->set_distance(100);
            reply_->set_distance(300);

            // Note that we set the reply (in the buffer we got from gRPC) and call
            // Finish in one go. We don't have to wait for a callback to acknowledge
            // the write operation.
            Finish(grpc::Status::OK);
            // When the client has received the last bits from us in regard of this
            // RPC, `OnDone()` will be the final event we receive.
        }

    private:
        CallbackSvc& owner_;

        // Our buffer for each of the outgoing messages on the stream
        ::routeguide::Point req_;

        // Note that for this RPC type, gRPC owns the reply-buffer.
        // It's a bit inconsistent and confusing, as the interfaces mostly takes
        // pointers, but usually our implementation owns the buffers.
        ::routeguide::RouteSummary *reply_ = {};
    };

    // This is all our method actually does. It just creates an instance
    // of the implementation class to deal with the request.
    return createNew<ServerReadReactorImpl>(owner_, reply);
};

I'm a bit puzzled by the lack of typedefs in the generated headers from protobuf. In stead of using some nice simple typenames, we have to spell out the full template names and arguments in our code. That takes time. In general, all that stuff must be located and then copied and pasted from the generated headers.

RouteChat

Again, this is the most complex RPC type that gRPC supports.

To quickly recap the proto definition:

    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

When we use the callbacks, we must be prepared for simultaneous events in multiple threads. In our async code, we used only one thread in the event loop, and that handled all the events. So we did not have to make our events handlers thread-safe. When we use the callbacks, we may not only have new RPC's coming in simultaneous in different threads, but we may have IO events (read/write) on a bidirectional stream coming in simultaneously in different threads. So be careful if you have non-const data shared by the event handlers.

This implementation borrows the core idea for how to do a "bidi" stream from our async code. It use read() and write() methods to initiate the async operations. Then it overrides the IO event handlers in the ServerBidiReactor<> interface to handle the completions.

As before, we don't call Finish() until we are done reading and writing. That is handled by finishIfDone().

/*! RPC callback event for RouteChat
 */
::grpc::ServerBidiReactor< ::routeguide::RouteNote, ::routeguide::RouteNote>*
    RouteChat(::grpc::CallbackServerContext* ctx) override {

    class ServerBidiReactorImpl
        // Some shared code we need for convenience for all the RPC Request classes
        : public ReqBase<ServerBidiReactorImpl>
        // The async gRPC stream interface for this RPC
        , public grpc::ServerBidiReactor<::routeguide::RouteNote, ::routeguide::RouteNote> {
    public:
        ServerBidiReactorImpl(CallbackSvc& owner)
            : owner_{owner} {

           /* There are multiple ways to handle the message-flow in a bidirectional stream.
            *
            * One party can send the first message, and the other party can respond with a message,
            * until one or both parties gets bored.
            *
            * Both parties can wait for some event to occur, and send a message when appropriate.
            *
            * One party can send occasional updates (for example location data) and the other
            * party can respond with one or more messages when appropriate.
            *
            * Both parties can start sending messages as soon as the connection is made.
            * That's what we are doing (or at least preparing for) in this example.
            */

            read();   // Initiate the read for the first incoming message
            write();  // Initiate the first write operation.
        }

        /*! Callback event when the RPC is complete */
        void OnDone() override {
            done();
        }

        /*! Callback event when a read operation is complete */
        void OnReadDone(bool ok) override {
            if (!ok) {
                LOG_TRACE << me() << "- The read-operation failed. It's probably not an error :)";
                done_reading_ = true;
                return finishIfDone();
            }

            LOG_TRACE << "Incoming message: " << req_.message();
            read();
        }

        /*! Callback event when a write operation is complete */
        void OnWriteDone(bool ok) override {
            if (!ok) [[unlikely]] {
                // The operation failed.
                LOG_WARN << "The write-operation failed.";

                // When ok is false here, we will not be able to write
                // anything on this stream.
                done_writing_ = true;

                // This RPC call did not end well
                status_ = {grpc::StatusCode::UNKNOWN, "write failed"};

                return finishIfDone();
            }

            write();
        }

    private:
        void read() {
            // Start a new read
            req_.Clear();
            StartRead(&req_);
        }

        void write() {
            if (++replies_ > owner_.config().num_stream_messages) {
                done_writing_ = true;

                LOG_TRACE << me() << " - We are done writing to the stream.";
                return finishIfDone();
            }

            // Prepare a new message to the stream
            reply_.Clear();

            // Shout something nice to the other side
            reply_.set_message(std::string{"Server Message #"} + std::to_string(replies_));

            // Start new write on the stream
            StartWrite(&reply_);
        }

        void finishIfDone() {
            if (!sent_finish_ && done_reading_ && done_writing_) {
                LOG_TRACE << me() << " - We are done reading and writing. Sending finish!";
                Finish(status_);
                sent_finish_ = true;
                return;
            }
        }

        CallbackSvc& owner_;
        ::routeguide::RouteNote req_;
        ::routeguide::RouteNote reply_;
        grpc::Status status_;
        size_t replies_ = 0;
        bool done_reading_ = false;
        bool done_writing_ = false;
        bool sent_finish_ = false;
    };


    auto instance = createNew<ServerBidiReactorImpl>(owner_);
    LOG_TRACE << instance->me()
                << " - Starting new bidirectional stream conversation with "
                << ctx->peer();
    return instance;
}

Even the bidi RPC is notable simper than the async version. Things will undoubtedly be more messy in a real use-case where we deal with actual functionality and probable need to both queue outgoing messages (since we can only send one at a time) and relay requests and responses trough our own worker-threads so the server can tolerate the delays caused by disk or network latency and database queries. But the code required to deal with RPC's and streams is now down to a level where it's manageable ;)

The complete source code.