Published:

Jarle Aase

Exploring how to implement the client using gRPC async callbacks

bookmark 9 min read

The client interface is different from the server interface. Here we don't override an interface. In stead, we call methods on a stub interface, much like we did in the async version.

The generated code from rpcgen for "callbacks" looks like this:

class async final :
  public StubInterface::async_interface {
 public:
  void GetFeature(::grpc::ClientContext* context, const ::routeguide::Point* request, ::routeguide::Feature* response, std::function<void(::grpc::Status)>) override;
  void GetFeature(::grpc::ClientContext* context, const ::routeguide::Point* request, ::routeguide::Feature* response, ::grpc::ClientUnaryReactor* reactor) override;
  void ListFeatures(::grpc::ClientContext* context, const ::routeguide::Rectangle* request, ::grpc::ClientReadReactor< ::routeguide::Feature>* reactor) override;
  void RecordRoute(::grpc::ClientContext* context, ::routeguide::RouteSummary* response, ::grpc::ClientWriteReactor< ::routeguide::Point>* reactor) override;
  void RouteChat(::grpc::ClientContext* context, ::grpc::ClientBidiReactor< ::routeguide::RouteNote,::routeguide::RouteNote>* reactor) override;
 private:
  friend class Stub;
  explicit async(Stub* stub): stub_(stub) { }
  Stub* stub() { return stub_; }
  Stub* stub_;
};

The unary RPC, GetFeature, has two variants, one taking a std::function<> as argument. We will use that in our example.

The methods we get will initiate an async request to the server. The varrious events that happen then are handled by whatever we put in as tha last argument. For the unary methods where we supply a function, it will be called once when the RPC is complete. That is conveniant. The other methods reqire us to pass a class that overrides the events so we can send/reveive data over the stream.

In the code below, I have wrapped these methods in more conveniant methods, where we can call the RPC and interact with it's events via callbacks. That's well known pattern for most C++ developers.

I have also added methods that shows how to use the wrapper methods.

Let's start with the initialization until we have a usable stub_ where we can call RPC's.

class EverythingCallbackClient {
public:

EverythingCallbackClient(const Config& config)
    : config_{config} {

    LOG_INFO << "Connecting to gRPC service at: " << config.address;
    channel_ = grpc::CreateChannel(config.address, grpc::InsecureChannelCredentials());

    stub_ = ::routeguide::RouteGuide::NewStub(channel_);
    assert(stub_);
}

...

private:
    const Config config_;

    // This is a connection to the gRPC server
    std::shared_ptr<grpc::Channel> channel_;

    // An instance of the client that was generated from our .proto file.
    std::unique_ptr<::routeguide::RouteGuide::Stub> stub_;

GetFeature

Now, lets look at the simplest, unary wrapper method.

We use a class to keep the buffers and state for the RPC. This could have been handled by the caller in this case. However, if you want to call some RPC's from around in you code, it may add a burden to add shared pointers or other means to own buffers. It's simpler to hide these details from the caller and deal with it in the wrapper.

/// Callback function with the result of the unary RPC call
using get_feature_cb_t = std::function<void(const grpc::Status& status,
                                            const ::routeguide::Feature& feature)>;

/*! Naive implementation of `GetFeature` with an actual callback.
 */
void getFeature(::routeguide::Point& point, get_feature_cb_t && fn) {

    // In order to keep the state for the duration of the async request,
    // we use this class.
    class Impl : Base {
    public:
        Impl(EverythingCallbackClient& owner,
             ::routeguide::Point& point,
             get_feature_cb_t && fn)
            : Base(owner), req_{std::move(point)}, caller_callback_{std::move(fn)} {

            LOG_TRACE << "getFeature starting async request.";

            owner_.stub_->async()->GetFeature(&ctx_, &req_, &reply_,
                                       [this](grpc::Status status) {

                LOG_TRACE << "getFeature calling finished callback.";
                caller_callback_(status, reply_);
                delete this;
            });
        }

    private:
        grpc::ClientContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::Feature reply_;
        get_feature_cb_t caller_callback_;
    };

    // This is all our method actually does. It just creates an instance
    // of the implementation class to deal with the request.
    new Impl(*this, point, std::move(fn));
} // getFeature

Then an example on how to use this method.

/*! Example on how to use getFeature() */
void nextGetFeature() {
    // Initiate a new request
    ::routeguide::Point point;
    point.set_latitude(10);
    point.set_longitude(100);

    getFeature(point, [this](const grpc::Status& status,
                             const ::routeguide::Feature& feature) {
        if (status.ok()) {
            LOG_TRACE << "Received feature: " << feature.name();

        } else {
            LOG_TRACE << "#" << recid << " failed: "
                      << status.error_message();
        }
    });
}

Now these unary gRPC calls are becoming conveniant to use even when they are asyncroneous. That's something new ;)

ListFeatures

We continue with a wrapper around ListFeatures.

/*! Data for a callback function suitable for `ListFeatures`.
 *
 *  Here we use a union that either contains a Feature message received
 *  from the stream, or the final Status message. Alternatively we could have
 *  used two callbacks. However, std::variant (C++ unions) can be useful in
 *  such cases as this.
 *
 *  We use a pointer to the Feature so that we don't have to make a deep
 *  copy for each received message just for the purpose of "doing the right thing" ;)
 */
using feature_or_status_t = std::variant<
        // I would have preferred a reference, but that don't work in C++ 20 in variants
        const ::routeguide::Feature *,
        grpc::Status
    >;

/*! A callback function suitable to handle the events following a ListFeatures RPC
 */
using list_features_cb_t = std::function<void(feature_or_status_t)>;

/*! Naive implementation of `ListFeatures` with a callback for the events.
 */
void listFeatures(::routeguide::Rectangle& rect, list_features_cb_t&& fn) {

    /*! A class that deals with this RPC
     *
     *  It overrides the events we need, owns our buffers and implements
     *  our boilerplate code needed to call the user supplied callback.
     */
    class Impl
        // Our shared code for all the RPC's we implementn here
        : Base
        // The async gRPC stream interface for this RPC
        , grpc::ClientReadReactor<::routeguide::Feature> {
    public:

        Impl(EverythingCallbackClient& owner,
             ::routeguide::Rectangle& rect,
             list_features_cb_t && fn)
            : Base(owner), req_{std::move(rect)}, caller_callback_{std::move(fn)} {

            LOG_TRACE << "listFeatures starting async request.";

            // Glue this instance of this class to an initiation
            // of the ListFeatures RPC.
            owner_.stub_->async()->ListFeatures(&ctx_, &req_, this);

            // Initiate the first async read.
            // Since StartCall is not yet called, the operation is
            // queued by gRPC.
            StartRead(&resp_);

            // Here we initiate the actual async RPC
            StartCall();
        }

        /*! Callback event when a read operation is complete */
        void OnReadDone(bool ok) override {

            // We go on reading while ok
            if (ok) {
                LOG_TRACE << "Request successful. Message: " << resp_.name();

                caller_callback_(&resp_);

                resp_.Clear();
                return StartRead(&resp_);
            }

            LOG_TRACE << "Read failed (end of stream?)";
        }

        /*! Callback event when the RPC is complete */
        void OnDone(const grpc::Status& s) override {
            if (s.ok()) {
                LOG_TRACE << "Request succeeded.";
            } else {
                LOG_WARN << "Request failed: " << s.error_message();
            }

            caller_callback_(s);
            delete this;
        }

    private:
        grpc::ClientContext ctx_;
        ::routeguide::Rectangle req_;
        ::routeguide::Feature resp_;
        list_features_cb_t caller_callback_;
    };

    // All this method actually does.
    new Impl(*this, rect, std::move(fn));
} // listFeatures

That was more code. Now let's look at how to use this wrapper:

/*! Example on how to use listFeatures() */
void nextListFeatures() {
    ::routeguide::Rectangle rect;
    rect.mutable_hi()->set_latitude(1);
    rect.mutable_hi()->set_longitude(2);

    LOG_TRACE << "Calling listFeatures";

    listFeatures(rect, [this](feature_or_status_t val) {

        if (std::holds_alternative<const ::routeguide::Feature *>(val)) {
            auto feature = std::get<const ::routeguide::Feature *>(val);

            LOG_TRACE << "nextListFeatures - Received feature: " << feature->name();
        } else if (std::holds_alternative<grpc::Status>(val)) {
            auto status = std::get<grpc::Status>(val);
            if (status.ok()) {
                LOG_TRACE << "nextListFeatures done. Initiating next request ...";
                nextRequest();
            } else {
                LOG_TRACE << "nextListFeatures failed: " <<  status.error_message();
            }
        } else {
            assert(false && "unexpected value type in variant!");
        }

    });
}

Now, look at that! That's a neat callback interface to a stream RPC!

RecordRoute

This wrapper is similar to the previous one. Just with the stream in the other direction.

/*! Definition of a callback function that need to provide the data for a write.
 *
 *  The function must return immediately.
 *
 *  \param point. The buffer for the data.
 *  \return true if we are to write data, false if all data has been written and
 *      we are done.
 */
using on_ready_to_write_point_cb_t = std::function<bool(::routeguide::Point& point)>;

/*! Definition of a callback function that is called when the RPC is complete.
 *
 *  \param status. The status for the RPC.
 *  \param summary. The reply from the server. Only valid if `staus.ok()` is true.
 */
using on_done_route_summary_cb_t = std::function<
    void(const grpc::Status& status, ::routeguide::RouteSummary& summary)>;

/*! Naive implementation of `RecordRoute` with callbacks for the events.
 */
void recordRoute(on_ready_to_write_point_cb_t&& writerCb, on_done_route_summary_cb_t&& doneCb) {

    /*! A class that deals with this RPC
     *
     *  It overrides the events we need, owns our buffers and implements
     *  our boilerplate code needed to call the user supplied callback.
     */
    class Impl
        // Our shared code for all the RPC's we implement here
        : Base
        // The async gRPC stream interface for this RPC
        , grpc::ClientWriteReactor<::routeguide::Point> {
    public:
        Impl(EverythingCallbackClient& owner,
             on_ready_to_write_point_cb_t& writerCb,
             on_done_route_summary_cb_t& doneCb)
            : Base(owner), writer_cb_{std::move(writerCb)}
            , done_cb_{std::move(doneCb)}
        {
            LOG_TRACE << "recordRoute starting async request.";

            // Glue this instance of this class to an initiation
            // of the RecordRoute RPC.
            owner_.stub_->async()->RecordRoute(&ctx_, &resp_, this);

            // Start the first async write operation on the stream
            write();

            // Here we will initiate the actual async RPC
            StartCall();
        }

        /*! Callback event when a write operation is complete */
        void OnWriteDone(bool ok) override {
            if (!ok) [[unlikely]] {
                LOG_WARN << "RecordRoute - Failed to write to the stream: ";

                // Once failed here, we cannot write any more.
                return StartWritesDone();
            }

            // One write operation was completed.
            // Off we go with the next one.
            write();
        }

        /*! Callback event when the RPC is complete */
        void OnDone(const grpc::Status& s) override {
            done_cb_(s, resp_);
            delete this;
        }

    private:

        // Initiate a new async write operation (if appropriate).
        void write() {
            // Get another message from the caller
            req_.Clear();
            if (writer_cb_(req_)) {

                // Note that if we had implemented a delayed `StartWrite()`, for example
                // by relaying the event to a task manager like `boost::asio::io_service::post()`,
                // we would need to call `AddHold()` either here or in the constructor.
                // Only when we had called `RemoveHold()` the same number of times, would
                // gRPC consider calling the `OnDone()` event method.

                return StartWrite(&req_);
            }

            // The caller don't have any further data to write
            StartWritesDone();
        }

        grpc::ClientContext ctx_;
        ::routeguide::Point req_;
        ::routeguide::RouteSummary resp_;
        on_ready_to_write_point_cb_t writer_cb_;
        on_done_route_summary_cb_t done_cb_;
    };

    // All this method actually does.
    new Impl(*this, writerCb, doneCb);
} // recordRoute

Then an example on how to use the wrapper:

/*! Example on how to use recordRoute() */
void nextRecordRoute() {

    recordRoute(
        // Callback to provide data to send to the server
        // Note that we instantiate a local variable `count` that lives
        // in the scope of one instance of the lambda function.
        [this, count=size_t{0}](::routeguide::Point& point) mutable {
            if (++count > config_.num_stream_messages) [[unlikely]] {
                // We are done
                return false;
            }

            // Just pick some data to set.
            // In a real implementation we would have to get prerpared data or
            // data from a quick calculation. We have to return immediately since
            // we are using one of gRPC's worker threads.
            // If we needed to do some work, like fetching from a database, we
            // would need another workflow where the event was dispatched to a
            // task manager or thread-pool, argument was a write functor rather than
            // the data object itself.
            point.set_latitude(count);
            point.set_longitude(100);

            LOG_TRACE << "RecordRoute reuest# sending latitude " << count;

            return true;
        },

        // Callback to handle the completion of the request and its status/reply.
        [this](const grpc::Status& status, ::routeguide::RouteSummary& summery) mutable {
            if (!status.ok()) {
                LOG_WARN << "RecordRoute request failed: " << status.error_message();
                return;
            }

            LOG_TRACE << "RecordRoute request is done. Distance: "
                      << summery.distance();

            nextRequest();
        });
}

In this example we have some logic to just send n messages. But as you can see, the use of the wrapper is fairly simple.

RouteChat

Finally, the wrapper over a RPC with a bidirectional stream using callbacks.

/*! Definition of a callback function to provide the next outgoing message.
 *
 *  The function must return immediately.
 *
 *  \param msg Buffer for the data to send when the function returns.
 *  \return true if we are to send the message, false if we are done
 *      sending messages.
 */
using on_say_something_cb_t = std::function<bool(::routeguide::RouteNote& msg)>;

/*! Definition of a callback function regarding an incoming message.
 *
 *  The function must return immediately.
 */
using on_got_message_cb_t = std::function<void(::routeguide::RouteNote&)>;

/*! Definition of a callback function to notify us that the RPC is complete.
 *
 *  The function must return immediately.
 */
using on_done_status_cb_t = std::function<void(const grpc::Status&)>;


/*! Naive implementation of `RecordRoute` with callbacks for the events.
 *
 *  As before, we are prepared for a shouting contest, and will start sending
 *  message as soon as the RPC connection is established.
 *
 *  \param outgoing Callback function to provide new messages to send.
 *  \param incoming Callback function to notify us about an incoming message.
 *  \param done Callcack to inform us that the RPC is completed, and if
 *      it was successful.
 */
void routeChat(on_say_something_cb_t&& outgoing,
               on_got_message_cb_t&& incoming,
               on_done_status_cb_t&& done) {

    class Impl
        // Our shared code for all the RPC's we implement here
        : Base
        // The async gRPC stream interface for this RPC
        , grpc::ClientBidiReactor<::routeguide::RouteNote,
                                  ::routeguide::RouteNote>
    {
    public:
        Impl(EverythingCallbackClient& owner,
             on_say_something_cb_t& outgoing,
             on_got_message_cb_t& incoming,
             on_done_status_cb_t& done)
            : Base(owner), outgoing_{std::move(outgoing)}
            , incoming_{std::move(incoming)}
            , done_{std::move(done)} {

            LOG_TRACE << "routeChat starting async request.";

            // Glue this instance of this class to an initiation
            // of the RouteChat RPC.
            owner_.stub_->async()->RouteChat(&ctx_, this);

            // Start sending the first outgoing message
            read();

            // Start receiving the first incoming message
            write();

            // Here we will initiate the actual async RPC
            // The queued requests for read and write will be initiated.
            StartCall();

        }

        /*! Callback event when a write operation is complete */
        void OnWriteDone(bool ok) override {
            write();
        }

        /*! Callback event when a read operation is complete */
        void OnReadDone(bool ok) override {
            if (ok) {
                incoming_(in_);
                read();
            }
        }

        /*! Callback event when the RPC is complete */
        void OnDone(const grpc::Status& s) override {
            done_(s);
            delete this;
        }

    private:
        // Initiate a new async read operation
        void read() {
            in_.Clear();
            StartRead(&in_);
        }

        // Initiate a new async write operation (if appropriate).
        void write() {
            out_.Clear();
            if (outgoing_(out_)) {
                return StartWrite(&out_);
            }

            StartWritesDone();
        }

        grpc::ClientContext ctx_;
        ::routeguide::RouteNote in_;
        ::routeguide::RouteNote out_;
        on_say_something_cb_t outgoing_;
        on_got_message_cb_t incoming_;
        on_done_status_cb_t done_;
    };

    // All this method actually does.
    new Impl(*this, outgoing, incoming, done);
} // routeChat

And the example on how to use the wrapper.

 /*! Example on how to use routeChat() */
void nextRouteChat() {

    routeChat(
        // Compose an outgoing message
        [this, count=size_t{0}](::routeguide::RouteNote& msg) mutable {
            if (++count > config_.num_stream_messages) [[unlikely]] {
                // We are done
                return false;
            }

            // Say something thoughtful to make us look smart.
            // Something to print on T-shirts and make memes from ;)
            msg.set_message(std::string{"chat message "} + std::to_string(count));

            LOG_TRACE << "RouteChat reuest outgoing message " << count;

            return true;
        },
        // We received an incoming message
        [this](::routeguide::RouteNote& msg) {
            LOG_TRACE << "RouteChat reuest incoming message: " << msg.message();
        },
        // The conversation is over.
        [this](const grpc::Status& status) {
            if (!status.ok()) {
                LOG_WARN << "RouteChat reuest failed: " << status.error_message();
                return;
            }

            LOG_TRACE << "RecordRoute request is done.";
            nextRequest();
        }
        );
}

Now, with this abstraction, it's simple to use a bidirectional stream in an actual project.

I wrote the wrappers to highlight what I believe is a proper way to expose a "callback" interface.

When I write code, I try to keep one method (or class) focused on one thing. If I write a method that insert date into a database, the method should compose the data. The algorithm in that method should be clear for anyone that took a peek at the code. That means that the same method should not deal with the specifics of the database, like opening a handle, deal with connects or re-connects and close the handle in all of the i return paths. When I deal with the data, the database operation should be limited to something like db.write(data);.

Likewise when using RPC's. Some methods may go in detail on how to use the gRPC stubs. But when I need to use one of those RPC's, I don't want the details. I want just what I need to implement my algorithm - in this case for example an event-handler to provide more data, and an event-handler when it's done.

The complete source code.

That concludes the dive into the gRPC callback interface for now.