Published:

Jarle Aase

Implementing an async server with one message and one stream.

bookmark 7 min read

What we have covered in the previous parts are all of what some RPC frameworks can offer. One of the cool things with gRPC is that in addition to a request and a reply, it can also handle streams of messages. In this article we will look at an async server that implements three RPC calls:

So, added from the first iteration is how to deal with multiple request types, and a stream in one direction.

The proto file looks like:

syntax = "proto3";
package routeguide;

// Interface exported by the server.
service RouteGuide {
  rpc GetFeature(Point) returns (Feature) {}
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
}

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

message Rectangle {
  Point lo = 1;
  Point hi = 2;
}

// A feature names something at a given point.
message Feature {
  string name = 1;
  Point location = 2;
}

message RouteSummary {
  int32 point_count = 1;
  int32 feature_count = 2;
  int32 distance = 3;
  int32 elapsed_time = 4;
}

You can see at the full proto-file with comments here.

Now, the first thing we must do is to generalize a bit. We'll try to keep the event-loop as simple as before, and we'll try to avoid copying and pasting code around to facilitate the various requests.

In order to use a request's implementation's this pointer as tag, and allow the event-loop to effortlessly call procees(), we'll start by creating a base-class for the requests. I have also added a reference to the server class, so we can easily pass information all the way from the command-line to the instance of a request-handler.

     /*! Base class for requests
     *
     *  In order to use `this` as a tag and avoid any special processing in the
     *  event-loop, the simplest approacch in C++ is to let the request implementations
     *  inherit form a base-class that contains the shared code they all need, and
     *  a pure virtual method for the state-machine.
     */
    class RequestBase {
    public:
        RequestBase(UnaryAndSingleStreamSvc& parent,
                    ::routeguide::RouteGuide::AsyncService& service,
                    ::grpc::ServerCompletionQueue& cq)
            : parent_{parent}, service_{service}, cq_{cq} {}

        virtual ~RequestBase() = default;

        // The state-machine
        virtual void proceed(bool ok) = 0;

        void done() {
            // Ugly, ugly, ugly
            LOG_TRACE << "If the program crash now, it was a bad idea to delete this ;)";
            delete this;
        }


    protected:
        // The state required for all requests
        UnaryAndSingleStreamSvc& parent_;
        ::routeguide::RouteGuide::AsyncService& service_;
        ::grpc::ServerCompletionQueue& cq_;
        ::grpc::ServerContext ctx_;
    };

As you can see, it's quite simple. The only thing we need to implement in the derived classes is the constructor and proceed().

GetFeature

The implementation of the GetFeature RPC call, is almost exactly as it was in our first iteration.

We derive from the base class, declare our State enum and then we initialize with gRPC as before in the constructor.

     /*! Implementation for the `GetFeature()` RPC call.
     */
    class GetFeatureRequest : public RequestBase {
    public:
        enum class State {
            CREATED,
            REPLIED,
            DONE
        };

        GetFeatureRequest(UnaryAndSingleStreamSvc& parent,
                          ::routeguide::RouteGuide::AsyncService& service,
                          ::grpc::ServerCompletionQueue& cq)
            : RequestBase(parent, service, cq) {

            // Register this instance with the event-queue and the service.
            // The first event received over the queue is that we have a request.
            service_.RequestGetFeature(&ctx_, &req_, &resp_, &cq_, &cq_, this);
        }

        ...

    private:
        ::routeguide::Point req_;
        ::routeguide::Feature reply_;
        ::grpc::ServerAsyncResponseWriter<::routeguide::Feature> resp_{&ctx_};
        State state_ = State::CREATED;

The only significant changes in proceed() is the call to create a new instance to deal with the next request. Since we have more than one type, I have refactored createNew() to be a template-method in the server class. We also call Clear() on the reply_` variable before re-use. That's required to not unintentionally leak information from a previous stream-message to a new stream-message.

    ...
    // Before we do anything else, we must create a new instance
    // so the service can handle a new request from a client.
    createNew<ListFeaturesRequest>(parent_, service_, cq_);
    ...

    // Prepare the reply-object to be re-used.
    // This is usually cheaper than creating a new one for each write operation.
    reply_.Clear();

Our new server implementation is the class UnaryAndSingleStreamSvc. It's quite similar to the class SimpleReqRespSvc from our first server-iteration. The event-loop is identical.

ListFeatures

Let's take a look at how the ListFeatures()'s request-handler ListFeaturesRequest is implemented.

 /*! Implementation for the `ListFeatures()` RPC call.
     *
     *  This is a bit more advanced. We receive a normal request message,
     *  but the reply is a stream of messages.
     */
    class ListFeaturesRequest : public RequestBase {
    public:
        enum class State {
            CREATED,
            REPLYING,
            FINISHING,
            DONE
        };

        ListFeaturesRequest(UnaryAndSingleStreamSvc& parent,
                            ::routeguide::RouteGuide::AsyncService& service,
                            ::grpc::ServerCompletionQueue& cq)
            : RequestBase(parent, service, cq) {

            // Register this instance with the event-queue and the service.
            // The first event received over the queue is that we have a request.
            service_.RequestListFeatures(&ctx_, &req_, &resp_, &cq_, &cq_, this);
        }

        ...
private:
        ::routeguide::Rectangle req_;
        ::routeguide::Feature reply_;
        ::grpc::ServerAsyncWriter<::routeguide::Feature> resp_{&ctx_};
        State state_ = State::CREATED;
        size_t replies_ = 0;

As you may notice, it has an additional state, compared to the simpler class GetFeatureRequest. That's because it may send many messages in the response to a request, one at the time, until it send the finish message. It also have a different type for the reply, a ServerAsyncWriter in stead of a ServerAsyncResponseWriter. This allows us to write as many messages to the client over the reply-stream as we wish. Some servers, for example a "Breaking News" service, may never enter the Finish state.

As I promised before, the streams add complexity to the state-machine. This is the proceed() implementation for ListFeaturesRequest.

     // State-machine to deal with a single request
        // This works almost like a co-routine, where we work our way down for each
        // time we are called. The State_ could just as well have been an integer/counter;
        void proceed(bool ok) override {
            switch(state_) {
            case State::CREATED:
                if (!ok) [[unlikely]] {
                    // The operation failed.
                    // Let's end it here.
                    LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                    return done();
                }

                // Before we do anything else, we must create a new instance
                // so the service can handle a new request from a client.
                createNew<ListFeaturesRequest>(parent_, service_, cq_);

                state_ = State::REPLYING;
                //fallthrough

            case State::REPLYING:
                if (!ok) [[unlikely]] {
                    // The operation failed.
                    LOG_WARN << "The reply-operation failed.";
                }

                if (++replies_ > parent_.config_.num_stream_messages) {
                    // We have reached the desired number of replies
                    state_ = State::FINISHING;

                    // *Finish* will relay the event that the write is completed on the queue, using *this* as tag.
                    resp_.Finish(::grpc::Status::OK, this);

                    // Now, wait for the client to be aware of use finishing.
                    break;
                }

                // This is where we have the request, and may formulate another answer.
                // If this was code for a framework, this is where we would have called
                // the `onRpcRequestListFeaturesOnceAgain()` method, or unblocked the next statement
                // in a co-routine awaiting the next state-change.
                //
                // In our case, let's just return something.

                // Prepare the reply-object to be re-used.
                // This is usually cheaper than creating a new one for each write operation.
                reply_.Clear();

                // Since it's a stream, it make sense to return different data for each message.
                reply_.set_name(std::string{"stream-reply #"} + std::to_string(replies_));

                // *Write* will relay the event that the write is completed on the queue, using *this* as tag.
                resp_.Write(reply_, this);

                // Now, we wait for the write to complete
                break;

            case State::FINISHING:
                if (!ok) [[unlikely]] {
                    // The operation failed.
                    LOG_WARN << "The finish-operation failed.";
                }

                state_ = State::DONE; // Not required, but may be useful if we investigate a crash.

                // We are done. There will be no further events for this instance.
                return done();

            default:
                LOG_ERROR << "Logic error / unexpected state in proceed()!";
            } // switch
        }

Reading trough the code above, keep in mind that the calls to Write() and Finish() are just initiating an asynchronous operation. The methods return immediately, and the result will arrive as new events on the queue when they are completed.

RecordRoute

Finally in this article, we will look at the implementation of a request with an incoming stream and a (one) normal reply. The proto-definition for this request looks like this:

    rpc RecordRoute(stream Point) returns (RouteSummary) {}

As you can see from the implementation, it looks pretty similar to the previous one. We have a ::grpc::ServerAsyncReader instead of a ::grpc::ServerAsyncWriter, and we call RequestRecordRoute() to start the request-flow.

    /*! Implementation for the `RecordRoute()` RPC call.
     *
     *  This is a bit more advanced. We receive a normal request message,
     *  but the reply is a stream of messages.
     */
    class RecordRouteRequest : public RequestBase {
    public:
        enum class State {
            CREATED,
            READING,
            FINISHING,
            DONE
        };

        RecordRouteRequest(UnaryAndSingleStreamSvc& parent,
                            ::routeguide::RouteGuide::AsyncService& service,
                            ::grpc::ServerCompletionQueue& cq)
            : RequestBase(parent, service, cq) {

            // Register this instance with the event-queue and the service.
            // The first event received over the queue is that we have a request.
            service_.RequestRecordRoute(&ctx_, &reader_, &cq_, &cq_, this);
        }

        ...

    private:
        ::routeguide::Point req_;
        ::routeguide::RouteSummary reply_;
        ::grpc::ServerAsyncReader< ::routeguide::RouteSummary, ::routeguide::Point> reader_{&ctx_};
        State state_ = State::CREATED;

The state-machine is also similar to the previous example. The big difference is that we don't know how many messages to read before the client is done and we can reply.

We started by initiating the RecordRoute request in our constructor. Then, when we have a client calling this method and the state-machine gets its first invocation (in state CREATED), we initiate the first Read() operation. Then we basically loop by initiating a new read operation each time the state-machine gets called with ok == true. When ok != true, we initiate the Finish() operation, and when proceed() is called next, we call done() to delete this instance of the request object.

    void proceed(bool ok) override {
        switch(state_) {
        case State::CREATED:
            if (!ok) [[unlikely]] {
                // The operation failed.
                // Let's end it here.
                LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                return done();
            }

            // Before we do anything else, we must create a new instance
            // so the service can handle a new request from a client.
            createNew<RecordRouteRequest>(parent_, service_, cq_);

            // Initiate the first read operation
            reader_.Read(&req_, this);
            state_ = State::READING;
            break;

        case State::READING:
            if (!ok) [[unlikely]] {
                // The operation failed.
                // This is normal on an incoming stream, when there are no more messages.
                // As far as I know, there is no way at this point to deduce if the false status is
                // because the client is done sending messages, or because we encountered
                // an error.
                LOG_TRACE << "The read-operation failed. It's probably not an error :)";

                // Initiate the finish operation

                // This is where we have received the request, with all it's parts,
                // and may formulate another answer.
                // If this was code for a framework, this is where we would have called
                // the `onRpcRequestRecordRouteDone()` method, or unblocked the next statement
                // in a co-routine awaiting the next state-change.
                //
                // In our case, let's just return something.

                reply_.set_distance(100);
                reply_.set_distance(300);
                reader_.Finish(reply_, ::grpc::Status::OK, this);
                state_ = State::FINISHING;
                break;
            }

            // This is where we have read a message from the request.
            // If this was code for a framework, this is where we would have called
            // the `onRpcRequestRecordRouteGotMessage()` method, or unblocked the next statement
            // in a co-routine awaiting the next state-change.
            //
            // In our case, let's just log it.
            LOG_TRACE << "Got message: longitude=" << req_.longitude()
                        << ", latitude=" << req_.latitude();

            // Prepare the reply-object to be re-used.
            // This is usually cheaper than creating a new one for each read operation.
            req_.Clear();

            // *Read* will relay the event that the write is completed on the queue, using *this* as tag.
            // Initiate the first read operation
            reader_.Read(&req_, this);

            // Now, we wait for the read to complete
            break;

        case State::FINISHING:
            if (!ok) [[unlikely]] {
                // The operation failed.
                LOG_WARN << "The finish-operation failed.";
            }

            state_ = State::DONE; // Not required, but may be useful if we investigate a crash.

            // We are done. There will be no further events for this instance.
            return done();

        default:
            LOG_ERROR << "Logic error / unexpected state in proceed()!";
        } // switch
    }


As you may notice, we handle both a new message, and the end of the message-stream and the Finish in the State::READING: code-block.

In this code I don't really care about the data in the request or what we send in reply. The focus is entirely on the boilerplate code required in order to use the async interface for gRPC. If you implement something using async gRPC, your focus will probably be on the correct implementation of your RPC requests. That means that your code will probably be of a magnitude more complex than this. I would recommend that you add your own logic entirely in classes or modules dedicated to that, and call from those into the gRPC classes or from the gRPC classes into your interfaces. Even that will probably end up as significantly more complex code than what I present here.

The complete source code.