Published:

Jarle Aase

Implementing the full routeguide async client

bookmark 5 min read

Now, let's re-use the abstractions we created for the server to implement the final client using the gRPC async interface (or "stub" to be more accurate).

Before we look at the request implementation, lets take brief look at the client override for the event-loop.

    class EverythingClient
    : public EventLoopBase<ClientVars<::routeguide::RouteGuide>> {
    public:

        ...
        EverythingClient(const Config& config)
            : EventLoopBase(config) {


            LOG_INFO << "Connecting to gRPC service at: " << config.address;
            grpc_.channel_ = grpc::CreateChannel(config.address, grpc::InsecureChannelCredentials());

            grpc_.stub_ = ::routeguide::RouteGuide::NewStub(grpc_.channel_);
            assert(grpc_.stub_);

            // Add request(s)
            LOG_DEBUG << "Creating " << config_.parallel_requests
                    << " initial request(s) of type " << config_.request_type;

            for(auto i = 0; i < config_.parallel_requests;  ++i) {
                nextRequest();
            }
        }

    private:
        size_t request_count_{0};

In the constructor we set up the connection to the gRPC server, we create in instance of the "stub" the code generator created from our proto file and finally we call nextRequest() to initialize the first batch of outgoing requests. I have omitted the code for request-creation here, as it's irrelevant for the much more interesting gRPC Request code. The complete source code, including the test-client and test-server that consume and allow us to execute all the code we have been trough, is available on github.

GetFeature

Let's start with GetFeature as before.

    class GetFeatureRequest : public RequestBase {
    public:

        GetFeatureRequest(EverythingClient& owner)
            : RequestBase(owner) {

            // Initiate the async request.
            rpc_ = owner.grpc().stub_->AsyncGetFeature(&ctx_, req_, cq());

            // Add the operation to the queue. We will be notified when
            // the request is completed.
            rpc_->Finish(&reply_, &status_, handle_.tag(
                Handle::Operation::FINISH,
                [this, &owner](bool ok, Handle::Operation /* op */) {

                    if (!ok) [[unlikely]] {
                    LOG_WARN << me(*this) << " - The request failed.";
                        return;
                    }

                    if (!status_.ok()) {
                        LOG_WARN << me(*this) << " - The request failed with error-message: "
                                 << status_.error_message();
                    }
                }));
        }

    private:
        Handle handle_{*this};

        // We need quite a few variables to perform our single RPC call.
        ::grpc::ClientContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::Feature reply_;
        ::grpc::Status status_;
        std::unique_ptr< ::grpc::ClientAsyncResponseReader<decltype(reply_)>> rpc_;

    }; // GetFeatureRequest

It's quite simple, with the bulk of the code in one lamda function dealing with the Finish event.

There is still a little to much boilerplate code to declare the required variables and initiation. We could add another layer of abstraction by creating a Request template for the unary rpc request type. However, the gRPC code generator gives us little help to achieve that. It would have been nice if it gave us typedefs for the three variable types for req_, reply_ and rpc_. We could probably deduce the types from the initiator and finish method using some insane template meta-programming hacks, but I'm not going down that rabbit hole today. It would have been so much easier if the code generator just added the using statements for us :/

Let's continue with ListFeatures.

ListFeatures

We follow the same pattern as we did in the servers stream methods. We put the code to deal with an event-type in the lambda for that event. We shuffle the logic shared between connect and read in the read() method.

In the constructor, we initiate both a connect/request operation and a Finish operation. Hence, we have two Handle variables.

    class ListFeaturesRequest : public RequestBase {
    public:

        ListFeaturesRequest(EverythingClient& owner)
            : RequestBase(owner) {

            // Initiate the async request.
            rpc_ = owner.grpc().stub_->AsyncListFeatures(&ctx_, req_, cq(), op_handle_.tag(
                Handle::Operation::CONNECT,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        LOG_WARN << me(*this) << " - The request failed (connect).";
                        return;
                    }

                    read(true);
            }));

            rpc_->Finish(&status_, finish_handle_.tag(
                Handle::Operation::FINISH,
                [this](bool ok, Handle::Operation /* op */) mutable {
                    if (!ok) [[unlikely]] {
                        LOG_WARN << me(*this) << " - The request failed (connect).";
                        return;
                    }

                    if (!status_.ok()) {
                        LOG_WARN << me(*this) << " - The request finished with error-message: "
                                 << status_.error_message();
                    }
            }));
        }

    private:
        void read(const bool first) {

            if (!first) {
                // This is where we have an actual message from the server.
                // If this was a framework, this is where we would have called
                // `onListFeatureReceivedOneMessage()` or or unblocked the next statement
                // in a co-routine waiting for the next request

                // In our case, let's just log it.
                LOG_TRACE << me(*this) << " - Request successful. Message: " << reply_.name();

                // Prepare the reply-object to be re-used.
                // This is usually cheaper than creating a new one for each read operation.
                reply_.Clear();
            }

            // Now, lets register another read operation
            rpc_->Read(&reply_, op_handle_.tag(
                                    Handle::Operation::READ,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        LOG_TRACE << me(*this) << " - The read-request failed.";
                        return;
                    }

                    read(false);
                }));
        }

        Handle op_handle_{*this};
        Handle finish_handle_{*this};

        ::grpc::ClientContext ctx_;
        ::routeguide::Rectangle req_;
        ::routeguide::Feature reply_;
        ::grpc::Status status_;
        std::unique_ptr< ::grpc::ClientAsyncReader< decltype(reply_)>> rpc_;
    }; // ListFeaturesRequest

RecordRouteRequest

This is very similar to the previous example. We are just sending in stead of receiving over the stream.

    class RecordRouteRequest : public RequestBase {
    public:

        RecordRouteRequest(EverythingClient& owner)
            : RequestBase(owner) {

            // Initiate the async request (connect).
            rpc_ = owner.grpc().stub_->AsyncRecordRoute(&ctx_, &reply_, cq(), io_handle_.tag(
                Handle::Operation::CONNECT,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        LOG_WARN << me(*this) << " - The request failed (connect).";
                        return;
                    }

                    // The server will not send anything until we are done writing.
                    // So let's get started.

                    write(true);
               }));

            // Register a handler to be called when the server has sent a reply and final status.
            rpc_->Finish(&status_, finish_handle_.tag(
                Handle::Operation::FINISH,
                [this](bool ok, Handle::Operation /* op */) mutable {
                    if (!ok) [[unlikely]] {
                        LOG_WARN << me(*this) << " - The request failed (connect).";
                        return;
                    }

                    if (!status_.ok()) {
                        LOG_WARN << me(*this) << " - The request finished with error-message: "
                                 << status_.error_message();
                    }
               }));
        }

    private:
        void write(const bool first) {

            if (!first) {
                req_.Clear();
            }

            if (++sent_messages_ > owner_.config().num_stream_messages) {

                LOG_TRACE << me(*this) << " - We are done writing to the stream.";

                rpc_->WritesDone(io_handle_.tag(
                    Handle::Operation::WRITE_DONE,
                    [this](bool ok, Handle::Operation /* op */) {
                        if (!ok) [[unlikely]] {
                            LOG_TRACE << me(*this) << " - The writes-done request failed.";
                            return;
                        }

                        LOG_TRACE << me(*this) << " - We have told the server that we are done writing.";
                    }));

                return;
            }

            // Send some data to the server
            req_.set_latitude(100);
            req_.set_longitude(sent_messages_);

            // Now, lets register another write operation
            rpc_->Write(req_, io_handle_.tag(
                Handle::Operation::WRITE,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        LOG_TRACE << me(*this) << " - The write-request failed.";
                        return;
                    }

                    write(false);
                }));
        }

        Handle io_handle_{*this};
        Handle finish_handle_{*this};
        size_t sent_messages_ = 0;

        ::grpc::ClientContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::RouteSummary reply_;
        ::grpc::Status status_;
        std::unique_ptr<  ::grpc::ClientAsyncWriter< ::routeguide::Point>> rpc_;
    }; // RecordRouteRequest

The final example is the bidirectional stream. Like in the server, we implement a Real Internet Chat (tm), where we just yell at the receiver, until we have yelled what was on our mind. Then we finish and wait for the server to say its final bits (the Status). Simultaneously, we read the messages from the server and discard them (like Real Internet Discussion Participants) until they have the decency to shut up.

class RouteChatRequest : public RequestBase {
public:

    RouteChatRequest(EverythingClient& owner)
        : RequestBase(owner) {

        // Initiate the async request.
        rpc_ = owner.grpc().stub_->AsyncRouteChat(&ctx_, cq(), in_handle_.tag(
            Handle::Operation::CONNECT,
            [this](bool ok, Handle::Operation /* op */) {
                if (!ok) [[unlikely]] {
                    LOG_WARN << me(*this) << " - The request failed (connect).";
                    return;
                }

                // We are initiating both reading and writing.
                // Some clients may initiate only a read or a write at this time,
                // depending on the use-case.
                read(true);
                write(true);
            }));

        rpc_->Finish(&status_, finish_handle_.tag(
            Handle::Operation::FINISH,
            [this](bool ok, Handle::Operation /* op */) mutable {
                if (!ok) [[unlikely]] {
                    LOG_WARN << me(*this) << " - The request failed (finish).";
                    return;
                }

                if (!status_.ok()) {
                    LOG_WARN << me(*this) << " - The request finished with error-message: "
                             << status_.error_message();
               }
            }));
    }

private:
    void read(const bool first) {

        if (!first) {
            // This is where we have an actual message from the server.
            // If this was a framework, this is where we would have called
            // `onListFeatureReceivedOneMessage()` or or unblocked the next statement
            // in a co-routine waiting for the next request

            // In our case, let's just log it.
            LOG_TRACE << me(*this) << " - Request successful. Message: " << reply_.message();
            reply_.Clear();
        }

        // Now, lets register another read operation
        rpc_->Read(&reply_, in_handle_.tag(
            Handle::Operation::READ,
            [this](bool ok, Handle::Operation /* op */) {
                if (!ok) [[unlikely]] {
                    LOG_TRACE << me(*this) << " - The read-request failed.";
                    return;
                }

                read(false);
            }));
    }

    void write(const bool first) {

        if (!first) {
            req_.Clear();
        }

        if (++sent_messages_ > owner_.config().num_stream_messages) {

            LOG_TRACE << me(*this) << " - We are done writing to the stream.";

            rpc_->WritesDone(out_handle_.tag(
                Handle::Operation::WRITE_DONE,
                [this](bool ok, Handle::Operation /* op */) {
                    if (!ok) [[unlikely]] {
                        LOG_TRACE << me(*this) << " - The writes-done request failed.";
                        return;
                    }

                    LOG_TRACE << me(*this) << " - We have told the server that we are done writing.";
              }));

            return;
        }

        // Now, lets register another write operation
        rpc_->Write(req_, out_handle_.tag(
            Handle::Operation::WRITE,
            [this](bool ok, Handle::Operation /* op */) {
                if (!ok) [[unlikely]] {
                    LOG_TRACE << me(*this) << " - The write-request failed.";
                    return;
                }

                write(false);
            }));
    }

    Handle in_handle_{*this};
    Handle out_handle_{*this};
    Handle finish_handle_{*this};
    size_t sent_messages_ = 0;

    ::grpc::ClientContext ctx_;
    ::routeguide::RouteNote req_;
    ::routeguide::RouteNote reply_;
    ::grpc::Status status_;
    std::unique_ptr<  ::grpc::ClientAsyncReaderWriter< ::routeguide::RouteNote, ::routeguide::RouteNote>> rpc_;
};

The code is similar to the server implementation, except that we don't get the final bits to say ;)

The complete source code.

That concludes our walk-trough of how to use the gRPC async interfaces/stub.

The next (planned) articles will look at the callback interface to gRPC.