What we have covered in the previous parts are all of what some RPC frameworks can offer. One of the cool things with gRPC is that in addition to a request and a reply, it can also handle streams of messages. In this article we will look at an async server that implements three RPC calls:
- GetFeature() as before.
- ListFeatures() takes a request and returns a stream of messages.
- RecordRoute() takes a stream of messages and return a single reply when the last message in the stream is received.
So, added from the first iteration is how to deal with multiple request types, and a stream in one direction.
The proto file looks like:
syntax = "proto3";package routeguide;// Interface exported by the server. rpc GetFeature(Point) returns (Feature) rpc ListFeatures(Rectangle) returns (stream Feature) rpc RecordRoute(stream Point) returns (RouteSummary) } int32 latitude = 1; int32 longitude = 2; Point lo = 1; Point hi = 2;// A feature names something at a given point. string name = 1; Point location = 2; int32 point_count = 1; int32 feature_count = 2; int32 distance = 3; int32 elapsed_time = 4;You can see at the full proto-file with comments here.
Now, the first thing we must do is to generalize a bit. We'll try to keep the event-loop as simple as before, and we'll try to avoid copying and pasting code around to facilitate the various requests.
In order to use a request's implementation's this pointer as tag, and allow the event-loop to effortlessly call procees(), we'll start by creating a base-class for the requests. I have also added a reference to the server class, so we can easily pass information all the way from the command-line to the instance of a request-handler.
/*! Base class for requests * * In order to use `this` as a tag and avoid any special processing in the * event-loop, the simplest approacch in C++ is to let the request implementations * inherit form a base-class that contains the shared code they all need, and * a pure virtual method for the state-machine. */;As you can see, it's quite simple. The only thing we need to implement in the derived classes is the constructor and proceed().
GetFeature
The implementation of the GetFeature RPC call, is almost exactly as it was in our first iteration.
We derive from the base class, declare our State enum and then we initialize with gRPC as before in the constructor.
/*! Implementation for the `GetFeature()` RPC call. */The only significant changes in proceed() is the call to create a new instance to deal with the next request. Since we have more than one type, I have refactored createNew() to be a template-method in the server class. We also call Clear() on the reply_` variable before re-use. That's required to not unintentionally leak information from a previous stream-message to a new stream-message.
... // Before we do anything else, we must create a new instance // so the service can handle a new request from a client. createNew<ListFeaturesRequest>; ... // Prepare the reply-object to be re-used. // This is usually cheaper than creating a new one for each write operation. reply_.;Our new server implementation is the class UnaryAndSingleStreamSvc. It's quite similar to the class SimpleReqRespSvc from our first server-iteration. The event-loop is identical.
ListFeatures
Let's take a look at how the ListFeatures()'s request-handler ListFeaturesRequest is implemented.
/*! Implementation for the `ListFeatures()` RPC call. * * This is a bit more advanced. We receive a normal request message, * but the reply is a stream of messages. */ As you may notice, it has an additional state, compared to the simpler class GetFeatureRequest. That's because it may send many messages in the response to a request, one at the time, until it send the finish message. It also have a different type for the reply, a ServerAsyncWriter in stead of a ServerAsyncResponseWriter. This allows us to write as many messages to the client over the reply-stream as we wish. Some servers, for example a "Breaking News" service, may never enter the Finish state.
As I promised before, the streams add complexity to the state-machine. This is the proceed() implementation for ListFeaturesRequest.
// State-machine to deal with a single request // This works almost like a co-routine, where we work our way down for each // time we are called. The State_ could just as well have been an integer/counter; void Reading trough the code above, keep in mind that the calls to Write() and Finish() are just initiating an asynchronous operation. The methods return immediately, and the result will arrive as new events on the queue when they are completed.
RecordRoute
Finally in this article, we will look at the implementation of a request with an incoming stream and a (one) normal reply. The proto-definition for this request looks like this:
rpc RecordRoute(stream Point) returns (RouteSummary) {}As you can see from the implementation, it looks pretty similar to the previous one. We have a ::grpc::ServerAsyncReader instead of a ::grpc::ServerAsyncWriter, and we call RequestRecordRoute() to start the request-flow.
/*! Implementation for the `RecordRoute()` RPC call. * * This is a bit more advanced. We receive a normal request message, * but the reply is a stream of messages. */The state-machine is also similar to the previous example. The big difference is that we don't know how many messages to read before the client is done and we can reply.
We started by initiating the RecordRoute request in our constructor. Then, when we have a client calling this method and the state-machine gets its first invocation (in state CREATED), we initiate the first Read() operation. Then we basically loop by initiating a new read operation each time the state-machine gets called with ok == true. When ok != true, we initiate the Finish() operation, and when proceed() is called next, we call done() to delete this instance of the request object.
void As you may notice, we handle both a new message, and the end of the message-stream and the Finish in the State::READING: code-block.
In this code I don't really care about the data in the request or what we send in reply. The focus is entirely on the boilerplate code required in order to use the async interface for gRPC. If you implement something using async gRPC, your focus will probably be on the correct implementation of your RPC requests. That means that your code will probably be of a magnitude more complex than this. I would recommend that you add your own logic entirely in classes or modules dedicated to that, and call from those into the gRPC classes or from the gRPC classes into your interfaces. Even that will probably end up as significantly more complex code than what I present here.