gRPC – Client Streaming

Define protobuff file

Protocol buffer file is the base template for the communication between the server and the client.

//protobuff version
syntax = "proto3";
//package for protobuff
package calculator;
//package for java
option java_package = "com.thebytecloud.calculator";
//This option will create classes as separate files.
//If this is false, all classes will be created in the same java file.
option java_multiple_files = true;
//Client streaming messages
message ComputeAverageRequest {
    int32 number = 1;
}
message ComputeAverageResponse {
    double average = 1;
}
//Service is like a class which has methods for the communication.
service CalculatorService {
    //Client streaming rpc
    rpc ComputeAverage(stream ComputeAverageRequest) returns (ComputeAverageResponse) {};
}

Generating the java classes

Running below command in the terminal will generate java files in package com.thebytecloud.calculator (option java_package)

mvn clean generate-sources

Implementing protobuff service methods

package com.thebytecloud.server;
import com.thebytecloud.calculator.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class CalculatorServiceImpl extends CalculatorServiceGrpc.CalculatorServiceImplBase {
    @Override
    public StreamObserver<ComputeAverageRequest> computeAverage(StreamObserver<ComputeAverageResponse> responseObserver) {
        final StreamObserver<ComputeAverageRequest> requestStreamObserver = new StreamObserver<ComputeAverageRequest>() {
            int sum = 0;
            int count = 0;
            @Override
            public void onNext(ComputeAverageRequest computeAverageRequest) {
                sum += computeAverageRequest.getNumber();
                count++;
            }
            @Override
            public void onError(Throwable throwable) {
            }
            @Override
            public void onCompleted() {
                double average = (double) sum / count;
                responseObserver.onNext(ComputeAverageResponse.newBuilder()
                        .setAverage(average).build());
                responseObserver.onCompleted();
            }
        };
        return requestStreamObserver;
    }
}

Client Implementation

package com.thebytecloud.client;
import com.thebytecloud.calculator.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CalculatorClient {
    private final ManagedChannel managedChannel;
    public CalculatorClient(ManagedChannel managedChannel) {
        this.managedChannel = managedChannel;
    }
    public static void main(String[] args) throws InterruptedException {
        String server = "localhost";
        int serverPort = 7070;
        if(System.getenv("SERVER_PORT") != null)
            serverPort = Integer.parseInt(System.getenv("SERVER_PORT"));
        if(System.getenv("SERVER") != null)
            server = System.getenv("SERVER");
        System.out.println("server = " + server+":"+serverPort);
        Thread.sleep(2000);
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(server, serverPort)
                .usePlaintext()
                .build();
        CalculatorClient calculatorClient = new CalculatorClient(managedChannel);
        calculatorClient.clientStreamingCall();
    }
    private void clientStreamingCall() {
        final CalculatorServiceGrpc.CalculatorServiceStub asyncStub = CalculatorServiceGrpc.newStub(managedChannel);
        CountDownLatch latch = new CountDownLatch(1);
        //Creating response observer. This will be called whenever response received from server.
        StreamObserver<ComputeAverageResponse> responseObserver = new StreamObserver<ComputeAverageResponse>() {
            @Override
            public void onNext(ComputeAverageResponse computeAverageResponse) {
                System.out.println("Received average = " + computeAverageResponse.getAverage());
            }
            @Override
            public void onError(Throwable throwable) {
                latch.countDown();
            }
            @Override
            public void onCompleted() {
                System.out.println("Server has completed sending response!");
                latch.countDown();
            }
        };
        //Getting request observer to send the request messages
        StreamObserver<ComputeAverageRequest> requestObserver = asyncStub.computeAverage(responseObserver);
        //sending 10000 messages to server (Client streaming)
        for (int i = 0; i < 10000; i++) {
            requestObserver.onNext(ComputeAverageRequest.newBuilder()
                    .setNumber(i)
                    .build());
        }
        //done with streaming. should call completed.
        requestObserver.onCompleted();
        try {
            latch.await(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Executing server and client

Executing server and client can be done via IDE or command line. Following are the commands to execute in terminal.

gRPC Server

grpc-java-examples$ mvn clean install
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.server.CalculatorServer
serverBindPort = 7070
Starting gRPC Server...!

gRPC Client

grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.client.CalculatorClient
server = localhost:7070
Received average = 4999.5
Server has completed sending response!
grpc-java-examples$

Here client sent a 10000 numbers one by one as a streaming. Once streaming completed then server responds with average of 10000 numbers which we sent.

Share your love
Nanthakumar
Nanthakumar

I’m a curious engineer, interested in various aspects of software engineering.

Articles: 7