Browse Source

Remove carbonzipper dependency from carbonapi

Carbonapi will use the carbonzipper code that lives in this repo from
now on.
Gunnar Þór Magnússon 1 year ago
parent
commit
5fc8bc2d72
56 changed files with 89 additions and 4718 deletions
  1. 2
    2
      .gitignore
  2. 1
    31
      Gopkg.lock
  3. 0
    4
      Gopkg.toml
  4. 1
    1
      Makefile
  5. 1
    1
      cmd/carbonapi/http_handlers.go
  6. 4
    4
      cmd/carbonapi/main.go
  7. 2
    2
      cmd/carbonapi/main_test.go
  8. 3
    3
      cmd/carbonapi/zipper.go
  9. 8
    8
      cmd/carbonzipper/main.go
  10. 1
    1
      expr/functions/graphiteWeb/function.go
  11. 1
    1
      pathcache/pathcache.go
  12. 0
    25
      vendor/github.com/go-graphite/carbonzipper/LICENSE
  13. 0
    109
      vendor/github.com/go-graphite/carbonzipper/cache/cache.go
  14. 0
    59
      vendor/github.com/go-graphite/carbonzipper/intervalset/intervalset.go
  15. 0
    57
      vendor/github.com/go-graphite/carbonzipper/limiter/limiter.go
  16. 0
    45
      vendor/github.com/go-graphite/carbonzipper/mstats/mstats.go
  17. 0
    58
      vendor/github.com/go-graphite/carbonzipper/pathcache/pathcache.go
  18. 0
    50
      vendor/github.com/go-graphite/carbonzipper/util/apictx/ctx.go
  19. 0
    50
      vendor/github.com/go-graphite/carbonzipper/util/zipperctx/ctx.go
  20. 0
    602
      vendor/github.com/go-graphite/carbonzipper/zipper/broadcast/broadcast_group.go
  21. 0
    106
      vendor/github.com/go-graphite/carbonzipper/zipper/cache/query.go
  22. 0
    25
      vendor/github.com/go-graphite/carbonzipper/zipper/config/config.go
  23. 0
    94
      vendor/github.com/go-graphite/carbonzipper/zipper/errors/errors.go
  24. 0
    159
      vendor/github.com/go-graphite/carbonzipper/zipper/helper/requests.go
  25. 0
    9
      vendor/github.com/go-graphite/carbonzipper/zipper/httpHeaders/headers.go
  26. 0
    23
      vendor/github.com/go-graphite/carbonzipper/zipper/metadata/metadata.go
  27. 0
    233
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/auto/auto_group.go
  28. 0
    331
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/graphite_group.go
  29. 0
    21
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/msgpack/type.go
  30. 0
    600
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/msgpack/type_gen.go
  31. 0
    228
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/grpc/grpc_group.go
  32. 0
    340
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/v2/protobuf_group.go
  33. 0
    252
      vendor/github.com/go-graphite/carbonzipper/zipper/protocols/v3/protobuf_group.go
  34. 0
    58
      vendor/github.com/go-graphite/carbonzipper/zipper/types/backend.go
  35. 0
    21
      vendor/github.com/go-graphite/carbonzipper/zipper/types/errors.go
  36. 0
    43
      vendor/github.com/go-graphite/carbonzipper/zipper/types/interface.go
  37. 0
    66
      vendor/github.com/go-graphite/carbonzipper/zipper/types/lbmethod.go
  38. 0
    228
      vendor/github.com/go-graphite/carbonzipper/zipper/types/response.go
  39. 0
    35
      vendor/github.com/go-graphite/carbonzipper/zipper/types/stats.go
  40. 0
    12
      vendor/github.com/go-graphite/carbonzipper/zipper/types/timeouts.go
  41. 0
    656
      vendor/github.com/go-graphite/carbonzipper/zipper/zipper.go
  42. 5
    5
      zipper/broadcast/broadcast_group.go
  43. 3
    3
      zipper/broadcast/broadcast_group_test.go
  44. 1
    1
      zipper/config/config.go
  45. 2
    2
      zipper/dummy/dummy.go
  46. 5
    5
      zipper/helper/requests.go
  47. 3
    3
      zipper/metadata/metadata.go
  48. 7
    7
      zipper/protocols/auto/auto_group.go
  49. 7
    7
      zipper/protocols/graphite/graphite_group.go
  50. 4
    4
      zipper/protocols/grpc/grpc_group.go
  51. 6
    6
      zipper/protocols/v2/protobuf_group.go
  52. 6
    6
      zipper/protocols/v3/protobuf_group.go
  53. 1
    1
      zipper/types/interface.go
  54. 1
    1
      zipper/types/response.go
  55. 12
    12
      zipper/zipper.go
  56. 2
    2
      zipper/zipper_test.go

+ 2
- 2
.gitignore View File

@@ -1,3 +1,3 @@
1 1
 .idea
2
-carbonapi
3
-carbonzipper
2
+/carbonapi
3
+/carbonzipper

+ 1
- 31
Gopkg.lock View File

@@ -106,36 +106,6 @@
106 106
   revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9"
107 107
   version = "v1.4.7"
108 108
 
109
-[[projects]]
110
-  branch = "gmagnusson/repo-merge"
111
-  name = "github.com/go-graphite/carbonzipper"
112
-  packages = [
113
-    "cache",
114
-    "intervalset",
115
-    "limiter",
116
-    "mstats",
117
-    "pathcache",
118
-    "util/apictx",
119
-    "util/zipperctx",
120
-    "zipper",
121
-    "zipper/broadcast",
122
-    "zipper/cache",
123
-    "zipper/config",
124
-    "zipper/dummy",
125
-    "zipper/errors",
126
-    "zipper/helper",
127
-    "zipper/httpHeaders",
128
-    "zipper/metadata",
129
-    "zipper/protocols/auto",
130
-    "zipper/protocols/graphite",
131
-    "zipper/protocols/graphite/msgpack",
132
-    "zipper/protocols/grpc",
133
-    "zipper/protocols/v2",
134
-    "zipper/protocols/v3",
135
-    "zipper/types"
136
-  ]
137
-  revision = "ef88d4c37691b67955d5d7ec79b459026930ff49"
138
-
139 109
 [[projects]]
140 110
   name = "github.com/go-graphite/protocol"
141 111
   packages = [
@@ -467,6 +437,6 @@
467 437
 [solve-meta]
468 438
   analyzer-name = "dep"
469 439
   analyzer-version = 1
470
-  inputs-digest = "d5624766b7e9c9f2da264b87de2b151e9c3cea48f6dd716b997f7fa1b81066e8"
440
+  inputs-digest = "4e4fa1c570cebf3bed6470f494925d5f56f679412f1133e749651741eeb9c089"
471 441
   solver-name = "gps-cdcl"
472 442
   solver-version = 1

+ 0
- 4
Gopkg.toml View File

@@ -69,10 +69,6 @@
69 69
   branch = "master"
70 70
   name = "github.com/facebookgo/pidfile"
71 71
 
72
-[[constraint]]
73
-  branch = "gmagnusson/repo-merge"
74
-  name = "github.com/go-graphite/carbonzipper"
75
-
76 72
 [[constraint]]
77 73
   branch = "master"
78 74
   name = "github.com/go-graphite/protocol"

+ 1
- 1
Makefile View File

@@ -19,7 +19,7 @@ debug:
19 19
 nocairo:
20 20
 	$(GO) build -ldflags '-X main.BuildVersion=$(VERSION)' $(PKG_CARBONAPI)
21 21
 
22
-carbonzipper: dep
22
+carbonzipper:
23 23
 	$(GO) build --ldflags '-X main.BuildVersion=$(VERSION)' $(PKG_CARBONZIPPER)
24 24
 
25 25
 test:

+ 1
- 1
cmd/carbonapi/http_handlers.go View File

@@ -19,7 +19,7 @@ import (
19 19
 	"github.com/go-graphite/carbonapi/expr/types"
20 20
 	"github.com/go-graphite/carbonapi/pkg/parser"
21 21
 	"github.com/go-graphite/carbonapi/util"
22
-	"github.com/go-graphite/carbonzipper/intervalset"
22
+	"github.com/go-graphite/carbonapi/intervalset"
23 23
 	pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
24 24
 
25 25
 	"github.com/go-graphite/carbonapi/expr/metadata"

+ 4
- 4
cmd/carbonapi/main.go View File

@@ -23,10 +23,10 @@ import (
23 23
 	"github.com/go-graphite/carbonapi/expr/functions/cairo/png"
24 24
 	"github.com/go-graphite/carbonapi/expr/helper"
25 25
 	"github.com/go-graphite/carbonapi/pkg/parser"
26
-	"github.com/go-graphite/carbonzipper/cache"
27
-	"github.com/go-graphite/carbonzipper/mstats"
28
-	zipperCfg "github.com/go-graphite/carbonzipper/zipper/config"
29
-	zipperTypes "github.com/go-graphite/carbonzipper/zipper/types"
26
+	"github.com/go-graphite/carbonapi/cache"
27
+	"github.com/go-graphite/carbonapi/mstats"
28
+	zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
29
+	zipperTypes "github.com/go-graphite/carbonapi/zipper/types"
30 30
 	"github.com/gorilla/handlers"
31 31
 	"github.com/peterbourgon/g2g"
32 32
 	"github.com/spf13/viper"

+ 2
- 2
cmd/carbonapi/main_test.go View File

@@ -9,8 +9,8 @@ import (
9 9
 	"testing"
10 10
 
11 11
 	"github.com/go-graphite/carbonapi/expr/types"
12
-	realZipper "github.com/go-graphite/carbonzipper/zipper"
13
-	zipperTypes "github.com/go-graphite/carbonzipper/zipper/types"
12
+	realZipper "github.com/go-graphite/carbonapi/zipper"
13
+	zipperTypes "github.com/go-graphite/carbonapi/zipper/types"
14 14
 	pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
15 15
 	"github.com/lomik/zapwriter"
16 16
 	"github.com/stretchr/testify/assert"

+ 3
- 3
cmd/carbonapi/zipper.go View File

@@ -7,9 +7,9 @@ import (
7 7
 
8 8
 	"github.com/go-graphite/carbonapi/expr/types"
9 9
 	"github.com/go-graphite/carbonapi/util"
10
-	realZipper "github.com/go-graphite/carbonzipper/zipper"
11
-	zipperCfg "github.com/go-graphite/carbonzipper/zipper/config"
12
-	zipperTypes "github.com/go-graphite/carbonzipper/zipper/types"
10
+	realZipper "github.com/go-graphite/carbonapi/zipper"
11
+	zipperCfg "github.com/go-graphite/carbonapi/zipper/config"
12
+	zipperTypes "github.com/go-graphite/carbonapi/zipper/types"
13 13
 	pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
14 14
 	"go.uber.org/zap"
15 15
 )

+ 8
- 8
cmd/carbonzipper/main.go View File

@@ -20,15 +20,15 @@ import (
20 20
 	"github.com/dgryski/httputil"
21 21
 	"github.com/facebookgo/grace/gracehttp"
22 22
 	"github.com/facebookgo/pidfile"
23
-	"github.com/go-graphite/carbonzipper/intervalset"
24
-	"github.com/go-graphite/carbonzipper/mstats"
23
+	"github.com/go-graphite/carbonapi/intervalset"
24
+	"github.com/go-graphite/carbonapi/mstats"
25 25
 	"github.com/spf13/viper"
26
-	// "github.com/go-graphite/carbonzipper/pathcache"
27
-	cu "github.com/go-graphite/carbonzipper/util/apictx"
28
-	util "github.com/go-graphite/carbonzipper/util/zipperctx"
29
-	"github.com/go-graphite/carbonzipper/zipper"
30
-	zipperConfig "github.com/go-graphite/carbonzipper/zipper/config"
31
-	"github.com/go-graphite/carbonzipper/zipper/types"
26
+	// "github.com/go-graphite/carbonapi/pathcache"
27
+	cu "github.com/go-graphite/carbonapi/util/apictx"
28
+	util "github.com/go-graphite/carbonapi/util/zipperctx"
29
+	"github.com/go-graphite/carbonapi/zipper"
30
+	zipperConfig "github.com/go-graphite/carbonapi/zipper/config"
31
+	"github.com/go-graphite/carbonapi/zipper/types"
32 32
 	protov2 "github.com/go-graphite/protocol/carbonapi_v2_pb"
33 33
 	protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
34 34
 	"github.com/lomik/zapwriter"

+ 1
- 1
expr/functions/graphiteWeb/function.go View File

@@ -16,7 +16,7 @@ import (
16 16
 	"github.com/go-graphite/carbonapi/expr/metadata"
17 17
 	"github.com/go-graphite/carbonapi/expr/types"
18 18
 	"github.com/go-graphite/carbonapi/pkg/parser"
19
-	"github.com/go-graphite/carbonzipper/limiter"
19
+	"github.com/go-graphite/carbonapi/limiter"
20 20
 	pb "github.com/go-graphite/protocol/carbonapi_v3_pb"
21 21
 	"github.com/lomik/zapwriter"
22 22
 	"github.com/spf13/viper"

+ 1
- 1
pathcache/pathcache.go View File

@@ -2,7 +2,7 @@ package pathcache
2 2
 
3 3
 import (
4 4
 	"github.com/dgryski/go-expirecache"
5
-	"github.com/go-graphite/carbonzipper/zipper/types"
5
+	"github.com/go-graphite/carbonapi/zipper/types"
6 6
 
7 7
 	"time"
8 8
 )

+ 0
- 25
vendor/github.com/go-graphite/carbonzipper/LICENSE View File

@@ -1,25 +0,0 @@
1
-Copyright (c) 2013-2018 Damian Gryski <damian@gryski.com>
2
-              2018 Google LLC
3
-
4
-All rights reserved.
5
-
6
-Redistribution and use in source and binary forms, with or without
7
-modification, are permitted provided that the following conditions are met:
8
-
9
-* Redistributions of source code must retain the above copyright notice,
10
-this list of conditions and the following disclaimer.
11
-
12
-* Redistributions in binary form must reproduce the above copyright notice,
13
-this list of conditions and the following disclaimer in the documentation
14
-and/or other materials provided with the distribution.
15
-
16
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 0
- 109
vendor/github.com/go-graphite/carbonzipper/cache/cache.go View File

@@ -1,109 +0,0 @@
1
-package cache
2
-
3
-import (
4
-	"crypto/sha1"
5
-	"encoding/hex"
6
-	"errors"
7
-	"sync/atomic"
8
-	"time"
9
-
10
-	"github.com/bradfitz/gomemcache/memcache"
11
-
12
-	"github.com/dgryski/go-expirecache"
13
-)
14
-
15
-var (
16
-	ErrTimeout  = errors.New("cache: timeout")
17
-	ErrNotFound = errors.New("cache: not found")
18
-)
19
-
20
-type BytesCache interface {
21
-	Get(k string) ([]byte, error)
22
-	Set(k string, v []byte, expire int32)
23
-}
24
-
25
-type NullCache struct{}
26
-
27
-func (NullCache) Get(string) ([]byte, error) { return nil, ErrNotFound }
28
-func (NullCache) Set(string, []byte, int32)  {}
29
-
30
-func NewExpireCache(maxsize uint64) BytesCache {
31
-	ec := expirecache.New(maxsize)
32
-	go ec.ApproximateCleaner(10 * time.Second)
33
-	return &ExpireCache{ec: ec}
34
-}
35
-
36
-type ExpireCache struct {
37
-	ec *expirecache.Cache
38
-}
39
-
40
-func (ec ExpireCache) Get(k string) ([]byte, error) {
41
-	v, ok := ec.ec.Get(k)
42
-
43
-	if !ok {
44
-		return nil, ErrNotFound
45
-	}
46
-
47
-	return v.([]byte), nil
48
-}
49
-
50
-func (ec ExpireCache) Set(k string, v []byte, expire int32) {
51
-	ec.ec.Set(k, v, uint64(len(v)), expire)
52
-}
53
-
54
-func (ec ExpireCache) Items() int { return ec.ec.Items() }
55
-
56
-func (ec ExpireCache) Size() uint64 { return ec.ec.Size() }
57
-
58
-func NewMemcached(prefix string, servers ...string) BytesCache {
59
-	return &MemcachedCache{prefix: prefix, client: memcache.New(servers...)}
60
-}
61
-
62
-type MemcachedCache struct {
63
-	prefix   string
64
-	client   *memcache.Client
65
-	timeouts uint64
66
-}
67
-
68
-func (m *MemcachedCache) Get(k string) ([]byte, error) {
69
-	key := sha1.Sum([]byte(k))
70
-	hk := hex.EncodeToString(key[:])
71
-	done := make(chan bool, 1)
72
-
73
-	var err error
74
-	var item *memcache.Item
75
-
76
-	go func() {
77
-		item, err = m.client.Get(m.prefix + hk)
78
-		done <- true
79
-	}()
80
-
81
-	timeout := time.After(50 * time.Millisecond)
82
-
83
-	select {
84
-	case <-timeout:
85
-		atomic.AddUint64(&m.timeouts, 1)
86
-		return nil, ErrTimeout
87
-	case <-done:
88
-	}
89
-
90
-	if err != nil {
91
-		// translate to internal cache miss error
92
-		if err == memcache.ErrCacheMiss {
93
-			err = ErrNotFound
94
-		}
95
-		return nil, err
96
-	}
97
-
98
-	return item.Value, nil
99
-}
100
-
101
-func (m *MemcachedCache) Set(k string, v []byte, expire int32) {
102
-	key := sha1.Sum([]byte(k))
103
-	hk := hex.EncodeToString(key[:])
104
-	go m.client.Set(&memcache.Item{Key: m.prefix + hk, Value: v, Expiration: expire})
105
-}
106
-
107
-func (m *MemcachedCache) Timeouts() uint64 {
108
-	return atomic.LoadUint64(&m.timeouts)
109
-}

+ 0
- 59
vendor/github.com/go-graphite/carbonzipper/intervalset/intervalset.go View File

@@ -1,59 +0,0 @@
1
-package intervalset
2
-
3
-// Copy paste from https://github.com/lomik/go-carbon/blob/master/carbonserver/pickle.go
4
-// Original license: MIT
5
-// Original author: Roman Lomonosov
6
-
7
-import (
8
-	"encoding/binary"
9
-	"math"
10
-)
11
-
12
-// Fake single interval set for graphite
13
-type IntervalSet struct {
14
-	Start int32
15
-	End   int32
16
-}
17
-
18
-func (i *IntervalSet) MarshalPickle() ([]byte, error) {
19
-	//     0: (    MARK
20
-	//     1: c        GLOBAL     'graphite.intervals IntervalSet'
21
-	//    33: o        OBJ        (MARK at 0)
22
-	//    34: }    EMPTY_DICT
23
-	//    35: (    MARK
24
-	//    36: U        SHORT_BINSTRING 'intervals'
25
-	//    47: ]        EMPTY_LIST
26
-	//    48: (        MARK
27
-	//    49: c            GLOBAL     'graphite.intervals Interval'
28
-	//    78: o            OBJ        (MARK at 48)
29
-	//    79: }        EMPTY_DICT
30
-	//    80: (        MARK
31
-	//    81: U            SHORT_BINSTRING 'start'
32
-	//    88: G            BINFLOAT   1322087998.393128
33
-	//    97: U            SHORT_BINSTRING 'size'
34
-	//   103: G            BINFLOAT   157679977.1475761
35
-	//   112: U            SHORT_BINSTRING 'end'
36
-	//   117: G            BINFLOAT   1479767975.540704
37
-	//   126: U            SHORT_BINSTRING 'tuple'
38
-	//   133: G            BINFLOAT   1322087998.393128
39
-	//   142: G            BINFLOAT   1479767975.540704
40
-	//   151: \x86         TUPLE2
41
-	//   152: u            SETITEMS   (MARK at 80)
42
-	//   153: b        BUILD
43
-	//   154: a        APPEND
44
-	//   155: U        SHORT_BINSTRING 'size'
45
-	//   161: G        BINFLOAT   157679977.1475761
46
-	//   170: u        SETITEMS   (MARK at 35)
47
-	//   171: b    BUILD
48
-	//   172: .    STOP
49
-	b := []byte("(cgraphite.intervals\nIntervalSet\no}(U\tintervals](cgraphite.intervals\nInterval\no}(U\x05startGA\xd3\xb3]\x8f\x99)\x02U\x04sizeGA\xa2\xcc\x02\xd2K\x8f\x18U\x03endGA\xd6\x0c\xdd\xe9\xe2\x9a\xe5U\x05tupleGA\xd3\xb3]\x8f\x99)\x02GA\xd6\x0c\xdd\xe9\xe2\x9a\xe5\x86ubaU\x04sizeGA\xa2\xcc\x02\xd2K\x8f\x18ub")
50
-
51
-	binary.BigEndian.PutUint64(b[89:97], uint64(math.Float64bits(float64(i.Start))))
52
-	binary.BigEndian.PutUint64(b[104:112], uint64(math.Float64bits(float64(i.End-i.Start))))
53
-	binary.BigEndian.PutUint64(b[118:126], uint64(math.Float64bits(float64(i.End))))
54
-	binary.BigEndian.PutUint64(b[134:142], uint64(math.Float64bits(float64(i.Start))))
55
-	binary.BigEndian.PutUint64(b[143:151], uint64(math.Float64bits(float64(i.End))))
56
-	binary.BigEndian.PutUint64(b[162:170], uint64(math.Float64bits(float64(i.End-i.Start))))
57
-
58
-	return b, nil
59
-}

+ 0
- 57
vendor/github.com/go-graphite/carbonzipper/limiter/limiter.go View File

@@ -1,57 +0,0 @@
1
-package limiter
2
-
3
-import (
4
-	"context"
5
-	"errors"
6
-)
7
-
8
-// ServerLimiter provides interface to limit amount of requests
9
-type ServerLimiter struct {
10
-	m   map[string]chan struct{}
11
-	cap int
12
-}
13
-
14
-// NewServerLimiter creates a limiter for specific servers list.
15
-func NewServerLimiter(servers []string, l int) *ServerLimiter {
16
-	if l == 0 {
17
-		return &ServerLimiter{}
18
-	}
19
-
20
-	sl := make(map[string]chan struct{})
21
-
22
-	for _, s := range servers {
23
-		sl[s] = make(chan struct{}, l)
24
-	}
25
-
26
-	return &ServerLimiter{
27
-		m:   sl,
28
-		cap: l,
29
-	}
30
-}
31
-
32
-func (sl ServerLimiter) Capacity() int {
33
-	return sl.cap
34
-}
35
-
36
-// Enter claims one of free slots or blocks until there is one.
37
-func (sl ServerLimiter) Enter(ctx context.Context, s string) error {
38
-	if sl.m == nil {
39
-		return nil
40
-	}
41
-
42
-	select {
43
-	case sl.m[s] <- struct{}{}:
44
-		return nil
45
-	case <-ctx.Done():
46
-		return errors.New("timeout exceeded")
47
-	}
48
-}
49
-
50
-// Frees a slot in limiter
51
-func (sl ServerLimiter) Leave(ctx context.Context, s string) {
52
-	if sl.m == nil {
53
-		return
54
-	}
55
-
56
-	<-sl.m[s]
57
-}

+ 0
- 45
vendor/github.com/go-graphite/carbonzipper/mstats/mstats.go View File

@@ -1,45 +0,0 @@
1
-package mstats
2
-
3
-import (
4
-	"runtime"
5
-	"strconv"
6
-	"sync/atomic"
7
-	"time"
8
-)
9
-
10
-// Var is an atomic variable satisfying expvar.Var
11
-type Var struct {
12
-	atomic.Value
13
-}
14
-
15
-func (a *Var) String() string {
16
-	v := a.Load()
17
-	if v == nil {
18
-		return "0"
19
-	}
20
-	return strconv.FormatUint(v.(uint64), 10)
21
-}
22
-
23
-// PauseNS is the total number of nanoseconds the GC has paused the application
24
-var PauseNS Var
25
-
26
-// NumGC is the number of collections
27
-var NumGC Var
28
-
29
-// Alloc is the number of bytes allocated and not yet freed by the application
30
-var Alloc Var
31
-
32
-// TotalAlloc is the total number of bytes allocated by the application
33
-var TotalAlloc Var
34
-
35
-// Start polls runtime.ReadMemStats with interval d and updates the package level variables
36
-func Start(d time.Duration) {
37
-	for range time.Tick(d) {
38
-		var m runtime.MemStats
39
-		runtime.ReadMemStats(&m)
40
-		PauseNS.Store(m.PauseTotalNs)
41
-		Alloc.Store(m.Alloc)
42
-		TotalAlloc.Store(m.TotalAlloc)
43
-		NumGC.Store(uint64(m.NumGC))
44
-	}
45
-}

+ 0
- 58
vendor/github.com/go-graphite/carbonzipper/pathcache/pathcache.go View File

@@ -1,58 +0,0 @@
1
-package pathcache
2
-
3
-import (
4
-	"github.com/dgryski/go-expirecache"
5
-	"github.com/go-graphite/carbonzipper/zipper/types"
6
-
7
-	"time"
8
-)
9
-
10
-// PathCache provides general interface to cache find and search queries
11
-type PathCache struct {
12
-	ec *expirecache.Cache
13
-
14
-	expireDelaySec int32
15
-}
16
-
17
-// NewPathCache initializes PathCache structure
18
-func NewPathCache(ExpireDelaySec int32) PathCache {
19
-
20
-	p := PathCache{
21
-		ec:             expirecache.New(0),
22
-		expireDelaySec: ExpireDelaySec,
23
-	}
24
-
25
-	go p.ec.ApproximateCleaner(10 * time.Second)
26
-
27
-	return p
28
-}
29
-
30
-// ECItems returns amount of items in the cache
31
-func (p *PathCache) ECItems() int {
32
-	return p.ec.Items()
33
-}
34
-
35
-// ECSize returns size of the cache
36
-func (p *PathCache) ECSize() uint64 {
37
-	return p.ec.Size()
38
-}
39
-
40
-// Set allows to set a key (k) to value (v).
41
-func (p *PathCache) Set(k string, v []types.ServerClient) {
42
-
43
-	var size uint64
44
-	for _, vv := range v {
45
-		size += uint64(len(vv.Backends()))
46
-	}
47
-
48
-	p.ec.Set(k, v, size, p.expireDelaySec)
49
-}
50
-
51
-// Get returns an an element by key. If not successful - returns also false in second var.
52
-func (p *PathCache) Get(k string) ([]types.ServerClient, bool) {
53
-	if v, ok := p.ec.Get(k); ok {
54
-		return v.([]types.ServerClient), true
55
-	}
56
-
57
-	return nil, false
58
-}

+ 0
- 50
vendor/github.com/go-graphite/carbonzipper/util/apictx/ctx.go View File

@@ -1,50 +0,0 @@
1
-package apictx
2
-
3
-import (
4
-	"context"
5
-	"net/http"
6
-)
7
-
8
-type key int
9
-
10
-const (
11
-	ctxHeaderUUID = "X-CTX-CarbonAPI-UUID"
12
-
13
-	uuidKey key = 0
14
-)
15
-
16
-func ifaceToString(v interface{}) string {
17
-	if v != nil {
18
-		return v.(string)
19
-	}
20
-	return ""
21
-}
22
-
23
-func getCtxString(ctx context.Context, k key) string {
24
-	return ifaceToString(ctx.Value(k))
25
-}
26
-
27
-func GetUUID(ctx context.Context) string {
28
-	return getCtxString(ctx, uuidKey)
29
-}
30
-
31
-func SetUUID(ctx context.Context, v string) context.Context {
32
-	return context.WithValue(ctx, uuidKey, v)
33
-}
34
-
35
-func ParseCtx(h http.HandlerFunc) http.HandlerFunc {
36
-	return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
37
-		uuid := req.Header.Get(ctxHeaderUUID)
38
-
39
-		ctx := req.Context()
40
-		ctx = SetUUID(ctx, uuid)
41
-
42
-		h.ServeHTTP(rw, req.WithContext(ctx))
43
-	})
44
-}
45
-
46
-func MarshalCtx(ctx context.Context, response *http.Request) *http.Request {
47
-	response.Header.Add(ctxHeaderUUID, GetUUID(ctx))
48
-
49
-	return response
50
-}

+ 0
- 50
vendor/github.com/go-graphite/carbonzipper/util/zipperctx/ctx.go View File

@@ -1,50 +0,0 @@
1
-package zipperctx
2
-
3
-import (
4
-	"context"
5
-	"net/http"
6
-)
7
-
8
-type key int
9
-
10
-const (
11
-	ctxHeaderUUID = "X-CTX-CarbonZipper-UUID"
12
-
13
-	uuidKey key = 0
14
-)
15
-
16
-func ifaceToString(v interface{}) string {
17
-	if v != nil {
18
-		return v.(string)
19
-	}
20
-	return ""
21
-}
22
-
23
-func getCtxString(ctx context.Context, k key) string {
24
-	return ifaceToString(ctx.Value(k))
25
-}
26
-
27
-func GetUUID(ctx context.Context) string {
28
-	return getCtxString(ctx, uuidKey)
29
-}
30
-
31
-func SetUUID(ctx context.Context, v string) context.Context {
32
-	return context.WithValue(ctx, uuidKey, v)
33
-}
34
-
35
-func ParseCtx(h http.HandlerFunc) http.HandlerFunc {
36
-	return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
37
-		uuid := req.Header.Get(ctxHeaderUUID)
38
-
39
-		ctx := req.Context()
40
-		ctx = SetUUID(ctx, uuid)
41
-
42
-		h.ServeHTTP(rw, req.WithContext(ctx))
43
-	})
44
-}
45
-
46
-func MarshalCtx(ctx context.Context, response *http.Request) *http.Request {
47
-	response.Header.Add(ctxHeaderUUID, GetUUID(ctx))
48
-
49
-	return response
50
-}

+ 0
- 602
vendor/github.com/go-graphite/carbonzipper/zipper/broadcast/broadcast_group.go View File

@@ -1,602 +0,0 @@
1
-package broadcast
2
-
3
-import (
4
-	"context"
5
-	"strconv"
6
-	"strings"
7
-
8
-	"github.com/go-graphite/carbonzipper/limiter"
9
-	"github.com/go-graphite/carbonzipper/pathcache"
10
-	"github.com/go-graphite/carbonzipper/zipper/cache"
11
-	"github.com/go-graphite/carbonzipper/zipper/errors"
12
-	"github.com/go-graphite/carbonzipper/zipper/types"
13
-	protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
14
-
15
-	"go.uber.org/zap"
16
-)
17
-
18
-type BroadcastGroup struct {
19
-	limiter   *limiter.ServerLimiter
20
-	groupName string
21
-	timeout   types.Timeouts
22
-	clients   []types.ServerClient
23
-	servers   []string
24
-
25
-	pathCache pathcache.PathCache
26
-	logger    *zap.Logger
27
-
28
-	infoCache  *cache.QueryCache
29
-	findCache  *cache.QueryCache
30
-	fetchCache *cache.QueryCache
31
-	probeCache *cache.QueryCache
32
-}
33
-
34
-func NewBroadcastGroup(logger *zap.Logger, groupName string, servers []types.ServerClient, expireDelaySec int32, concurencyLimit int, timeout types.Timeouts) (*BroadcastGroup, *errors.Errors) {
35
-	if len(servers) == 0 {
36
-		return nil, errors.Fatal("no servers specified")
37
-	}
38
-	serverNames := make([]string, 0, len(servers))
39
-	for _, s := range servers {
40
-		serverNames = append(serverNames, s.Name())
41
-	}
42
-	pathCache := pathcache.NewPathCache(expireDelaySec)
43
-	limiter := limiter.NewServerLimiter(serverNames, concurencyLimit)
44
-
45
-	return NewBroadcastGroupWithLimiter(logger, groupName, servers, serverNames, pathCache, limiter, timeout)
46
-}
47
-
48
-func NewBroadcastGroupWithLimiter(logger *zap.Logger, groupName string, servers []types.ServerClient, serverNames []string, pathCache pathcache.PathCache, limiter *limiter.ServerLimiter, timeout types.Timeouts) (*BroadcastGroup, *errors.Errors) {
49
-	b := &BroadcastGroup{
50
-		timeout:   timeout,
51
-		groupName: groupName,
52
-		clients:   servers,
53
-		limiter:   limiter,
54
-		servers:   serverNames,
55
-
56
-		pathCache: pathCache,
57
-		logger:    logger.With(zap.String("type", "broadcastGroup"), zap.String("groupName", groupName)),
58
-
59
-		// TODO: remove hardcode
60
-		infoCache:  cache.NewQueryCache(1024, 5),
61
-		findCache:  cache.NewQueryCache(1024, 5),
62
-		fetchCache: cache.NewQueryCache(25600, 1),
63
-		probeCache: cache.NewQueryCache(1024, 10),
64
-	}
65
-
66
-	b.logger.Debug("created broadcast group",
67
-		zap.String("group_name", b.groupName),
68
-		zap.Strings("clients", b.servers),
69
-	)
70
-
71
-	return b, nil
72
-}
73
-
74
-func (bg BroadcastGroup) Name() string {
75
-	return bg.groupName
76
-}
77
-
78
-func (bg BroadcastGroup) Backends() []string {
79
-	return bg.servers
80
-}
81
-
82
-func (bg *BroadcastGroup) chooseServers(requests []string) []types.ServerClient {
83
-	var res []types.ServerClient
84
-
85
-	for _, request := range requests {
86
-		idx := strings.Index(request, ".")
87
-		if idx > 0 {
88
-			request = request[:idx]
89
-		}
90
-		if clients, ok := bg.pathCache.Get(request); ok && len(clients) > 0 {
91
-			res = append(res, clients...)
92
-		}
93
-	}
94
-
95
-	if len(res) != 0 {
96
-		return res
97
-	}
98
-	return bg.clients
99
-}
100
-
101
-func (bg BroadcastGroup) MaxMetricsPerRequest() int {
102
-	return 0
103
-}
104
-
105
-func fetchRequestToKey(prefix string, request *protov3.MultiFetchRequest) string {
106
-	key := []byte("prefix=" + prefix)
107
-	for _, r := range request.Metrics {
108
-		key = append(key, []byte("&"+r.Name+"&start="+strconv.FormatUint(uint64(r.StartTime), 10)+"&stop="+strconv.FormatUint(uint64(r.StopTime), 10)+"\n")...)
109
-	}
110
-
111
-	return string(key)
112
-}
113
-
114
-func (bg *BroadcastGroup) doSingleFetch(ctx context.Context, logger *zap.Logger, client types.ServerClient, request *protov3.MultiFetchRequest, doneCh chan<- string, resCh chan<- *types.ServerFetchResponse) {
115
-	logger.Debug("waiting for slot",
116
-		zap.Int("maxConns", bg.limiter.Capacity()),
117
-	)
118
-	err := bg.limiter.Enter(ctx, client.Name())
119
-	if err != nil {
120
-		logger.Debug("timeout waiting for a slot")
121
-		resCh <- &types.ServerFetchResponse{
122
-			Server: client.Name(),
123
-			Err:    errors.FromErrNonFatal(err),
124
-		}
125
-		doneCh <- client.Name()
126
-		return
127
-	}
128
-	logger.Debug("got slot")
129
-	defer bg.limiter.Leave(ctx, client.Name())
130
-
131
-	var requests []*protov3.MultiFetchRequest
132
-	maxMetricPerRequest := client.MaxMetricsPerRequest()
133
-	if maxMetricPerRequest == 0 {
134
-		logger.Debug("will do single request, cause MaxMetrics is 0")
135
-		requests = []*protov3.MultiFetchRequest{request}
136
-	} else {
137
-		logger.Debug("will do my best to split request",
138
-			zap.Int("max_metrics", maxMetricPerRequest),
139
-		)
140
-		for _, metric := range request.Metrics {
141
-			f, _, e := bg.Find(ctx, &protov3.MultiGlobRequest{Metrics: []string{metric.Name}})
142
-			if (e != nil && e.HaveFatalErrors && len(e.Errors) > 0) || f == nil || len(f.Metrics) == 0 {
143
-				continue
144
-			}
145
-			newRequest := &protov3.MultiFetchRequest{}
146
-
147
-			logger.Debug("will split request",
148
-				zap.Int("metrics", len(f.Metrics)),
149
-				zap.Int("max_metrics", maxMetricPerRequest),
150
-			)
151
-			for _, m := range f.Metrics {
152
-				for _, match := range m.Matches {
153
-					newRequest.Metrics = append(newRequest.Metrics, protov3.FetchRequest{
154
-						Name:            match.Path,
155
-						StartTime:       metric.StartTime,
156
-						StopTime:        metric.StopTime,
157
-						PathExpression:  metric.PathExpression,
158
-						FilterFunctions: metric.FilterFunctions,
159
-					})
160
-					if len(newRequest.Metrics) == maxMetricPerRequest {
161
-						requests = append(requests, newRequest)
162
-						newRequest = &protov3.MultiFetchRequest{}
163
-					}
164
-				}
165
-			}
166
-			if len(newRequest.Metrics) > 0 {
167
-				requests = append(requests, newRequest)
168
-			}
169
-			logger.Debug("spliited request",
170
-				zap.Int("amount_of_requests", len(requests)),
171
-			)
172
-		}
173
-	}
174
-
175
-	for _, req := range requests {
176
-		logger.Debug("sending request",
177
-			zap.String("client_name", client.Name()),
178
-		)
179
-		r := &types.ServerFetchResponse{
180
-			Server: client.Name(),
181
-		}
182
-		r.Response, r.Stats, r.Err = client.Fetch(ctx, req)
183
-		resCh <- r
184
-	}
185
-	doneCh <- client.Name()
186
-}
187
-
188
-func (bg *BroadcastGroup) Fetch(ctx context.Context, request *protov3.MultiFetchRequest) (*protov3.MultiFetchResponse, *types.Stats, *errors.Errors) {
189
-	requestNames := make([]string, 0, len(request.Metrics))
190
-	for i := range request.Metrics {
191
-		requestNames = append(requestNames, request.Metrics[i].Name)
192
-	}
193
-	logger := bg.logger.With(zap.String("type", "fetch"), zap.Strings("request", requestNames))
194
-	logger.Debug("will try to fetch data")
195
-
196
-	key := fetchRequestToKey(bg.groupName, request)
197
-	item := bg.fetchCache.GetQueryItem(key)
198
-	res, ok := item.FetchOrLock(ctx)
199
-	if ok {
200
-		if res == nil {
201
-			return nil, nil, errors.Fatal("timeout")
202
-		}
203
-		logger.Debug("cache hit")
204
-		result := res.(*types.ServerFetchResponse)
205
-		return result.Response, result.Stats, nil
206
-	}
207
-	defer item.StoreAbort()
208
-
209
-	// Now we have global lock for fetching data for this metric
210
-	resCh := make(chan *types.ServerFetchResponse, len(bg.clients))
211
-	doneCh := make(chan string, len(bg.clients))
212
-	ctx, cancel := context.WithTimeout(ctx, bg.timeout.Render)
213
-	defer cancel()
214
-
215
-	clients := bg.chooseServers(requestNames)
216
-	for _, client := range clients {
217
-		go bg.doSingleFetch(ctx, logger, client, request, doneCh, resCh)
218
-	}
219
-
220
-	result := &types.ServerFetchResponse{
221
-		Server:       "",
222
-		ResponsesMap: make(map[string][]protov3.FetchResponse),
223
-		Response:     &protov3.MultiFetchResponse{},
224
-		Stats:        &types.Stats{},
225
-		Err:          &errors.Errors{},
226
-	}
227
-	var err errors.Errors
228
-	answeredServers := make(map[string]struct{})
229
-	responseCounts := 0
230
-GATHER:
231
-	for {
232
-		if responseCounts == len(clients) && len(resCh) == 0 {
233
-			break GATHER
234
-		}
235
-		select {
236
-		case name := <-doneCh:
237
-			responseCounts++
238
-			answeredServers[name] = struct{}{}
239
-		case res := <-resCh:
240
-			if res.Err != nil {
241
-				err.Merge(res.Err)
242
-			}
243
-			result.Merge(res)
244
-		case <-ctx.Done():
245
-			noAnswer := make([]string, 0)
246
-			for _, s := range clients {
247
-				if _, ok := answeredServers[s.Name()]; !ok {
248
-					noAnswer = append(noAnswer, s.Name())
249
-				}
250
-			}
251
-			logger.Warn("timeout waiting for more responses",
252
-				zap.Strings("no_answers_from", noAnswer),
253
-			)
254
-			err.Add(types.ErrTimeoutExceeded)
255
-			break GATHER
256
-		}
257
-	}
258
-
259
-	if len(result.Response.Metrics) == 0 {
260
-		logger.Error("failed to get any response")
261
-
262
-		return nil, nil, err.Addf("failed to get any response from backend group: %v", bg.groupName)
263
-	}
264
-
265
-	logger.Debug("got some responses",
266
-		zap.Int("clients_count", len(bg.clients)),
267
-		zap.Int("response_count", responseCounts),
268
-		zap.Bool("have_errors", len(err.Errors) != 0),
269
-		zap.Any("errors", err.Errors),
270
-		zap.Int("response_count", len(result.Response.Metrics)),
271
-	)
272
-
273
-	item.StoreAndUnlock(result, uint64(result.Response.Size()))
274
-
275
-	return result.Response, result.Stats, &err
276
-}
277
-
278
-// Find request handling
279
-
280
-func findRequestToKey(prefix string, request *protov3.MultiGlobRequest) string {
281
-	return "prefix=" + prefix + "&" + strings.Join(request.Metrics, "&")
282
-}
283
-
284
-func (bg *BroadcastGroup) doFind(ctx context.Context, logger *zap.Logger, client types.ServerClient, request *protov3.MultiGlobRequest, resCh chan<- *types.ServerFindResponse) {
285
-	logger = logger.With(
286
-		zap.String("group_name", bg.groupName),
287
-		zap.String("client_name", client.Name()),
288
-	)
289
-	logger.Debug("waiting for a slot")
290
-
291
-	r := &types.ServerFindResponse{
292
-		Server: client.Name(),
293
-	}
294
-
295
-	err := bg.limiter.Enter(ctx, client.Name())
296
-	if err != nil {
297
-		logger.Debug("timeout waiting for a slot")
298
-		r.Err = errors.FromErrNonFatal(types.ErrTimeoutExceeded)
299
-		resCh <- r
300
-		return
301
-	}
302
-	defer bg.limiter.Leave(ctx, client.Name())
303
-
304
-	logger.Debug("got a slot")
305
-
306
-	r.Response, r.Stats, r.Err = client.Find(ctx, request)
307
-	logger.Debug("fetched response",
308
-		zap.Any("response", r),
309
-	)
310
-	resCh <- r
311
-}
312
-
313
-func (bg *BroadcastGroup) Find(ctx context.Context, request *protov3.MultiGlobRequest) (*protov3.MultiGlobResponse, *types.Stats, *errors.Errors) {
314
-	logger := bg.logger.With(zap.String("type", "find"), zap.Strings("request", request.Metrics))
315
-
316
-	key := findRequestToKey(bg.groupName, request)
317
-	item := bg.findCache.GetQueryItem(key)
318
-	res, ok := item.FetchOrLock(ctx)
319
-	if ok {
320
-		if res == nil {
321
-			return nil, nil, errors.Fatal("timeout")
322
-		}
323
-		result := res.(*types.ServerFindResponse)
324
-		logger.Debug("cache hit",
325
-			zap.Any("result", result),
326
-		)
327
-		return result.Response, result.Stats, nil
328
-	}
329
-	defer item.StoreAbort()
330
-
331
-	resCh := make(chan *types.ServerFindResponse, len(bg.clients))
332
-
333
-	logger.Debug("will do query with timeout",
334
-		zap.Float64("timeout", bg.timeout.Find.Seconds()),
335
-	)
336
-
337
-	ctx, cancel := context.WithTimeout(ctx, bg.timeout.Render)
338
-	defer cancel()
339
-	ctx = context.Background()
340
-
341
-	clients := bg.chooseServers(request.Metrics)
342
-	for _, client := range clients {
343
-		go bg.doFind(ctx, logger, client, request, resCh)
344
-	}
345
-
346
-	result := &types.ServerFindResponse{}
347
-	var err errors.Errors
348
-	responseCounts := 0
349
-	answeredServers := make(map[string]struct{})
350
-GATHER:
351
-	for {
352
-		select {
353
-		case r := <-resCh:
354
-			answeredServers[r.Server] = struct{}{}
355
-			responseCounts++
356
-			if r.Err != nil {
357
-				err.Merge(r.Err)
358
-			}
359
-			if result.Response == nil {
360
-				result = r
361
-			} else {
362
-				result.Merge(r)
363
-			}
364
-
365
-			if responseCounts == len(clients) {
366
-				break GATHER
367
-			}
368
-		case <-ctx.Done():
369
-			noAnswer := make([]string, 0)
370
-			for _, s := range clients {
371
-				if _, ok := answeredServers[s.Name()]; !ok {
372
-					noAnswer = append(noAnswer, s.Name())
373
-				}
374
-			}
375
-			logger.Warn("timeout waiting for more responses",
376
-				zap.Strings("no_answers_from", noAnswer),
377
-			)
378
-			err.Add(types.ErrTimeoutExceeded)
379
-			break GATHER
380
-		}
381
-	}
382
-	logger.Debug("got some responses",
383
-		zap.Int("clients_count", len(bg.clients)),
384
-		zap.Int("response_count", responseCounts),
385
-		zap.Bool("have_errors", len(err.Errors) != 0),
386
-		zap.Any("errors", err.Errors),
387
-		zap.Any("response", result.Response),
388
-	)
389
-
390
-	if result.Response == nil {
391
-		return &protov3.MultiGlobResponse{}, result.Stats, err.Addf("failed to fetch response from the server %v", bg.groupName)
392
-	}
393
-	item.StoreAndUnlock(result, uint64(result.Response.Size()))
394
-
395
-	return result.Response, result.Stats, &err
396
-}
397
-
398
-// Info request handling
399
-
400
-func infoRequestToKey(prefix string, request *protov3.MultiMetricsInfoRequest) string {
401
-	return "prefix=" + prefix + "&" + strings.Join(request.Names, "&")
402
-}
403
-
404
-func (bg *BroadcastGroup) doInfoRequest(ctx context.Context, logger *zap.Logger, request *protov3.MultiMetricsInfoRequest, client types.ServerClient, resCh chan<- *types.ServerInfoResponse) {
405
-	r := &types.ServerInfoResponse{
406
-		Server: client.Name(),
407
-	}
408
-	logger.Debug("waiting for a slot",
409
-		zap.String("group_name", bg.groupName),
410
-		zap.String("client_name", client.Name()),
411
-	)
412
-	err := bg.limiter.Enter(ctx, client.Name())
413
-	if err != nil {
414
-		logger.Debug("timeout waiting for a slot")
415
-		r.Err = errors.FromErrNonFatal(err)
416
-		resCh <- r
417
-		return
418
-	}
419
-	defer bg.limiter.Leave(ctx, client.Name())
420
-
421
-	logger.Debug("got a slot")
422
-	r.Response, r.Stats, r.Err = client.Info(ctx, request)
423
-	resCh <- r
424
-}
425
-
426
-func (bg *BroadcastGroup) Info(ctx context.Context, request *protov3.MultiMetricsInfoRequest) (*protov3.ZipperInfoResponse, *types.Stats, *errors.Errors) {
427
-	logger := bg.logger.With(zap.String("type", "info"), zap.Strings("request", request.Names))
428
-
429
-	key := infoRequestToKey(bg.groupName, request)
430
-	item := bg.infoCache.GetQueryItem(key)
431
-	res, ok := item.FetchOrLock(ctx)
432
-	if ok {
433
-		if res == nil {
434
-			return nil, nil, errors.Fatal("timeout")
435
-		}
436
-		logger.Debug("cache hit")
437
-		result := res.(*types.ServerInfoResponse)
438
-		return result.Response, result.Stats, nil
439
-	}
440
-	defer item.StoreAbort()
441
-
442
-	resCh := make(chan *types.ServerInfoResponse, len(bg.clients))
443
-	ctx, cancel := context.WithTimeout(ctx, bg.timeout.Find)
444
-	defer cancel()
445
-
446
-	clients := bg.chooseServers(request.Names)
447
-	for _, client := range clients {
448
-		go bg.doInfoRequest(ctx, logger, request, client, resCh)
449
-	}
450
-
451
-	result := &types.ServerInfoResponse{}
452
-	var err errors.Errors
453
-	responseCounts := 0
454
-	answeredServers := make(map[string]struct{})
455
-GATHER:
456
-	for {
457
-		select {
458
-		case res := <-resCh:
459
-			answeredServers[res.Server] = struct{}{}
460
-			responseCounts++
461
-			if res.Err != nil {
462
-				err.Merge(res.Err)
463
-			}
464
-			if result.Response == nil {
465
-				result = res
466
-			} else if res.Response != nil {
467
-				for k, v := range res.Response.Info {
468
-					result.Response.Info[k] = v
469
-				}
470
-			}
471
-
472
-			if responseCounts == len(clients) {
473
-				break GATHER
474
-			}
475
-		case <-ctx.Done():
476
-			noAnswer := make([]string, 0)
477
-			for _, s := range clients {
478
-				if _, ok := answeredServers[s.Name()]; !ok {
479
-					noAnswer = append(noAnswer, s.Name())
480
-				}
481
-			}
482
-			logger.Warn("timeout waiting for more responses",
483
-				zap.Strings("no_answers_from", noAnswer),
484
-			)
485
-			err.Add(types.ErrTimeoutExceeded)
486
-			break GATHER
487
-		}
488
-	}
489
-	logger.Debug("got some responses",
490
-		zap.Int("clients_count", len(bg.clients)),
491
-		zap.Int("response_count", responseCounts),
492
-		zap.Bool("have_errors", len(err.Errors) == 0),
493
-	)
494
-
495
-	item.StoreAndUnlock(result, uint64(result.Response.Size()))
496
-
497
-	return result.Response, result.Stats, &err
498
-}
499
-
500
-func (bg *BroadcastGroup) List(ctx context.Context) (*protov3.ListMetricsResponse, *types.Stats, *errors.Errors) {
501
-	return nil, nil, errors.FromErr(types.ErrNotImplementedYet)
502
-}
503
-func (bg *BroadcastGroup) Stats(ctx context.Context) (*protov3.MetricDetailsResponse, *types.Stats, *errors.Errors) {
504
-	return nil, nil, errors.FromErr(types.ErrNotImplementedYet)
505
-}
506
-
507
-type tldResponse struct {
508
-	server types.ServerClient
509
-	tlds   []string
510
-	err    *errors.Errors
511
-}
512
-
513
-func doProbe(ctx context.Context, client types.ServerClient, resCh chan<- tldResponse) {
514
-	res, err := client.ProbeTLDs(ctx)
515
-
516
-	resCh <- tldResponse{
517
-		server: client,
518
-		tlds:   res,
519
-		err:    err,
520
-	}
521
-}
522
-
523
-func (bg *BroadcastGroup) ProbeTLDs(ctx context.Context) ([]string, *errors.Errors) {
524
-	logger := bg.logger.With(zap.String("function", "prober"))
525
-
526
-	key := "*"
527
-	item := bg.probeCache.GetQueryItem(key)
528
-	res, ok := item.FetchOrLock(ctx)
529
-	if ok {
530
-		if res == nil {
531
-			return nil, errors.Fatal("timeout")
532
-		}
533
-		logger.Debug("cache hit")
534
-		result := res.([]string)
535
-
536
-		return result, nil
537
-	}
538
-	defer item.StoreAbort()
539
-
540
-	var tlds []string
541
-	resCh := make(chan tldResponse, len(bg.clients))
542
-	ctx, cancel := context.WithTimeout(context.Background(), bg.timeout.Find)
543
-	defer cancel()
544
-
545
-	for _, client := range bg.clients {
546
-		go doProbe(ctx, client, resCh)
547
-	}
548
-
549
-	responses := 0
550
-	size := uint64(0)
551
-	var err errors.Errors
552
-	answeredServers := make(map[string]struct{})
553
-	cache := make(map[string][]types.ServerClient)
554
-	tldMap := make(map[string]struct{})
555
-GATHER:
556
-	for {
557
-		if responses == len(bg.clients) {
558
-			break GATHER
559
-		}
560
-		select {
561
-		case r := <-resCh:
562
-			answeredServers[r.server.Name()] = struct{}{}
563
-			responses++
564
-			if r.err != nil && len(r.err.Errors) > 0 {
565
-				err.Merge(r.err)
566
-				continue
567
-			}
568
-			for _, tld := range r.tlds {
569
-				tldMap[tld] = struct{}{}
570
-			}
571
-			for _, tld := range r.tlds {
572
-				size += uint64(len(tld))
573
-				cache[tld] = append(cache[tld], r.server)
574
-			}
575
-		case <-ctx.Done():
576
-			noAnswer := make([]string, 0)
577
-			for _, s := range bg.clients {
578
-				if _, ok := answeredServers[s.Name()]; !ok {
579
-					noAnswer = append(noAnswer, s.Name())
580
-				}
581
-			}
582
-			logger.Warn("timeout waiting for more responses",
583
-				zap.Strings("no_answers_from", noAnswer),
584
-			)
585
-			err.Add(types.ErrTimeoutExceeded)
586
-			break GATHER
587
-		}
588
-	}
589
-	cancel()
590
-
591
-	for tld, _ := range tldMap {
592
-		tlds = append(tlds, tld)
593
-	}
594
-
595
-	item.StoreAndUnlock(tlds, size)
596
-
597
-	for k, v := range cache {
598
-		bg.pathCache.Set(k, v)
599
-	}
600
-
601
-	return tlds, &err
602
-}

+ 0
- 106
vendor/github.com/go-graphite/carbonzipper/zipper/cache/query.go View File

@@ -1,106 +0,0 @@
1
-package cache
2
-
3
-import (
4
-	"context"
5
-	"sync"
6
-	"sync/atomic"
7
-
8
-	"github.com/dgryski/go-expirecache"
9
-)
10
-
11
-const (
12
-	Empty uint64 = iota
13
-	QueryIsPending
14
-	DataIsAvailable
15
-)
16
-
17
-type QueryItem struct {
18
-	sync.RWMutex
19
-	Key           string
20
-	Data          atomic.Value
21
-	Flags         uint64 // DataIsAvailable or QueryIsPending
22
-	QueryFinished chan struct{}
23
-
24
-	parent *QueryCache
25
-}
26
-
27
-func (q *QueryItem) GetStatus() uint64 {
28
-	s := atomic.LoadUint64(&q.Flags)
29
-	return s
30
-}
31
-
32
-func (q *QueryItem) FetchOrLock(ctx context.Context) (interface{}, bool) {
33
-	d := q.Data.Load()
34
-	if d != nil {
35
-		return d, true
36
-	}
37
-
38
-	ok := atomic.CompareAndSwapUint64(&q.Flags, Empty, QueryIsPending)
39
-	if ok {
40
-		// We are the leader now and will be fetching the data
41
-		return nil, false
42
-	}
43
-
44
-	q.RLock()
45
-	defer q.RUnlock()
46
-
47
-	select {
48
-	case <-ctx.Done():
49
-		return nil, true
50
-	case <-q.QueryFinished:
51
-		break
52
-	}
53
-
54
-	return q.Data.Load(), true
55
-}
56
-
57
-func (q *QueryItem) StoreAbort() {
58
-	d := q.Data.Load()
59
-	if d != nil {
60
-		return
61
-	}
62
-	atomic.StoreUint64(&q.Flags, Empty)
63
-	close(q.QueryFinished)
64
-
65
-	q.Lock()
66
-	q.QueryFinished = make(chan struct{})
67
-	q.Unlock()
68
-}
69
-
70
-func (q *QueryItem) StoreAndUnlock(data interface{}, size uint64) {
71
-	q.Data.Store(data)
72
-	atomic.StoreUint64(&q.Flags, DataIsAvailable)
73
-	close(q.QueryFinished)
74
-	atomic.AddUint64(&q.parent.totalSize, size)
75
-}
76
-
77
-type QueryCache struct {
78
-	ec *expirecache.Cache
79
-
80
-	objectCount uint64
81
-	totalSize   uint64
82
-	expireTime  int32
83
-}
84
-
85
-func NewQueryCache(queryCacheSizeMB uint64, expireTime int32) *QueryCache {
86
-	return &QueryCache{
87
-		ec:         expirecache.New(queryCacheSizeMB),
88
-		expireTime: expireTime,
89
-	}
90
-}
91
-
92
-// TODO: Make size and expire configurable
93
-func (q *QueryCache) GetQueryItem(k string) *QueryItem {
94
-	objectCount := atomic.AddUint64(&q.objectCount, 1)
95
-	size := atomic.AddUint64(&q.totalSize, 1)
96
-	emptyQueryItem := &QueryItem{
97
-		Key:           k,
98
-		QueryFinished: make(chan struct{}),
99
-		Flags:         Empty,
100
-
101
-		parent: q,
102
-	}
103
-	item := q.ec.GetOrSet(k, emptyQueryItem, size/objectCount, q.expireTime).(*QueryItem)
104
-
105
-	return item
106
-}

+ 0
- 25
vendor/github.com/go-graphite/carbonzipper/zipper/config/config.go View File

@@ -1,25 +0,0 @@
1
-package config
2
-
3
-import (
4
-	"time"
5
-
6
-	"github.com/go-graphite/carbonzipper/zipper/types"
7
-)
8
-
9
-// Config is a structure that contains zipper-related configuration bits
10
-type Config struct {
11
-	ConcurrencyLimitPerServer int              `mapstructure:"concurrencyLimitPerServer"`
12
-	MaxIdleConnsPerHost       int              `mapstructure:"maxIdleConnsPerHost"`
13
-	Backends                  []string         `mapstructure:"backends"`
14
-	BackendsV2                types.BackendsV2 `mapstructure:"backendsv2"`
15
-	MaxGlobs                  int              `mapstructure:"maxGlobs"`
16
-	MaxTries                  int              `mapstructure:"maxTries"`
17
-
18
-	CarbonSearch   types.CarbonSearch
19
-	CarbonSearchV2 types.CarbonSearchV2
20
-
21
-	ExpireDelaySec       int32
22
-	InternalRoutingCache time.Duration
23
-	Timeouts             types.Timeouts
24
-	KeepAliveInterval    time.Duration `yaml:"keepAliveInterval"`
25
-}

+ 0
- 94
vendor/github.com/go-graphite/carbonzipper/zipper/errors/errors.go View File

@@ -1,94 +0,0 @@
1
-package errors
2
-
3
-import (
4
-	"fmt"
5
-
6
-	"errors"
7
-)
8
-
9
-type Errors struct {
10
-	HaveFatalErrors bool
11
-	Errors          []error
12
-}
13
-
14
-func FromErrNonFatal(err error) *Errors {
15
-	if err == nil {
16
-		return nil
17
-	}
18
-	return &Errors{
19
-		HaveFatalErrors: false,
20
-		Errors:          []error{err},
21
-	}
22
-}
23
-
24
-func FromErr(err error) *Errors {
25
-	if err == nil {
26
-		return nil
27
-	}
28
-	return &Errors{
29
-		HaveFatalErrors: true,
30
-		Errors:          []error{err},
31
-	}
32
-}
33
-
34
-func Fatal(err string) *Errors {
35
-	return &Errors{
36
-		HaveFatalErrors: true,
37
-		Errors:          []error{errors.New(err)},
38
-	}
39
-}
40
-
41
-func Fatalf(format string, args ...interface{}) *Errors {
42
-	return &Errors{
43
-		HaveFatalErrors: true,
44
-		Errors:          []error{fmt.Errorf(format, args)},
45
-	}
46
-}
47
-
48
-func Error(err string) *Errors {
49
-	return &Errors{
50
-		HaveFatalErrors: false,
51
-		Errors:          []error{errors.New(err)},
52
-	}
53
-}
54
-
55
-func Errorf(format string, args ...interface{}) *Errors {
56
-	return &Errors{
57
-		HaveFatalErrors: false,
58
-		Errors:          []error{fmt.Errorf(format, args)},
59
-	}
60
-}
61
-
62
-func (e *Errors) AddFatal(err error) *Errors {
63
-	if err == nil {
64
-		return e
65
-	}
66
-	e.HaveFatalErrors = true
67
-	e.Errors = append(e.Errors, err)
68
-	return e
69
-}
70
-
71
-func (e *Errors) Add(err error) *Errors {
72
-	if err == nil {
73
-		return e
74
-	}
75
-	e.Errors = append(e.Errors, err)
76
-	return e
77
-}
78
-
79
-func (e *Errors) Addf(format string, args ...interface{}) *Errors {
80
-	e.Errors = append(e.Errors, fmt.Errorf(format, args))
81
-	return e
82
-}
83
-
84
-func (e *Errors) Merge(e2 *Errors) *Errors {
85
-	if e2 == nil {
86
-		return e
87
-	}
88
-	if !e.HaveFatalErrors {
89
-		e.HaveFatalErrors = e2.HaveFatalErrors
90
-	}
91
-
92
-	e.Errors = append(e.Errors, e2.Errors...)
93
-	return e
94
-}

+ 0
- 159
vendor/github.com/go-graphite/carbonzipper/zipper/helper/requests.go View File

@@ -1,159 +0,0 @@
1
-package helper
2
-
3
-import (
4
-	"bytes"
5
-	"context"
6
-	"fmt"
7
-	"io"
8
-	"io/ioutil"
9
-	"net/http"
10
-	"net/url"
11
-	"sync/atomic"
12
-
13
-	"github.com/go-graphite/carbonzipper/limiter"
14
-	cu "github.com/go-graphite/carbonzipper/util/apictx"
15
-	util "github.com/go-graphite/carbonzipper/util/zipperctx"
16
-	"github.com/go-graphite/carbonzipper/zipper/errors"
17
-	"github.com/go-graphite/carbonzipper/zipper/types"
18
-	"go.uber.org/zap"
19
-)
20
-
21
-type ServerResponse struct {
22
-	Server   string
23
-	Response []byte
24
-}
25
-
26
-type HttpQuery struct {
27
-	groupName string
28
-	servers   []string
29
-	maxTries  int
30
-	logger    *zap.Logger
31
-	limiter   *limiter.ServerLimiter
32
-	client    *http.Client
33
-	encoding  string
34
-
35
-	counter uint64
36
-}
37
-
38
-func NewHttpQuery(logger *zap.Logger, groupName string, servers []string, maxTries int, limiter *limiter.ServerLimiter, client *http.Client, encoding string) *HttpQuery {
39
-	return &HttpQuery{
40
-		groupName: groupName,
41
-		servers:   servers,
42
-		maxTries:  maxTries,
43
-		logger:    logger.With(zap.String("action", "query")),
44
-		limiter:   limiter,
45
-		client:    client,
46
-		encoding:  encoding,
47
-	}
48
-}
49
-
50
-func (c *HttpQuery) pickServer() string {
51
-	if len(c.servers) == 1 {
52
-		// No need to do heavy operations here
53
-		return c.servers[0]
54
-	}
55
-	logger := c.logger.With(zap.String("function", "picker"))
56
-	counter := atomic.AddUint64(&(c.counter), 1)
57
-	idx := counter % uint64(len(c.servers))
58
-	srv := c.servers[int(idx)]
59
-	logger.Debug("picked",
60
-		zap.Uint64("counter", counter),
61
-		zap.Uint64("idx", idx),
62
-		zap.String("Server", srv),
63
-	)
64
-
65
-	return srv
66
-}
67
-
68
-func (c *HttpQuery) doRequest(ctx context.Context, uri string, body []byte) (*ServerResponse, error) {
69
-	server := c.pickServer()
70
-	c.logger.Debug("picked server",
71
-		zap.String("server", server),
72
-	)
73
-
74
-	u, err := url.Parse(server + uri)
75
-	if err != nil {
76
-		return nil, err
77
-	}
78
-
79
-	logger := c.logger.With(
80
-		zap.String("server", server),
81
-		zap.String("name", c.groupName),
82
-		zap.String("uri", u.String()),
83
-	)
84
-
85
-	var reader io.Reader
86
-	if body != nil {
87
-		reader = bytes.NewReader(body)
88
-	}
89
-	req, err := http.NewRequest("GET", u.String(), reader)
90
-	req.Header.Set("Accept", c.encoding)
91
-	if err != nil {
92
-		return nil, err
93
-	}
94
-	req = cu.MarshalCtx(ctx, util.MarshalCtx(ctx, req))
95
-
96
-	logger.Debug("trying to get slot")
97
-
98
-	err = c.limiter.Enter(ctx, c.groupName)
99
-	if err != nil {
100
-		logger.Debug("timeout waiting for a slot")
101
-		return nil, err
102
-	}
103
-	logger.Debug("got slot")
104
-
105
-	resp, err := c.client.Do(req.WithContext(ctx))
106
-	c.limiter.Leave(ctx, server)
107
-	if err != nil {
108
-		logger.Error("error fetching result",
109
-			zap.Error(err),
110
-		)
111
-		return nil, err
112
-	}
113
-	defer resp.Body.Close()
114
-
115
-	body, err = ioutil.ReadAll(resp.Body)
116
-	if err != nil {
117
-		logger.Error("error reading body",
118
-			zap.Error(err),
119
-		)
120
-		return nil, err
121
-	}
122
-
123
-	if resp.StatusCode != http.StatusOK {
124
-		logger.Error("status not ok",
125
-			zap.Int("status_code", resp.StatusCode),
126
-		)
127
-		return nil, fmt.Errorf(types.ErrFailedToFetchFmt, c.groupName, resp.StatusCode, string(body))
128
-	}
129
-
130
-	return &ServerResponse{Server: server, Response: body}, nil
131
-}
132
-
133
-func (c *HttpQuery) DoQuery(ctx context.Context, uri string, body []byte) (*ServerResponse, *errors.Errors) {
134
-	maxTries := c.maxTries
135
-	if len(c.servers) > maxTries {
136
-		maxTries = len(c.servers)
137
-	}
138
-
139
-	var e errors.Errors
140
-	for try := 0; try < maxTries; try++ {
141
-		res, err := c.doRequest(ctx, uri, body)
142
-		if err != nil {
143
-			c.logger.Error("have errors",
144
-				zap.Error(err),
145
-			)
146
-			e.Add(err)
147
-			if ctx.Err() != nil {
148
-				e.HaveFatalErrors = true
149
-				return nil, &e
150
-			}
151
-			continue
152
-		}
153
-
154
-		return res, nil
155
-	}
156
-
157
-	e.AddFatal(types.ErrMaxTriesExceeded)
158
-	return nil, &e
159
-}

+ 0
- 9
vendor/github.com/go-graphite/carbonzipper/zipper/httpHeaders/headers.go View File

@@ -1,9 +0,0 @@
1
-package httpHeaders
2
-
3
-const (
4
-	ContentTypeJSON          = "application/json"
5
-	ContentTypeProtobuf      = "application/x-protobuf"
6
-	ContentTypePickle        = "application/pickle"
7
-	ContentTypeCarbonAPIv3PB = "application/x-carbonapi-v3-pb"
8
-	ContentTypeCarbonAPIv2PB = "application/x-protobuf"
9
-)

+ 0
- 23
vendor/github.com/go-graphite/carbonzipper/zipper/metadata/metadata.go View File

@@ -1,23 +0,0 @@
1
-package metadata
2
-
3
-import (
4
-	"sync"
5
-
6
-	"github.com/go-graphite/carbonzipper/limiter"
7
-	"github.com/go-graphite/carbonzipper/zipper/errors"
8
-	"github.com/go-graphite/carbonzipper/zipper/types"
9
-	"go.uber.org/zap"
10
-)
11
-
12
-type md struct {
13
-	sync.RWMutex
14
-	SupportedProtocols       map[string]struct{}
15
-	ProtocolInits            map[string]func(*zap.Logger, types.BackendV2) (types.ServerClient, *errors.Errors)
16
-	ProtocolInitsWithLimiter map[string]func(*zap.Logger, types.BackendV2, *limiter.ServerLimiter) (types.ServerClient, *errors.Errors)
17
-}
18
-
19
-var Metadata = md{
20
-	SupportedProtocols:       make(map[string]struct{}),
21
-	ProtocolInits:            make(map[string]func(*zap.Logger, types.BackendV2) (types.ServerClient, *errors.Errors)),
22
-	ProtocolInitsWithLimiter: make(map[string]func(*zap.Logger, types.BackendV2, *limiter.ServerLimiter) (types.ServerClient, *errors.Errors)),
23
-}

+ 0
- 233
vendor/github.com/go-graphite/carbonzipper/zipper/protocols/auto/auto_group.go View File

@@ -1,233 +0,0 @@
1
-package auto
2
-
3
-import (
4
-	"context"
5
-	"net"
6
-	"net/http"
7
-	"net/url"
8
-	"time"
9
-
10
-	"github.com/go-graphite/carbonzipper/limiter"
11
-	"github.com/go-graphite/carbonzipper/zipper/broadcast"
12
-	"github.com/go-graphite/carbonzipper/zipper/errors"
13
-	"github.com/go-graphite/carbonzipper/zipper/helper"
14
-	"github.com/go-graphite/carbonzipper/zipper/httpHeaders"
15
-	"github.com/go-graphite/carbonzipper/zipper/metadata"
16
-	"github.com/go-graphite/carbonzipper/zipper/types"
17
-	protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
18
-	"go.uber.org/zap"
19
-)
20
-
21
-func init() {
22
-	aliases := []string{"auto"}
23
-	metadata.Metadata.Lock()
24
-	for _, name := range aliases {
25
-		metadata.Metadata.SupportedProtocols[name] = struct{}{}
26
-		metadata.Metadata.ProtocolInits[name] = New
27
-		metadata.Metadata.ProtocolInitsWithLimiter[name] = NewWithLimiter
28
-	}
29
-	defer metadata.Metadata.Unlock()
30
-}
31
-
32
-type capabilityResponse struct {
33
-	server   string
34
-	protocol string
35
-}
36
-
37
-//_internal/capabilities/
38
-func doQuery(ctx context.Context, logger *zap.Logger, groupName string, httpClient *http.Client, limiter *limiter.ServerLimiter, server string, payload []byte, resChan chan<- capabilityResponse) {
39
-	httpQuery := helper.NewHttpQuery(logger, groupName, []string{server}, 1, limiter, httpClient, httpHeaders.ContentTypeCarbonAPIv3PB)
40
-	rewrite, _ := url.Parse("http://127.0.0.1/_internal/capabilities/")
41
-
42
-	res, e := httpQuery.DoQuery(ctx, rewrite.RequestURI(), payload)
43
-	if e != nil || res == nil || res.Response == nil || len(res.Response) == 0 {
44
-		logger.Info("will assume old protocol")
45
-		resChan <- capabilityResponse{
46
-			server:   server,
47
-			protocol: "protobuf",
48
-		}
49
-		return
50
-	}
51
-
52
-	response := protov3.CapabilityResponse{}
53
-	logger.Debug("response",
54
-		zap.Any("res", res),
55
-	)
56
-	err := response.Unmarshal(res.Response)
57
-
58
-	if err != nil {
59
-		resChan <- capabilityResponse{
60
-			server:   server,
61
-			protocol: "protobuf",
62
-		}
63
-		return
64
-	}
65
-
66
-	resChan <- capabilityResponse{
67
-		server:   server,
68
-		protocol: response.SupportedProtocols[0],
69
-	}
70
-
71
-}
72
-
73
-type CapabilityResponse struct {
74
-	ProtoToServers map[string][]string
75
-}
76
-
77
-func getBestSupportedProtocol(logger *zap.Logger, servers []string, concurencyLimit int) *CapabilityResponse {
78
-	response := &CapabilityResponse{
79
-		ProtoToServers: make(map[string][]string),
80
-	}
81
-	groupName := "capability query"
82
-	limiter := limiter.NewServerLimiter([]string{groupName}, concurencyLimit)
83
-
84
-	httpClient := &http.Client{
85
-		Transport: &http.Transport{
86
-			DialContext: (&net.Dialer{
87
-				// TODO: Make that configurable
88
-				Timeout:   200 * time.Millisecond,
89
-				KeepAlive: 30 * time.Second,
90
-				DualStack: true,
91
-			}).DialContext,
92
-		},
93
-	}
94
-
95
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
96
-	defer cancel()
97
-
98
-	request := protov3.CapabilityRequest{}
99
-	data, err := request.Marshal()
100
-	if err != nil {
101
-		return nil
102
-	}
103
-
104
-	resCh := make(chan capabilityResponse, len(servers))
105
-
106
-	for _, srv := range servers {
107
-		go doQuery(ctx, logger, groupName, httpClient, limiter, srv, data, resCh)
108
-	}
109
-
110
-	answeredServers := make(map[string]struct{})
111
-	responseCounts := 0
112
-GATHER:
113
-	for {
114
-		if responseCounts == len(servers) && len(resCh) == 0 {
115
-			break GATHER
116
-		}
117
-		select {
118
-		case res := <-resCh:
119
-			responseCounts++
120
-			answeredServers[res.server] = struct{}{}
121
-			if res.protocol == "" {
122
-				return nil
123
-			}
124
-			p := response.ProtoToServers[res.protocol]
125
-			response.ProtoToServers[res.protocol] = append(p, res.server)
126
-		case <-ctx.Done():
127
-			noAnswer := make([]string, 0)
128
-			for _, s := range servers {
129
-				if _, ok := answeredServers[s]; !ok {
130
-					noAnswer = append(noAnswer, s)
131
-				}
132
-			}
133
-			logger.Warn("timeout waiting for more responses",
134
-				zap.Strings("no_answers_from", noAnswer),
135
-			)
136
-			break GATHER
137
-		}
138
-	}
139
-
140
-	return response
141
-}
142
-
143
-// RoundRobin is used to connect to backends inside clientGroups, implements ServerClient interface
144
-type AutoGroup struct {
145
-	groupName string
146
-
147
-	httpQuery *helper.HttpQuery
148
-}
149
-
150
-func NewWithLimiter(logger *zap.Logger, config types.BackendV2, limiter *limiter.ServerLimiter) (types.ServerClient, *errors.Errors) {
151
-	return nil, errors.Fatal("auto group doesn't support anything useful except for New")
152
-}
153
-
154
-func New(logger *zap.Logger, config types.BackendV2) (types.ServerClient, *errors.Errors) {
155
-	logger = logger.With(zap.String("type", "autoGroup"), zap.String("name", config.GroupName))
156
-
157
-	limit := 100
158
-	if config.ConcurrencyLimit != nil {
159
-		limit = *config.ConcurrencyLimit
160
-	}
161
-	res := getBestSupportedProtocol(logger, config.Servers, limit)
162
-	if res == nil {
163
-		return nil, errors.Fatalf("can't query all backend")
164
-	}
165
-
166
-	var broadcastClients []types.ServerClient
167
-	for proto, servers := range res.ProtoToServers {
168
-		metadata.Metadata.RLock()
169
-		backendInit, ok := metadata.Metadata.ProtocolInits[proto]
170
-		metadata.Metadata.RUnlock()
171
-		if !ok {
172
-			var protocols []string
173
-			metadata.Metadata.RLock()
174
-			for p := range metadata.Metadata.SupportedProtocols {
175
-				protocols = append(protocols, p)
176
-			}
177
-			metadata.Metadata.RUnlock()
178
-			logger.Error("unknown backend protocol",
179
-				zap.Any("backend", config),
180
-				zap.String("requested_protocol", proto),
181
-				zap.Strings("supported_backends", protocols),
182
-			)
183
-			return nil, errors.Fatalf("unknown backend protocol '%v'", proto)
184
-		}
185
-
186
-		cfg := config
187
-		cfg.GroupName = config.GroupName + "_" + proto
188
-		cfg.Servers = servers
189
-		c, ePtr := backendInit(logger, cfg)
190
-		if ePtr != nil && ePtr.HaveFatalErrors {
191
-			return nil, ePtr
192
-		}
193
-
194
-		broadcastClients = append(broadcastClients, c)
195
-	}
196
-
197
-	return broadcast.NewBroadcastGroup(logger, config.GroupName+"_broadcast", broadcastClients, 600, limit, *config.Timeouts)
198
-}
199
-
200
-func (c AutoGroup) MaxMetricsPerRequest() int {
201
-	return -1
202
-}
203
-
204
-func (c AutoGroup) Name() string {
205
-	return c.groupName
206
-}
207
-
208
-func (c AutoGroup) Backends() []string {
209
-	return nil
210
-}
211
-
212
-func (c *AutoGroup) Fetch(ctx context.Context, request *protov3.MultiFetchRequest) (*protov3.MultiFetchResponse, *types.Stats, *errors.Errors) {
213
-	return nil, nil, errors.Fatal("auto group doesn't support fetch")
214
-}
215
-
216
-func (c *AutoGroup) Find(ctx context.Context, request *protov3.MultiGlobRequest) (*protov3.MultiGlobResponse, *types.Stats, *errors.Errors) {
217
-	return nil, nil, errors.Fatal("auto group doesn't support fetch")
218
-}
219
-
220
-func (c *AutoGroup) Info(ctx context.Context, request *protov3.MultiMetricsInfoRequest) (*protov3.ZipperInfoResponse, *types.Stats, *errors.Errors) {
221
-	return nil, nil, errors.Fatal("auto group doesn't support fetch")
222
-}
223
-
224
-func (c *AutoGroup) List(ctx context.Context) (*protov3.ListMetricsResponse, *types.Stats, *errors.Errors) {
225
-	return nil, nil, errors.Fatal("auto group doesn't support fetch")
226
-}
227
-func (c *AutoGroup) Stats(ctx context.Context) (*protov3.MetricDetailsResponse, *types.Stats, *errors.Errors) {
228
-	return nil, nil, errors.FromErr(types.ErrNotImplementedYet)
229
-}
230
-
231
-func (c *AutoGroup) ProbeTLDs(ctx context.Context) ([]string, *errors.Errors) {
232
-	return nil, errors.Fatal("auto group doesn't support fetch")
233
-}

+ 0
- 331
vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/graphite_group.go View File

@@ -1,331 +0,0 @@
1
-package v2
2
-
3
-import (
4
-	"context"
5
-	"math"
6
-	"net"
7
-	"net/http"
8
-	"net/url"
9
-	"strconv"
10
-
11
-	"github.com/go-graphite/carbonzipper/limiter"
12
-	"github.com/go-graphite/carbonzipper/zipper/errors"
13
-	"github.com/go-graphite/carbonzipper/zipper/helper"
14
-	"github.com/go-graphite/carbonzipper/zipper/httpHeaders"
15
-	"github.com/go-graphite/carbonzipper/zipper/metadata"
16
-	"github.com/go-graphite/carbonzipper/zipper/protocols/graphite/msgpack"
17
-	"github.com/go-graphite/carbonzipper/zipper/types"
18
-	protov2 "github.com/go-graphite/protocol/carbonapi_v2_pb"
19
-	protov3 "github.com/go-graphite/protocol/carbonapi_v3_pb"
20
-
21
-	"go.uber.org/zap"
22
-)
23
-
24
-func init() {
25
-	aliases := []string{"msgpack"}
26
-	metadata.Metadata.Lock()
27
-	for _, name := range aliases {
28
-		metadata.Metadata.SupportedProtocols[name] = struct{}{}
29
-		metadata.Metadata.ProtocolInits[name] = New
30
-		metadata.Metadata.ProtocolInitsWithLimiter[name] = NewWithLimiter
31
-	}
32
-	defer metadata.Metadata.Unlock()
33
-}
34
-
35
-// RoundRobin is used to connect to backends inside clientGroups, implements ServerClient interface
36
-type GraphiteGroup struct {
37
-	groupName string
38
-	servers   []string
39
-	protocol  string
40
-
41
-	client *http.Client
42
-
43
-	counter             uint64
44
-	maxIdleConnsPerHost int
45
-
46
-	limiter              *limiter.ServerLimiter
47
-	logger               *zap.Logger
48
-	timeout              types.Timeouts
49
-	maxTries             int
50
-	maxMetricsPerRequest int
51
-
52
-	httpQuery *helper.HttpQuery
53
-}
54
-
55
-func NewWithLimiter(logger *zap.Logger, config types.BackendV2, limiter *limiter.ServerLimiter) (types.ServerClient, *errors.Errors) {
56
-	logger = logger.With(zap.String("type", "graphite"), zap.String("protocol", config.Protocol), zap.String("name", config.GroupName))
57
-
58
-	httpClient := &http.Client{
59
-		Transport: &http.Transport{
60
-			MaxIdleConnsPerHost: *config.MaxIdleConnsPerHost,
61
-			DialContext: (&net.Dialer{
62
-				Timeout:   config.Timeouts.Connect,
63
-				KeepAlive: *config.KeepAliveInterval,
64
-				DualStack: true,
65
-			}).DialContext,
66
-		},
67
-	}
68
-
69
-	httpQuery := helper.NewHttpQuery(logger, config.GroupName, config.Servers, *config.MaxTries, limiter, httpClient, httpHeaders.ContentTypeCarbonAPIv2PB)
70
-
71
-	c := &GraphiteGroup{
72
-		groupName:            config.GroupName,
73
-		servers:              config.Servers,
74
-		protocol:             config.Protocol,
75
-		timeout:              *config.Timeouts,
76
-		maxTries:             *config.MaxTries,
77
-		maxMetricsPerRequest: config.MaxGlobs,
78
-
79
-		client:  httpClient,
80
-		limiter: limiter,
81
-		logger:  logger,
82
-
83
-		httpQuery: httpQuery,
84
-	}
85
-	return c, nil
86
-}
87
-
88
-func New(logger *zap.Logger, config types.BackendV2) (types.ServerClient, *errors.Errors) {
89
-	if config.ConcurrencyLimit == nil {
90
-		return nil, errors.Fatal("concurency limit is not set")
91
-	}
92
-	if len(config.Servers) == 0 {
93
-		return nil, errors.Fatal("no servers specified")
94
-	}
95
-	limiter := limiter.NewServerLimiter([]string{config.GroupName}, *config.ConcurrencyLimit)
96
-
97
-	return NewWithLimiter(logger, config, limiter)
98
-}
99
-
100
-func (c GraphiteGroup) MaxMetricsPerRequest() int {
101
-	return c.maxMetricsPerRequest
102
-}
103
-
104
-func (c GraphiteGroup) Name() string {
105
-	return c.groupName
106
-}
107
-
108
-func (c GraphiteGroup) Backends() []string {
109
-	return c.servers
110
-}
111
-
112
-func (c *GraphiteGroup) Fetch(ctx context.Context, request *protov3.MultiFetchRequest) (*protov3.MultiFetchResponse, *types.Stats, *errors.Errors) {
113
-	stats := &types.Stats{}
114
-	rewrite, _ := url.Parse("http://127.0.0.1/render/")
115
-
116
-	pathExprToTargets := make(map[string][]string)
117
-	for _, m := range request.Metrics {
118
-		targets := pathExprToTargets[m.PathExpression]
119
-		pathExprToTargets[m.PathExpression] = append(targets, m.Name)
120
-	}
121
-
122
-	var r protov3.MultiFetchResponse
123
-	for pathExpr, targets := range pathExprToTargets {
124
-		v := url.Values{
125
-			"target": targets,
126
-			"format": []string{c.protocol},
127
-			"from":   []string{strconv.Itoa(int(request.Metrics[0].StartTime))},
128
-			"until":  []string{strconv.Itoa(int(request.Metrics[0].StopTime))},
129
-		}
130
-		rewrite.RawQuery = v.Encode()
131
-		res, err := c.httpQuery.DoQuery(ctx, rewrite.RequestURI(), nil)
132
-		if err == nil {
133
-			err = &errors.Errors{}
134
-		}
135
-		if err.HaveFatalErrors {
136
-			err.HaveFatalErrors = false
137
-			return nil, stats, err
138
-		}
139
-
140
-		var metrics msgpack.MultiGraphiteFetchResponse
141
-		_, e := metrics.UnmarshalMsg(res.Response)
142
-		err.AddFatal(e)
143
-		if err.HaveFatalErrors {
144
-			return nil, stats, err
145
-		}
146
-
147
-		for _, m := range metrics {
148
-			vals := make([]float64, len(m.Values))
149
-			for i, vIface := range m.Values {
150
-				if v, ok := vIface.(float64); ok {
151
-					vals[i] = v
152
-				} else {
153
-					vals[i] = math.NaN()
154
-				}
155
-			}
156
-			r.Metrics = append(r.Metrics, protov3.FetchResponse{
157
-				Name:              m.Name,
158
-				PathExpression:    pathExpr,
159
-				ConsolidationFunc: "Average",
160
-				StopTime:          int64(m.End),
161
-				StartTime:         int64(m.Start),
162
-				StepTime:          int64(m.Step),
163
-				Values:            vals,
164
-				XFilesFactor:      0.0,
165
-			})
166
-		}
167
-	}
168
-
169
-	return &r, stats, nil
170
-}
171
-
172
-func (c *GraphiteGroup) Find(ctx context.Context, request *protov3.MultiGlobRequest) (*protov3.MultiGlobResponse, *types.Stats, *errors.Errors) {
173
-	logger := c.logger.With(zap.String("type", "find"), zap.Strings("request", request.Metrics))
174
-	stats := &types.Stats{}
175
-	rewrite, _ := url.Parse("http://127.0.0.1/metrics/find/")
176
-
177
-	var r protov3.MultiGlobResponse
178
-	r.Metrics = make([]protov3.GlobResponse, 0)
179
-	var e errors.Errors
180
-	for _, query := range request.Metrics {
181
-		v := url.Values{
182
-			"query":  []string{query},
183
-			"format": []string{c.protocol},
184
-		}
185
-		rewrite.RawQuery = v.Encode()
186
-		res, err := c.httpQuery.DoQuery(ctx, rewrite.RequestURI(), nil)
187
-		if err != nil {
188
-			e.Merge(err)
189
-			continue
190
-		}
191
-		var globs msgpack.MultiGraphiteGlobResponse
192
-		_, marshalErr := globs.UnmarshalMsg(res.Response)
193
-		if marshalErr != nil {
194
-			e.Add(marshalErr)
195
-			continue
196
-		}
197
-
198
-		stats.Servers = append(stats.Servers, res.Server)
199
-		matches := make([]protov3.GlobMatch, 0, len(globs))
200
-		for _, m := range globs {
201
-			matches = append(matches, protov3.GlobMatch{
202
-				Path:   m.Path,
203
-				IsLeaf: m.IsLeaf,
204
-			})
205
-		}
206
-		r.Metrics = append(r.Metrics, protov3.GlobResponse{
207
-			Name:    query,
208
-			Matches: matches,
209
-		})
210
-	}
211
-
212
-	if len(e.Errors) != 0 {
213
-		logger.Error("errors occurred while getting results",
214
-			zap.Any("errors", e.Errors),
215
-		)
216
-	}
217
-
218
-	if len(r.Metrics) == 0 {
219
-		return nil, stats, errors.FromErr(types.ErrNoResponseFetched)
220
-	}
221
-	return &r, stats, nil
222
-}
223
-
224
-func (c *GraphiteGroup) Info(ctx context.Context, request *protov3.MultiMetricsInfoRequest) (*protov3.ZipperInfoResponse, *types.Stats, *errors.Errors) {
225
-	logger := c.logger.With(zap.String("type", "info"))
226
-	stats := &types.Stats{}
227
-	rewrite, _ := url.Parse("http://127.0.0.1/info/")
228
-
229
-	var r protov3.ZipperInfoResponse
230
-	var e errors.Errors
231
-	r.Info = make(map[string]protov3.MultiMetricsInfoResponse)
232
-	data := protov3.MultiMetricsInfoResponse{}
233
-	server := c.groupName
234
-	if len(c.servers) == 1 {
235
-		server = c.servers[0]
236
-	}
237
-
238
-	for _, query := range request.Names {
239
-		v := url.Values{
240
-			"target": []string{query},
241
-			"format": []string{c.protocol},
242
-		}
243
-		rewrite.RawQuery = v.Encode()
244
-		res, e2 := c.httpQuery.DoQuery(ctx, rewrite.RequestURI(), nil)
245
-		if e2 != nil {
246
-			e.Merge(e2)
247
-			continue
248
-		}
249
-
250
-		var info protov2.InfoResponse
251
-		err := info.Unmarshal(res.Response)
252
-		if err != nil {
253
-			e.Add(err)
254
-			continue
255
-		}
256
-		stats.Servers = append(stats.Servers, res.Server)
257
-
258
-		if info.AggregationMethod == "" {
259
-			info.AggregationMethod = "average"
260
-		}
261
-		infoV3 := protov3.MetricsInfoResponse{
262
-			Name:              info.Name,
263
-			ConsolidationFunc: info.AggregationMethod,
264
-			XFilesFactor:      info.XFilesFactor,
265
-			MaxRetention:      int64(info.MaxRetention),
266
-		}
267
-
268
-		for _, r := range info.Retentions {
269
-			newR := protov3.Retention{
270
-				SecondsPerPoint: int64(r.SecondsPerPoint),
271
-				NumberOfPoints:  int64(r.NumberOfPoints),
272
-			}
273
-			infoV3.Retentions = append(infoV3.Retentions, newR)
274
-		}
275
-
276
-		data.Metrics = append(data.Metrics, infoV3)
277
-	}
278
-	r.Info[server] = data
279
-
280
-	if len(e.Errors) != 0 {
281
-		logger.Error("errors occurred while getting results",
282
-			zap.Any("errors", e.Errors),
283
-		)
284
-	}
285
-
286
-	if len(r.Info[server].Metrics) == 0 {
287
-		return nil, stats, errors.FromErr(types.ErrNoResponseFetched)
288
-	}
289
-
290
-	logger.Debug("got client response",
291
-		zap.Any("r", r),
292
-	)
293
-
294
-	return &r, stats, nil
295
-}
296
-
297
-func (c *GraphiteGroup) List(ctx context.Context) (*protov3.ListMetricsResponse, *types.Stats, *errors.Errors) {
298
-	return nil, nil, errors.FromErr(types.ErrNotImplementedYet)
299
-}
300
-func (c *GraphiteGroup) Stats(ctx context.Context) (*protov3.MetricDetailsResponse, *types.Stats, *errors.Errors) {
301
-	return nil, nil, errors.FromErr(types.ErrNotImplementedYet)
302
-}
303
-
304
-func (c *GraphiteGroup) ProbeTLDs(ctx context.Context) ([]string, *errors.Errors) {
305
-	logger := c.logger.With(zap.String("function", "prober"))
306
-	req := &protov3.MultiGlobRequest{
307
-		Metrics: []string{"*"},
308
-	}
309
-
310
-	logger.Debug("doing request",
311
-		zap.Strings("request", req.Metrics),
312
-	)
313
-
314
-	res, _, err := c.Find(ctx, req)
315
-	if err != nil {
316
-		return nil, err
317
-	}
318
-
319
-	var tlds []string
320
-	for _, m := range res.Metrics {
321
-		for _, v := range m.Matches {
322
-			tlds = append(tlds, v.Path)
323
-		}
324
-	}
325
-
326
-	logger.Debug("will return data",
327
-		zap.Strings("tlds", tlds),
328
-	)
329
-
330
-	return tlds, nil
331
-}

+ 0
- 21
vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/msgpack/type.go View File

@@ -1,21 +0,0 @@
1
-package msgpack
2
-
3
-//go:generate msgp
4
-
5
-type GraphiteFetchResponse struct {
6
-	Start          uint32    `msg:"start"`
7
-	End            uint32    `msg:"end"`
8
-	Step           uint32    `msg:"step"`
9
-	Name           string    `msg:"name"`
10
-	PathExpression string    `msg:"pathExpression"`
11
-	Values         []interface{} `msg:"values"`
12
-}
13
-
14
-type MultiGraphiteFetchResponse []GraphiteFetchResponse
15
-
16
-type GraphiteGlobResponse struct {
17
-	IsLeaf bool   `msg:"isLeaf"`
18
-	Path   string `msg:"path"`
19
-}
20
-
21
-type MultiGraphiteGlobResponse []GraphiteGlobResponse

+ 0
- 600
vendor/github.com/go-graphite/carbonzipper/zipper/protocols/graphite/msgpack/type_gen.go View File

@@ -1,600 +0,0 @@
1
-package msgpack
2
-
3
-// NOTE: THIS FILE WAS PRODUCED BY THE
4
-// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
5
-// DO NOT EDIT
6
-
7
-import (
8
-	"github.com/tinylib/msgp/msgp"
9
-)
10
-
11
-// DecodeMsg implements msgp.Decodable
12
-func (z *GraphiteFetchResponse) DecodeMsg(dc *msgp.Reader) (err error) {
13
-	var field []byte
14
-	_ = field
15
-	var zb0001 uint32
16
-	zb0001, err = dc.ReadMapHeader()
17
-	if err != nil {
18
-		return
19
-	}
20
-	for zb0001 > 0 {
21
-		zb0001--
22
-		field, err = dc.ReadMapKeyPtr()
23
-		if err != nil {
24
-			return
25
-		}
26
-		switch msgp.UnsafeString(field) {
27
-		case "start":
28
-			z.Start, err = dc.ReadUint32()
29
-			if err != nil {
30
-				return
31
-			}
32
-		case "end":
33
-			z.End, err = dc.ReadUint32()
34
-			if err != nil {
35
-				return
36
-			}
37
-		case "step":
38
-			z.Step, err = dc.ReadUint32()
39
-			if err != nil {
40
-				return
41
-			}
42
-		case "name":
43
-			z.Name, err = dc.ReadString()
44
-			if err != nil {
45
-				return
46
-			}
47
-		case "pathExpression":
48
-			z.PathExpression, err = dc.ReadString()
49
-			if err != nil {
50
-				return
51
-			}
52
-		case "values":
53
-			var zb0002 uint32
54
-			zb0002, err = dc.ReadArrayHeader()
55
-			if err != nil {
56
-				return
57
-			}
58
-			if cap(z.Values) >= int(zb0002) {
59
-				z.Values = (z.Values)[:zb0002]
60
-			} else {
61
-				z.Values = make([]interface{}, zb0002)
62
-			}
63
-			for za0001 := range z.Values {
64
-				z.Values[za0001], err = dc.ReadIntf()
65
-				if err != nil {
66
-					return
67
-				}
68
-			}
69
-		default:
70
-			err = dc.Skip()
71
-			if err != nil {
72
-				return
73
-			}
74
-		}
75
-	}
76
-	return
77
-}
78
-
79
-// EncodeMsg implements msgp.Encodable
80
-func (z *GraphiteFetchResponse) EncodeMsg(en *msgp.Writer) (err error) {
81
-	// map header, size 6
82
-	// write "start"
83
-	err = en.Append(0x86, 0xa5, 0x73, 0x74, 0x61, 0x72, 0x74)
84
-	if err != nil {
85
-		return
86
-	}
87
-	err = en.WriteUint32(z.Start)
88
-	if err != nil {
89
-		return
90
-	}
91
-	// write "end"
92
-	err = en.Append(0xa3, 0x65, 0x6e, 0x64)
93
-	if err != nil {
94
-		return
95
-	}
96
-	err = en.WriteUint32(z.End)
97
-	if err != nil {
98
-		return
99
-	}
100
-	// write "step"
101
-	err = en.Append(0xa4, 0x73, 0x74, 0x65, 0x70)
102
-	if err != nil {
103
-		return
104
-	}
105
-	err = en.WriteUint32(z.Step)
106
-	if err != nil {
107
-		return
108
-	}
109
-	// write "name"
110
-	err = en.Append(0xa4, 0x6e, 0x61, 0x6d, 0x65)
111
-	if err != nil {
112
-		return
113
-	}
114
-	err = en.WriteString(z.Name)
115
-	if err != nil {
116
-		return
117
-	}
118
-	// write "pathExpression"
119
-	err = en.Append(0xae, 0x70, 0x61, 0x74, 0x68, 0x45, 0x78, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e)
120
-	if err != nil {
121
-		return
122
-	}
123
-	err = en.WriteString(z.PathExpression)
124
-	if err != nil {
125
-		return
126
-	}
127
-	// write "values"
128
-	err = en.Append(0xa6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73)
129
-	if err != nil {
130
-		return
131
-	}
132
-	err = en.WriteArrayHeader(uint32(len(z.Values)))
133
-	if err != nil {
134
-		return
135
-	}
136
-	for za0001 := range z.Values {
137
-		err = en.WriteIntf(z.Values[za0001])
138
-		if err != nil {
139
-			return
140
-		}
141
-	}
142
-	return
143
-}
144
-
145
-// MarshalMsg implements msgp.Marshaler
146
-func (z *GraphiteFetchResponse) MarshalMsg(b []byte) (o []byte, err error) {
147
-	o = msgp.Require(b, z.Msgsize())
148
-	// map header, size 6
149
-	// string "start"
150
-	o = append(o, 0x86, 0xa5, 0x73, 0x74, 0x61, 0x72, 0x74)
151
-	o = msgp.AppendUint32(o, z.Start)
152
-	// string "end"
153
-	o = append(o, 0xa3, 0x65, 0x6e, 0x64)
154
-	o = msgp.AppendUint32(o, z.End)
155
-	// string "step"
156
-	o = append(o, 0xa4, 0x73, 0x74, 0x65, 0x70)
157
-	o = msgp.AppendUint32(o, z.Step)
158
-	// string "name"
159
-	o = append(o, 0xa4, 0x6e, 0x61, 0x6d, 0x65)
160
-	o = msgp.AppendString(o, z.Name)
161
-	// string "pathExpression"
162
-	o = append(o, 0xae, 0x70, 0x61, 0x74, 0x68, 0x45, 0x78, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e)
163
-	o = msgp.AppendString(o, z.PathExpression)
164
-	// string "values"
165
-	o = append(o, 0xa6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73)
166
-	o = msgp.AppendArrayHeader(o, uint32(len(z.Values)))
167
-	for za0001 := range z.Values {
168
-		o, err = msgp.AppendIntf(o, z.Values[za0001])
169
-		if err != nil {
170
-			return
171
-		}
172
-	}
173
-	return
174
-}
175
-
176
-// UnmarshalMsg implements msgp.Unmarshaler
177
-func (z *GraphiteFetchResponse) UnmarshalMsg(bts []byte) (o []byte, err error) {
178
-	var field []byte
179
-	_ = field
180
-	var zb0001 uint32
181
-	zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
182
-	if err != nil {
183
-		return
184
-	}
185
-	for zb0001 > 0 {
186
-		zb0001--
187
-		field, bts, err = msgp.ReadMapKeyZC(bts)
188
-		if err != nil {
189
-			return
190
-		}
191
-		switch msgp.UnsafeString(field) {
192
-		case "start":
193
-			z.Start, bts, err = msgp.ReadUint32Bytes(bts)
194
-			if err != nil {
195
-				return
196
-			}
197
-		case "end":
198
-			z.End, bts, err = msgp.ReadUint32Bytes(bts)
199
-			if err != nil {
200
-				return
201
-			}
202
-		case "step":
203
-			z.Step, bts, err = msgp.ReadUint32Bytes(bts)
204
-			if err != nil {
205
-				return
206
-			}
207
-		case "name":
208
-			z.Name, bts, err = msgp.ReadStringBytes(bts)
209
-			if err != nil {
210
-				return
211
-			}
212
-		case "pathExpression":
213
-			z.PathExpression, bts, err = msgp.ReadStringBytes(bts)
214
-			if err != nil {
215
-				return
216
-			}
217
-		case "values":
218
-			var zb0002 uint32
219
-			zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
220
-			if err != nil {
221
-				return
222
-			}
223
-			if cap(z.Values) >= int(zb0002) {
224
-				z.Values = (z.Values)[:zb0002]
225
-			} else {
226
-				z.Values = make([]interface{}, zb0002)
227
-			}
228
-			for za0001 := range z.Values {
229
-				z.Values[za0001], bts, err = msgp.ReadIntfBytes(bts)
230
-				if err != nil {
231
-					return
232
-				}
233
-			}
234
-		default:
235
-			bts, err = msgp.Skip(bts)
236
-			if err != nil {
237
-				return
238
-			}
239
-		}
240
-	}
241
-	o = bts
242
-	return
243
-}
244
-