Published:

Jarle Aase

Implementing an async client with one message and one stream.

bookmark 10 min read

So far, our code has dealt with only one async operation in flight, and the tag has been a pointer to an instance of our request class.

When we deal with incoming streams in the client, we would like to keep two async requests in flight. One to connect, and then to perform the next read or write operation from/to the stream (depending on the streams direction), and one to get the final reply and status from the server. That means that we need at least two distinct tag addresses. We could be creative, and use this for one of the tags, and for example const auto tag = reinterpret_cast<void *>(reinterpret_cast<uint64_t>(this) + 1); for the other. Since this is returned from a normal memory allocation, we can be quite confident that it is aligned with a 4 or 8 byte boundary (depending on the target binary type, 32 bit or 64 bit). Therefore, we could look at the tag see if we need to round it down 1 byte to reach a boundary address. This would probably work, but it would be a typical example of premature optimization. A safer and approach, in my opinion, is to use a intermediate variable, which in turn has a pointer or reference to the Request object.

Below is a simplified example of this idea.

// Request object
struct Request {

    // intermediate type
    struct Tag {
        Request& request_;

        // Just relay the event to the Request instance
        void proceed() {
            request_.proceed();
        }
    };

    // State-machine for the request
    void proceed();

    // Two tags with different addresses
    Tag first_{*this};
    Tag second_{*this};
};

...
// Address of first_ is used a the "tag".
StartAsyncSomething(cq_, &first_);

// Address of second_ is used a the "tag".
StartAsyncSomethingElse(cq_, &second_);

...

// event-loop
while(true) {
    void *tag;
    cq_->Next(&tag);

    // Call proceed() on the intermediate type.
    static_cast<Request::Tag *>(tag)->proceed();
}

...

// Our queue
grpc::CompletionQueue cq_;

Request Base class

In order to deal with the complexity of more than one request type, and more than one tag for each request instance, we start the client implementation by creating a base-class for the request handler.

We keep the state that is shared by all the request types in the base class. In our case, we also keep a reference to the "parent" class, so we can get access to its state.

The most interesting declaration is the pure virtual method proceed(). As you may notice, it has an argument in addition to ok. Since we intend to have multiple async operations in flight, and these can be called in seemingly random order (at least, the order in which we get the result from Read() and Finish() appears as random to me), it makes sense to add the type of operation that proceed should deal with.

    /*! Base class for requests
     *
     *  In order to use `this` as a tag and avoid any special processing in the
     *  event-loop, the simplest approach in C++ is to let the request implementations
     *  inherit form a base-class that contains the shared code they all need, and
     *  a pure virtual method for the state-machine.
     */
    class RequestBase {
    public:

        RequestBase(UnaryAndSingleStreamClient& parent)
            : parent_{parent} {
            LOG_TRACE << "Constructed request #" << client_id_ << " at address" << this;
        }

        virtual ~RequestBase() = default;

        // The state-machine
        virtual void proceed(bool ok, Handle::Operation op) = 0;

    protected:
        // The state required for all requests
        UnaryAndSingleStreamClient& parent_;
        int ref_cnt_ = 0;
        ::grpc::ClientContext ctx_;

    private:
        void done() {
            // Ugly, ugly, ugly
            LOG_TRACE << "If the program crash now, it was a bad idea to delete this ;)  #"
                      << client_id_ << " at address " << this;

            // Reference-counting of instances of requests in flight
            parent_.decCounter();
            delete this;
        }
    };

To this base-class, we add a Handle type that deals with the unique tags. All the async operations we have initiated will hit our proceed() method, so it makes sense to use reference counting to decide when to delete the request object.

I have added Handle as a sub-class to RequestBase. I think of them as one entity. That means that Handle can access protected and private variables and methods in RequestBase. This is a pattern that works well to solve some problems (for example containers and their iterators). It's not something I recommend as a general approach. In this case, I think it's effective.

    /*! Tag
        *
        *  In order to allow tags for multiple async operations simultaneously,
        *  we use this "Handle". It points to the request owning the
        *  operation, and it is associated with a type of operation.
        */
    class Handle {
    public:
        enum Operation {
            CONNECT,
            READ,
            WRITE,
            WRITE_DONE,
            FINISH
        };

        Handle(RequestBase& instance, Operation op)
            : instance_{instance}, op_{op} {}

        /*! Return a tag for an async operation.
            *
            *  Note that we use this method for reference-counting
            *  the pending async operations, so it cannot be called
            *  for other purposes!
            */
        [[nodiscard]] void *tag() {
            ++instance_.ref_cnt_;
            return this;
        }

        void proceed(bool ok) {
            --instance_.ref_cnt_;

            instance_.proceed(ok, op_);

            if (instance_.ref_cnt_ == 0) {
                instance_.done();
            }
        }

    private:
        RequestBase& instance_;
        const Operation op_;
    };

Finally, since we have abstracted away the new complexity, the client class and it's event-loop remains simple.

For clarity, I have removed a few lines of code that deals mostly with new request-instances.

class UnaryAndSingleStreamClient {
public:


    class RequestBase {
        class Handle {...}
        ...
    }

    UnaryAndSingleStreamClient(const Config& config)
        : config_{config} {}

    // Run the event-loop.
    // Returns when there are no more requests to send
    void run() {

        LOG_INFO << "Connecting to gRPC service at: " << config_.address;
        channel_ = grpc::CreateChannel(config_.address, grpc::InsecureChannelCredentials());

        stub_ = ::routeguide::RouteGuide::NewStub(channel_);

        while(pending_requests_) {
            // FIXME: This is crazy. Figure out how to use stable clock!
            const auto deadline = std::chrono::system_clock::now()
                                + std::chrono::milliseconds(500);

            // Get any IO operation that is ready.
            void * tag = {};
            bool ok = true;

            // Wait for the next event to complete in the queue
            const auto status = cq_.AsyncNext(&tag, &ok, deadline);

            // So, here we deal with the first of the three states: The status of Next().
            switch(status) {
            case grpc::CompletionQueue::NextStatus::TIMEOUT:
                LOG_TRACE << "AsyncNext() timed out.";
                continue;

            case grpc::CompletionQueue::NextStatus::GOT_EVENT:
                // Use a scope to allow a new variable inside a case statement.
                {
                    auto handle = static_cast<RequestBase::Handle *>(tag);

                    // Now, let the relevant state-machine deal with the event.
                    // We could have done it here, but that code would smell **really** bad!
                    handle->proceed(ok);
                }
                break;

            case grpc::CompletionQueue::NextStatus::SHUTDOWN:
                LOG_INFO << "SHUTDOWN. Tearing down the gRPC connection(s).";
                return;
            } // switch
        } // event-loop

        LOG_DEBUG << "exiting event-loop";
        close();
    }

    void incCounter() {
        ++pending_requests_;
    }

    void decCounter() {
        --pending_requests_;
    }

private:
    // This is the Queue. It's shared for all the requests.
    ::grpc::CompletionQueue cq_;

    // This is a connection to the gRPC server
    std::shared_ptr<grpc::Channel> channel_;

    // An instance of the client that was generated from our .proto file.
    std::unique_ptr<::routeguide::RouteGuide::Stub> stub_;

    size_t pending_requests_{0};
    const Config config_;
};

The event-loop above can deal with any number of request types and request instances. It's totally generic, as long as the tags are derived from a RequestBase::Handle *.

GetFeature

Now let's take a look at how the implementation of or familiar GetFeature request looks

    /*! Implementation for the `GetFeature()` RPC request.
     */
    class GetFeatureRequest : public RequestBase {
    public:
        GetFeatureRequest(UnaryAndSingleStreamClient& parent)
            : RequestBase(parent) {

            // Initiate the async request.
            rpc_ = parent_.stub_->AsyncGetFeature(&ctx_, req_, &parent_.cq_);
            assert(rpc_);

            // Add the operation to the queue, so we get notified when
            // the request is completed.
            // Note that we use our handle's this as tag. We don't really need the
            // handle in this unary call, but the server implementation need's
            // to iterate over a Handle to deal with the other request classes.
            rpc_->Finish(&reply_, &status_, handle_.tag());

            // Reference-counting of instances of requests in flight
            parent.incCounter();
        }

        void proceed(bool ok, Handle::Operation /*op */) override {
            if (!ok) [[unlikely]] {
                LOG_WARN << boost::typeindex::type_id_runtime(*this).pretty_name()
                         << " - The request failed. Status: " << status_.error_message();
                return;
            }

            if (status_.ok()) {
                // Initiate a new request
                parent_.nextRequest();
            } else {
                LOG_WARN << boost::typeindex::type_id_runtime(*this).pretty_name()
                         << " - The request failed with error-message: " << status_.error_message();
            }

            // The reply is a single message, so at this time we are done.
        }

    private:
        Handle handle_{*this, Handle::Operation::FINISH};

        // We need quite a few variables to perform our single RPC call.
        ::routeguide::Point req_;
        ::routeguide::Feature reply_;
        ::grpc::Status status_;
        std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::routeguide::Feature>> rpc_;
    };

The handle variable, Handle handle_{*this, Handle::Operation::FINISH}; contains a reference to the request handler, and the only operation we need, Handle::Operation::FINISH. Note that the operation enum is ours, and has no meaning to gRPC.

Since the handle use reference-counting to delete the request instance, it will always be deleted just after GetFeatureRequest::proceed() returns.

ListFeatures

So far, the functionality is exactly as it was in the original client, although we have more abstractions and out new shiny Handle ;)

When we use gRPC streams, things gets more interesting.

While GetFeature() only ever issue one async operation, ListFeatures() which take a single argument as a request, and allows the server to send any number of messages in a stream as the reply - in addition to the grpc::Status when it is done.

Let's recap the proto-definition for ListFeatures().

    rpc ListFeatures(Rectangle) returns (stream Feature) {}

A normal work-flow for this client, if the stream has two incoming messages, will go trough these states:

Note that the FAILED_READ is not an error in this case. If the protocol allows a variable number of messages in the stream, you will typically just start a new read operation for each successful read completion, until the read operation fails. The grpc::Status returned when the the finish operation compleat can tell you if there was an actual error.

The ordering of last two events, the failed read and finish appears random. Since we use reference-counting to decide when the request object has handled all it's pending requests, we don't need to care about the order.

Let's take a look at the implementation.

/*! Implementation for the `ListFeatures()` RPC request.
 */
class ListFeaturesRequest : public RequestBase {
public:
    // Now we are implementing an actual, trivial state-machine, as
    // we will read an unknown number of messages.

    ListFeaturesRequest(UnaryAndSingleStreamClient& parent)
        : RequestBase(parent) {

        // Initiate the async request.
        // Note that this time, we have to supply the tag to the gRPC initiation method.
        // That's because we will get an event that the request is in progress
        // before we should (can?) start reading the replies.
        rpc_ = parent_.stub_->AsyncListFeatures(&ctx_, req_, &parent_.cq_, connect_handle.tag());

        // Also register a Finish handler, so we know when we are
        // done or failed. This is where we get the server's status when deal with
        // streams.
        rpc_->Finish(&status_, finish_handle.tag());
    }

    ...
private:
    // We need quite a few variables to perform our single RPC call.

    Handle connect_handle   {*this, Handle::Operation::CONNECT};
    Handle read_handle      {*this, Handle::Operation::READ};
    Handle finish_handle    {*this, Handle::Operation::FINISH};

    ::routeguide::Rectangle req_;
    ::routeguide::Feature reply_;
    ::grpc::Status status_;
    std::unique_ptr< ::grpc::ClientAsyncReader< ::routeguide::Feature>> rpc_;

Note that this requests initiate method AsyncListFeatures() takes a tag. That's because we will be reading from a stream, and we can't start reading until we are "connected", that is, until we have seen that tag in our proceed() method where ok == true.

In addition to starting the connect with a tag from our connect_handle variable, we also register a handler for Finish(). That means that if we run into an error during connect or read, we should get a value in status_ that we can examine when that tag is handed to proceed().

The state-machine has to deal with any one of the three events we may get (depending on the success of the connect).

    // As promised, the state-machine gets more complex when we have
        // streams. In this case, we have three states to deal with on each invocation:
        // 1) The state of the instance - how many async operations have we started?
        //    This is handled by reference-counting, so we don't have to deal with it in
        //    the loop. This greatly reduce the code below.
        // 2) The operation
        // 3) The ok boolean value.
        void proceed(bool ok, Handle::Operation op) override {

            switch(op) {

            case Handle::Operation::CONNECT:
                if (!ok) [[unlikely]] {
                    LOG_WARN << me() << " - The request failed.";
                    return;
                }

                // Now, register a read operation.
                rpc_->Read(&reply_, read_handle.tag());
                break;

            case Handle::Operation::READ:
                if (!ok) [[unlikely]] {
                    LOG_TRACE << me() << " - Failed to read a message.";
                    return;
                }

                // This is where we have an actual message from the server.
                // If this was a framework, this is where we would have called
                // `onListFeatureReceivedOneMessage()` or or unblocked the next statement
                // in a co-routine waiting for the next request

                // In our case, let's just log it.
                LOG_TRACE << me() << " - Request successful. Message: " << reply_.name();


                // Prepare the reply-object to be re-used.
                // This is usually cheaper than creating a new one for each read operation.
                reply_.Clear();

                // Now, lets register another read operation
                rpc_->Read(&reply_, read_handle.tag());
                break;

            case Handle::Operation::FINISH:
                if (!ok) [[unlikely]] {
                    LOG_WARN << me() << " - Failed to FINISH! Status: " << status_.error_message();
                    return;
                }

                if (!status_.ok()) {
                    LOG_WARN << me() << " - The request finished with error-message: " << status_.error_message();
                }
                break;

            default:
                LOG_ERROR << me()
                          << " - Unexpected operation in state-machine: "
                          << static_cast<int>(op);

            } // state
        }

        std::string me() const {
            return boost::typeindex::type_id_runtime(*this).pretty_name()
                   + " #" + std::to_string(client_id_);
        }

RecordRoute

The last request we will deal with in this article is RecordRoute.

    rpc RecordRoute(stream Point) returns (RouteSummary) {}

Let's start with the request class, without it's state-machine.

/*! Implementation for the `RecordRoute()` RPC request.
 */
class RecordRouteRequest : public RequestBase {
public:
    // Now we are implementing an actual, trivial state-machine, as
    // we will send a fixed number of messages.

    RecordRouteRequest(UnaryAndSingleStreamClient& parent)
        : RequestBase(parent) {

        // Initiate the async request.
        // Note that this time, we have to supply the tag to the gRPC initiation method.
        // That's because we will get an event that the request is in progress
        // before we should (can?) start writing the requests.
        rpc_ = parent_.stub_->AsyncRecordRoute(&ctx_, &reply_, &parent_.cq_, connect_handle.tag());

        // Initiate a `Finish()` operation so we get the reply-message and a status from the server.
        rpc_->Finish(&status_, finish_handle.tag());
    }

    ...

private:
    // We need quite a few variables to perform our single RPC call.
    size_t sent_messages_ = 0;

    Handle connect_handle   {*this, Handle::Operation::CONNECT};
    Handle write_handle     {*this, Handle::Operation::WRITE};
    Handle write_done_handle{*this, Handle::Operation::WRITE_DONE};
    Handle finish_handle    {*this, Handle::Operation::FINISH};

    ::routeguide::Point req_;
    ::routeguide::RouteSummary reply_;
    ::grpc::Status status_;
    std::unique_ptr< ::grpc::ClientAsyncWriter< ::routeguide::Point>> rpc_;
};

The first thing you might notice is that we now have four state handles. We will only use two at any given time.

Like before, we start two operations in our constructor; connecting to the server with the correct request, and tell gRPC to call our finish tag when the the library has received the servers final Status update.

We are giving the reply_ buffer to AsyncRecordRoute(), and the status_ buffer to Finish(). I'm not sure when the reply_ buffer is filled in. My assumption is that it is not safe for use until we are processing the finish event.

The state-machine is the most complex so far.

    void proceed(bool ok, Handle::Operation op) override {

        switch(op) {

        case Handle::Operation::CONNECT:
            if (!ok) [[unlikely]] {
                LOG_WARN << me() << " - The request failed.";
                break;
            }

            // We are ready to send the first message to the server.
            // If this was a framework, this is where we would have called
            // `onRecordRouteReadyToSendFirst()` or or unblocked the next statement
            // in a co-routine waiting for the next state
            req_.set_latitude(50);
            req_.set_longitude(sent_messages_);
            rpc_->Write(req_, write_handle.tag());
            break;

        case Handle::Operation::WRITE:
            if (!ok) [[unlikely]] {
                LOG_TRACE << me() << " - Failed to write a message.";
                break;
            }

            // This is where we have sent an actual message to the server.
            // If this was a framework, this is where we would have called
            // `onRecordRouteReadyToSendNext()` or or unblocked the next statement
            // in a co-routine waiting for the next state

            if (++sent_messages_ >= parent_.config_.num_stream_messages) {
                LOG_TRACE << me() << " - We are done sending messages.";
                rpc_->WritesDone(write_done_handle.tag());

                // Now we have two pending requests, write done and finish.
                break;
            }

            // Prepare the message-object to be re-used.
            // This is usually cheaper than creating a new one for each read operation.
            req_.Clear();

            req_.set_latitude(100);
            req_.set_longitude(sent_messages_);

            // Now, lets register another write operation
            rpc_->Write(req_, write_handle.tag());
            break;

        case Handle::Operation::WRITE_DONE:
            if (!ok) [[unlikely]] {
                LOG_WARN << me() << " - Failed to notify the server that we are done.";
            }
            break;

        case Handle::Operation::FINISH:
            if (!ok) [[unlikely]] {
                LOG_WARN << me() << " - Failed to FINISH! Status: " << status_.error_message();
                break;
            }

            // This is where we have sent all the message to the server.
            // If this was a framework, this is where we would have called
            // `onRecordRouteGotReply()` or or unblocked the next statement
            // in a co-routine waiting for the next state

            if (!status_.ok()) {
                LOG_WARN << me() << " - The request finished with error-message: " << status_.error_message();
            }
            break;

        default:
            LOG_ERROR << me()
                        << " - Unexpected operation in state-machine: "
                        << static_cast<int>(op);

            assert(false);

        } // state
    }

    std::string me() const {
        return boost::typeindex::type_id_runtime(*this).pretty_name()
                + " #" + std::to_string(client_id_);
    }

The extra state comes from the need to call WritesDone() when we are out of messages to send. This initiates another async operation. At that point we will get one event for WRITE_DONE and one event for FINISH. The the reference count will be 0, and the instance will be deleted.

The complete source code.