Define protobuff file
Protocol buffer file is the base template for the communication between the server and the client.
//protobuff version
syntax = "proto3";
//package for protobuff
package calculator;
//package for java
option java_package = "com.thebytecloud.calculator";
//This option will create classes as separate files.
//If this is false, all classes will be created in the same java file.
option java_multiple_files = true;
//Bi directional streaming messages
message FindMaxRequest {
int32 number = 1;
}
message FindMaxResponse {
int32 response = 1;
}
//Service is like a class which has methods for the communication.
service CalculatorService {
//Bi directional rpc
rpc FindMax(stream FindMaxRequest) returns (stream FindMaxResponse) {};
}
Generating the java classes
Running below command in the terminal will generate java files in package com.thebytecloud.calculator (option java_package)
mvn clean generate-sources
Implementing protobuff service methods
package com.thebytecloud.server;
import com.thebytecloud.calculator.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class CalculatorServiceImpl extends CalculatorServiceGrpc.CalculatorServiceImplBase {
@Override
public StreamObserver<FindMaxRequest> findMax(StreamObserver<FindMaxResponse> responseObserver) {
StreamObserver<FindMaxRequest> requestObserver = new StreamObserver<FindMaxRequest>() {
int currentMax = 0;
@Override
public void onNext(FindMaxRequest findMaxRequest) {
int currentNumber = findMaxRequest.getNumber();
System.out.println("Received number = " + currentNumber);
if(currentNumber > currentMax) {
currentMax = currentNumber;
responseObserver.onNext(FindMaxResponse.newBuilder()
.setResponse(currentMax)
.build());
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(FindMaxResponse.newBuilder()
.setResponse(currentMax)
.build());
responseObserver.onCompleted();
}
};
return requestObserver;
}
}
Client Implementation
package com.thebytecloud.client;
import com.thebytecloud.calculator.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CalculatorClient {
private final ManagedChannel managedChannel;
public CalculatorClient(ManagedChannel managedChannel) {
this.managedChannel = managedChannel;
}
public static void main(String[] args) throws InterruptedException {
String server = "localhost";
int serverPort = 7070;
if(System.getenv("SERVER_PORT") != null)
serverPort = Integer.parseInt(System.getenv("SERVER_PORT"));
if(System.getenv("SERVER") != null)
server = System.getenv("SERVER");
System.out.println("server = " + server+":"+serverPort);
Thread.sleep(2000);
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(server, serverPort)
.usePlaintext()
.build();
CalculatorClient calculatorClient = new CalculatorClient(managedChannel);
calculatorClient.biDirectionalStreamingCall();
}
private void biDirectionalStreamingCall() {
final CalculatorServiceGrpc.CalculatorServiceStub asyncStub = CalculatorServiceGrpc.newStub(managedChannel);
final CountDownLatch latch = new CountDownLatch(1);
StreamObserver<FindMaxResponse> responseObserver = new StreamObserver<FindMaxResponse>() {
@Override
public void onNext(FindMaxResponse findMaxResponse) {
System.out.println("Got maximum from server = "+findMaxResponse.getResponse());
}
@Override
public void onError(Throwable throwable) {
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Server is done with messages");
latch.countDown();
}
};
StreamObserver<FindMaxRequest> requestObserver = asyncStub.findMax(responseObserver);
Arrays.asList(3,5,17,9,8,30,12).forEach(number -> {
requestObserver.onNext(FindMaxRequest.newBuilder()
.setNumber(number)
.build());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
requestObserver.onCompleted();
try {
latch.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executing server and client
Executing server and client can be done via IDE or command line. Following are the commands to execute in terminal.
gRPC Server
grpc-java-examples$ mvn clean install
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.server.CalculatorServer
serverBindPort = 7070
Starting gRPC Server...!
Received number = 3
Received number = 5
Received number = 17
Received number = 9
Received number = 8
Received number = 30
Received number = 12
gRPC Client
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.client.CalculatorClient
server = localhost:7070
Got maximum from server = 3
Got maximum from server = 5
Got maximum from server = 17
Got maximum from server = 30
Got maximum from server = 30
Server is done with messages
nantha grpc-java-examples$
Here client sent a multiple numbers one by one as a streaming. At the same time in server side, as soon as it sees maximum number it sends back to the client as a server streaming.