티스토리 뷰

728x90
반응형

Opencensus를 이용한 gRPC Global Tracing

Overview

Opencensus를 사용해서 시스템의 추적 및 매트릭을 수집하고 선택한 백엔드로 내보내 분산 시스템에 대한 관찰성을 제공할 수 있다.

간단하게 gRPC 상에서 추적을 검증하기 위한 용도이므로 단순히 바이트를 포함하는 페이로드를 받아서 대문자 처리를 하는 서비스를 대상으로 검토해 보도록 한다.

Requirements

Creating our gRPC Service

추적을 활성화하기 위해서는 다음과 같은 패키지가 필요하다.

  • Opencensus gRPC 지원 : go.opencensus.io/plugin/ocgrpc
  • Opencensus Trace 지원 : go.opencensus.io/trace

매트릭을 활성화하기 위해서는 다음과 같은 패키지가 필요하다.

Protobuf 정의

rpc 폴더를 생성하고 defs.proto 파일에 아래와 같이 정의를 구성한다.

syntax = "proto3";

package rpc;

message Payload {
    int32 id = 1;
    bytes data = 2;
}

service Fetch {
    rpc Capitalize(Payload) returns (Payload) {}
}

gRPC 서비스 생성

정의한 파일을 protoc로 컴파일 한다.

# protoc 
$ protoc -I rpc rpc/defs.proto --go_out=plugins=grpc:rpc

정상적으로 수행되면 defs.pb.go 파일이 생성된다.

서버 구현

server.go 파일을 생성하고 아래와 같이 구성한다.

package main

import (
    "bytes"
    "context"
    "log"
    "net"
    "google.golang.org/grpc"
    "oc.tutorials/ocgrpc/rpc"
)

type fetchIt int

// Compile time assertion that fetchIt implements FetchServer.
var _ rpc.FetchServer = (*fetchIt)(nil)

func (fi *fetchIt) Capitalize(ctx context.Context, in *rpc.Payload) (*rpc.Payload, error) {
    out :=  &rpc.Payload{
        Data: bytes.ToUpper(in.Data),
    }

    return out, nil
}

func main() {
    addr := ":9988"
    ln, err := net.Listen("tcp", addr)
    if nil != err {
        log.Fatalf("gRPC server: failed to listen: %v", err)
    }

    srv := grpc.NewServer()
    rpc.RegisterFetchServer(srv, new(fetchIt))

    log.Printf("fetchIt gRPC server serving at %q", addr)
    if err := srv.Serve(ln); nil != err {
        log.Fatalf("gRPC server: error serving: %v", err)
    }
}

이제 실행해서 클라이언트 호출에 대기한다.

$ go run server.go

클라이언트 구현

클라이언트는 gRPC 채널을 통해서 서버와 통신하며 바이트 단위로 전송하고 대문자 처리된 결과를 받게 된다.

client.go 파일을 생성하고 아래와 같이 구성한다.

package main

import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"

    "google.golang.org/grpc"
    "oc.tutorials/ocgrpc/rpc"
)

func main() {
    serverAddr := ":9988"
    cc, err := grpc.Dial(serverAddr, grpc.WithInsecure())
    if nil != err {
        log.Fatalf("fetchIt gRPC client failed to dial to server: %v", err)
    }

    fc := rpc.NewFetchClient(cc)

    fIn := bufio.NewReader(os.Stdin)
    for {
        fmt.Print("> ")
        line, _, err := fIn.ReadLine()
        if nil != err {
            log.Printf("Failed to read a line in: %v", err)
            return
        }

        ctx := context.Background()
        out, err := fc.Capitialize(ctx, &rpc.Payload{Data: line})
        if nil != err {
            log.Printf("fetchIt gRPC client got error from server: %v", err)
            continue
        }
        fmt.Printf("< %s\n\n", out.Data)
    }
}

이제 실행해서 입력한 결과를 확인할 수 있다.

$ go run client.go
> foo
< FOO

> bar
< BAR

Instrumentation

위의 예제에 추적과 매트릭에 대한 계측을 추가한다.

서버 계측

  • 추적

    추적을 측정하기 위해서는 grpc.StatsHandler에 ServerHandler를 등록해서 gRPC 측정항목을 추출하고 추적할 수 있도록 추가해 줘야 한다.

    import (
      ...
      "go.opencensus.io/trace"
    )
    
    func (fi *fetchIt) Capitalize(ctx context.Context, in *rpc.Payload) (*rpc.Payload, error) {
      ctx, span := trace.StartSpan(ctx, "oc.tutorials.grpc.Capitalize")
      defer span.End()
    
      ...
    }
    
    func main() {
      ...
      srv := grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
      ...
    }
  • 매트릭

    매트릭을 측정하기 위해서는 필요한 View 정보를 추가해 줘야 한다.

    import (
      ...
      "go.opencensus.io/plugin/ocgrpc"
      "go.opencensus.io/stats/view"
    )
    
    func main() {
      ...
      if err := view.Register(ocgrpc.DefaultServerViews...); nil != err {
        log.Fatalf("Failed to register ocgrpc server views: %v", err)
      }
    
      srv := grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
      ...
    }
  • 완성된 최종 서버 코드

    추출 및 수집된 데이터를 확인하기 위해서 stackdriver를 Exporter로 등록하는 정보를 추가해 줘야 한다. (Exporter는 다양하므로 선호하는 것을 활용해도 된다)

    package main
    
    import (
      "bytes"
      "context"
      "log"
      "net"
    
      "go.opencensus.io/plugin/ocgrpc"
      "go.opencensus.io/stats/view"
      "go.opencensus.io/trace"
      "google.golang.org/grpc"
    
      "oc.tutorials/ocgrpc/rpc"
    
      // The exporter to extract our metrics and traces
      "contrib.go.opencensus.io/exporter/stackdriver"
    )
    
    type fetchIt int
    
    // Compile time assertion that fetchIt implements FetchServer.
    var _ rpc.FetchServer = (*fetchIt)(nil)
    
    func (fi *fetchIt) Capitalize(ctx context.Context, in *rpc.Payload) (*rpc.Payload, error) {
      ctx, span := trace.StartSpan(ctx, "oc.tutorials.grpc.Capitalize")
      defer span.End()
    
      out := &rpc.Payload{
        Data: bytes.ToUpper(in.Data),
      }
      return out, nil
    }
    
    func main() {
      if err := view.Register(ocgrpc.DefaultServerViews...); nil != err {
        log.Fatalf("Failed to register ocgrpc server views: %v", err)
      }
    
      // Create and register the exporter
      sd, err := stackdriver.NewExporter(stackdriver.Options{
        ProjectID:    "census-demos", // Insert your projectID here
        MetricPrefix: "ocgrpctutorial",
      })
      if nil != err {
        log.Fatalf("Failed to create Stackdriver exporter: %v", err)
      }
      defer sd.Flush()
      trace.RegisterExporter(sd)
      sd.StartMetricsExporter()
      defer sd.StopMetricsExporter()
      // For demo purposes let's always sample
      trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
    
      addr := ":9988"
      ln, err := net.Listen("tcp", addr)
      if nil != err {
        log.Fatalf("gRPC server: failed to listen: %v", err)
      }
      srv := grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{}))
      rpc.RegisterFetchServer(srv, new(fetchIt))
      log.Printf("fetchIt gRPC server serving at %q", addr)
      if err := srv.Serve(ln); nil != err {
        log.Fatalf("gRPC server: error serving: %v", err)
      }
    }

클라이언트 계측

  • 추적

    추적을 측정하기 위해서는 grpc.StatsHandler에 ClientHandler를 등록해서 gRPC 측정항목을 추출하고 추적할 수 있도록 추가해 줘야 한다.

    import (
      ...
      "go.opencensus.io/trace"
    )
    
    func main() {
      ...
      cc, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithStatsHandler(new(ocgrpc.ClientHander)))
      ...
      ctx, span := trace.StartSpan(context.Background(), "oc.tutorials.grpc.ClientCapitalize")
      out, err := fc.Capitalize(ctx, &rpc.Payload{Data: line})
      if nil != err {
        span.SetStatus(trace.Status{Code: trace.StatusCodeInternal, Message: err.Error()})
        log.Printf("fetchIt gRPC client got error from server: %v", err)
      } else {
        fmt.Printf("< %v\n\n", out.Data)
      }
      span.End()
      ...
    }
  • 매트릭

    매트릭을 측정하기 위해서는 필요한 View 정보를 추가해 줘야 한다.

    import (
      ...
      "go.opencensus.io/plugin/ocgrpc"
      "go.opencensus.io/stats/view"
      "go.opencensus.io/trace"
    )
    
    func main() {
      ...
      if err := view.Register(ocgrpc.DefaultClientViews...); nil != err {
        log.Fatalf("Failed to register ocgrpc client views: %v", err)
      }
      cc, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithStatsHandler(new(ocgrpc.ClientHander)))
    }
  • 완성된 클라이언트 코드

    추출 및 수집된 데이터를 확인하기 위해서 stackdriver를 Exporter로 등록하는 정보를 추가해 줘야 한다. (Exporter는 다양하므로 선호하는 것을 활용해도 된다)

    package main
    
    import (
      "bufio"
      "context"
      "fmt"
      "log"
      "os"
    
      "go.opencensus.io/plugin/ocgrpc"
      "go.opencensus.io/stats/view"
      "go.opencensus.io/trace"
      "google.golang.org/grpc"
    
      "oc.tutorials/ocgrpc/rpc"
    
      // The exporter to extract our metrics and traces
      "contrib.go.opencensus.io/exporter/stackdriver"
    )
    
    func main() {
      if err := view.Register(ocgrpc.DefaultClientViews...); nil != err {
        log.Fatalf("Failed to register ocgrpc client views: %v", err)
      }
    
      // Create and register the exporter
      sd, err := stackdriver.NewExporter(stackdriver.Options{
        ProjectID:    "census-demos", // Insert your projectID here
        MetricPrefix: "ocgrpctutorial",
      })
      if nil != err {
        log.Fatalf("Failed to create Stackdriver exporter: %v", err)
      }
      defer sd.Flush()
      trace.RegisterExporter(sd)
      sd.StartMetricsExporter()
      defer sd.StopMetricsExporter()
      // For demo purposes let's always sample
      trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
    
      serverAddr := ":9988"
      cc, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithStatsHandler(new(ocgrpc.ClientHandler)))
      if nil != err {
        log.Fatalf("fetchIt gRPC client failed to dial to server: %v", err)
      }
      fc := rpc.NewFetchClient(cc)
    
      fIn := bufio.NewReader(os.Stdin)
      for {
        fmt.Print("> ")
        line, _, err := fIn.ReadLine()
        if nil != err {
          log.Printf("Failed to read a line in: %v", err)
          return
        }
    
        ctx, span := trace.StartSpan(context.Background(), "oc.tutorials.grpc.ClientCapitalize")
        out, err := fc.Capitalize(ctx, &rpc.Payload{Data: line})
        if nil != err {
          span.SetStatus(trace.Status{Code: trace.StatusCodeInternal, Message: err.Error()})
          log.Printf("fetchIt gRPC client got error from server: %v", err)
        } else {
          fmt.Printf("< %s\n\n", out.Data)
        }
        span.End()
      }
    }

Example Traces

GCP와 연계를 했고 stackdriver를 선택해서 추적 정보를 Export 했다면 Goolgle Cloud Platform Trace 정보 를 통해서 처리된 작업을 확인할 수 있다.

Example Metrics

GCP와 연계를 했고 stackdriver를 선택해서 매트릭 정보를 Export 했다면 Google Cloud Platform Metrics 정보 를 통해서 처리된 작업을 확인할 수 있다.

참고 자료

728x90
반응형
댓글
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함