Browse Source

Add experimental support for tracing with OpenCensus

See `github.com/olivere/trace/opencensus`.

Close #912
tags/v6.2.7
Oliver Eilhard 1 year ago
parent
commit
2ddc762ab5
3 changed files with 367 additions and 0 deletions
  1. 222
    0
      trace/opencensus/transport.go
  2. 131
    0
      trace/opencensus/transport_test.go
  3. 14
    0
      trace/opencensus/util.go

+ 222
- 0
trace/opencensus/transport.go View File

@@ -0,0 +1,222 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package opencensus

import (
"context"
"fmt"
"net/http"
"net/url"

"github.com/pkg/errors"
"go.opencensus.io/trace"
)

// Transport for tracing Elastic operations.
type Transport struct {
rt http.RoundTripper
defaultAttributes []trace.Attribute
}

// Option signature for specifying options, e.g. WithRoundTripper.
type Option func(t *Transport)

// WithRoundTripper specifies the http.RoundTripper to call
// next after this transport. If it is nil (default), the
// transport will use http.DefaultTransport.
func WithRoundTripper(rt http.RoundTripper) Option {
return func(t *Transport) {
t.rt = rt
}
}

// WithDefaultAttributes specifies default attributes to add
// to each span.
func WithDefaultAttributes(attrs ...trace.Attribute) Option {
return func(t *Transport) {
t.defaultAttributes = attrs
}
}

// NewTransport specifies a transport that will trace Elastic
// and report back via OpenTracing.
func NewTransport(opts ...Option) *Transport {
t := &Transport{}
for _, o := range opts {
o(t)
}
return t
}

// RoundTrip captures the request and starts an OpenTracing span
// for Elastic PerformRequest operation.
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
_, span := trace.StartSpan(req.Context(), "elastic:PerformRequest")
attrs := append([]trace.Attribute(nil), t.defaultAttributes...)
attrs = append(attrs,
trace.StringAttribute("Component", "github.com/olivere/elastic/v6"),
trace.StringAttribute("Method", req.Method),
trace.StringAttribute("URL", req.URL.String()),
trace.StringAttribute("Hostname", req.URL.Hostname()),
trace.Int64Attribute("Port", atoi64(req.URL.Port())),
)
span.AddAttributes(attrs...)

var (
resp *http.Response
err error
)
defer func() {
setSpanStatus(span, err)
span.End()
}()
if t.rt != nil {
resp, err = t.rt.RoundTrip(req)
} else {
resp, err = http.DefaultTransport.RoundTrip(req)
}
return resp, err
}

// See https://github.com/opencensus-integrations/ocsql/blob/master/driver.go#L749
func setSpanStatus(span *trace.Span, err error) {
var status trace.Status
switch {
case err == nil:
status.Code = trace.StatusCodeOK
span.SetStatus(status)
return
case err == context.Canceled:
status.Code = trace.StatusCodeCancelled
case err == context.DeadlineExceeded:
status.Code = trace.StatusCodeDeadlineExceeded
case isConnErr(err):
status.Code = trace.StatusCodeUnavailable
case isNotFound(err):
status.Code = trace.StatusCodeNotFound
case isConflict(err):
status.Code = trace.StatusCodeFailedPrecondition
case isForbidden(err):
status.Code = trace.StatusCodePermissionDenied
case isTimeout(err):
status.Code = trace.StatusCodeResourceExhausted
default:
status.Code = trace.StatusCodeUnknown
}
status.Message = err.Error()
span.SetStatus(status)
}

// Copied from elastic to prevent cyclic dependencies.
type elasticError struct {
Status int `json:"status"`
Details *errorDetails `json:"error,omitempty"`
}

// errorDetails encapsulate error details from Elasticsearch.
// It is used in e.g. elastic.Error and elastic.BulkResponseItem.
type errorDetails struct {
Type string `json:"type"`
Reason string `json:"reason"`
ResourceType string `json:"resource.type,omitempty"`
ResourceId string `json:"resource.id,omitempty"`
Index string `json:"index,omitempty"`
Phase string `json:"phase,omitempty"`
Grouped bool `json:"grouped,omitempty"`
CausedBy map[string]interface{} `json:"caused_by,omitempty"`
RootCause []*errorDetails `json:"root_cause,omitempty"`
FailedShards []map[string]interface{} `json:"failed_shards,omitempty"`
}

// Error returns a string representation of the error.
func (e *elasticError) Error() string {
if e.Details != nil && e.Details.Reason != "" {
return fmt.Sprintf("elastic: Error %d (%s): %s [type=%s]", e.Status, http.StatusText(e.Status), e.Details.Reason, e.Details.Type)
}
return fmt.Sprintf("elastic: Error %d (%s)", e.Status, http.StatusText(e.Status))
}

// isContextErr returns true if the error is from a context that was canceled or deadline exceeded
func isContextErr(err error) bool {
if err == context.Canceled || err == context.DeadlineExceeded {
return true
}
// This happens e.g. on redirect errors, see https://golang.org/src/net/http/client_test.go#L329
if ue, ok := err.(*url.Error); ok {
if ue.Temporary() {
return true
}
// Use of an AWS Signing Transport can result in a wrapped url.Error
return isContextErr(ue.Err)
}
return false
}

// isConnErr returns true if the error indicates that Elastic could not
// find an Elasticsearch host to connect to.
func isConnErr(err error) bool {
if err == nil {
return false
}
if err.Error() == "no Elasticsearch node available" {
return true
}
innerErr := errors.Cause(err)
if innerErr == nil {
return false
}
if innerErr.Error() == "no Elasticsearch node available" {
return true
}
return false
}

// isNotFound returns true if the given error indicates that Elasticsearch
// returned HTTP status 404. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
func isNotFound(err interface{}) bool {
return isStatusCode(err, http.StatusNotFound)
}

// isTimeout returns true if the given error indicates that Elasticsearch
// returned HTTP status 408. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
func isTimeout(err interface{}) bool {
return isStatusCode(err, http.StatusRequestTimeout)
}

// isConflict returns true if the given error indicates that the Elasticsearch
// operation resulted in a version conflict. This can occur in operations like
// `update` or `index` with `op_type=create`. The err parameter can be of
// type *elastic.Error, elastic.Error, *http.Response or int (indicating the
// HTTP status code).
func isConflict(err interface{}) bool {
return isStatusCode(err, http.StatusConflict)
}

// isForbidden returns true if the given error indicates that Elasticsearch
// returned HTTP status 403. This happens e.g. due to a missing license.
// The err parameter can be of type *elastic.Error, elastic.Error,
// *http.Response or int (indicating the HTTP status code).
func isForbidden(err interface{}) bool {
return isStatusCode(err, http.StatusForbidden)
}

// isStatusCode returns true if the given error indicates that the Elasticsearch
// operation returned the specified HTTP status code. The err parameter can be of
// type *http.Response, *Error, Error, or int (indicating the HTTP status code).
func isStatusCode(err interface{}, code int) bool {
switch e := err.(type) {
case *http.Response:
return e.StatusCode == code
case *elasticError:
return e.Status == code
case elasticError:
return e.Status == code
case int:
return e == code
}
return false
}

+ 131
- 0
trace/opencensus/transport_test.go View File

@@ -0,0 +1,131 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package opencensus

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"go.opencensus.io/trace"

"github.com/olivere/elastic"
)

func init() {
// Always sample
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
}

type testExporter struct {
spans []*trace.SpanData
}

func (t *testExporter) ExportSpan(s *trace.SpanData) {
t.spans = append(t.spans, s)
}

func TestTransport(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, `{
"name" : "Qg28M36",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "rwHa7BBnRC2h8KoDfCbmuQ",
"version" : {
"number" : "6.3.2",
"build_flavor" : "oss",
"build_type" : "tar",
"build_hash" : "053779d",
"build_date" : "2018-07-20T05:20:23.451332Z",
"build_snapshot" : false,
"lucene_version" : "7.3.1",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}`)
return
default:
w.WriteHeader(http.StatusInternalServerError)
return
}
}))
defer ts.Close()

// Register test exporter
var te testExporter
trace.RegisterExporter(&te)

// Setup a simple transport
tr := NewTransport(
WithDefaultAttributes(
trace.StringAttribute("Opaque-Id", "12345"),
),
)
httpClient := &http.Client{
Transport: tr,
}

// Create a simple Ping request via Elastic
client, err := elastic.NewClient(
elastic.SetURL(ts.URL),
elastic.SetHttpClient(httpClient),
elastic.SetHealthcheck(false),
elastic.SetSniff(false),
)
if err != nil {
t.Fatal(err)
}
res, code, err := client.Ping(ts.URL).Do(context.Background())
if err != nil {
t.Fatal(err)
}
if want, have := http.StatusOK, code; want != have {
t.Fatalf("want Status=%d, have %d", want, have)
}
if want, have := "You Know, for Search", res.TagLine; want != have {
t.Fatalf("want TagLine=%q, have %q", want, have)
}
trace.UnregisterExporter(&te)

// Check the data written into tracer
spans := te.spans
if want, have := 1, len(spans); want != have {
t.Fatalf("want %d finished spans, have %d", want, have)
}
span := spans[0]
if want, have := "elastic:PerformRequest", span.Name; want != have {
t.Fatalf("want Span.Name=%q, have %q", want, have)
}
if attr, ok := span.Attributes["Component"].(string); !ok {
t.Fatalf("attribute %q not found", "Component")
} else if want, have := "github.com/olivere/elastic/v6", attr; want != have {
t.Fatalf("want attribute=%q, have %q", want, have)
}
if attr, ok := span.Attributes["Method"].(string); !ok {
t.Fatalf("attribute %q not found", "Method")
} else if want, have := "GET", attr; want != have {
t.Fatalf("want attribute=%q, have %q", want, have)
}
if attr, ok := span.Attributes["URL"].(string); !ok || attr == "" {
t.Fatalf("attribute %q not found", "URL")
}
if attr, ok := span.Attributes["Hostname"].(string); !ok || attr == "" {
t.Fatalf("attribute %q not found", "Hostname")
}
if port, ok := span.Attributes["Port"].(int64); !ok || port <= 0 {
t.Fatalf("attribute %q not found", "Port")
}
if attr, ok := span.Attributes["Opaque-Id"].(string); !ok {
t.Fatalf("attribute %q not found", "Opaque-Id")
} else if want, have := "12345", attr; want != have {
t.Fatalf("want attribute=%q, have %q", want, have)
}
}

+ 14
- 0
trace/opencensus/util.go View File

@@ -0,0 +1,14 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package opencensus

import (
"strconv"
)

func atoi64(s string) int64 {
i, _ := strconv.ParseInt(s, 10, 64)
return i
}

Loading…
Cancel
Save