Published:

Jarle Aase

Implementing the full routeguide async server

bookmark 13 min read

One of the lessens learned from the previous articles is that there is a lot of repetitions of code to handle rpc requests. In order to reduce the repetitions a bit, we will start this iteration by creating a templated base class for everything. We have realized that the event-loop in the server and clients look the same. So why not use tha same code for both?

Of course, there are complications. The server code use different variables than the client, and everything is based on code generated by the command line utility protoc - based on our unique proto file.

To deal with this, the "everything"-base will consist of two parts:

  1. a template class that contains the event-loop, base class for the Requests and a more mature Handle. The aim is to reduce the boilerplate code for an actual Request to a minimun - while still keeping the Request implementation simple to understand.

  2. A template class that is different for servers and clients, but providing a uniform interface to the actuat implementation. This class also take a template argument for the code generated to implement our proto interface.

Simplified, it looks something like this:

    template <class grpcT>
    struct ServerVars {
    };

    template <class grpcT>
    struct ClientVars {
    };

    template <typename T>
    struct EventLoopBase {

        struct RequestBase {

            struct Handle {
                void proceed(bool ok); // state-machine
            }; // Handle
        }; // RequestBase

        void run(); // event-loop
    }; // EventLoopBase

    ...

    // Instatiate and run a server
    EventLoopBase<ServerVars<::routeguide::RouteGuide>> server;
    server.run();

    // Instatiate and run a client
    EventLoopBase<ClientVars<::routeguide::RouteGuide>> client;
    client.run();

This gives us the opportunity to carefully write and test much of the code that would otherwise be copied and pasted around. If bugs are caught later on, we can fix them in one place. If we need to optimize our code, we can also do that in one place. All in all, a much better design than lots of almost repeated code ;)

The EventLoopBase implementation

Let's start with an outline of the EventLoopBase class.

    template <typename T>
    class EventLoopBase {
    public:

    EventLoopBase(const Config& config)
        : config_{config} {}

    template <typename reqT, typename parenT>
    void createNew(parenT& parent) {

        // Use make_uniqe, so we destroy the object if it throws an exception
        // (for example out of memory).
        try {
            auto instance = std::make_unique<reqT>(parent);

            // If we got here, the instance should be fine, so let it handle itself.
            instance.release();
        } catch(const std::exception& ex) {
            LOG_ERROR << "Got exception while creating a new instance. "
                      << "This may end my ability to handle any further requests. "
                      << " Error: " << ex.what();
        }
    }

    void stop() {
        grpc_.stop();
    }

    auto& grpc() {
        return grpc_;
    }

    auto * cq() noexcept {
        return grpc_.cq();
    }

    const auto& config() noexcept {
        return config_;
    }

protected:
    const Config& config_;
    size_t num_open_requests_ = 0;
    T grpc_;

Note that we have a factory method createNew to create new instances of any one of our various requests. We also forward some methods to the template attribute grpc_. This allows us flexibility to implement T as we want, as long as it provides the expected stop() and cq() methods. In the server the actual queue is a std::unique_ptr<grpc::ServerCompletionQueue> type, as required by gRPC. In the client, the type we use is ::grpc::CompletionQueue. By adding the auto * cq() method to both ServerVars and ClientVars, this implementation detail is hidden to the rest of our code. Also, the use of auto * cq() will return a ServerCompletionQueue * in the server instantiation and CompletionQueue * in the client instantiation - which is exactly what we require.

The event-loop in is used for both server and client.

/*! Runs the event-loop
 *
 *  The method returns when/if the loop is finished
 *
 *  createNew<T> must have been called on all request-types that are used
 *  prior to this call.
 */
void run() {
      // The inner event-loop
    while(num_open_requests_) {
        // The inner event-loop

        bool ok = true;
        void *tag = {};

        // FIXME: This is crazy. Figure out how to use stable clock!
        const auto deadline = std::chrono::system_clock::now()
                              + std::chrono::milliseconds(1000);

        // Get any IO operation that is ready.
        const auto status = cq()->AsyncNext(&tag, &ok, deadline);

        // So, here we deal with the first of the three states: The status from Next().
        switch(status) {
        case grpc::CompletionQueue::NextStatus::TIMEOUT:
            LOG_TRACE << "AsyncNext() timed out.";
            continue;

        case grpc::CompletionQueue::NextStatus::GOT_EVENT:

            {
                auto request = static_cast<typename RequestBase::Handle *>(tag);

                // Now, let the RequestBase::Handle state-machine deal with the event.
                request->proceed(ok);
            }
            break;

        case grpc::CompletionQueue::NextStatus::SHUTDOWN:
            LOG_INFO << "SHUTDOWN. Tearing down the gRPC connection(s) ";
            return;
        } // switch
    } // loop
}

In a server implementation we will always add a new request-handler when we start processing a new rpc reqeest. So for the server, num_open_requests_ will always be >= the number of rpc request types we implement. In the client, we need to pre-load the requests we want to start with, and then add more as appropriate. When all the requests are finished, run() returns. If we wanted to use the client in a server app, for example a web-service or cache that used gRPC to fetch data occasionally, we could start a new client on demand, change while(num_open_requests_) to while(true) or just increment num_open_requests_ by one before we start (like adding "work" to asio's io_context).

Our new RequestBase is the base for both server-side and client-side requests.

    class RequestBase {
    public:
        RequestBase(EventLoopBase& owner)
            : owner_{owner} {
            ++owner.num_open_requests_;
            LOG_TRACE << "Constructed request #" << client_id_ << " at address" << this;
        }

        virtual ~RequestBase() {
            --owner_.num_open_requests_;
        }

        template <typename reqT>
        static std::string me(reqT& req) {
            return boost::typeindex::type_id_runtime(req).pretty_name()
                   + " #"
                   + std::to_string(req.client_id_);
        }

    protected:
        // The state required for all requests
        EventLoopBase& owner_;
        int ref_cnt_ = 0;
        const size_t client_id_ = getNewClientId();

    private:
        void done() {
            // Ugly, ugly, ugly
            LOG_TRACE << "If the program crash now, it was a bad idea to delete this ;)  #"
                      << client_id_ << " at address " << this;
            delete this;
        }

    }; // RequestBase;

So far, pretty simple code. The most interesting method may be me(), which is a general implementation of a method we used before to return the class name and request id for a request. This is very useful in log events.

Now, let's examine the new Handle implementation. This one is a bit different.

    class RequestBase {
    public:
        class Handle
        {
        public:
            // In this implementation, the operation is informative.
            // It has no side-effects.
            enum Operation {
                INVALID,
                CONNECT,
                READ,
                WRITE,
                WRITE_DONE,
                FINISH
            };

            using proceed_t = std::function<void(bool ok, Operation op)>;

            Handle(RequestBase& instance)
                : base_{instance} {}


            [[nodiscard]] void *tag(Operation op, proceed_t&& fn) noexcept {
                assert(op_ == Operation::INVALID);

                LOG_TRACE << "Handle::proceed() - " << base_.client_id_
                          << " initiating " << name(op) << " operation.";
                op_ = op;
                proceed_ = std::move(fn);
                return tag_();
            }

            void proceed(bool ok) {
                --base_.ref_cnt_;

                // We must reset the `op_` type before we call `proceed_`
                // See the comment below regarding `proceed()`.
                const auto current_op = op_;
                op_ = Operation::INVALID;

                if (proceed_) {
                    // Move `proceed` to the stack.
                    // There is a good probability that `proceed()` will call `tag()`,
                    // which will overwrite the current value in the Handle's instance.
                    auto proceed = std::move(proceed_);
                    proceed(ok, current_op);
                }

                if (base_.ref_cnt_ == 0) {
                    base_.done();
                }
            }

        private:
            [[nodiscard]] void *tag_() noexcept {
                ++base_.ref_cnt_;
                return this;
            }

            RequestBase& base_;
            Operation op_ = Operation::INVALID;
            proceed_t proceed_;
        };
    }

There are two major changes. First, we use a functor to directly implement the state-machine in response to an event. Second, we re-use the handle if appropriate. Normally we only need one or two handles as any one time, so it makes little sense to have up to four in our request implementation. We still have the Operation enum, but now it's just to provide accurate information to log-events - or to get some useful meta-information in the debugger. tag() takes a functor, normally a lambda function, to be executed when the async operation is completed.

Finally, let's take a brief look at the ServerVars and ClientVars implementations.

    template <typename grpcT>
    struct ServerVars {
        // An instance of our service, compiled from code generated by protoc
        typename grpcT::AsyncService service_;

        // This is the Queue. It's shared for all the requests.
        std::unique_ptr<grpc::ServerCompletionQueue> cq_;

        [[nodiscard]] auto * cq() noexcept {
            assert(cq_);
            return cq_.get();
        }

        // A gRPC server object
        std::unique_ptr<grpc::Server> server_;

        void stop() {
            LOG_INFO << "Shutting down ";
            server_->Shutdown();
            server_->Wait();
        }
    };

    template <typename grpcT>
    struct ClientVars {
        // This is the Queue. It's shared for all the requests.
        ::grpc::CompletionQueue cq_;

        [[nodiscard]] auto * cq() noexcept {
            return &cq_;
        }

        // This is a connection to the gRPC server
        std::shared_ptr<grpc::Channel> channel_;

        // An instance of the client that was generated from our .proto file.
        std::unique_ptr<typename grpcT::Stub> stub_;

        void stop() {
            // We don't stop the client...
            assert(false);
        }
    };

That's the generalization of the boilerplate code.

The complete source code.

The final async server implementation

Now, lets look at the actual server implementation of all the four rpc calls in Googles "routeguide" proto-file example.

service RouteGuide {
  rpc GetFeature(Point) returns (Feature) {}
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

Before we get obsessed with the implementation details of the four Requests, let's look briefly at the implementation for the outer event-loop class.

class EverythingSvr
    : public EventLoopBase<ServerVars<::routeguide::RouteGuide>> {
public:

    EverythingSvr(const Config& config)
        : EventLoopBase(config) {

        grpc::ServerBuilder builder;
        builder.AddListeningPort(config_.address, grpc::InsecureServerCredentials());
        builder.RegisterService(&grpc_.service_);
        grpc_.cq_ = builder.AddCompletionQueue();
        // Finally assemble the server.
        grpc_.server_ = builder.BuildAndStart();

        LOG_INFO
            // Fancy way to print the class-name.
            // Useful when I copy/paste this code around ;)
            << boost::typeindex::type_id_runtime(*this).pretty_name()

            // The useful information
            << " listening on " << config_.address;

        // Prepare the first instances of request handlers
        createNew<GetFeatureRequest>(*this);
        createNew<ListFeaturesRequest>(*this);
        createNew<RecordRouteRequest>(*this);
        createNew<RouteChatRequest>(*this);
    }
};

As before in the server, we use a builder to set up the required variables and start the gRPC server. At the lowest level, this is a HTTP 2 server listening to the host address/IP and port number we specified.

Then we create one instance of each Request type we need to handle the four rpc request in our proto file. These will be used to handle the first incoming rpc call of each type. In our implementations below, we will create new instances as required to keep the gRPC service responsive. If we forget that, the client side will see a client call "connect" normally - but the Finish event will never be triggered.

GetFeature

    class GetFeatureRequest : public RequestBase {
    public:

        GetFeatureRequest(EverythingSvr& owner)
            : RequestBase(owner) {

            // Register this instance with the event-queue and the service.
            // The first event received over the queue is that we have a request.
            owner_.grpc().service_.RequestGetFeature(&ctx_, &req_, &resp_, cq(), cq(),
                op_handle_.tag(Handle::Operation::CONNECT,
                [this, &owner](bool ok, Handle::Operation /* op */) {

                LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();

                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        // Let's end it here.
                        LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                        return;
                    }

                    // Before we do anything else, we must create a new instance of
                    // GetFeatureRequest, so the service can handle a new request from a client.
                    owner_.createNew<GetFeatureRequest>(owner);

                    // This is where we have the request, and may formulate an answer.
                    // If this was code for a framework, this is where we would have called
                    // the `onRpcRequestGetFeature()` method, or unblocked the next statement
                    // in a co-routine waiting for the next request.
                    //
                    // In our case, let's just return something.
                    reply_.set_name("whatever");
                    reply_.mutable_location()->CopyFrom(req_);

                    // Initiate our next async operation.
                    // That will complete when we have sent the reply, or replying failed.
                    resp_.Finish(reply_, ::grpc::Status::OK,
                        op_handle_.tag(Handle::Operation::FINISH,
                        [this](bool ok, Handle::Operation /* op */) {

                            if (!ok) [[unlikely]] {
                                LOG_WARN << "The finish-operation failed.";
                            }

                    }));// FINISH operation lambda
                })); // CONNECT operation lambda
        }

    private:
        Handle op_handle_{*this}; // We need only one handle for this operation.

        ::grpc::ServerContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::Feature reply_;
        ::grpc::ServerAsyncResponseWriter<decltype(reply_)> resp_{&ctx_};
    };

GetFeatureRequest was never very complicated. What we have done here is to remove the proceed() override statement on the current operation, and instead added a lambda expression as an argument to the handle's tag() method. Personally I find this code easier to read. It's still not a coroutine, but the flow of logic is presented in a way that makes it simple to read and comprehend.

ListFeaturesRequest

Remember, in ListFeatures we return a stream of messages. Here we use lambda's for the logic that is unique for a specific operation, and reply() to handle the shared reply logic both from the connect event and from the replied event.

    class ListFeaturesRequest : public RequestBase {
    public:

        ListFeaturesRequest(EverythingSvr& owner)
            : RequestBase(owner) {

            owner_.grpc().service_.RequestListFeatures(&ctx_, &req_, &resp_, cq(), cq(),
                op_handle_.tag(Handle::Operation::CONNECT,
                [this, &owner](bool ok, Handle::Operation /* op */) {

                    LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();

                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        // Let's end it here.
                        LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                        return;
                    }

                    // Before we do anything else, we must create a new instance
                    // so the service can handle a new request from a client.
                    owner_.createNew<ListFeaturesRequest>(owner);

                reply();
            }));
        }

    private:
        void reply() {
            if (++replies_ > owner_.config().num_stream_messages) {
                // We have reached the desired number of replies

                resp_.Finish(::grpc::Status::OK,
                    op_handle_.tag(Handle::Operation::FINISH,
                    [this](bool ok, Handle::Operation /* op */) {
                        if (!ok) [[unlikely]] {
                            // The operation failed.
                            LOG_WARN << "The finish-operation failed.";
                        }
                }));

                return;
            }

            // This is where we have the request, and may formulate another answer.
            // If this was code for a framework, this is where we would have called
            // the `onRpcRequestListFeaturesOnceAgain()` method, or unblocked the next statement
            // in a co-routine awaiting the next state-change.
            //
            // In our case, let's just return something.


            // Prepare the reply-object to be re-used.
            // This is usually cheaper than creating a new one for each write operation.
            reply_.Clear();

            // Since it's a stream, it make sense to return different data for each message.
            reply_.set_name(std::string{"stream-reply #"} + std::to_string(replies_));

            resp_.Write(reply_, op_handle_.tag(Handle::Operation::FINISH,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        LOG_WARN << "The reply-operation failed.";
                        return;
                    }

                    reply();
            }));
        }

        Handle op_handle_{*this}; // We need only one handle for this operation.
        size_t replies_ = 0;

        ::grpc::ServerContext ctx_;
        ::routeguide::Rectangle req_;
        ::routeguide::Feature reply_;
        ::grpc::ServerAsyncWriter<decltype(reply_)> resp_{&ctx_};
    };

This was a little more than the previous one, but if you remove the comments, it's not many lines of code.

RecordRouteRequest

Now we will get a stream of messages, and reply when the stream has dried out.

    class RecordRouteRequest : public RequestBase {
    public:

        RecordRouteRequest(EverythingSvr& owner)
            : RequestBase(owner) {

            owner_.grpc().service_.RequestRecordRoute(&ctx_, &io_, cq(), cq(),
                op_handle_.tag(Handle::Operation::CONNECT,
                    [this, &owner](bool ok, Handle::Operation /* op */) {

                        LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();

                        if (!ok) [[unlikely]] {
                            // The operation failed.
                            // Let's end it here.
                            LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                            return;
                        }

                      // Before we do anything else, we must create a new instance
                      // so the service can handle a new request from a client.
                      owner_.createNew<RecordRouteRequest>(owner);

                      read(true);
                  }));
        }

    private:
        void read(const bool first) {

            if (!first) {
                // This is where we have read a message from the request.
                // If this was code for a framework, this is where we would have called
                // the `onRpcRequestRecordRouteGotMessage()` method, or unblocked the next statement
                // in a co-routine awaiting the next state-change.
                //
                // In our case, let's just log it.
                LOG_TRACE << "Got message: longitude=" << req_.longitude()
                          << ", latitude=" << req_.latitude();


                // Reset the req_ message. This is cheaper than allocating a new one for each read.
                req_.Clear();
            }

            io_.Read(&req_,  op_handle_.tag(Handle::Operation::READ,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        // This is normal on an incoming stream, when there are no more messages.
                        // As far as I know, there is no way at this point to deduce if the false status is
                        // because the client is done sending messages, or because we encountered
                        // an error.
                        LOG_TRACE << "The read-operation failed. It's probably not an error :)";

                        // Initiate the finish operation

                        // This is where we have received the request, with all it's parts,
                        // and may formulate another answer.
                        // If this was code for a framework, this is where we would have called
                        // the `onRpcRequestRecordRouteDone()` method, or unblocked the next statement
                        // in a co-routine awaiting the next state-change.
                        //
                        // In our case, let's just return something.

                        reply_.set_distance(100);
                        reply_.set_distance(300);
                        io_.Finish(reply_, ::grpc::Status::OK, op_handle_.tag(
                            Handle::Operation::FINISH,
                            [this](bool ok, Handle::Operation /* op */) {

                             if (!ok) [[unlikely]] {
                                LOG_WARN << "The finish-operation failed.";
                            }

                            // We are done
                        }));
                        return;
                    } // ok != false

                    read(false);
            }));

        }

        Handle op_handle_{*this}; // We need only one handle for this operation.

        ::grpc::ServerContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::RouteSummary reply_;
        ::grpc::ServerAsyncReader< decltype(reply_), decltype(req_)> io_{&ctx_};
    };

The logic here is similar to ListFeaturesRequest. In stead of reply() we have read(), and to re-use as much as possible, we have a bool flag to tell us if this is the first call to read() - from the connect event. If it's not the first, then we have a read message to deal with before we start a new async Read operation on the stream.

Now, we will deal with the most complex rpc request type that gRPC supports, the bidirectional stream.

RouteChatRequest

In the proto-file we have specified that both the request and the reply are streams.

  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

We have no way to specify the actual protocol initiation. Some servers may want an incoming message before they send and outgoing message. Others may not start reading on the stream before they have succeeded in sending a message. Some protocols will do tic-tac, where both the server and the client respond to a message by sending a message. The messages in the stream can be strictly ordered, or they can be unrelated and flow in any direction when one side have something to send. The protocol in your implementations are entirely up to you. Just remember that you can only send one message at any given time, and only have one Read operation active at any time. If you need to send a burst of messages, you must provide your own queue for the pending messages, and hand them to gRPC one by one, as you get confirmation for the previous message successful departure.

Our RouteChat implementations will start sending and start reading as soon as a request is active. It says Chat in the name! Nobody expects anyone on a "Chat" on the Internet to be polite and obey any kind of "protocol" ;) So we will yell at the other party until we grow tired. We will read the incoming messages and discard them as they arrive until the other end gets tired. When both parties are done, we get to say the final bits. We are the server implementation, so we define the Status for the conversation.

    class RouteChatRequest : public RequestBase {
    public:

        RouteChatRequest(EverythingSvr& owner)
            : RequestBase(owner) {

            owner_.grpc().service_.RequestRouteChat(&ctx_, &stream_, cq(), cq(),
                in_handle_.tag(Handle::Operation::CONNECT,
                    [this, &owner](bool ok, Handle::Operation /* op */) {

                        LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();

                        if (!ok) [[unlikely]] {
                            // The operation failed.
                            // Let's end it here.
                            LOG_WARN << "The request-operation failed. Assuming we are shutting down";
                            return;
                        }

                        // Before we do anything else, we must create a new instance
                        // so the service can handle a new request from a client.
                        owner_.createNew<RouteChatRequest>(owner);

                        read(true);   // Initiate the read for the first incoming message
                        write(true);  // Initiate the first write operation.
            }));
        }

    private:
        void read(const bool first) {
            if (!first) {
                // This is where we have read a message from the stream.
                // If this was code for a framework, this is where we would have called
                // the `onRpcRequestRouteChatGotMessage()` method, or unblocked the next statement
                // in a co-routine awaiting the next state-change.
                //
                // In our case, let's just log it.

                LOG_TRACE << "Incoming message: " << req_.message();

                req_.Clear();
            }

            // Start new read
            stream_.Read(&req_, in_handle_.tag(
                Handle::Operation::READ,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        // This is normal on an incoming stream, when there are no more messages.
                        // As far as I know, there is no way at this point to deduce if the false status is
                        // because the client is done sending messages, or because we encountered
                        // an error.
                        LOG_TRACE << "The read-operation failed. It's probably not an error :)";

                        done_reading_ = true;
                        return finishIfDone();
                    }

                    read(false);  // Initiate the read for the next incoming message
            }));
        }

        void write(const bool first) {
            if (!first) {
                reply_.Clear();
            }

            if (++replies_ > owner_.config().num_stream_messages) {
                done_writing_ = true;

                LOG_TRACE << me(*this) << " - We are done writing to the stream.";
                return finishIfDone();
            }

            // This is where we are ready to write a new message.
            // If this was code for a framework, this is where we would have called
            // the `onRpcRequestRouteChatReadytoSendNewMessage()` method, or unblocked
            // the next statement in a co-routine awaiting the next state-change.

            reply_.set_message(std::string{"Server Message #"} + std::to_string(replies_));

            // Start new write
            stream_.Write(reply_, out_handle_.tag(
                                Handle::Operation::WRITE,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        // The operation failed.
                        LOG_WARN << "The write-operation failed.";

                        // When ok is false here, we will not be able to write
                        // anything on this stream.
                        done_writing_ = true;
                        return finishIfDone();
                    }

                    write(false);  // Initiate the next write or finish
                }));
        }

        // We wait until all incoming messages are received and all outgoing messages are sent
        // before we send the finish message.
        void finishIfDone() {
            if (!sent_finish_ && done_reading_ && done_writing_) {
                LOG_TRACE << me(*this) << " - We are done reading and writing. Sending finish!";

                stream_.Finish(grpc::Status::OK, out_handle_.tag(
                    Handle::Operation::FINISH,
                    [this](bool ok, Handle::Operation /* op */) {

                        if (!ok) [[unlikely]] {
                            LOG_WARN << "The finish-operation failed.";
                        }

                        LOG_TRACE << me(*this) << " - We are done";
                }));
                sent_finish_ = true;
                return;
            }
        }

        bool done_reading_ = false;
        bool done_writing_ = false;
        bool sent_finish_ = false;
        size_t replies_ = 0;

        // We are streaming messages in and out simultaneously, so we need two handles.
        // One for each direction.
        Handle in_handle_{*this};
        Handle out_handle_{*this};

        ::grpc::ServerContext ctx_;
        ::routeguide::RouteNote req_;
        ::routeguide::RouteNote reply_;

        // Interestingly, the template the class is named `*ReaderWriter`, while
        // the template argument order is first Writer type and then Reader type.
        // Lot's of room for false assumptions and subtle errors here ;)
        ::grpc::ServerAsyncReaderWriter< decltype(reply_), decltype(req_)> stream_{&ctx_};
    };

As you see, here we basically combine the logic in the last two examples above. Then we add some extra logic in finishIfDone() to allow both directions of the stream to finish before we set the final status for the rpc call and end it.

The complete source code.