Exploring how to implement the server using gRPC async callbacks
What the callback interface give us on the server side is a C++ virtual interface that let us override methods to implement the RPC's in our proto-file. That's exactely what I would expect from a RPC code generator for C++. I was a bit surprised when I first started to experiment with async gRPC some years ago - when there was only the lecacy async interface.
So, have they gotten it right, this time around? Yes and no.
Yes, because they now deal with all the details, so that we can deal with the RPC request directly. We implement an override for the interface created by rpcgen, and don't worry about what happens before that gets called, or after the request is complete. I emphasize request, because for the streaming RPC types, the callback is just the initiation of the work-flow. Our implementation methods for the RPC's are called from a fixes size thread pool owned by gRPC. If we do anything time consuming there, our server will not be very performant. So we have to return from the callback immediately. What we will do to facilitate streaming RPC's is to just instantiate an object that carries out the operations we need to complete the request. That's also what we did in the async implementation - only back then we pre-created one instance for each RPC method, and then we created a new instance as soon as we had a new RPC request in progress. With the callbacks, we just get notified when there is a new RPC and then it's up to us how we choose to proceed.
No, because we still have to create an implementation class for the state and event-flow for each stream API.
One thing to keep in mind is that the callback's may be called simultaneously from different threads. Our implementation must be thread-safe.
Simplified, what we implement here for the server-side stream RPC's looks like below. We create overrides for each RPC method, and we also create an implementation class for the streaming class required to read/write the stream.
1
2class CallbackImpl : public ::routeguide::RouteGuide::CallbackService {
3public:
4
5 class ServerWriteReactorImpl
6 : public ::grpc::ServerWriteReactor< ::routeguide::Feature> {
7 public:
8 ServerWriteReactorImpl() { ... }
9
10 // Override callbacks for events we need
11 void OnDone() override { ... }
12 void OnWriteDone(bool ok) override { ... }
13
14 private:
15 // Implement functionality we need
16 reply() { ... }
17 };
18
19 // Example of an RPC with one request in and a stream out
20 auto ListFeatures (::grpc::CallbackServerContext* ctx,
21 const ::routeguide::Rectangle* req) override {
22
23 return new ServerWriteReactorImpl();
24 }
25 ...
26};
27
The really exiting good news is that unary RPC's (the ones without streams) are trivial to implement. I figure that these will be the majority in most real use-cases.
GetFeature
This is the override implementation of the GetFeature()
method.
1
2class CallbackServiceImpl : public ::routeguide::RouteGuide::CallbackService {
3public:
4 ...
5
6 grpc::ServerUnaryReactor *GetFeature(grpc::CallbackServerContext *ctx,
7 const routeguide::Point *req,
8 routeguide::Feature *resp) override {
9
10 // Give a nice, thoughtful response
11 resp->set_name("whatever");
12
13 // We could have implemented our own reactor, but this is the recommended
14 // way to do it in unary methods.
15 auto* reactor = ctx->DefaultReactor();
16 reactor->Finish(grpc::Status::OK);
17 return reactor;
18 }
19
20 ...
21};
22
Our callback will be called each time someone requests that RPC. Remember that we don't control the threads. gRPC will use it's own thread-pool and we must expect our method to be called potentially many times in parallel. If we use some shared resources, we must use a lock or some other synchronization strategy to handle race conditions.
Also remember that callbacks must return immediately. If you need to do some IO or heavy calculations, you must take the request and schedule it on another thread and then return from the callback. You can call Finish()
later. This will of course add complexity to your code. But it is what it is. That's the rules we have to obey by. That was by the way also the case in our async implementations.
ListFeatures
Here we use the same pattern that gRPC team use in their example for the callback interface. We put the implementation for the stream class/work-flow inside the callback method override.
1
2 /*! RPC callback event for ListFeatures
3 */
4 ::grpc::ServerWriteReactor< ::routeguide::Feature>* ListFeatures(
5 ::grpc::CallbackServerContext* ctx, const ::routeguide::Rectangle* req) override {
6
7 // Now, we need to keep the state and buffers required to handle the stream
8 // until the request is completed. We also need to return from this
9 // method immediately.
10 //
11 // We solve these conflicting requirements by implementing a class that overrides
12 // the async gRPC stream interface `ServerWriteReactor`, and then adding our
13 // state and buffers here.
14 //
15 // This class will deal with the request. The method we are in will just
16 // allocate an instance of our class on the heap and return.
17 //
18 // Yes - it's ugly. It's the best the gRPC team have been able to come up with :/
19 class ServerWriteReactorImpl
20 // Some shared code we need for convenience for all the RPC Request classes
21 : public ReqBase<ServerWriteReactorImpl>
22 // The interface to the gRPC async stream for this request.
23 , public ::grpc::ServerWriteReactor< ::routeguide::Feature> {
24 public:
25 ServerWriteReactorImpl(CallbackSvc& owner)
26 : owner_{owner} {
27
28 // Start replying with the first message on the stream
29 reply();
30 }
31
32 /*! Callback event when the RPC is completed */
33 void OnDone() override {
34 done();
35 }
36
37 /*! Callback event when a write operation is complete */
38 void OnWriteDone(bool ok) override {
39 if (!ok) [[unlikely]] {
40 LOG_WARN << "The write-operation failed.";
41
42 // We still need to call Finish or the request will remain stuck!
43 Finish({grpc::StatusCode::UNKNOWN, "stream write failed"});
44 return;
45 }
46
47 reply();
48 }
49
50 private:
51 void reply() {
52 // Reply with the number of messages in config
53 if (++replies_ > owner_.config().num_stream_messages) {
54 reply_.Clear();
55
56 // Since it's a stream, it make sense to return different data for each message.
57 reply_.set_name(std::string{"stream-reply #"} + std::to_string(replies_));
58
59 return StartWrite(&reply_);
60 }
61
62 // Didn't write anything, all is done.
63 Finish(grpc::Status::OK);
64 }
65
66 CallbackSvc& owner_;
67 size_t replies_ = 0;
68 ::routeguide::Feature reply_;
69 };
70
71 return createNew<ServerWriteReactorImpl>(owner_);
72 };
73
As you can see, the logic is similar to our final async example. But it is simpler. Simple is good. It gives us less opportunity to mess up ;)
RecordRoute
Here we read from the stream until it ends (ok == false). Then we reply.
Just as above, the code is simpler than before.
1
2 /*! RPC callback event for RecordRoute
3 */
4 ::grpc::ServerReadReactor< ::routeguide::Point>* RecordRoute(
5 ::grpc::CallbackServerContext* ctx, ::routeguide::RouteSummary* reply) override {
6
7 class ServerReadReactorImpl
8 // Some shared code we need for convenience for all the RPC Request class
9 : public ReqBase<ServerReadReactorImpl>
10 // The async gRPC stream interface for this RPC
11 , public grpc::ServerReadReactor<::routeguide::Point> {
12 public:
13 ServerReadReactorImpl(CallbackSvc& owner, ::routeguide::RouteSummary* reply)
14 : owner_{owner}, reply_{reply} {
15 assert(reply_);
16
17 // Initiate the first read operation
18 StartRead(&req_);
19 }
20
21 /*! Callback event when the RPC is complete */
22 void OnDone() override {
23 done();
24 }
25
26 /*! Callback event when a read operation is complete */
27 void OnReadDone(bool ok) override {
28 if (ok) {
29 // We have read a message from the request.
30
31 LOG_TRACE << "Got message: longitude=" << req_.longitude()
32 << ", latitude=" << req_.latitude();
33
34 req_.Clear();
35
36 // Initiate the next async read
37 return StartRead(&req_);
38 }
39
40 LOG_TRACE << "The read-operation failed. It's probably not an error :)";
41
42 // Let's compose an exiting reply to the client.
43 reply_->set_distance(100);
44 reply_->set_distance(300);
45
46 // Note that we set the reply (in the buffer we got from gRPC) and call
47 // Finish in one go. We don't have to wait for a callback to acknowledge
48 // the write operation.
49 Finish(grpc::Status::OK);
50 // When the client has received the last bits from us in regard of this
51 // RPC, `OnDone()` will be the final event we receive.
52 }
53
54 private:
55 CallbackSvc& owner_;
56
57 // Our buffer for each of the outgoing messages on the stream
58 ::routeguide::Point req_;
59
60 // Note that for this RPC type, gRPC owns the reply-buffer.
61 // It's a bit inconsistent and confusing, as the interfaces mostly takes
62 // pointers, but usually our implementation owns the buffers.
63 ::routeguide::RouteSummary *reply_ = {};
64 };
65
66 // This is all our method actually does. It just creates an instance
67 // of the implementation class to deal with the request.
68 return createNew<ServerReadReactorImpl>(owner_, reply);
69 };
70
I'm a bit puzzled by the lack of typedefs in the generated headers from protobuf. In stead of using some nice simple typenames, we have to spell out the full template names and arguments in our code. That takes time. In general, all that stuff must be located and then copied and pasted from the generated headers.
RouteChat
Again, this is the most complex RPC type that gRPC supports.
To quickly recap the proto definition:
1 rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
When we use the callbacks, we must be prepared for simultaneous events in multiple threads. In our async code, we used only one thread in the event loop, and that handled all the events. So we did not have to make our events handlers thread-safe. When we use the callbacks, we may not only have new RPC's coming in simultaneous in different threads, but we may have IO events (read/write) on a bidirectional stream coming in simultaneously in different threads. So be careful if you have non-const data shared by the event handlers.
This implementation borrows the core idea for how to do a "bidi" stream from our async code. It use read()
and write()
methods to initiate the async operations. Then it overrides the IO event handlers in the ServerBidiReactor<>
interface to handle the completions.
As before, we don't call Finish()
until we are done reading and writing. That is handled by finishIfDone()
.
1
2 /*! RPC callback event for RouteChat
3 */
4 ::grpc::ServerBidiReactor< ::routeguide::RouteNote, ::routeguide::RouteNote>*
5 RouteChat(::grpc::CallbackServerContext* ctx) override {
6
7 class ServerBidiReactorImpl
8 // Some shared code we need for convenience for all the RPC Request classes
9 : public ReqBase<ServerBidiReactorImpl>
10 // The async gRPC stream interface for this RPC
11 , public grpc::ServerBidiReactor<::routeguide::RouteNote, ::routeguide::RouteNote> {
12 public:
13 ServerBidiReactorImpl(CallbackSvc& owner)
14 : owner_{owner} {
15
16 /* There are multiple ways to handle the message-flow in a bidirectional stream.
17 *
18 * One party can send the first message, and the other party can respond with a message,
19 * until one or both parties gets bored.
20 *
21 * Both parties can wait for some event to occur, and send a message when appropriate.
22 *
23 * One party can send occasional updates (for example location data) and the other
24 * party can respond with one or more messages when appropriate.
25 *
26 * Both parties can start sending messages as soon as the connection is made.
27 * That's what we are doing (or at least preparing for) in this example.
28 */
29
30 read(); // Initiate the read for the first incoming message
31 write(); // Initiate the first write operation.
32 }
33
34 /*! Callback event when the RPC is complete */
35 void OnDone() override {
36 done();
37 }
38
39 /*! Callback event when a read operation is complete */
40 void OnReadDone(bool ok) override {
41 if (!ok) {
42 LOG_TRACE << me() << "- The read-operation failed. It's probably not an error :)";
43 done_reading_ = true;
44 return finishIfDone();
45 }
46
47 LOG_TRACE << "Incoming message: " << req_.message();
48 read();
49 }
50
51 /*! Callback event when a write operation is complete */
52 void OnWriteDone(bool ok) override {
53 if (!ok) [[unlikely]] {
54 // The operation failed.
55 LOG_WARN << "The write-operation failed.";
56
57 // When ok is false here, we will not be able to write
58 // anything on this stream.
59 done_writing_ = true;
60
61 // This RPC call did not end well
62 status_ = {grpc::StatusCode::UNKNOWN, "write failed"};
63
64 return finishIfDone();
65 }
66
67 write();
68 }
69
70 private:
71 void read() {
72 // Start a new read
73 req_.Clear();
74 StartRead(&req_);
75 }
76
77 void write() {
78 if (++replies_ > owner_.config().num_stream_messages) {
79 done_writing_ = true;
80
81 LOG_TRACE << me() << " - We are done writing to the stream.";
82 return finishIfDone();
83 }
84
85 // Prepare a new message to the stream
86 reply_.Clear();
87
88 // Shout something nice to the other side
89 reply_.set_message(std::string{"Server Message #"} + std::to_string(replies_));
90
91 // Start new write on the stream
92 StartWrite(&reply_);
93 }
94
95 void finishIfDone() {
96 if (!sent_finish_ && done_reading_ && done_writing_) {
97 LOG_TRACE << me() << " - We are done reading and writing. Sending finish!";
98 Finish(status_);
99 sent_finish_ = true;
100 return;
101 }
102 }
103
104 CallbackSvc& owner_;
105 ::routeguide::RouteNote req_;
106 ::routeguide::RouteNote reply_;
107 grpc::Status status_;
108 size_t replies_ = 0;
109 bool done_reading_ = false;
110 bool done_writing_ = false;
111 bool sent_finish_ = false;
112 };
113
114
115 auto instance = createNew<ServerBidiReactorImpl>(owner_);
116 LOG_TRACE << instance->me()
117 << " - Starting new bidirectional stream conversation with "
118 << ctx->peer();
119 return instance;
120 }
121
Even the bidi RPC is notable simper than the async version. Things will undoubtedly be more messy in a real use-case where we deal with actual functionality and probable need to both queue outgoing messages (since we can only send one at a time) and relay requests and responses trough our own worker-threads so the server can tolerate the delays caused by disk or network latency and database queries. But the code required to deal with RPC's and streams is now down to a level where it's manageable ;)