gRPC – Bi-directional Streaming

Define protobuff file

Protocol buffer file is the base template for the communication between the server and the client.

//protobuff version
syntax = "proto3";
//package for protobuff
package calculator;
//package for java
option java_package = "com.thebytecloud.calculator";
//This option will create classes as separate files.
//If this is false, all classes will be created in the same java file.
option java_multiple_files = true;
//Bi directional streaming messages
message FindMaxRequest {
    int32 number = 1;
}
message FindMaxResponse {
    int32 response = 1;
}
//Service is like a class which has methods for the communication.
service CalculatorService {
    //Bi directional rpc
    rpc FindMax(stream FindMaxRequest) returns (stream FindMaxResponse) {};
}

Generating the java classes

Running below command in the terminal will generate java files in package com.thebytecloud.calculator (option java_package)

mvn clean generate-sources

Implementing protobuff service methods

package com.thebytecloud.server;
import com.thebytecloud.calculator.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class CalculatorServiceImpl extends CalculatorServiceGrpc.CalculatorServiceImplBase {
    @Override
    public StreamObserver<FindMaxRequest> findMax(StreamObserver<FindMaxResponse> responseObserver) {
        StreamObserver<FindMaxRequest> requestObserver = new StreamObserver<FindMaxRequest>() {
            int currentMax = 0;
            @Override
            public void onNext(FindMaxRequest findMaxRequest) {
                int currentNumber = findMaxRequest.getNumber();
                System.out.println("Received number = " + currentNumber);
                if(currentNumber > currentMax) {
                    currentMax = currentNumber;
                    responseObserver.onNext(FindMaxResponse.newBuilder()
                            .setResponse(currentMax)
                            .build());
                }
            }
            @Override
            public void onError(Throwable throwable) {
            }
            @Override
            public void onCompleted() {
                responseObserver.onNext(FindMaxResponse.newBuilder()
                        .setResponse(currentMax)
                        .build());
                responseObserver.onCompleted();
            }
        };
        return requestObserver;
    }
}

Client Implementation

package com.thebytecloud.client;
import com.thebytecloud.calculator.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CalculatorClient {
    private final ManagedChannel managedChannel;
    public CalculatorClient(ManagedChannel managedChannel) {
        this.managedChannel = managedChannel;
    }
    public static void main(String[] args) throws InterruptedException {
        String server = "localhost";
        int serverPort = 7070;
        if(System.getenv("SERVER_PORT") != null)
            serverPort = Integer.parseInt(System.getenv("SERVER_PORT"));
        if(System.getenv("SERVER") != null)
            server = System.getenv("SERVER");
        System.out.println("server = " + server+":"+serverPort);
        Thread.sleep(2000);
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(server, serverPort)
                .usePlaintext()
                .build();
        CalculatorClient calculatorClient = new CalculatorClient(managedChannel);
        calculatorClient.biDirectionalStreamingCall();
    }
    private void biDirectionalStreamingCall() {
        final CalculatorServiceGrpc.CalculatorServiceStub asyncStub = CalculatorServiceGrpc.newStub(managedChannel);
        final CountDownLatch latch = new CountDownLatch(1);
        StreamObserver<FindMaxResponse> responseObserver = new StreamObserver<FindMaxResponse>() {
            @Override
            public void onNext(FindMaxResponse findMaxResponse) {
                System.out.println("Got maximum from server = "+findMaxResponse.getResponse());
            }
            @Override
            public void onError(Throwable throwable) {
                latch.countDown();
            }
            @Override
            public void onCompleted() {
                System.out.println("Server is done with messages");
                latch.countDown();
            }
        };
        StreamObserver<FindMaxRequest> requestObserver = asyncStub.findMax(responseObserver);
        Arrays.asList(3,5,17,9,8,30,12).forEach(number -> {
            requestObserver.onNext(FindMaxRequest.newBuilder()
                    .setNumber(number)
                    .build());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        requestObserver.onCompleted();
        try {
            latch.await(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Executing server and client

Executing server and client can be done via IDE or command line. Following are the commands to execute in terminal.

gRPC Server

grpc-java-examples$ mvn clean install
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.server.CalculatorServer
serverBindPort = 7070
Starting gRPC Server...!
Received number = 3
Received number = 5
Received number = 17
Received number = 9
Received number = 8
Received number = 30
Received number = 12

gRPC Client

grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.client.CalculatorClient
server = localhost:7070
Got maximum from server = 3
Got maximum from server = 5
Got maximum from server = 17
Got maximum from server = 30
Got maximum from server = 30
Server is done with messages
nantha grpc-java-examples$ 

Here client sent a multiple numbers one by one as a streaming. At the same time in server side, as soon as it sees maximum number it sends back to the client as a server streaming.

Share your love
Nanthakumar
Nanthakumar

I’m a curious engineer, interested in various aspects of software engineering.

Articles: 7