Published:

Jarle Aase

Exploring how to implement the client using gRPC async callbacks

bookmark 9 min read

The client interface is different from the server interface. Here we don't override an interface. In stead, we call methods on a stub interface, much like we did in the async version.

The generated code from rpcgen for "callbacks" looks like this:

 1
 2class async final :
 3    public StubInterface::async_interface {
 4    public:
 5    void GetFeature(::grpc::ClientContext* context, const ::routeguide::Point* request, ::routeguide::Feature* response, std::function<void(::grpc::Status)>) override;
 6    void GetFeature(::grpc::ClientContext* context, const ::routeguide::Point* request, ::routeguide::Feature* response, ::grpc::ClientUnaryReactor* reactor) override;
 7    void ListFeatures(::grpc::ClientContext* context, const ::routeguide::Rectangle* request, ::grpc::ClientReadReactor< ::routeguide::Feature>* reactor) override;
 8    void RecordRoute(::grpc::ClientContext* context, ::routeguide::RouteSummary* response, ::grpc::ClientWriteReactor< ::routeguide::Point>* reactor) override;
 9    void RouteChat(::grpc::ClientContext* context, ::grpc::ClientBidiReactor< ::routeguide::RouteNote,::routeguide::RouteNote>* reactor) override;
10    private:
11    friend class Stub;
12    explicit async(Stub* stub): stub_(stub) { }
13    Stub* stub() { return stub_; }
14    Stub* stub_;
15};
16

The unary RPC, GetFeature, has two variants, one taking a std::function<> as argument. We will use that in our example.

The methods we get will initiate an async request to the server. The varrious events that happen then are handled by whatever we put in as tha last argument. For the unary methods where we supply a function, it will be called once when the RPC is complete. That is conveniant. The other methods reqire us to pass a class that overrides the events so we can send/reveive data over the stream.

In the code below, I have wrapped these methods in more conveniant methods, where we can call the RPC and interact with it's events via callbacks. That's well known pattern for most C++ developers.

I have also added methods that shows how to use the wrapper methods.

Let's start with the initialization until we have a usable stub_ where we can call RPC's.

 1
 2class EverythingCallbackClient {
 3public:
 4
 5EverythingCallbackClient(const Config& config)
 6    : config_{config} {
 7
 8    LOG_INFO << "Connecting to gRPC service at: " << config.address;
 9    channel_ = grpc::CreateChannel(config.address, grpc::InsecureChannelCredentials());
10
11    stub_ = ::routeguide::RouteGuide::NewStub(channel_);
12    assert(stub_);
13}
14
15...
16
17private:
18    const Config config_;
19
20    // This is a connection to the gRPC server
21    std::shared_ptr<grpc::Channel> channel_;
22
23    // An instance of the client that was generated from our .proto file.
24    std::unique_ptr<::routeguide::RouteGuide::Stub> stub_;
25

GetFeature

Now, lets look at the simplest, unary wrapper method.

We use a class to keep the buffers and state for the RPC. This could have been handled by the caller in this case. However, if you want to call some RPC's from around in you code, it may add a burden to add shared pointers or other means to own buffers. It's simpler to hide these details from the caller and deal with it in the wrapper.

 1
 2    /// Callback function with the result of the unary RPC call
 3    using get_feature_cb_t = std::function<void(const grpc::Status& status,
 4                                                const ::routeguide::Feature& feature)>;
 5
 6    /*! Naive implementation of `GetFeature` with an actual callback.
 7     */
 8    void getFeature(::routeguide::Point& point, get_feature_cb_t && fn) {
 9
10        // In order to keep the state for the duration of the async request,
11        // we use this class.
12        class Impl : Base {
13        public:
14            Impl(EverythingCallbackClient& owner,
15                 ::routeguide::Point& point,
16                 get_feature_cb_t && fn)
17                : Base(owner), req_{std::move(point)}, caller_callback_{std::move(fn)} {
18
19                LOG_TRACE << "getFeature starting async request.";
20
21                owner_.stub_->async()->GetFeature(&ctx_, &req_, &reply_,
22                                           [this](grpc::Status status) {
23
24                    LOG_TRACE << "getFeature calling finished callback.";
25                    caller_callback_(status, reply_);
26                    delete this;
27                });
28            }
29
30        private:
31            grpc::ClientContext ctx_;
32            ::routeguide::Point req_;
33            ::routeguide::Feature reply_;
34            get_feature_cb_t caller_callback_;
35        };
36
37        // This is all our method actually does. It just creates an instance
38        // of the implementation class to deal with the request.
39        new Impl(*this, point, std::move(fn));
40    } // getFeature
41

Then an example on how to use this method.

 1
 2    /*! Example on how to use getFeature() */
 3    void nextGetFeature() {
 4        // Initiate a new request
 5        ::routeguide::Point point;
 6        point.set_latitude(10);
 7        point.set_longitude(100);
 8
 9        getFeature(point, [this](const grpc::Status& status,
10                                 const ::routeguide::Feature& feature) {
11            if (status.ok()) {
12                LOG_TRACE << "Received feature: " << feature.name();
13
14            } else {
15                LOG_TRACE << "#" << recid << " failed: "
16                          << status.error_message();
17            }
18        });
19    }
20

Now these unary gRPC calls are becoming conveniant to use even when they are asyncroneous. That's something new ;)

ListFeatures

We continue with a wrapper around ListFeatures.

 1
 2    /*! Data for a callback function suitable for `ListFeatures`.
 3     *
 4     *  Here we use a union that either contains a Feature message received
 5     *  from the stream, or the final Status message. Alternatively we could have
 6     *  used two callbacks. However, std::variant (C++ unions) can be useful in
 7     *  such cases as this.
 8     *
 9     *  We use a pointer to the Feature so that we don't have to make a deep
10     *  copy for each received message just for the purpose of "doing the right thing" ;)
11     */
12    using feature_or_status_t = std::variant<
13            // I would have preferred a reference, but that don't work in C++ 20 in variants
14            const ::routeguide::Feature *,
15            grpc::Status
16        >;
17
18    /*! A callback function suitable to handle the events following a ListFeatures RPC
19     */
20    using list_features_cb_t = std::function<void(feature_or_status_t)>;
21
22    /*! Naive implementation of `ListFeatures` with a callback for the events.
23     */
24    void listFeatures(::routeguide::Rectangle& rect, list_features_cb_t&& fn) {
25
26        /*! A class that deals with this RPC
27         *
28         *  It overrides the events we need, owns our buffers and implements
29         *  our boilerplate code needed to call the user supplied callback.
30         */
31        class Impl
32            // Our shared code for all the RPC's we implementn here
33            : Base
34            // The async gRPC stream interface for this RPC
35            , grpc::ClientReadReactor<::routeguide::Feature> {
36        public:
37
38            Impl(EverythingCallbackClient& owner,
39                 ::routeguide::Rectangle& rect,
40                 list_features_cb_t && fn)
41                : Base(owner), req_{std::move(rect)}, caller_callback_{std::move(fn)} {
42
43                LOG_TRACE << "listFeatures starting async request.";
44
45                // Glue this instance of this class to an initiation
46                // of the ListFeatures RPC.
47                owner_.stub_->async()->ListFeatures(&ctx_, &req_, this);
48
49                // Initiate the first async read.
50                // Since StartCall is not yet called, the operation is
51                // queued by gRPC.
52                StartRead(&resp_);
53
54                // Here we initiate the actual async RPC
55                StartCall();
56            }
57
58            /*! Callback event when a read operation is complete */
59            void OnReadDone(bool ok) override {
60
61                // We go on reading while ok
62                if (ok) {
63                    LOG_TRACE << "Request successful. Message: " << resp_.name();
64
65                    caller_callback_(&resp_);
66
67                    resp_.Clear();
68                    return StartRead(&resp_);
69                }
70
71                LOG_TRACE << "Read failed (end of stream?)";
72            }
73
74            /*! Callback event when the RPC is complete */
75            void OnDone(const grpc::Status& s) override {
76                if (s.ok()) {
77                    LOG_TRACE << "Request succeeded.";
78                } else {
79                    LOG_WARN << "Request failed: " << s.error_message();
80                }
81
82                caller_callback_(s);
83                delete this;
84            }
85
86        private:
87            grpc::ClientContext ctx_;
88            ::routeguide::Rectangle req_;
89            ::routeguide::Feature resp_;
90            list_features_cb_t caller_callback_;
91        };
92
93        // All this method actually does.
94        new Impl(*this, rect, std::move(fn));
95    } // listFeatures
96

That was more code. Now let's look at how to use this wrapper:

 1
 2    /*! Example on how to use listFeatures() */
 3    void nextListFeatures() {
 4        ::routeguide::Rectangle rect;
 5        rect.mutable_hi()->set_latitude(1);
 6        rect.mutable_hi()->set_longitude(2);
 7
 8        LOG_TRACE << "Calling listFeatures";
 9
10        listFeatures(rect, [this](feature_or_status_t val) {
11
12            if (std::holds_alternative<const ::routeguide::Feature *>(val)) {
13                auto feature = std::get<const ::routeguide::Feature *>(val);
14
15                LOG_TRACE << "nextListFeatures - Received feature: " << feature->name();
16            } else if (std::holds_alternative<grpc::Status>(val)) {
17                auto status = std::get<grpc::Status>(val);
18                if (status.ok()) {
19                    LOG_TRACE << "nextListFeatures done. Initiating next request ...";
20                    nextRequest();
21                } else {
22                    LOG_TRACE << "nextListFeatures failed: " <<  status.error_message();
23                }
24            } else {
25                assert(false && "unexpected value type in variant!");
26            }
27
28        });
29    }
30

Now, look at that! That's a neat callback interface to a stream RPC!

RecordRoute

This wrapper is similar to the previous one. Just with the stream in the other direction.

  1
  2    /*! Definition of a callback function that need to provide the data for a write.
  3     *
  4     *  The function must return immediately.
  5     *
  6     *  \param point. The buffer for the data.
  7     *  \return true if we are to write data, false if all data has been written and
  8     *      we are done.
  9     */
 10    using on_ready_to_write_point_cb_t = std::function<bool(::routeguide::Point& point)>;
 11
 12    /*! Definition of a callback function that is called when the RPC is complete.
 13     *
 14     *  \param status. The status for the RPC.
 15     *  \param summary. The reply from the server. Only valid if `staus.ok()` is true.
 16     */
 17    using on_done_route_summary_cb_t = std::function<
 18        void(const grpc::Status& status, ::routeguide::RouteSummary& summary)>;
 19
 20    /*! Naive implementation of `RecordRoute` with callbacks for the events.
 21     */
 22    void recordRoute(on_ready_to_write_point_cb_t&& writerCb, on_done_route_summary_cb_t&& doneCb) {
 23
 24        /*! A class that deals with this RPC
 25         *
 26         *  It overrides the events we need, owns our buffers and implements
 27         *  our boilerplate code needed to call the user supplied callback.
 28         */
 29        class Impl
 30            // Our shared code for all the RPC's we implement here
 31            : Base
 32            // The async gRPC stream interface for this RPC
 33            , grpc::ClientWriteReactor<::routeguide::Point> {
 34        public:
 35            Impl(EverythingCallbackClient& owner,
 36                 on_ready_to_write_point_cb_t& writerCb,
 37                 on_done_route_summary_cb_t& doneCb)
 38                : Base(owner), writer_cb_{std::move(writerCb)}
 39                , done_cb_{std::move(doneCb)}
 40            {
 41                LOG_TRACE << "recordRoute starting async request.";
 42
 43                // Glue this instance of this class to an initiation
 44                // of the RecordRoute RPC.
 45                owner_.stub_->async()->RecordRoute(&ctx_, &resp_, this);
 46
 47                // Start the first async write operation on the stream
 48                write();
 49
 50                // Here we will initiate the actual async RPC
 51                StartCall();
 52            }
 53
 54            /*! Callback event when a write operation is complete */
 55            void OnWriteDone(bool ok) override {
 56                if (!ok) [[unlikely]] {
 57                    LOG_WARN << "RecordRoute - Failed to write to the stream: ";
 58
 59                    // Once failed here, we cannot write any more.
 60                    return StartWritesDone();
 61                }
 62
 63                // One write operation was completed.
 64                // Off we go with the next one.
 65                write();
 66            }
 67
 68            /*! Callback event when the RPC is complete */
 69            void OnDone(const grpc::Status& s) override {
 70                done_cb_(s, resp_);
 71                delete this;
 72            }
 73
 74        private:
 75
 76            // Initiate a new async write operation (if appropriate).
 77            void write() {
 78                // Get another message from the caller
 79                req_.Clear();
 80                if (writer_cb_(req_)) {
 81
 82                    // Note that if we had implemented a delayed `StartWrite()`, for example
 83                    // by relaying the event to a task manager like `boost::asio::io_service::post()`,
 84                    // we would need to call `AddHold()` either here or in the constructor.
 85                    // Only when we had called `RemoveHold()` the same number of times, would
 86                    // gRPC consider calling the `OnDone()` event method.
 87
 88                    return StartWrite(&req_);
 89                }
 90
 91                // The caller don't have any further data to write
 92                StartWritesDone();
 93            }
 94
 95            grpc::ClientContext ctx_;
 96            ::routeguide::Point req_;
 97            ::routeguide::RouteSummary resp_;
 98            on_ready_to_write_point_cb_t writer_cb_;
 99            on_done_route_summary_cb_t done_cb_;
100        };
101
102        // All this method actually does.
103        new Impl(*this, writerCb, doneCb);
104    } // recordRoute
105

Then an example on how to use the wrapper:

 1
 2    /*! Example on how to use recordRoute() */
 3    void nextRecordRoute() {
 4
 5        recordRoute(
 6            // Callback to provide data to send to the server
 7            // Note that we instantiate a local variable `count` that lives
 8            // in the scope of one instance of the lambda function.
 9            [this, count=size_t{0}](::routeguide::Point& point) mutable {
10                if (++count > config_.num_stream_messages) [[unlikely]] {
11                    // We are done
12                    return false;
13                }
14
15                // Just pick some data to set.
16                // In a real implementation we would have to get prerpared data or
17                // data from a quick calculation. We have to return immediately since
18                // we are using one of gRPC's worker threads.
19                // If we needed to do some work, like fetching from a database, we
20                // would need another workflow where the event was dispatched to a
21                // task manager or thread-pool, argument was a write functor rather than
22                // the data object itself.
23                point.set_latitude(count);
24                point.set_longitude(100);
25
26                LOG_TRACE << "RecordRoute reuest# sending latitude " << count;
27
28                return true;
29            },
30
31            // Callback to handle the completion of the request and its status/reply.
32            [this](const grpc::Status& status, ::routeguide::RouteSummary& summery) mutable {
33                if (!status.ok()) {
34                    LOG_WARN << "RecordRoute request failed: " << status.error_message();
35                    return;
36                }
37
38                LOG_TRACE << "RecordRoute request is done. Distance: "
39                          << summery.distance();
40
41                nextRequest();
42            });
43    }
44

In this example we have some logic to just send n messages. But as you can see, the use of the wrapper is fairly simple.

RouteChat

Finally, the wrapper over a RPC with a bidirectional stream using callbacks.

  1
  2    /*! Definition of a callback function to provide the next outgoing message.
  3     *
  4     *  The function must return immediately.
  5     *
  6     *  \param msg Buffer for the data to send when the function returns.
  7     *  \return true if we are to send the message, false if we are done
  8     *      sending messages.
  9     */
 10    using on_say_something_cb_t = std::function<bool(::routeguide::RouteNote& msg)>;
 11
 12    /*! Definition of a callback function regarding an incoming message.
 13     *
 14     *  The function must return immediately.
 15     */
 16    using on_got_message_cb_t = std::function<void(::routeguide::RouteNote&)>;
 17
 18    /*! Definition of a callback function to notify us that the RPC is complete.
 19     *
 20     *  The function must return immediately.
 21     */
 22    using on_done_status_cb_t = std::function<void(const grpc::Status&)>;
 23
 24
 25    /*! Naive implementation of `RecordRoute` with callbacks for the events.
 26     *
 27     *  As before, we are prepared for a shouting contest, and will start sending
 28     *  message as soon as the RPC connection is established.
 29     *
 30     *  \param outgoing Callback function to provide new messages to send.
 31     *  \param incoming Callback function to notify us about an incoming message.
 32     *  \param done Callcack to inform us that the RPC is completed, and if
 33     *      it was successful.
 34     */
 35    void routeChat(on_say_something_cb_t&& outgoing,
 36                   on_got_message_cb_t&& incoming,
 37                   on_done_status_cb_t&& done) {
 38
 39        class Impl
 40            // Our shared code for all the RPC's we implement here
 41            : Base
 42            // The async gRPC stream interface for this RPC
 43            , grpc::ClientBidiReactor<::routeguide::RouteNote,
 44                                      ::routeguide::RouteNote>
 45        {
 46        public:
 47            Impl(EverythingCallbackClient& owner,
 48                 on_say_something_cb_t& outgoing,
 49                 on_got_message_cb_t& incoming,
 50                 on_done_status_cb_t& done)
 51                : Base(owner), outgoing_{std::move(outgoing)}
 52                , incoming_{std::move(incoming)}
 53                , done_{std::move(done)} {
 54
 55                LOG_TRACE << "routeChat starting async request.";
 56
 57                // Glue this instance of this class to an initiation
 58                // of the RouteChat RPC.
 59                owner_.stub_->async()->RouteChat(&ctx_, this);
 60
 61                // Start sending the first outgoing message
 62                read();
 63
 64                // Start receiving the first incoming message
 65                write();
 66
 67                // Here we will initiate the actual async RPC
 68                // The queued requests for read and write will be initiated.
 69                StartCall();
 70
 71            }
 72
 73            /*! Callback event when a write operation is complete */
 74            void OnWriteDone(bool ok) override {
 75                write();
 76            }
 77
 78            /*! Callback event when a read operation is complete */
 79            void OnReadDone(bool ok) override {
 80                if (ok) {
 81                    incoming_(in_);
 82                    read();
 83                }
 84            }
 85
 86            /*! Callback event when the RPC is complete */
 87            void OnDone(const grpc::Status& s) override {
 88                done_(s);
 89                delete this;
 90            }
 91
 92        private:
 93            // Initiate a new async read operation
 94            void read() {
 95                in_.Clear();
 96                StartRead(&in_);
 97            }
 98
 99            // Initiate a new async write operation (if appropriate).
100            void write() {
101                out_.Clear();
102                if (outgoing_(out_)) {
103                    return StartWrite(&out_);
104                }
105
106                StartWritesDone();
107            }
108
109            grpc::ClientContext ctx_;
110            ::routeguide::RouteNote in_;
111            ::routeguide::RouteNote out_;
112            on_say_something_cb_t outgoing_;
113            on_got_message_cb_t incoming_;
114            on_done_status_cb_t done_;
115        };
116
117        // All this method actually does.
118        new Impl(*this, outgoing, incoming, done);
119    } // routeChat
120

And the example on how to use the wrapper.

 1
 2     /*! Example on how to use routeChat() */
 3    void nextRouteChat() {
 4
 5        routeChat(
 6            // Compose an outgoing message
 7            [this, count=size_t{0}](::routeguide::RouteNote& msg) mutable {
 8                if (++count > config_.num_stream_messages) [[unlikely]] {
 9                    // We are done
10                    return false;
11                }
12
13                // Say something thoughtful to make us look smart.
14                // Something to print on T-shirts and make memes from ;)
15                msg.set_message(std::string{"chat message "} + std::to_string(count));
16
17                LOG_TRACE << "RouteChat reuest outgoing message " << count;
18
19                return true;
20            },
21            // We received an incoming message
22            [this](::routeguide::RouteNote& msg) {
23                LOG_TRACE << "RouteChat reuest incoming message: " << msg.message();
24            },
25            // The conversation is over.
26            [this](const grpc::Status& status) {
27                if (!status.ok()) {
28                    LOG_WARN << "RouteChat reuest failed: " << status.error_message();
29                    return;
30                }
31
32                LOG_TRACE << "RecordRoute request is done.";
33                nextRequest();
34            }
35            );
36    }
37

Now, with this abstraction, it's simple to use a bidirectional stream in an actual project.

I wrote the wrappers to highlight what I believe is a proper way to expose a "callback" interface.

When I write code, I try to keep one method (or class) focused on one thing. If I write a method that insert date into a database, the method should compose the data. The algorithm in that method should be clear for anyone that took a peek at the code. That means that the same method should not deal with the specifics of the database, like opening a handle, deal with connects or re-connects and close the handle in all of the i return paths. When I deal with the data, the database operation should be limited to something like db.write(data);.

Likewise when using RPC's. Some methods may go in detail on how to use the gRPC stubs. But when I need to use one of those RPC's, I don't want the details. I want just what I need to implement my algorithm - in this case for example an event-handler to provide more data, and an event-handler when it's done.

The complete source code.

That concludes the dive into the gRPC callback interface for now.