Published:

Jarle Aase

Implementing a unary, async client

bookmark 3 min read

In the server, we used two threads, one to run the servers event-loop, and one to handle signals. I did it that way, because the server is designed to run until it is stopped. The client on the other hand, will quit when it has finished it's work. For some clients, like command-line tools, that make sense. You may write a client that is part of a server, and who need to be prepared to handle rpc's at any time. In that case, just don't exit the event-loop when you are out of work, and you are fine ;)

The test-program that use this code has three inputs, the server address, the total number of requests to execute, and the number of parallel requests to run.

The client's initialization is even simpler than the server:

 1class SimpleReqResClient {
 2public:
 3
 4
 5     // Run the event-loop.
 6    // Returns when there are no more requests to send
 7    void run() {
 8
 9        LOG_INFO << "Connecting to gRPC service at: " << config_.address;
10        channel_ = grpc::CreateChannel(config_.address, grpc::InsecureChannelCredentials());
11
12        stub_ = ::routeguide::RouteGuide::NewStub(channel_);
13
14        ...
15    }
16
17private:
18    // This is the Queue. It's shared for all the requests.
19    ::grpc::CompletionQueue cq_;
20
21    // This is a connection to the gRPC server
22    std::shared_ptr<grpc::Channel> channel_;
23
24    // An instance of the client that was generated from our .proto file.
25    std::unique_ptr<::routeguide::RouteGuide::Stub> stub_;
26
27    const Config& config_;
28    std::atomic_size_t pending_requests_{0};
29    std::atomic_size_t request_count{0};

Since our code is just meant for testing, we will add some work in run() before we enter the event-loop. The requests will be added to the queue, and executed in whatever order that pleases gRPC.

1// Add request(s)
2for(auto i = 0; i < config_.parallel_requests;  ++i) {
3    createRequest();
4}
5
6        ...

The event-loop itself is identical to the event-loop in the server, except for the while() condition.

 1    while(pending_requests_) {
 2        // FIXME: This is crazy. Figure out how to use stable clock!
 3        const auto deadline = std::chrono::system_clock::now()
 4                                + std::chrono::milliseconds(500);
 5
 6        // Get any IO operation that is ready.
 7        void * tag = {};
 8        bool ok = true;
 9
10        // Wait for the next event to complete in the queue
11        const auto status = cq_.AsyncNext(&tag, &ok, deadline);
12
13        // So, here we deal with the first of the three states: The status of Next().
14        switch(status) {
15        case grpc::CompletionQueue::NextStatus::TIMEOUT:
16            LOG_DEBUG << "AsyncNext() timed out.";
17            continue;
18
19        case grpc::CompletionQueue::NextStatus::GOT_EVENT:
20            LOG_TRACE << "AsyncNext() returned an event. The boolean status is "
21                        << (ok ? "OK" : "FAILED");
22
23            // Use a scope to allow a new variable inside a case statement.
24            {
25                auto request = static_cast<OneRequest *>(tag);
26
27                // Now, let the OneRequest state-machine deal with the event.
28                // We could have done it here, but that code would smell really nasty.
29                request->proceed(ok);
30            }
31            break;
32
33        case grpc::CompletionQueue::NextStatus::SHUTDOWN:
34            LOG_INFO << "SHUTDOWN. Tearing down the gRPC connection(s) ";
35            return;
36        } // switch

We also have a class for each RPC request.

 1class OneRequest {
 2    public:
 3        OneRequest(SimpleReqResClient& parent)
 4                : parent_{parent} {
 5
 6            // Initiate the async request.
 7            rpc_ = parent_.stub_->AsyncGetFeature(&ctx_, req_, &parent_.cq_);
 8            assert(rpc_);
 9
10            // Add the operation to the queue, so we get notified when
11            // the request is completed.
12            // Note that we use `this` as tag.
13            rpc_->Finish(&reply_, &status_, this);
14
15            // Reference-counting of instances of requests in flight
16            parent.incCounter();
17        }
18    ...
19private:
20    SimpleReqResClient& parent_;
21
22    // We need quite a few variables to perform our single RPC call.
23    ::routeguide::Point req_;
24    ::routeguide::Feature reply_;
25    ::grpc::Status status_;
26    std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::routeguide::Feature>> rpc_;
27    ::grpc::ClientContext ctx_;
28}

In the constructor, we call AsyncGetFeature() which is a "stub" method generated for us by rpcgen from our proto-file. Note that we don't add a tag there. In stead, we call a method on the object returned, Finish(), where we supply our tag. In this case we will get only one event. Either we have a successful reply from the server, or we have a failure. So, a pretty simple state-machine, this time. Don't worry. It will get more complex when we start playing with streams ;)

The arguments to AsyncGetFeature() are a pointer to ctx_, a client context for gRPC - a common pattern in C programming. Then the request-argument. This is the request or message we send to the server. The request instance must be alive until the request is sent over the wire (or longer), so we use a class variable req_ for this. The last argument is a pointer to our queue. As mentioned before, I'm not too exited about using pointers for mandatory arguments in C++.

Note that we provide Finish() with a pointer to reply_. This is an instance of a protobuf message of the return-type for this RPC request. It's where gRPC will store the reply from the server. We also provide it with a pointer to status_, which is an instance of ::grpc::Status, a class wrapper around an enum that can identify a handful of common error-conditions. In the server, we supply a ::grpc::Status in it's Finish() call. So it's my understanding that our code in the server can use this state to tell the client about some common problems. However, some of the available error-codes, like UNAVAILABLE and CANCELLED suggests that gRPC itself may return an error-status to the client. So I would not place any bets on where a status_ error origins from. I'll just try to deal with them as well as possible.

This is what the RPC request state-machine look like:

 1    void proceed(bool ok) {
 2    if (!ok) [[unlikely]] {
 3        LOG_WARN << "OneRequest: The request failed.";
 4        return done();
 5    }
 6
 7    // Initiate a new request
 8    parent_.createRequest();
 9
10    if (status_.ok()) {
11        LOG_TRACE << "Request successful. Message: " << reply_.name();
12    } else {
13        LOG_WARN << "OneRequest: The request failed with error-message: " << status_.error_message();
14    }
15
16    // The reply is a single message, so at this time we are done.
17    done();
18}

Note that we have to deal with two separate potential error states, the ok variable and the _status. Only if both are okay can we expect the reply to contain any valid or useful information for us.

The complete source code.