Define protobuff file
Protocol buffer file is the base template for the communication between the server and the client.
//protobuff version
syntax = "proto3";
//package for protobuff
package calculator;
//package for java
option java_package = "com.thebytecloud.calculator";
//This option will create classes as separate files.
//If this is false, all classes will be created in the same java file.
option java_multiple_files = true;
//Client streaming messages
message ComputeAverageRequest {
int32 number = 1;
}
message ComputeAverageResponse {
double average = 1;
}
//Service is like a class which has methods for the communication.
service CalculatorService {
//Client streaming rpc
rpc ComputeAverage(stream ComputeAverageRequest) returns (ComputeAverageResponse) {};
}
Generating the java classes
Running below command in the terminal will generate java files in package com.thebytecloud.calculator (option java_package)
mvn clean generate-sources
Implementing protobuff service methods
package com.thebytecloud.server;
import com.thebytecloud.calculator.*;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
public class CalculatorServiceImpl extends CalculatorServiceGrpc.CalculatorServiceImplBase {
@Override
public StreamObserver<ComputeAverageRequest> computeAverage(StreamObserver<ComputeAverageResponse> responseObserver) {
final StreamObserver<ComputeAverageRequest> requestStreamObserver = new StreamObserver<ComputeAverageRequest>() {
int sum = 0;
int count = 0;
@Override
public void onNext(ComputeAverageRequest computeAverageRequest) {
sum += computeAverageRequest.getNumber();
count++;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
double average = (double) sum / count;
responseObserver.onNext(ComputeAverageResponse.newBuilder()
.setAverage(average).build());
responseObserver.onCompleted();
}
};
return requestStreamObserver;
}
}
Client Implementation
package com.thebytecloud.client;
import com.thebytecloud.calculator.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CalculatorClient {
private final ManagedChannel managedChannel;
public CalculatorClient(ManagedChannel managedChannel) {
this.managedChannel = managedChannel;
}
public static void main(String[] args) throws InterruptedException {
String server = "localhost";
int serverPort = 7070;
if(System.getenv("SERVER_PORT") != null)
serverPort = Integer.parseInt(System.getenv("SERVER_PORT"));
if(System.getenv("SERVER") != null)
server = System.getenv("SERVER");
System.out.println("server = " + server+":"+serverPort);
Thread.sleep(2000);
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(server, serverPort)
.usePlaintext()
.build();
CalculatorClient calculatorClient = new CalculatorClient(managedChannel);
calculatorClient.clientStreamingCall();
}
private void clientStreamingCall() {
final CalculatorServiceGrpc.CalculatorServiceStub asyncStub = CalculatorServiceGrpc.newStub(managedChannel);
CountDownLatch latch = new CountDownLatch(1);
//Creating response observer. This will be called whenever response received from server.
StreamObserver<ComputeAverageResponse> responseObserver = new StreamObserver<ComputeAverageResponse>() {
@Override
public void onNext(ComputeAverageResponse computeAverageResponse) {
System.out.println("Received average = " + computeAverageResponse.getAverage());
}
@Override
public void onError(Throwable throwable) {
latch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Server has completed sending response!");
latch.countDown();
}
};
//Getting request observer to send the request messages
StreamObserver<ComputeAverageRequest> requestObserver = asyncStub.computeAverage(responseObserver);
//sending 10000 messages to server (Client streaming)
for (int i = 0; i < 10000; i++) {
requestObserver.onNext(ComputeAverageRequest.newBuilder()
.setNumber(i)
.build());
}
//done with streaming. should call completed.
requestObserver.onCompleted();
try {
latch.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executing server and client
Executing server and client can be done via IDE or command line. Following are the commands to execute in terminal.
gRPC Server
grpc-java-examples$ mvn clean install
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.server.CalculatorServer
serverBindPort = 7070
Starting gRPC Server...!
gRPC Client
grpc-java-examples$ java -cp target/grpc-java-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.thebytecloud.client.CalculatorClient
server = localhost:7070
Received average = 4999.5
Server has completed sending response!
grpc-java-examples$
Here client sent a 10000 numbers one by one as a streaming. Once streaming completed then server responds with average of 10000 numbers which we sent.