One of the lessens learned from the previous articles is that there is a lot of repetitions of code to handle rpc requests. In order to reduce the repetitions a bit, we will start this iteration by creating a templated base class for everything. We have realized that the event-loop in the server and clients look the same. So why not use tha same code for both?
Of course, there are complications. The server code use different variables than the client, and everything is based on code generated by the command line utility protoc - based on our unique proto file.
To deal with this, the "everything"-base will consist of two parts:
a template class that contains the event-loop, base class for the Requests and a more mature Handle. The aim is to reduce the boilerplate code for an actual Request to a minimun - while still keeping the Request implementation simple to understand.
A template class that is different for servers and clients, but providing a uniform interface to the actuat implementation. This class also take a template argument for the code generated to implement our proto interface.
Simplified, it looks something like this:
template<class grpcT>structServerVars{};template<class grpcT>structClientVars{};template<typename T>structEventLoopBase{structRequestBase{structHandle{voidproceed(boolok);// state-machine};// Handle};// RequestBasevoidrun();// event-loop};// EventLoopBase...// Instatiate and run a server EventLoopBase<ServerVars<::routeguide::RouteGuide>> server; server.run();// Instatiate and run a client EventLoopBase<ClientVars<::routeguide::RouteGuide>> client; client.run();
This gives us the opportunity to carefully write and test much of the code that would otherwise be copied and pasted around. If bugs are caught later on, we can fix them in one place. If we need to optimize our code, we can also do that in one place. All in all, a much better design than lots of almost repeated code ;)
The EventLoopBase implementation
Let's start with an outline of the EventLoopBase class.
template<typename T>classEventLoopBase{public:EventLoopBase(const Config&config):config_{config}{}template<typename reqT,typename parenT>voidcreateNew(parenT&parent){// Use make_uniqe, so we destroy the object if it throws an exception// (for example out of memory).try{auto instance =std::make_unique<reqT>(parent);// If we got here, the instance should be fine, so let it handle itself. instance.release();}catch(const std::exception& ex){ LOG_ERROR <<"Got exception while creating a new instance. "<<"This may end my ability to handle any further requests. "<<" Error: "<< ex.what();}}voidstop(){ grpc_.stop();}auto&grpc(){return grpc_;}auto*cq()noexcept{return grpc_.cq();}constauto&config()noexcept{return config_;}protected:const Config& config_;size_t num_open_requests_ =0; T grpc_;
Note that we have a factory method createNew to create new instances of any one of our various requests. We also forward some methods to the template attribute grpc_. This allows us flexibility to implement T as we want, as long as it provides the expected stop() and cq() methods. In the server the actual queue is a std::unique_ptr<grpc::ServerCompletionQueue> type, as required by gRPC. In the client, the type we use is ::grpc::CompletionQueue. By adding the auto * cq() method to both ServerVars and ClientVars, this implementation detail is hidden to the rest of our code. Also, the use of auto * cq() will return a ServerCompletionQueue * in the server instantiation and CompletionQueue * in the client instantiation - which is exactly what we require.
The event-loop in is used for both server and client.
/*! Runs the event-loop** The method returns when/if the loop is finished** createNew<T> must have been called on all request-types that are used* prior to this call.*/voidrun(){// The inner event-loopwhile(num_open_requests_){// The inner event-loopbool ok =true;void*tag ={};// FIXME: This is crazy. Figure out how to use stable clock!constauto deadline =std::chrono::system_clock::now()+std::chrono::milliseconds(1000);// Get any IO operation that is ready.constauto status =cq()->AsyncNext(&tag,&ok, deadline);// So, here we deal with the first of the three states: The status from Next().switch(status){case grpc::CompletionQueue::NextStatus::TIMEOUT: LOG_TRACE <<"AsyncNext() timed out.";continue;case grpc::CompletionQueue::NextStatus::GOT_EVENT:{auto request =static_cast<typename RequestBase::Handle*>(tag);// Now, let the RequestBase::Handle state-machine deal with the event. request->proceed(ok);}break;case grpc::CompletionQueue::NextStatus::SHUTDOWN: LOG_INFO <<"SHUTDOWN. Tearing down the gRPC connection(s) ";return;}// switch}// loop}
In a server implementation we will always add a new request-handler when we start processing a new rpc reqeest. So for the server, num_open_requests_ will always be >= the number of rpc request types we implement. In the client, we need to pre-load the requests we want to start with, and then add more as appropriate. When all the requests are finished, run() returns. If we wanted to use the client in a server app, for example a web-service or cache that used gRPC to fetch data occasionally, we could start a new client on demand, change while(num_open_requests_) to while(true) or just increment num_open_requests_ by one before we start (like adding "work" to asio's io_context).
Our new RequestBase is the base for both server-side and client-side requests.
classRequestBase{public:RequestBase(EventLoopBase&owner):owner_{owner}{++owner.num_open_requests_; LOG_TRACE <<"Constructed request #"<< client_id_ <<" at address"<<this;}virtual~RequestBase(){--owner_.num_open_requests_;}template<typename reqT>static std::string me(reqT&req){returnboost::typeindex::type_id_runtime(req).pretty_name()+" #"+std::to_string(req.client_id_);}protected:// The state required for all requests EventLoopBase& owner_;int ref_cnt_ =0;constsize_t client_id_ =getNewClientId();private:voiddone(){// Ugly, ugly, ugly LOG_TRACE <<"If the program crash now, it was a bad idea to delete this ;) #"<< client_id_ <<" at address "<<this;deletethis;}};// RequestBase;
So far, pretty simple code. The most interesting method may be me(), which is a general implementation of a method we used before to return the class name and request id for a request. This is very useful in log events.
Now, let's examine the new Handle implementation. This one is a bit different.
classRequestBase{public:classHandle{public:// In this implementation, the operation is informative.// It has no side-effects.enumOperation{ INVALID, CONNECT, READ, WRITE, WRITE_DONE, FINISH};usingproceed_t= std::function<void(bool ok, Operation op)>;Handle(RequestBase&instance):base_{instance}{}[[nodiscard]]void*tag(Operation op, proceed_t&& fn)noexcept{assert(op_ == Operation::INVALID); LOG_TRACE <<"Handle::proceed() - "<< base_.client_id_<<" initiating "<<name(op)<<" operation."; op_ = op; proceed_ =std::move(fn);returntag_();}voidproceed(boolok){--base_.ref_cnt_;// We must reset the `op_` type before we call `proceed_`// See the comment below regarding `proceed()`.constauto current_op = op_; op_ = Operation::INVALID;if(proceed_){// Move `proceed` to the stack.// There is a good probability that `proceed()` will call `tag()`,// which will overwrite the current value in the Handle's instance.auto proceed =std::move(proceed_);proceed(ok, current_op);}if(base_.ref_cnt_==0){ base_.done();}}private:[[nodiscard]]void*tag_()noexcept{++base_.ref_cnt_;returnthis;} RequestBase& base_; Operation op_ = Operation::INVALID; proceed_t proceed_;};}
There are two major changes. First, we use a functor to directly implement the state-machine in response to an event. Second, we re-use the handle if appropriate. Normally we only need one or two handles as any one time, so it makes little sense to have up to four in our request implementation. We still have the Operation enum, but now it's just to provide accurate information to log-events - or to get some useful meta-information in the debugger. tag() takes a functor, normally a lambda function, to be executed when the async operation is completed.
Finally, let's take a brief look at the ServerVars and ClientVars implementations.
template<typename grpcT>structServerVars{// An instance of our service, compiled from code generated by protoctypename grpcT::AsyncService service_;// This is the Queue. It's shared for all the requests. std::unique_ptr<grpc::ServerCompletionQueue> cq_;[[nodiscard]]auto*cq()noexcept{assert(cq_);return cq_.get();}// A gRPC server object std::unique_ptr<grpc::Server> server_;voidstop(){ LOG_INFO <<"Shutting down "; server_->Shutdown(); server_->Wait();}};
template<typename grpcT>structClientVars{// This is the Queue. It's shared for all the requests.::grpc::CompletionQueue cq_;[[nodiscard]]auto*cq()noexcept{return&cq_;}// This is a connection to the gRPC server std::shared_ptr<grpc::Channel> channel_;// An instance of the client that was generated from our .proto file. std::unique_ptr<typename grpcT::Stub> stub_;voidstop(){// We don't stop the client...assert(false);}};
That's the generalization of the boilerplate code.
Before we get obsessed with the implementation details of the four Requests, let's look briefly at the implementation for the outer event-loop class.
classEverythingSvr:publicEventLoopBase<ServerVars<::routeguide::RouteGuide>>{public:EverythingSvr(const Config&config):EventLoopBase(config){ grpc::ServerBuilder builder; builder.AddListeningPort(config_.address,grpc::InsecureServerCredentials()); builder.RegisterService(&grpc_.service_); grpc_.cq_= builder.AddCompletionQueue();// Finally assemble the server. grpc_.server_= builder.BuildAndStart(); LOG_INFO// Fancy way to print the class-name.// Useful when I copy/paste this code around ;)<<boost::typeindex::type_id_runtime(*this).pretty_name()// The useful information<<" listening on "<< config_.address;// Prepare the first instances of request handlerscreateNew<GetFeatureRequest>(*this);createNew<ListFeaturesRequest>(*this);createNew<RecordRouteRequest>(*this);createNew<RouteChatRequest>(*this);}};
As before in the server, we use a builder to set up the required variables and start the gRPC server. At the lowest level, this is a HTTP 2 server listening to the host address/IP and port number we specified.
Then we create one instance of each Request type we need to handle the four rpc request in our proto file. These will be used to handle the first incoming rpc call of each type. In our implementations below, we will create new instances as required to keep the gRPC service responsive. If we forget that, the client side will see a client call "connect" normally - but the Finish event will never be triggered.
GetFeature
classGetFeatureRequest:publicRequestBase{public:GetFeatureRequest(EverythingSvr&owner):RequestBase(owner){// Register this instance with the event-queue and the service.// The first event received over the queue is that we have a request. owner_.grpc().service_.RequestGetFeature(&ctx_,&req_,&resp_,cq(),cq(), op_handle_.tag(Handle::Operation::CONNECT,[this,&owner](bool ok,Handle::Operation /* op */){ LOG_DEBUG <<me(*this)<<" - Processing a new connect from "<< ctx_.peer();if(!ok)[[unlikely]]{// The operation failed.// Let's end it here. LOG_WARN <<"The request-operation failed. Assuming we are shutting down";return;}// Before we do anything else, we must create a new instance of// GetFeatureRequest, so the service can handle a new request from a client. owner_.createNew<GetFeatureRequest>(owner);// This is where we have the request, and may formulate an answer.// If this was code for a framework, this is where we would have called// the `onRpcRequestGetFeature()` method, or unblocked the next statement// in a co-routine waiting for the next request.//// In our case, let's just return something. reply_.set_name("whatever"); reply_.mutable_location()->CopyFrom(req_);// Initiate our next async operation.// That will complete when we have sent the reply, or replying failed. resp_.Finish(reply_,::grpc::Status::OK, op_handle_.tag(Handle::Operation::FINISH,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{ LOG_WARN <<"The finish-operation failed.";}}));// FINISH operation lambda}));// CONNECT operation lambda}private:Handleop_handle_{*this};// We need only one handle for this operation.::grpc::ServerContext ctx_;::routeguide::Point req_;::routeguide::Feature reply_;::grpc::ServerAsyncResponseWriter<decltype(reply_)>resp_{&ctx_};};
GetFeatureRequest was never very complicated. What we have done here is to remove the proceed() override statement on the current operation, and instead added a lambda expression as an argument to the handle's tag() method. Personally I find this code easier to read. It's still not a coroutine, but the flow of logic is presented in a way that makes it simple to read and comprehend.
ListFeaturesRequest
Remember, in ListFeatures we return a stream of messages. Here we use lambda's for the logic that is unique for a specific operation, and reply() to handle the shared reply logic both from the connect event and from the replied event.
classListFeaturesRequest:publicRequestBase{public:ListFeaturesRequest(EverythingSvr&owner):RequestBase(owner){ owner_.grpc().service_.RequestListFeatures(&ctx_,&req_,&resp_,cq(),cq(), op_handle_.tag(Handle::Operation::CONNECT,[this,&owner](bool ok,Handle::Operation /* op */){ LOG_DEBUG <<me(*this)<<" - Processing a new connect from "<< ctx_.peer();if(!ok)[[unlikely]]{// The operation failed.// Let's end it here. LOG_WARN <<"The request-operation failed. Assuming we are shutting down";return;}// Before we do anything else, we must create a new instance// so the service can handle a new request from a client. owner_.createNew<ListFeaturesRequest>(owner);reply();}));}private:voidreply(){if(++replies_ > owner_.config().num_stream_messages){// We have reached the desired number of replies resp_.Finish(::grpc::Status::OK, op_handle_.tag(Handle::Operation::FINISH,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{// The operation failed. LOG_WARN <<"The finish-operation failed.";}}));return;}// This is where we have the request, and may formulate another answer.// If this was code for a framework, this is where we would have called// the `onRpcRequestListFeaturesOnceAgain()` method, or unblocked the next statement// in a co-routine awaiting the next state-change.//// In our case, let's just return something.// Prepare the reply-object to be re-used.// This is usually cheaper than creating a new one for each write operation. reply_.Clear();// Since it's a stream, it make sense to return different data for each message. reply_.set_name(std::string{"stream-reply #"}+std::to_string(replies_)); resp_.Write(reply_, op_handle_.tag(Handle::Operation::FINISH,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{// The operation failed. LOG_WARN <<"The reply-operation failed.";return;}reply();}));}Handleop_handle_{*this};// We need only one handle for this operation.size_t replies_ =0;::grpc::ServerContext ctx_;::routeguide::Rectangle req_;::routeguide::Feature reply_;::grpc::ServerAsyncWriter<decltype(reply_)>resp_{&ctx_};};
This was a little more than the previous one, but if you remove the comments, it's not many lines of code.
RecordRouteRequest
Now we will get a stream of messages, and reply when the stream has dried out.
classRecordRouteRequest:publicRequestBase{public:RecordRouteRequest(EverythingSvr&owner):RequestBase(owner){ owner_.grpc().service_.RequestRecordRoute(&ctx_,&io_,cq(),cq(), op_handle_.tag(Handle::Operation::CONNECT,[this,&owner](bool ok,Handle::Operation /* op */){ LOG_DEBUG <<me(*this)<<" - Processing a new connect from "<< ctx_.peer();if(!ok)[[unlikely]]{// The operation failed.// Let's end it here. LOG_WARN <<"The request-operation failed. Assuming we are shutting down";return;}// Before we do anything else, we must create a new instance// so the service can handle a new request from a client. owner_.createNew<RecordRouteRequest>(owner);read(true);}));}private:voidread(constboolfirst){if(!first){// This is where we have read a message from the request.// If this was code for a framework, this is where we would have called// the `onRpcRequestRecordRouteGotMessage()` method, or unblocked the next statement// in a co-routine awaiting the next state-change.//// In our case, let's just log it. LOG_TRACE <<"Got message: longitude="<< req_.longitude()<<", latitude="<< req_.latitude();// Reset the req_ message. This is cheaper than allocating a new one for each read. req_.Clear();} io_.Read(&req_, op_handle_.tag(Handle::Operation::READ,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{// The operation failed.// This is normal on an incoming stream, when there are no more messages.// As far as I know, there is no way at this point to deduce if the false status is// because the client is done sending messages, or because we encountered// an error. LOG_TRACE <<"The read-operation failed. It's probably not an error :)";// Initiate the finish operation// This is where we have received the request, with all it's parts,// and may formulate another answer.// If this was code for a framework, this is where we would have called// the `onRpcRequestRecordRouteDone()` method, or unblocked the next statement// in a co-routine awaiting the next state-change.//// In our case, let's just return something. reply_.set_distance(100); reply_.set_distance(300); io_.Finish(reply_,::grpc::Status::OK, op_handle_.tag(Handle::Operation::FINISH,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{ LOG_WARN <<"The finish-operation failed.";}// We are done}));return;}// ok != falseread(false);}));}Handleop_handle_{*this};// We need only one handle for this operation.::grpc::ServerContext ctx_;::routeguide::Point req_;::routeguide::RouteSummary reply_;::grpc::ServerAsyncReader<decltype(reply_),decltype(req_)>io_{&ctx_};};
The logic here is similar to ListFeaturesRequest. In stead of reply() we have read(), and to re-use as much as possible, we have a bool flag to tell us if this is the first call to read() - from the connect event. If it's not the first, then we have a read message to deal with before we start a new async Read operation on the stream.
Now, we will deal with the most complex rpc request type that gRPC supports, the bidirectional stream.
RouteChatRequest
In the proto-file we have specified that both the request and the reply are streams.
We have no way to specify the actual protocol initiation. Some servers may want an incoming message before they send and outgoing message. Others may not start reading on the stream before they have succeeded in sending a message. Some protocols will do tic-tac, where both the server and the client respond to a message by sending a message. The messages in the stream can be strictly ordered, or they can be unrelated and flow in any direction when one side have something to send. The protocol in your implementations are entirely up to you. Just remember that you can only send one message at any given time, and only have one Read operation active at any time. If you need to send a burst of messages, you must provide your own queue for the pending messages, and hand them to gRPC one by one, as you get confirmation for the previous message successful departure.
Our RouteChat implementations will start sending and start reading as soon as a request is active. It says Chat in the name! Nobody expects anyone on a "Chat" on the Internet to be polite and obey any kind of "protocol" ;) So we will yell at the other party until we grow tired. We will read the incoming messages and discard them as they arrive until the other end gets tired. When both parties are done, we get to say the final bits. We are the server implementation, so we define the Status for the conversation.
classRouteChatRequest:publicRequestBase{public:RouteChatRequest(EverythingSvr&owner):RequestBase(owner){ owner_.grpc().service_.RequestRouteChat(&ctx_,&stream_,cq(),cq(), in_handle_.tag(Handle::Operation::CONNECT,[this,&owner](bool ok,Handle::Operation /* op */){ LOG_DEBUG <<me(*this)<<" - Processing a new connect from "<< ctx_.peer();if(!ok)[[unlikely]]{// The operation failed.// Let's end it here. LOG_WARN <<"The request-operation failed. Assuming we are shutting down";return;}// Before we do anything else, we must create a new instance// so the service can handle a new request from a client. owner_.createNew<RouteChatRequest>(owner);read(true);// Initiate the read for the first incoming messagewrite(true);// Initiate the first write operation.}));}private:voidread(constboolfirst){if(!first){// This is where we have read a message from the stream.// If this was code for a framework, this is where we would have called// the `onRpcRequestRouteChatGotMessage()` method, or unblocked the next statement// in a co-routine awaiting the next state-change.//// In our case, let's just log it. LOG_TRACE <<"Incoming message: "<< req_.message(); req_.Clear();}// Start new read stream_.Read(&req_, in_handle_.tag(Handle::Operation::READ,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{// The operation failed.// This is normal on an incoming stream, when there are no more messages.// As far as I know, there is no way at this point to deduce if the false status is// because the client is done sending messages, or because we encountered// an error. LOG_TRACE <<"The read-operation failed. It's probably not an error :)"; done_reading_ =true;returnfinishIfDone();}read(false);// Initiate the read for the next incoming message}));}voidwrite(constboolfirst){if(!first){ reply_.Clear();}if(++replies_ > owner_.config().num_stream_messages){ done_writing_ =true; LOG_TRACE <<me(*this)<<" - We are done writing to the stream.";returnfinishIfDone();}// This is where we are ready to write a new message.// If this was code for a framework, this is where we would have called// the `onRpcRequestRouteChatReadytoSendNewMessage()` method, or unblocked// the next statement in a co-routine awaiting the next state-change. reply_.set_message(std::string{"Server Message #"}+std::to_string(replies_));// Start new write stream_.Write(reply_, out_handle_.tag(Handle::Operation::WRITE,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{// The operation failed. LOG_WARN <<"The write-operation failed.";// When ok is false here, we will not be able to write// anything on this stream. done_writing_ =true;returnfinishIfDone();}write(false);// Initiate the next write or finish}));}// We wait until all incoming messages are received and all outgoing messages are sent// before we send the finish message.voidfinishIfDone(){if(!sent_finish_ && done_reading_ && done_writing_){ LOG_TRACE <<me(*this)<<" - We are done reading and writing. Sending finish!"; stream_.Finish(grpc::Status::OK, out_handle_.tag(Handle::Operation::FINISH,[this](bool ok,Handle::Operation /* op */){if(!ok)[[unlikely]]{ LOG_WARN <<"The finish-operation failed.";} LOG_TRACE <<me(*this)<<" - We are done";})); sent_finish_ =true;return;}}bool done_reading_ =false;bool done_writing_ =false;bool sent_finish_ =false;size_t replies_ =0;// We are streaming messages in and out simultaneously, so we need two handles.// One for each direction.Handlein_handle_{*this};Handleout_handle_{*this};::grpc::ServerContext ctx_;::routeguide::RouteNote req_;::routeguide::RouteNote reply_;// Interestingly, the template the class is named `*ReaderWriter`, while// the template argument order is first Writer type and then Reader type.// Lot's of room for false assumptions and subtle errors here ;)::grpc::ServerAsyncReaderWriter<decltype(reply_),decltype(req_)>stream_{&ctx_};};
As you see, here we basically combine the logic in the last two examples above. Then we add some extra logic in finishIfDone() to allow both directions of the stream to finish before we set the final status for the rpc call and end it.