Post

grpc-java

grpc-java를 이용해 실습 진행해보기

grpc-java

공식문서 를 참고하여 실습을 진행하며 학습한 내용을 정리한 글입니다.

gRPC overview

gRPC에서 클라이언트 애플리케이션에서 다른 머신에 있는 서버 애플리케이션의 메소드를 로컬 오브젝트처럼 직접적으로 호출할 수 있고, 이것은 분산 애플리케이션과 서비스를 만드는 것을 용이하게 해주었습니다. 많은 RPC 시스템처럼, gRPC는 서비스를 정의하고, 원격으로 호출할 메소드의 파라미터, 리턴 타입을 정의하는 방식으로 동작합니다.

서버 사이드에서, 서버는 인터페이스를 구현하고, 클라이언트의 요청을 처리하는 gRPC 서버를 실행합니다. 클라이언트 사이드에서는 클라이언트는 서버와 동일한 메소드를 제공하는 stub를 가지고 있습니다.

https://grpc.io/img/landing-2.svg

gRPC 클라이언트와 서버는 서로 다른 환경속에서도 사용 가능하고, gRPC가 지원하는 어떤 프로그래밍 언어로도 작성 가능합니다.

  • 자바로 gRPC 서버를 실행하고, 클라이언트로는 go, python, ruby를 사용할 수 있습니다.

protocol buffers

기본적으로 gRPC는 ProtocolBuffers를 사용합니다. ProtocolBuffers는 구조화된 데이터를 직렬화하는 구글의 오픈소스 메커니즘입니다.

protocl buffers를 사용하기 위해서는 직렬화할 데이터의 구조를 proto file에 정의해야합니다. proto file은 보통 텍스트 파일과 비슷하지만, .proto 확장자를 가지는 파일입니다. protocol buffer 데이터는 메세지로 구조화됩니다. 각 메세지는 fields라는 네임-벨류 쌍을 가집니다.

1
2
3
4
5
message Person {
  string name = 1;
  int32 id = 2;
  bool has_ponycopter = 3;
}

.proto파일에 사용할 데이터 자료구조를 명시하고, protoc protocol buffer compiler를 사용해서 사용한 언어에 맞는 데이터 접근 클래스를 생성할 수 있습니다. proto file에 다음과 같이 gRPC 서비스를 선언하고, 사용할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// The greeter service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

gRPC tutorial

git clone -b v1.66.0 --depth 1 https://github.com/grpc/grpc-java 명령어로 실습 코드를 다운 받을 수 있습니다.

defining the service

가장 첫번째로 gRPC 서비스를 정의해야하고 protocol buffers를 이용한 request와 response를 정의해야합니다. 이 예제에서는 자바 코드를 생성하는 것이므로, .proto 파일에 java_package 파일 옵션을 명시합니다.

1
option java_package = "io.grpc.examples.routeguide";

java_packages옵션으로 생성된 자바 클래스에 패키지를 지정합니다.

서비스를 정의하기 위해서는 .proto 파일에 다음과 같이 정의할 수 있습니다.

1
2
3
service RouteGuide {
  ...
}

service내부에서 rpc 메서드들을 정의할 수 있습니다. gRPC에서는 4가지 종류의 서비스 메소드를 정의할 수 있습니다.

  • simpleRPC
    • 클라이언트는 서버에 stub을 이용해 요청을 전송하고 리스폰스를 기다립니다. 일반 함수와 동일하게 동작합니다.
      1
      2
      3
      
      service SampleRPC {
      rpc GetFeature(Point) returns (Feature) {}
      }
      
  • server-side streaming RPC
    • 클라이언트는 서버에 요청을 보내고, 스트림을 리턴 받습니다.
    • 클라이언트는 스트림에 메세지가 없을 때까지 스트림으로부터 데이터를 읽어옵니다.
      1
      2
      3
      4
      5
      
      // Obtains the Features available within the given Rectangle.  Results are
      // streamed rather than returned at once (e.g. in a response message with a
      // repeated field), as the rectangle may cover a large area and contain a
      // huge number of features.
      rpc ListFeatures(Rectangle) returns (stream Feature) {}
      
  • client-side streaming RPC
    • 클라이언트가 메세지 시퀀스를 제공받은 스트림을 활용해 서버로 전송합니다.
    • 클라이언트가 메세지 작성을 마치면, 서버가 모든 메세지를 읽고 응답하기를 대기합니다.
      1
      2
      3
      
      // Accepts a stream of Points on a route being traversed, returning a
      // RouteSummary when traversal is completed.
      rpc RecordRoute(stream Point) returns (RouteSummary) {}
      
  • bidirectional streaming RPC
    • 양쪽에서 read-write 스트림을 이용해서 메세지의 시퀀스를 보낼 수 있습니다.
    • 두 스트림은 독립적으로 동작합니다.
      • 서버는 리스폰스를 생성하기 전에, 모든 클라이언트의 메세지를 전달받도록 대기할 수 있습니다. 다른 읽기, 쓰기의 조합도 가능합니다.
        1
        2
        3
        
        // Accepts a stream of RouteNotes sent while a route is being traversed,
        // while receiving other RouteNotes (e.g. from other users).
        rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
        

gRPC 서버 & 클라이언트

.proto에서부터 서버와 클라이언트 인터페이스를 생성해야합니다. protoc라는 protocol buffer compiler를 이용해서 gRPC 서비스를 생성할 수 있습니다.

gradle 혹은 maven을 사용하면, protoc 빌드 플러그인이 proto파일로부터 코드들을 생성해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
plugins {
    id 'com.google.protobuf' version '0.9.4'
}

protobuf {
  protoc {
    artifact = "com.google.protobuf:protoc:3.25.3"
  }
  plugins {
    grpc {
      artifact = 'io.grpc:protoc-gen-grpc-java:1.66.0'
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc {}
    }
  }
}

예제 코드를 확인해보면, build/generated/source/proto/main/grpc 하위 디렉토리들에 proto파일로부터 생성된 자바 클래스들을 확인할 수 있습니다.

예제에서 사용할 proto 파일의 내용은 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
option objc_class_prefix = "RTG";

package routeguide;

// Interface exported by the server.
service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
  // The name of the feature.
  string name = 1;

  // The point where the feature is detected.
  Point location = 2;
}

// Not used in the RPC.  Instead, this is here for the form serialized to disk.
message FeatureDatabase {
  repeated Feature feature = 1;
}

// A RouteNote is a message sent while at a given point.
message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

서버 코드

서버 코드를 확인해보면 proto 파일로 생성한 RouteGuideGrpc.RouteGuideImplBaseRouteGuideService 클래스로 구현한 것을 알 수 있습니다.

1
2
3
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
  // ~~~
}

RouteGuideService에서 구현한 메소드들은 다음과 같습니다.

Simple RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
     * Gets the {@link Feature} at the requested {@link Point}. If no feature at that location
     * exists, an unnamed feature is returned at the provided location.
     *
     * @param request the requested location for the feature.
     * @param responseObserver the observer that will receive the feature at the requested point.
     */
    @Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      responseObserver.onNext(checkFeature(request));
      responseObserver.onCompleted();
    }
    /**
     * Gets the feature at the given point.
     *
     * @param location the location to check.
     * @return The feature object at the point. Note that an empty name indicates no feature.
     */
    private Feature checkFeature(Point location) {
      for (Feature feature : features) {
        if (feature.getLocation().getLatitude() == location.getLatitude()
            && feature.getLocation().getLongitude() == location.getLongitude()) {
          return feature;
        }
      }

      // No feature was found, return an unnamed feature.
      return Feature.newBuilder().setName("").setLocation(location).build();
    }

GetFeature()메소드는 클라이언트로부터 Point를 전달받아 그에 해당하는 Feature 정보를 리턴하는 메소드입니다.

getFeature() 메소드는 두가지 파라미터를 전달 받습니다.

  • Point : 클라이언트 리퀘스트
  • StreamObserver<Feature> : 서버가 응답을 호출할 때 사용하는 특별한 인터페이스인 응답 옵저버

리스폰스를 리턴하고 호출을 종료하기 위해서는 다음과 같은 과정이 필요합니다.

  1. 서비스 정의에 따라 클라이언트에게 반환할 Feature 응답 객체를 생성하고 데이터를 채웁니다. 예시에서는 이를 별도의 메소드인 checkFeature()에서 처리합니다.
  2. 응답 옵저버의 onNext() 메소드를 사용해 Feature를 반환합니다.
  3. 응답 옵저버의 onCompleted() 메소드를 사용해 RPC 처리가 완료되었음을 알립니다.

Server-side streaming RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    private final Collection<Feature> features;

    /**
     * Gets all features contained within the given bounding {@link Rectangle}.
     *
     * @param request the bounding rectangle for the requested features.
     * @param responseObserver the observer that will receive the features.
     */
    @Override
    public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
      int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
      int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
      int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
      int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

      for (Feature feature : features) {
        if (!RouteGuideUtil.exists(feature)) {
          continue;
        }

        int lat = feature.getLocation().getLatitude();
        int lon = feature.getLocation().getLongitude();
        if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
          responseObserver.onNext(feature);
        }
      }
      responseObserver.onCompleted();
    }

listFeatures 메소드는 서버 사이드 스트리밍 RPC로 클라이언트에게 다수의 Feature를 리턴합니다.

simple rpc에서처럼 리퀘스트 오브젝트와 StreamObserver를 파라미터로 전달받습니다. 클라이언트에게 리턴할 Feature들을 onNext()를 이용해 전달할 수 있습니다.

Client-side streaming RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    /**
     * Gets a stream of points, and responds with statistics about the "trip": number of points,
     * number of known features visited, total distance traveled, and total time spent.
     *
     * @param responseObserver an observer to receive the response summary.
     * @return an observer to receive the requested route points.
     */
    @Override
    public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
      return new StreamObserver<Point>() {
        int pointCount;
        int featureCount;
        int distance;
        Point previous;
        final long startTime = System.nanoTime();

        @Override
        public void onNext(Point point) {
          pointCount++;
          if (RouteGuideUtil.exists(checkFeature(point))) {
            featureCount++;
          }
          // For each point after the first, add the incremental distance from the previous point to
          // the total distance value.
          if (previous != null) {
            distance += calcDistance(previous, point);
          }
          previous = point;
        }

        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "recordRoute cancelled");
        }

        @Override
        public void onCompleted() {
          long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
          responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
              .setFeatureCount(featureCount).setDistance(distance)
              .setElapsedTime((int) seconds).build());
          responseObserver.onCompleted();
        }
      };
    }

recordRoute() 메소드는 클라이언트 사이드 스트리밍 메소드입니다. 클라이언트로부터 Point의 스트림을 전달받고, 전달받은 스트림으로부터 한개의 RouteSummary를 생성해 리턴합니다.

이전 메소드와 동일하게 StreamObserver 오브젝트를 파라미터로 전달받는 것을 확인할 수 있습니다. 다른 점은 리턴 타입이 StreamObserver입니다.

메서드 본문에서 반환할 익명 StreamObserver 객체를 인스턴스화하고, 다음과 같이 처리합니다:

  • onNext() 메서드를 오버라이드하여 클라이언트가 메시지 스트림에 Point를 쓸 때마다 특징(feature) 및 기타 정보를 얻습니다.
  • onCompleted() 메서드를 오버라이드하여(클라이언트가 메시지 작성을 완료했을 때 호출됨), RouteSummary를 채우고 빌드합니다. 그런 다음, 메서드 내의 응답 옵저버의 onNext()를 호출해 RouteSummary를 반환하고, 마지막으로 onCompleted() 메서드를 호출하여 서버 측에서 호출을 완료합니다.

Bidirectional streaming RPC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    /**
     * Receives a stream of message/location pairs, and responds with a stream of all previous
     * messages at each of those locations.
     *
     * @param responseObserver an observer to receive the stream of previous messages.
     * @return an observer to handle requested message/location pairs.
     */
    @Override
    public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
      return new StreamObserver<RouteNote>() {
        @Override
        public void onNext(RouteNote note) {
          List<RouteNote> notes = getOrCreateNotes(note.getLocation());

          // Respond with all previous notes at this location.
          for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
            responseObserver.onNext(prevNote);
          }

          // Now add the new note to the list
          notes.add(note);
        }

        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "routeChat cancelled");
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }

클라이언트 측 스트리밍 예제와 마찬가지로, 우리는 StreamObserver 응답 옵저버를 받고 반환합니다. 단, 이번에는 클라이언트가 메시지 스트림에 메시지를 작성하는 동안, 메서드의 응답 옵저버를 통해 값을 반환합니다. 이때 읽고 쓰는 구문은 클라이언트 스트리밍 및 서버 스트리밍 메서드와 정확히 동일합니다. 양측은 서로의 메시지를 작성된 순서대로 수신하지만, 클라이언트와 서버는 어느 순서로든 자유롭게 읽고 쓸 수 있습니다. 두 스트림은 완전히 독립적으로 동작합니다.

This post is licensed under CC BY 4.0 by the author.