Published:

Jarle Aase

Implementing the full routeguide async server

bookmark 13 min read

One of the lessens learned from the previous articles is that there is a lot of repetitions of code to handle rpc requests. In order to reduce the repetitions a bit, we will start this iteration by creating a templated base class for everything. We have realized that the event-loop in the server and clients look the same. So why not use tha same code for both?

Of course, there are complications. The server code use different variables than the client, and everything is based on code generated by the command line utility protoc - based on our unique proto file.

To deal with this, the "everything"-base will consist of two parts:

  1. a template class that contains the event-loop, base class for the Requests and a more mature Handle. The aim is to reduce the boilerplate code for an actual Request to a minimun - while still keeping the Request implementation simple to understand.

  2. A template class that is different for servers and clients, but providing a uniform interface to the actuat implementation. This class also take a template argument for the code generated to implement our proto interface.

Simplified, it looks something like this:

 1    template <class grpcT>
 2    struct ServerVars {
 3    };
 4
 5    template <class grpcT>
 6    struct ClientVars {
 7    };
 8
 9    template <typename T>
10    struct EventLoopBase {
11
12        struct RequestBase {
13
14            struct Handle {
15                void proceed(bool ok); // state-machine
16            }; // Handle
17        }; // RequestBase
18
19        void run(); // event-loop
20    }; // EventLoopBase
21
22    ...
23
24    // Instatiate and run a server
25    EventLoopBase<ServerVars<::routeguide::RouteGuide>> server;
26    server.run();
27
28    // Instatiate and run a client
29    EventLoopBase<ClientVars<::routeguide::RouteGuide>> client;
30    client.run();
31

This gives us the opportunity to carefully write and test much of the code that would otherwise be copied and pasted around. If bugs are caught later on, we can fix them in one place. If we need to optimize our code, we can also do that in one place. All in all, a much better design than lots of almost repeated code ;)

The EventLoopBase implementation

Let's start with an outline of the EventLoopBase class.

 1    template <typename T>
 2    class EventLoopBase {
 3    public:
 4
 5    EventLoopBase(const Config  config)
 6        : config_{config} {}
 7
 8    template <typename reqT, typename parenT>
 9    void createNew(parenT  parent) {
10
11        // Use make_uniqe, so we destroy the object if it throws an exception
12        // (for example out of memory).
13        try {
14            auto instance = std::make_unique<reqT>(parent);
15
16            // If we got here, the instance should be fine, so let it handle itself.
17            instance.release();
18        } catch(const std::exception  ex) {
19            LOG_ERROR << "Got exception while creating a new instance. "
20                      << "This may end my ability to handle any further requests. "
21                      << " Error: " << ex.what();
22        }
23    }
24
25    void stop() {
26        grpc_.stop();
27    }
28
29    auto  grpc() {
30        return grpc_;
31    }
32
33    auto * cq() noexcept {
34        return grpc_.cq();
35    }
36
37    const auto  config() noexcept {
38        return config_;
39    }
40
41protected:
42    const Config  config_;
43    size_t num_open_requests_ = 0;
44    T grpc_;

Note that we have a factory method createNew to create new instances of any one of our various requests. We also forward some methods to the template attribute grpc_. This allows us flexibility to implement T as we want, as long as it provides the expected stop() and cq() methods. In the server the actual queue is a std::unique_ptr<grpc::ServerCompletionQueue> type, as required by gRPC. In the client, the type we use is ::grpc::CompletionQueue. By adding the auto * cq() method to both ServerVars and ClientVars, this implementation detail is hidden to the rest of our code. Also, the use of auto * cq() will return a ServerCompletionQueue * in the server instantiation and CompletionQueue * in the client instantiation - which is exactly what we require.

The event-loop in is used for both server and client.

 1
 2/*! Runs the event-loop
 3    *
 4    *  The method returns when/if the loop is finished
 5    *
 6    *  createNew<T> must have been called on all request-types that are used
 7    *  prior to this call.
 8    */
 9void run() {
10        // The inner event-loop
11    while(num_open_requests_) {
12        // The inner event-loop
13
14        bool ok = true;
15        void *tag = {};
16
17        // FIXME: This is crazy. Figure out how to use stable clock!
18        const auto deadline = std::chrono::system_clock::now()
19                                + std::chrono::milliseconds(1000);
20
21        // Get any IO operation that is ready.
22        const auto status = cq()->AsyncNext( tag,  ok, deadline);
23
24        // So, here we deal with the first of the three states: The status from Next().
25        switch(status) {
26        case grpc::CompletionQueue::NextStatus::TIMEOUT:
27            LOG_TRACE << "AsyncNext() timed out.";
28            continue;
29
30        case grpc::CompletionQueue::NextStatus::GOT_EVENT:
31
32            {
33                auto request = static_cast<typename RequestBase::Handle *>(tag);
34
35                // Now, let the RequestBase::Handle state-machine deal with the event.
36                request->proceed(ok);
37            }
38            break;
39
40        case grpc::CompletionQueue::NextStatus::SHUTDOWN:
41            LOG_INFO << "SHUTDOWN. Tearing down the gRPC connection(s) ";
42            return;
43        } // switch
44    } // loop
45}
46

In a server implementation we will always add a new request-handler when we start processing a new rpc reqeest. So for the server, num_open_requests_ will always be >= the number of rpc request types we implement. In the client, we need to pre-load the requests we want to start with, and then add more as appropriate. When all the requests are finished, run() returns. If we wanted to use the client in a server app, for example a web-service or cache that used gRPC to fetch data occasionally, we could start a new client on demand, change while(num_open_requests_) to while(true) or just increment num_open_requests_ by one before we start (like adding "work" to asio's io_context).

Our new RequestBase is the base for both server-side and client-side requests.

 1class RequestBase {
 2public:
 3    RequestBase(EventLoopBase  owner)
 4        : owner_{owner} {
 5        ++owner.num_open_requests_;
 6        LOG_TRACE << "Constructed request #" << client_id_ << " at address" << this;
 7    }
 8
 9    virtual ~RequestBase() {
10        --owner_.num_open_requests_;
11    }
12
13    template <typename reqT>
14    static std::string me(reqT  req) {
15        return boost::typeindex::type_id_runtime(req).pretty_name()
16                + " #"
17                + std::to_string(req.client_id_);
18    }
19
20protected:
21    // The state required for all requests
22    EventLoopBase  owner_;
23    int ref_cnt_ = 0;
24    const size_t client_id_ = getNewClientId();
25
26private:
27    void done() {
28        // Ugly, ugly, ugly
29        LOG_TRACE << "If the program crash now, it was a bad idea to delete this ;)  #"
30                    << client_id_ << " at address " << this;
31        delete this;
32    }
33
34}; // RequestBase;
35

So far, pretty simple code. The most interesting method may be me(), which is a general implementation of a method we used before to return the class name and request id for a request. This is very useful in log events.

Now, let's examine the new Handle implementation. This one is a bit different.

 1class RequestBase {
 2public:
 3    class Handle
 4    {
 5    public:
 6        // In this implementation, the operation is informative.
 7        // It has no side-effects.
 8        enum Operation {
 9            INVALID,
10            CONNECT,
11            READ,
12            WRITE,
13            WRITE_DONE,
14            FINISH
15        };
16
17        using proceed_t = std::function<void(bool ok, Operation op)>;
18
19        Handle(RequestBase  instance)
20            : base_{instance} {}
21
22
23        [[nodiscard]] void *tag(Operation op, proceed_t   fn) noexcept {
24            assert(op_ == Operation::INVALID);
25
26            LOG_TRACE << "Handle::proceed() - " << base_.client_id_
27                        << " initiating " << name(op) << " operation.";
28            op_ = op;
29            proceed_ = std::move(fn);
30            return tag_();
31        }
32
33        void proceed(bool ok) {
34            --base_.ref_cnt_;
35
36            // We must reset the `op_` type before we call `proceed_`
37            // See the comment below regarding `proceed()`.
38            const auto current_op = op_;
39            op_ = Operation::INVALID;
40
41            if (proceed_) {
42                // Move `proceed` to the stack.
43                // There is a good probability that `proceed()` will call `tag()`,
44                // which will overwrite the current value in the Handle's instance.
45                auto proceed = std::move(proceed_);
46                proceed(ok, current_op);
47            }
48
49            if (base_.ref_cnt_ == 0) {
50                base_.done();
51            }
52        }
53
54    private:
55        [[nodiscard]] void *tag_() noexcept {
56            ++base_.ref_cnt_;
57            return this;
58        }
59
60        RequestBase  base_;
61        Operation op_ = Operation::INVALID;
62        proceed_t proceed_;
63    };
64}

There are two major changes. First, we use a functor to directly implement the state-machine in response to an event. Second, we re-use the handle if appropriate. Normally we only need one or two handles as any one time, so it makes little sense to have up to four in our request implementation. We still have the Operation enum, but now it's just to provide accurate information to log-events - or to get some useful meta-information in the debugger. tag() takes a functor, normally a lambda function, to be executed when the async operation is completed.

Finally, let's take a brief look at the ServerVars and ClientVars implementations.

 1template <typename grpcT>
 2struct ServerVars {
 3    // An instance of our service, compiled from code generated by protoc
 4    typename grpcT::AsyncService service_;
 5
 6    // This is the Queue. It's shared for all the requests.
 7    std::unique_ptr<grpc::ServerCompletionQueue> cq_;
 8
 9    [[nodiscard]] auto * cq() noexcept {
10        assert(cq_);
11        return cq_.get();
12    }
13
14    // A gRPC server object
15    std::unique_ptr<grpc::Server> server_;
16
17    void stop() {
18        LOG_INFO << "Shutting down ";
19        server_->Shutdown();
20        server_->Wait();
21    }
22};
23
 1template <typename grpcT>
 2struct ClientVars {
 3    // This is the Queue. It's shared for all the requests.
 4    ::grpc::CompletionQueue cq_;
 5
 6    [[nodiscard]] auto * cq() noexcept {
 7        return  cq_;
 8    }
 9
10    // This is a connection to the gRPC server
11    std::shared_ptr<grpc::Channel> channel_;
12
13    // An instance of the client that was generated from our .proto file.
14    std::unique_ptr<typename grpcT::Stub> stub_;
15
16    void stop() {
17        // We don't stop the client...
18        assert(false);
19    }
20};

That's the generalization of the boilerplate code.

The complete source code.

The final async server implementation

Now, lets look at the actual server implementation of all the four rpc calls in Googles "routeguide" proto-file example.

1service RouteGuide {
2  rpc GetFeature(Point) returns (Feature) {}
3  rpc ListFeatures(Rectangle) returns (stream Feature) {}
4  rpc RecordRoute(stream Point) returns (RouteSummary) {}
5  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
6}

Before we get obsessed with the implementation details of the four Requests, let's look briefly at the implementation for the outer event-loop class.

 1class EverythingSvr
 2    : public EventLoopBase<ServerVars<::routeguide::RouteGuide>> {
 3public:
 4
 5    EverythingSvr(const Config  config)
 6        : EventLoopBase(config) {
 7
 8        grpc::ServerBuilder builder;
 9        builder.AddListeningPort(config_.address, grpc::InsecureServerCredentials());
10        builder.RegisterService( grpc_.service_);
11        grpc_.cq_ = builder.AddCompletionQueue();
12        // Finally assemble the server.
13        grpc_.server_ = builder.BuildAndStart();
14
15        LOG_INFO
16            // Fancy way to print the class-name.
17            // Useful when I copy/paste this code around ;)
18            << boost::typeindex::type_id_runtime(*this).pretty_name()
19
20            // The useful information
21            << " listening on " << config_.address;
22
23        // Prepare the first instances of request handlers
24        createNew<GetFeatureRequest>(*this);
25        createNew<ListFeaturesRequest>(*this);
26        createNew<RecordRouteRequest>(*this);
27        createNew<RouteChatRequest>(*this);
28    }
29};
30

As before in the server, we use a builder to set up the required variables and start the gRPC server. At the lowest level, this is a HTTP 2 server listening to the host address/IP and port number we specified.

Then we create one instance of each Request type we need to handle the four rpc request in our proto file. These will be used to handle the first incoming rpc call of each type. In our implementations below, we will create new instances as required to keep the gRPC service responsive. If we forget that, the client side will see a client call "connect" normally - but the Finish event will never be triggered.

GetFeature

 1class GetFeatureRequest : public RequestBase {
 2public:
 3
 4    GetFeatureRequest(EverythingSvr  owner)
 5        : RequestBase(owner) {
 6
 7        // Register this instance with the event-queue and the service.
 8        // The first event received over the queue is that we have a request.
 9        owner_.grpc().service_.RequestGetFeature( ctx_,  req_,  resp_, cq(), cq(),
10            op_handle_.tag(Handle::Operation::CONNECT,
11            [this,  owner](bool ok, Handle::Operation /* op */) {
12
13            LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();
14
15                if (!ok) [[unlikely]] {
16                    // The operation failed.
17                    // Let's end it here.
18                    LOG_WARN << "The request-operation failed. Assuming we are shutting down";
19                    return;
20                }
21
22                // Before we do anything else, we must create a new instance of
23                // GetFeatureRequest, so the service can handle a new request from a client.
24                owner_.createNew<GetFeatureRequest>(owner);
25
26                // This is where we have the request, and may formulate an answer.
27                // If this was code for a framework, this is where we would have called
28                // the `onRpcRequestGetFeature()` method, or unblocked the next statement
29                // in a co-routine waiting for the next request.
30                //
31                // In our case, let's just return something.
32                reply_.set_name("whatever");
33                reply_.mutable_location()->CopyFrom(req_);
34
35                // Initiate our next async operation.
36                // That will complete when we have sent the reply, or replying failed.
37                resp_.Finish(reply_, ::grpc::Status::OK,
38                    op_handle_.tag(Handle::Operation::FINISH,
39                    [this](bool ok, Handle::Operation /* op */) {
40
41                        if (!ok) [[unlikely]] {
42                            LOG_WARN << "The finish-operation failed.";
43                        }
44
45                }));// FINISH operation lambda
46            })); // CONNECT operation lambda
47    }
48
49private:
50    Handle op_handle_{*this}; // We need only one handle for this operation.
51
52    ::grpc::ServerContext ctx_;
53    ::routeguide::Point req_;
54    ::routeguide::Feature reply_;
55    ::grpc::ServerAsyncResponseWriter<decltype(reply_)> resp_{ ctx_};
56};

GetFeatureRequest was never very complicated. What we have done here is to remove the proceed() override statement on the current operation, and instead added a lambda expression as an argument to the handle's tag() method. Personally I find this code easier to read. It's still not a coroutine, but the flow of logic is presented in a way that makes it simple to read and comprehend.

ListFeaturesRequest

Remember, in ListFeatures we return a stream of messages. Here we use lambda's for the logic that is unique for a specific operation, and reply() to handle the shared reply logic both from the connect event and from the replied event.

 1class ListFeaturesRequest : public RequestBase {
 2public:
 3
 4    ListFeaturesRequest(EverythingSvr  owner)
 5        : RequestBase(owner) {
 6
 7        owner_.grpc().service_.RequestListFeatures( ctx_,  req_,  resp_, cq(), cq(),
 8            op_handle_.tag(Handle::Operation::CONNECT,
 9            [this,  owner](bool ok, Handle::Operation /* op */) {
10
11                LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();
12
13                if (!ok) [[unlikely]] {
14                    // The operation failed.
15                    // Let's end it here.
16                    LOG_WARN << "The request-operation failed. Assuming we are shutting down";
17                    return;
18                }
19
20                // Before we do anything else, we must create a new instance
21                // so the service can handle a new request from a client.
22                owner_.createNew<ListFeaturesRequest>(owner);
23
24            reply();
25        }));
26    }
27
28private:
29    void reply() {
30        if (++replies_ > owner_.config().num_stream_messages) {
31            // We have reached the desired number of replies
32
33            resp_.Finish(::grpc::Status::OK,
34                op_handle_.tag(Handle::Operation::FINISH,
35                [this](bool ok, Handle::Operation /* op */) {
36                    if (!ok) [[unlikely]] {
37                        // The operation failed.
38                        LOG_WARN << "The finish-operation failed.";
39                    }
40            }));
41
42            return;
43        }
44
45        // This is where we have the request, and may formulate another answer.
46        // If this was code for a framework, this is where we would have called
47        // the `onRpcRequestListFeaturesOnceAgain()` method, or unblocked the next statement
48        // in a co-routine awaiting the next state-change.
49        //
50        // In our case, let's just return something.
51
52
53        // Prepare the reply-object to be re-used.
54        // This is usually cheaper than creating a new one for each write operation.
55        reply_.Clear();
56
57        // Since it's a stream, it make sense to return different data for each message.
58        reply_.set_name(std::string{"stream-reply #"} + std::to_string(replies_));
59
60        resp_.Write(reply_, op_handle_.tag(Handle::Operation::FINISH,
61            [this](bool ok, Handle::Operation /* op */) {
62                if (!ok) [[unlikely]] {
63                    // The operation failed.
64                    LOG_WARN << "The reply-operation failed.";
65                    return;
66                }
67
68                reply();
69        }));
70    }
71
72    Handle op_handle_{*this}; // We need only one handle for this operation.
73    size_t replies_ = 0;
74
75    ::grpc::ServerContext ctx_;
76    ::routeguide::Rectangle req_;
77    ::routeguide::Feature reply_;
78    ::grpc::ServerAsyncWriter<decltype(reply_)> resp_{ ctx_};
79};
80

This was a little more than the previous one, but if you remove the comments, it's not many lines of code.

RecordRouteRequest

Now we will get a stream of messages, and reply when the stream has dried out.

 1class RecordRouteRequest : public RequestBase {
 2public:
 3
 4    RecordRouteRequest(EverythingSvr  owner)
 5        : RequestBase(owner) {
 6
 7        owner_.grpc().service_.RequestRecordRoute( ctx_,  io_, cq(), cq(),
 8            op_handle_.tag(Handle::Operation::CONNECT,
 9                [this,  owner](bool ok, Handle::Operation /* op */) {
10
11                    LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();
12
13                    if (!ok) [[unlikely]] {
14                        // The operation failed.
15                        // Let's end it here.
16                        LOG_WARN << "The request-operation failed. Assuming we are shutting down";
17                        return;
18                    }
19
20                    // Before we do anything else, we must create a new instance
21                    // so the service can handle a new request from a client.
22                    owner_.createNew<RecordRouteRequest>(owner);
23
24                    read(true);
25                }));
26    }
27
28private:
29    void read(const bool first) {
30
31        if (!first) {
32            // This is where we have read a message from the request.
33            // If this was code for a framework, this is where we would have called
34            // the `onRpcRequestRecordRouteGotMessage()` method, or unblocked the next statement
35            // in a co-routine awaiting the next state-change.
36            //
37            // In our case, let's just log it.
38            LOG_TRACE << "Got message: longitude=" << req_.longitude()
39                        << ", latitude=" << req_.latitude();
40
41
42            // Reset the req_ message. This is cheaper than allocating a new one for each read.
43            req_.Clear();
44        }
45
46        io_.Read( req_,  op_handle_.tag(Handle::Operation::READ,
47            [this](bool ok, Handle::Operation /* op */) {
48                if (!ok) [[unlikely]] {
49                    // The operation failed.
50                    // This is normal on an incoming stream, when there are no more messages.
51                    // As far as I know, there is no way at this point to deduce if the false status is
52                    // because the client is done sending messages, or because we encountered
53                    // an error.
54                    LOG_TRACE << "The read-operation failed. It's probably not an error :)";
55
56                    // Initiate the finish operation
57
58                    // This is where we have received the request, with all it's parts,
59                    // and may formulate another answer.
60                    // If this was code for a framework, this is where we would have called
61                    // the `onRpcRequestRecordRouteDone()` method, or unblocked the next statement
62                    // in a co-routine awaiting the next state-change.
63                    //
64                    // In our case, let's just return something.
65
66                    reply_.set_distance(100);
67                    reply_.set_distance(300);
68                    io_.Finish(reply_, ::grpc::Status::OK, op_handle_.tag(
69                        Handle::Operation::FINISH,
70                        [this](bool ok, Handle::Operation /* op */) {
71
72                            if (!ok) [[unlikely]] {
73                            LOG_WARN << "The finish-operation failed.";
74                        }
75
76                        // We are done
77                    }));
78                    return;
79                } // ok != false
80
81                read(false);
82        }));
83
84    }
85
86    Handle op_handle_{*this}; // We need only one handle for this operation.
87
88    ::grpc::ServerContext ctx_;
89    ::routeguide::Point req_;
90    ::routeguide::RouteSummary reply_;
91    ::grpc::ServerAsyncReader< decltype(reply_), decltype(req_)> io_{ ctx_};
92};
93

The logic here is similar to ListFeaturesRequest. In stead of reply() we have read(), and to re-use as much as possible, we have a bool flag to tell us if this is the first call to read() - from the connect event. If it's not the first, then we have a read message to deal with before we start a new async Read operation on the stream.

Now, we will deal with the most complex rpc request type that gRPC supports, the bidirectional stream.

RouteChatRequest

In the proto-file we have specified that both the request and the reply are streams.

1  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

We have no way to specify the actual protocol initiation. Some servers may want an incoming message before they send and outgoing message. Others may not start reading on the stream before they have succeeded in sending a message. Some protocols will do tic-tac, where both the server and the client respond to a message by sending a message. The messages in the stream can be strictly ordered, or they can be unrelated and flow in any direction when one side have something to send. The protocol in your implementations are entirely up to you. Just remember that you can only send one message at any given time, and only have one Read operation active at any time. If you need to send a burst of messages, you must provide your own queue for the pending messages, and hand them to gRPC one by one, as you get confirmation for the previous message successful departure.

Our RouteChat implementations will start sending and start reading as soon as a request is active. It says Chat in the name! Nobody expects anyone on a "Chat" on the Internet to be polite and obey any kind of "protocol" ;) So we will yell at the other party until we grow tired. We will read the incoming messages and discard them as they arrive until the other end gets tired. When both parties are done, we get to say the final bits. We are the server implementation, so we define the Status for the conversation.

  1class RouteChatRequest : public RequestBase {
  2public:
  3
  4    RouteChatRequest(EverythingSvr  owner)
  5        : RequestBase(owner) {
  6
  7        owner_.grpc().service_.RequestRouteChat( ctx_,  stream_, cq(), cq(),
  8            in_handle_.tag(Handle::Operation::CONNECT,
  9                [this,  owner](bool ok, Handle::Operation /* op */) {
 10
 11                    LOG_DEBUG << me(*this) << " - Processing a new connect from " << ctx_.peer();
 12
 13                    if (!ok) [[unlikely]] {
 14                        // The operation failed.
 15                        // Let's end it here.
 16                        LOG_WARN << "The request-operation failed. Assuming we are shutting down";
 17                        return;
 18                    }
 19
 20                    // Before we do anything else, we must create a new instance
 21                    // so the service can handle a new request from a client.
 22                    owner_.createNew<RouteChatRequest>(owner);
 23
 24                    read(true);   // Initiate the read for the first incoming message
 25                    write(true);  // Initiate the first write operation.
 26        }));
 27    }
 28
 29private:
 30    void read(const bool first) {
 31        if (!first) {
 32            // This is where we have read a message from the stream.
 33            // If this was code for a framework, this is where we would have called
 34            // the `onRpcRequestRouteChatGotMessage()` method, or unblocked the next statement
 35            // in a co-routine awaiting the next state-change.
 36            //
 37            // In our case, let's just log it.
 38
 39            LOG_TRACE << "Incoming message: " << req_.message();
 40
 41            req_.Clear();
 42        }
 43
 44        // Start new read
 45        stream_.Read( req_, in_handle_.tag(
 46            Handle::Operation::READ,
 47            [this](bool ok, Handle::Operation /* op */) {
 48                if (!ok) [[unlikely]] {
 49                    // The operation failed.
 50                    // This is normal on an incoming stream, when there are no more messages.
 51                    // As far as I know, there is no way at this point to deduce if the false status is
 52                    // because the client is done sending messages, or because we encountered
 53                    // an error.
 54                    LOG_TRACE << "The read-operation failed. It's probably not an error :)";
 55
 56                    done_reading_ = true;
 57                    return finishIfDone();
 58                }
 59
 60                read(false);  // Initiate the read for the next incoming message
 61        }));
 62    }
 63
 64    void write(const bool first) {
 65        if (!first) {
 66            reply_.Clear();
 67        }
 68
 69        if (++replies_ > owner_.config().num_stream_messages) {
 70            done_writing_ = true;
 71
 72            LOG_TRACE << me(*this) << " - We are done writing to the stream.";
 73            return finishIfDone();
 74        }
 75
 76        // This is where we are ready to write a new message.
 77        // If this was code for a framework, this is where we would have called
 78        // the `onRpcRequestRouteChatReadytoSendNewMessage()` method, or unblocked
 79        // the next statement in a co-routine awaiting the next state-change.
 80
 81        reply_.set_message(std::string{"Server Message #"} + std::to_string(replies_));
 82
 83        // Start new write
 84        stream_.Write(reply_, out_handle_.tag(
 85                            Handle::Operation::WRITE,
 86            [this](bool ok, Handle::Operation /* op */) {
 87                if (!ok) [[unlikely]] {
 88                    // The operation failed.
 89                    LOG_WARN << "The write-operation failed.";
 90
 91                    // When ok is false here, we will not be able to write
 92                    // anything on this stream.
 93                    done_writing_ = true;
 94                    return finishIfDone();
 95                }
 96
 97                write(false);  // Initiate the next write or finish
 98            }));
 99    }
100
101    // We wait until all incoming messages are received and all outgoing messages are sent
102    // before we send the finish message.
103    void finishIfDone() {
104        if (!sent_finish_    done_reading_    done_writing_) {
105            LOG_TRACE << me(*this) << " - We are done reading and writing. Sending finish!";
106
107            stream_.Finish(grpc::Status::OK, out_handle_.tag(
108                Handle::Operation::FINISH,
109                [this](bool ok, Handle::Operation /* op */) {
110
111                    if (!ok) [[unlikely]] {
112                        LOG_WARN << "The finish-operation failed.";
113                    }
114
115                    LOG_TRACE << me(*this) << " - We are done";
116            }));
117            sent_finish_ = true;
118            return;
119        }
120    }
121
122    bool done_reading_ = false;
123    bool done_writing_ = false;
124    bool sent_finish_ = false;
125    size_t replies_ = 0;
126
127    // We are streaming messages in and out simultaneously, so we need two handles.
128    // One for each direction.
129    Handle in_handle_{*this};
130    Handle out_handle_{*this};
131
132    ::grpc::ServerContext ctx_;
133    ::routeguide::RouteNote req_;
134    ::routeguide::RouteNote reply_;
135
136    // Interestingly, the template the class is named `*ReaderWriter`, while
137    // the template argument order is first Writer type and then Reader type.
138    // Lot's of room for false assumptions and subtle errors here ;)
139    ::grpc::ServerAsyncReaderWriter< decltype(reply_), decltype(req_)> stream_{ ctx_};
140};
141

As you see, here we basically combine the logic in the last two examples above. Then we add some extra logic in finishIfDone() to allow both directions of the stream to finish before we set the final status for the rpc call and end it.

The complete source code.