No Description

grpc.go 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. protov3grpc "github.com/go-graphite/protocol/carbonapi_v3_grpc"
  8. pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
  9. gpb "github.com/golang/protobuf/ptypes/empty"
  10. "github.com/lomik/zapwriter"
  11. "go.uber.org/zap"
  12. "google.golang.org/grpc"
  13. )
  14. var errNotImplementedYet = fmt.Errorf("feature not implemented yet")
  15. var errNoDataInResponse = fmt.Errorf("no data in response")
  16. var errEmptyRequest = fmt.Errorf("empty request")
  17. var errUnknownError = fmt.Errorf("unknown error")
  18. type GRPCServer struct {
  19. listener net.Listener
  20. server *grpc.Server
  21. }
  22. func (srv *GRPCServer) serve() {
  23. srv.server.Serve(srv.listener)
  24. }
  25. func (srv GRPCServer) GetVersion(ctx context.Context, in *gpb.Empty) (*protov3grpc.ProtocolVersionResponse, error) {
  26. return &protov3grpc.ProtocolVersionResponse{
  27. Version: 1,
  28. }, nil
  29. }
  30. func (srv GRPCServer) FetchMetrics(ctx context.Context, in *pb.MultiFetchRequest) (*pb.MultiFetchResponse, error) {
  31. t0 := time.Now()
  32. memoryUsage := 0
  33. logger := zapwriter.Logger("grpc_find").With(
  34. zap.String("handler", "find"),
  35. )
  36. logger.Debug("got find request",
  37. zap.String("request", "grpc"),
  38. )
  39. Metrics.FindRequests.Add(1)
  40. grpcLogger := zapwriter.Logger("grpc_access").With(
  41. zap.String("handler", "render"),
  42. zap.String("format", "grpc"),
  43. )
  44. ctx, cancel := context.WithTimeout(ctx, config.Timeouts.Render)
  45. defer cancel()
  46. grpcLogger.Debug("got render request",
  47. zap.Any("request", in.Metrics),
  48. )
  49. Metrics.RenderRequests.Add(1)
  50. response, stats, err := config.zipper.FetchProtoV3(ctx, in)
  51. sendStats(stats)
  52. if err != nil {
  53. grpcLogger.Error("failed to fetch data",
  54. zap.Int("memory_usage_bytes", memoryUsage),
  55. zap.Error(err),
  56. zap.Any("request", in),
  57. zap.Duration("runtime_seconds", time.Since(t0)),
  58. )
  59. return nil, err
  60. }
  61. if len(response.Metrics) == 0 {
  62. return nil, errNoDataInResponse
  63. }
  64. grpcLogger.Info("request served",
  65. zap.Int("memory_usage_bytes", memoryUsage),
  66. zap.Duration("runtime_seconds", time.Since(t0)),
  67. )
  68. return response, nil
  69. }
  70. func (srv GRPCServer) FindMetrics(ctx context.Context, in *pb.MultiGlobRequest) (*pb.MultiGlobResponse, error) {
  71. t0 := time.Now()
  72. logger := zapwriter.Logger("grpc_find").With(
  73. zap.String("handler", "find"),
  74. )
  75. logger.Debug("got find request",
  76. zap.String("request", "grpc"),
  77. )
  78. Metrics.FindRequests.Add(1)
  79. grpcLogger := zapwriter.Logger("grpc_access").With(
  80. zap.String("handler", "find"),
  81. zap.String("format", "grpc"),
  82. )
  83. ctx, cancel := context.WithTimeout(ctx, config.Timeouts.Find)
  84. defer cancel()
  85. response, stats, err := config.zipper.FindProtoV3(ctx, in)
  86. sendStats(stats)
  87. if err != nil {
  88. grpcLogger.Error("find error",
  89. zap.Strings("query", in.Metrics),
  90. zap.String("reason", err.Error()),
  91. zap.Duration("runtime_seconds", time.Since(t0)),
  92. )
  93. return nil, err
  94. }
  95. if len(response.Metrics) == 0 {
  96. return nil, errNoDataInResponse
  97. }
  98. grpcLogger.Info("request served",
  99. zap.Duration("runtime_seconds", time.Since(t0)),
  100. )
  101. return response, nil
  102. }
  103. func (srv GRPCServer) MetricsInfo(ctx context.Context, in *pb.MultiMetricsInfoRequest) (*pb.MultiMetricsInfoResponse, error) {
  104. return nil, errNotImplementedYet
  105. }
  106. func (srv GRPCServer) ListMetrics(ctx context.Context, in *gpb.Empty) (*pb.ListMetricsResponse, error) {
  107. return nil, errNotImplementedYet
  108. }
  109. func (srv GRPCServer) Stats(ctx context.Context, in *gpb.Empty) (*pb.MetricDetailsResponse, error) {
  110. return nil, errNotImplementedYet
  111. }
  112. func NewGRPCServer(address string) (*GRPCServer, error) {
  113. listener, err := net.Listen("tcp", address)
  114. if err != nil {
  115. return nil, err
  116. }
  117. srv := GRPCServer{
  118. listener: listener,
  119. server: grpc.NewServer(),
  120. }
  121. protov3grpc.RegisterCarbonV1Server(srv.server, srv)
  122. go srv.serve()
  123. return &srv, nil
  124. }