Browse Source

Backport changes from v5

tags/v6.0.0-rc2
Oliver Eilhard 2 years ago
parent
commit
ad643b571a

+ 1
- 0
.travis.yml.off View File

@@ -1,5 +1,6 @@
sudo: required
language: go
script: go test -race -v . ./config
go:
- 1.7
- 1.8

+ 8
- 0
CONTRIBUTORS View File

@@ -15,8 +15,10 @@ Alex [@akotlar](https://github.com/akotlar)
Alexandre Olivier [@aliphen](https://github.com/aliphen)
Alexey Sharov [@nizsheanez](https://github.com/nizsheanez)
AndreKR [@AndreKR](https://github.com/AndreKR)
André Bierlein [@ligustah](https://github.com/ligustah)
Andrew Dunham [@andrew-d](https://github.com/andrew-d)
Andrew Gaul [@andrewgaul](https://github.com/andrewgaul)
Andy Walker [@alaska](https://github.com/alaska)
Arquivei [@arquivei](https://github.com/arquivei)
Benjamin Fernandes [@LotharSee](https://github.com/LotharSee)
Benjamin Zarzycki [@kf6nux](https://github.com/kf6nux)
@@ -26,7 +28,9 @@ Bryan Conklin [@bmconklin](https://github.com/bmconklin)
Bruce Zhou [@brucez-isell](https://github.com/brucez-isell)
cforbes [@cforbes](https://github.com/cforbes)
Chris M [@tebriel](https://github.com/tebriel)
Chris Rice [@donutmonger](https://github.com/donutmonger)
Christophe Courtaut [@kri5](https://github.com/kri5)
Connor Peet [@connor4312](https://github.com/connor4312)
Conrad Pankoff [@deoxxa](https://github.com/deoxxa)
Corey Scott [@corsc](https://github.com/corsc)
Daniel Barrett [@shendaras](https://github.com/shendaras)
@@ -59,6 +63,7 @@ John Goodall [@jgoodall](https://github.com/jgoodall)
John Stanford [@jxstanford](https://github.com/jxstanford)
jun [@coseyo](https://github.com/coseyo)
Junpei Tsuji [@jun06t](https://github.com/jun06t)
Keith Hatton [@khatton-ft](https://github.com/khatton-ft)
Kenta SUZUKI [@suzuken](https://github.com/suzuken)
Kevin Mulvey [@kmulvey](https://github.com/kmulvey)
Kyle Brandt [@kylebrandt](https://github.com/kylebrandt)
@@ -77,6 +82,7 @@ Naoya Tsutsumi [@tutuming](https://github.com/tutuming)
Nicholas Wolff [@nwolff](https://github.com/nwolff)
Nick K [@utrack](https://github.com/utrack)
Nick Whyte [@nickw444](https://github.com/nickw444)
Nicolae Vartolomei [@nvartolomei](https://github.com/nvartolomei)
Orne Brocaar [@brocaar](https://github.com/brocaar)
Pete C [@peteclark-ft](https://github.com/peteclark-ft)
Radoslaw Wesolowski [r--w](https://github.com/r--w)
@@ -88,6 +94,7 @@ Stephen Kubovic [@stephenkubovic](https://github.com/stephenkubovic)
Stuart Warren [@Woz](https://github.com/stuart-warren)
Sulaiman [@salajlan](https://github.com/salajlan)
Sundar [@sundarv85](https://github.com/sundarv85)
Swarlston [@Swarlston](https://github.com/Swarlston)
Take [ww24](https://github.com/ww24)
Tetsuya Morimoto [@t2y](https://github.com/t2y)
TimeEmit [@TimeEmit](https://github.com/timeemit)
@@ -99,3 +106,4 @@ Wyndham Blanton [@wyndhblb](https://github.com/wyndhblb)
Yarden Bar [@ayashjorden](https://github.com/ayashjorden)
zakthomas [@zakthomas](https://github.com/zakthomas)
singham [@zhaochenxiao90](https://github.com/zhaochenxiao90)
Roman Colohanin [@zuzmic](https://github.com/zuzmic)

+ 53
- 3
client.go View File

@@ -9,15 +9,19 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/pkg/errors"

"gopkg.in/olivere/elastic.v5/config"
)

const (
@@ -288,6 +292,47 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {
return c, nil
}

// NewClientFromConfig initializes a client from a configuration.
func NewClientFromConfig(cfg *config.Config) (*Client, error) {
var options []ClientOptionFunc
if cfg != nil {
if cfg.URL != "" {
options = append(options, SetURL(cfg.URL))
}
if cfg.Errorlog != "" {
f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize error log")
}
l := log.New(f, "", 0)
options = append(options, SetErrorLog(l))
}
if cfg.Tracelog != "" {
f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize trace log")
}
l := log.New(f, "", 0)
options = append(options, SetTraceLog(l))
}
if cfg.Infolog != "" {
f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize info log")
}
l := log.New(f, "", 0)
options = append(options, SetInfoLog(l))
}
if cfg.Username != "" || cfg.Password != "" {
options = append(options, SetBasicAuth(cfg.Username, cfg.Password))
}
if cfg.Sniff != nil {
options = append(options, SetSniff(*cfg.Sniff))
}
}
return NewClient(options...)
}

// NewSimpleClient creates a new short-lived Client that can be used in
// use cases where you need e.g. one client per request.
//
@@ -1233,6 +1278,9 @@ func (c *Client) PerformRequest(ctx context.Context, method, path string, params
defer res.Body.Close()
}

// Tracing
c.dumpResponse(res)

// Check for errors
if err := checkResponse((*http.Request)(req), res, ignoreErrors...); err != nil {
// No retry if request succeeded
@@ -1241,9 +1289,6 @@ func (c *Client) PerformRequest(ctx context.Context, method, path string, params
return resp, err
}

// Tracing
c.dumpResponse(res)

// We successfully made a request with this connection
conn.MarkAsHealthy()

@@ -1620,6 +1665,11 @@ func (c *Client) TasksList() *TasksListService {
return NewTasksListService(c)
}

// TasksGetTask retrieves a task running on the cluster.
func (c *Client) TasksGetTask() *TasksGetTaskService {
return NewTasksGetTaskService(c)
}

// TODO Pending cluster tasks
// TODO Cluster Reroute
// TODO Cluster Update Settings

+ 31
- 1
client_test.go View File

@@ -16,6 +16,7 @@ import (
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"

@@ -294,6 +295,7 @@ func TestClientHealthcheckTimeoutLeak(t *testing.T) {
// leaks via leaktest.
mux := http.NewServeMux()

var reqDoneMu sync.Mutex
var reqDone bool
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
cn, ok := w.(http.CloseNotifier)
@@ -301,7 +303,9 @@ func TestClientHealthcheckTimeoutLeak(t *testing.T) {
t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
}
<-cn.CloseNotify()
reqDoneMu.Lock()
reqDone = true
reqDoneMu.Unlock()
})

lis, err := net.Listen("tcp", "127.0.0.1:0")
@@ -346,9 +350,12 @@ func TestClientHealthcheckTimeoutLeak(t *testing.T) {
}

<-time.After(time.Second)
reqDoneMu.Lock()
if !reqDone {
reqDoneMu.Unlock()
t.Fatal("Request wasn't canceled or stopped")
}
reqDoneMu.Unlock()
}

// -- NewSimpleClient --
@@ -552,6 +559,7 @@ func TestClientSniffTimeoutLeak(t *testing.T) {
// leaks via leaktest.
mux := http.NewServeMux()

var reqDoneMu sync.Mutex
var reqDone bool
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
cn, ok := w.(http.CloseNotifier)
@@ -559,7 +567,9 @@ func TestClientSniffTimeoutLeak(t *testing.T) {
t.Fatalf("Writer is not CloseNotifier, but %v", reflect.TypeOf(w).Name())
}
<-cn.CloseNotify()
reqDoneMu.Lock()
reqDone = true
reqDoneMu.Unlock()
})

lis, err := net.Listen("tcp", "127.0.0.1:0")
@@ -605,9 +615,12 @@ func TestClientSniffTimeoutLeak(t *testing.T) {
}

<-time.After(time.Second)
reqDoneMu.Lock()
if !reqDone {
reqDoneMu.Unlock()
t.Fatal("Request wasn't canceled or stopped")
}
reqDoneMu.Unlock()
}

func TestClientExtractHostname(t *testing.T) {
@@ -973,6 +986,22 @@ func TestPerformRequestWithLoggerAndTracer(t *testing.T) {
t.Errorf("expected tracer output; got: %q", tgot)
}
}
func TestPerformRequestWithTracerOnError(t *testing.T) {
var tw bytes.Buffer
tout := log.New(&tw, "TRACER ", log.LstdFlags)

client, err := NewClient(SetTraceLog(tout), SetSniff(false))
if err != nil {
t.Fatal(err)
}

client.PerformRequest(context.TODO(), "GET", "/no-such-index", nil, nil)

tgot := tw.String()
if tgot == "" {
t.Errorf("expected tracer output; got: %q", tgot)
}
}

type customLogger struct {
out bytes.Buffer
@@ -1179,7 +1208,8 @@ func TestPerformRequestWithTimeout(t *testing.T) {
res *Response
err error
}
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

resc := make(chan result, 1)
go func() {

+ 90
- 0
config/config.go View File

@@ -0,0 +1,90 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package config

import (
"fmt"
"net/url"
"strconv"
"strings"
)

// Config represents an Elasticsearch configuration.
type Config struct {
URL string
Index string
Username string
Password string
Shards int
Replicas int
Sniff *bool
Infolog string
Errorlog string
Tracelog string
}

// Parse returns the Elasticsearch configuration by extracting it
// from the URL, its path, and its query string.
//
// Example:
// http://127.0.0.1:9200/store-blobs?shards=1&replicas=0&sniff=false&tracelog=elastic.trace.log
//
// The code above will return a URL of http://127.0.0.1:9200, an index name
// of store-blobs, and the related settings from the query string.
func Parse(elasticURL string) (*Config, error) {
cfg := &Config{
Shards: 1,
Replicas: 0,
Sniff: nil,
}

uri, err := url.Parse(elasticURL)
if err != nil {
return nil, fmt.Errorf("error parsing elastic parameter %q: %v", elasticURL, err)
}
index := uri.Path
if strings.HasPrefix(index, "/") {
index = index[1:]
}
if strings.HasSuffix(index, "/") {
index = index[:len(index)-1]
}
if index == "" {
return nil, fmt.Errorf("missing index in elastic parameter %q", elasticURL)
}
if uri.User != nil {
cfg.Username = uri.User.Username()
cfg.Password, _ = uri.User.Password()
}
uri.User = nil

if i, err := strconv.Atoi(uri.Query().Get("shards")); err == nil {
cfg.Shards = i
}
if i, err := strconv.Atoi(uri.Query().Get("replicas")); err == nil {
cfg.Replicas = i
}
if s := uri.Query().Get("sniff"); s != "" {
if b, err := strconv.ParseBool(s); err == nil {
cfg.Sniff = &b
}
}
if s := uri.Query().Get("infolog"); s != "" {
cfg.Infolog = s
}
if s := uri.Query().Get("errorlog"); s != "" {
cfg.Errorlog = s
}
if s := uri.Query().Get("tracelog"); s != "" {
cfg.Tracelog = s
}

uri.Path = ""
uri.RawQuery = ""
cfg.URL = uri.String()
cfg.Index = index

return cfg, nil
}

+ 45
- 0
config/config_test.go View File

@@ -0,0 +1,45 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package config

import "testing"

func TestParse(t *testing.T) {
urls := "http://user:pwd@elastic:19220/store-blobs?shards=5&replicas=2&sniff=true&errorlog=elastic.error.log&infolog=elastic.info.log&tracelog=elastic.trace.log"
cfg, err := Parse(urls)
if err != nil {
t.Fatal(err)
}
if want, got := "http://elastic:19220", cfg.URL; want != got {
t.Fatalf("expected URL = %q, got %q", want, got)
}
if want, got := "store-blobs", cfg.Index; want != got {
t.Fatalf("expected Index = %q, got %q", want, got)
}
if want, got := "user", cfg.Username; want != got {
t.Fatalf("expected Username = %q, got %q", want, got)
}
if want, got := "pwd", cfg.Password; want != got {
t.Fatalf("expected Password = %q, got %q", want, got)
}
if want, got := 5, cfg.Shards; want != got {
t.Fatalf("expected Shards = %v, got %v", want, got)
}
if want, got := 2, cfg.Replicas; want != got {
t.Fatalf("expected Replicas = %v, got %v", want, got)
}
if want, got := true, *cfg.Sniff; want != got {
t.Fatalf("expected Sniff = %v, got %v", want, got)
}
if want, got := "elastic.error.log", cfg.Errorlog; want != got {
t.Fatalf("expected Errorlog = %q, got %q", want, got)
}
if want, got := "elastic.info.log", cfg.Infolog; want != got {
t.Fatalf("expected Infolog = %q, got %q", want, got)
}
if want, got := "elastic.trace.log", cfg.Tracelog; want != got {
t.Fatalf("expected Tracelog = %q, got %q", want, got)
}
}

+ 9
- 0
config/doc.go View File

@@ -0,0 +1,9 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

/*
Package config allows parsing a configuration for Elasticsearch
from a URL.
*/
package config

+ 12
- 3
delete.go View File

@@ -7,6 +7,7 @@ package elastic
import (
"context"
"fmt"
"net/http"
"net/url"

"gopkg.in/olivere/elastic.v6/uritemplates"
@@ -168,7 +169,9 @@ func (s *DeleteService) Validate() error {
return nil
}

// Do executes the operation.
// Do executes the operation. If the document is not found (404), Elasticsearch will
// still return a response. This response is serialized and returned as well. In other
// words, for HTTP status code 404, both an error and a response might be returned.
func (s *DeleteService) Do(ctx context.Context) (*DeleteResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
@@ -182,7 +185,7 @@ func (s *DeleteService) Do(ctx context.Context) (*DeleteResponse, error) {
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "DELETE", path, params, nil)
res, err := s.client.PerformRequest(ctx, "DELETE", path, params, nil, http.StatusNotFound)
if err != nil {
return nil, err
}
@@ -192,6 +195,12 @@ func (s *DeleteService) Do(ctx context.Context) (*DeleteResponse, error) {
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}

// If we have a 404, we return both a result and an error, just like ES does
if res.StatusCode == http.StatusNotFound {
return ret, &Error{Status: http.StatusNotFound}
}

return ret, nil
}

@@ -204,7 +213,7 @@ type DeleteResponse struct {
Id string `json:"_id"`
Version int64 `json:"_version"`
Shards *shardsInfo `json:"_shards"`
Result bool `json:"string,omitempty"`
Result string `json:"result,omitempty"`
ForcedRefresh bool `json:"forced_refresh,omitempty"`
Found bool `json:"found"`
}

+ 12
- 11
delete_by_query.go View File

@@ -614,23 +614,24 @@ func (s *DeleteByQueryService) Do(ctx context.Context) (*BulkIndexByScrollRespon
// BulkIndexByScrollResponse is the outcome of executing Do with
// DeleteByQueryService and UpdateByQueryService.
type BulkIndexByScrollResponse struct {
Took int64 `json:"took"`
TimedOut bool `json:"timed_out"`
Total int64 `json:"total"`
Updated int64 `json:"updated"`
Created int64 `json:"created"`
Deleted int64 `json:"deleted"`
Batches int64 `json:"batches"`
VersionConflicts int64 `json:"version_conflicts"`
Noops int64 `json:"noops"`
Took int64 `json:"took"`
SliceId *int64 `json:"slice_id,omitempty"`
TimedOut bool `json:"timed_out"`
Total int64 `json:"total"`
Updated int64 `json:"updated,omitempty"`
Created int64 `json:"created,omitempty"`
Deleted int64 `json:"deleted"`
Batches int64 `json:"batches"`
VersionConflicts int64 `json:"version_conflicts"`
Noops int64 `json:"noops"`
Retries struct {
Bulk int64 `json:"bulk"`
Search int64 `json:"search"`
} `json:"retries"`
} `json:"retries,omitempty"`
Throttled string `json:"throttled"`
ThrottledMillis int64 `json:"throttled_millis"`
RequestsPerSecond float64 `json:"requests_per_second"`
Canceled string `json:"canceled"`
Canceled string `json:"canceled,omitempty"`
ThrottledUntil string `json:"throttled_until"`
ThrottledUntilMillis int64 `json:"throttled_until_millis"`
Failures []bulkIndexByScrollResponseFailure `json:"failures"`

+ 22
- 4
delete_test.go View File

@@ -69,13 +69,31 @@ func TestDelete(t *testing.T) {
// Delete non existent document 99
res, err = client.Delete().Index(testIndexName).Type("tweet").Id("99").Refresh("true").Do(context.TODO())
if err == nil {
t.Fatalf("expected error; got: %v", err)
t.Fatal("expected error")
}
if !IsNotFound(err) {
t.Errorf("expected NotFound error; got %v", err)
t.Fatalf("expected 404, got: %v", err)
}
if res != nil {
t.Fatalf("expected no response; got: %v", res)
if _, ok := err.(*Error); !ok {
t.Fatalf("expected error type *Error, got: %T", err)
}
if res == nil {
t.Fatal("expected response")
}
if res.Found {
t.Errorf("expected Found = false; got %v", res.Found)
}
if have, want := res.Id, "99"; have != want {
t.Errorf("expected _id = %q, got %q", have, want)
}
if have, want := res.Index, testIndexName; have != want {
t.Errorf("expected _index = %q, got %q", have, want)
}
if have, want := res.Type, "tweet"; have != want {
t.Errorf("expected _type = %q, got %q", have, want)
}
if have, want := res.Result, "not_found"; have != want {
t.Errorf("expected result = %q, got %q", have, want)
}

count, err = client.Count(testIndexName).Do(context.TODO())

+ 21
- 15
errors.go View File

@@ -93,32 +93,38 @@ func (e *Error) Error() string {
// returned HTTP status 404. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
func IsNotFound(err interface{}) bool {
switch e := err.(type) {
case *http.Response:
return e.StatusCode == http.StatusNotFound
case *Error:
return e.Status == http.StatusNotFound
case Error:
return e.Status == http.StatusNotFound
case int:
return e == http.StatusNotFound
}
return false
return IsStatusCode(err, http.StatusNotFound)
}

// IsTimeout returns true if the given error indicates that Elasticsearch
// returned HTTP status 408. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
func IsTimeout(err interface{}) bool {
return IsStatusCode(err, http.StatusRequestTimeout)
}

// IsConflict returns true if the given error indicates that the Elasticsearch
// operation resulted in a version conflict. This can occur in operations like
// `update` or `index` with `op_type=create`. The err parameter can be of
// type *elastic.Error, elastic.Error, *http.Response or int (indicating the
// HTTP status code).
func IsConflict(err interface{}) bool {
return IsStatusCode(err, http.StatusConflict)
}

// IsStatusCode returns true if the given error indicates that the Elasticsearch
// operation returned the specified HTTP status code. The err parameter can be of
// type *http.Response, *Error, Error, or int (indicating the HTTP status code).
func IsStatusCode(err interface{}, code int) bool {
switch e := err.(type) {
case *http.Response:
return e.StatusCode == http.StatusRequestTimeout
return e.StatusCode == code
case *Error:
return e.Status == http.StatusRequestTimeout
return e.Status == code
case Error:
return e.Status == http.StatusRequestTimeout
return e.Status == code
case int:
return e == http.StatusRequestTimeout
return e == code
}
return false
}

+ 93
- 0
errors_test.go View File

@@ -200,3 +200,96 @@ func TestIsTimeout(t *testing.T) {
t.Errorf("expected %v; got: %v", want, got)
}
}

func TestIsConflict(t *testing.T) {
if got, want := IsConflict(nil), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(""), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(200), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(http.StatusConflict), true; got != want {
t.Errorf("expected %v; got: %v", want, got)
}

if got, want := IsConflict(&Error{Status: 409}), true; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(&Error{Status: 200}), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}

if got, want := IsConflict(Error{Status: 409}), true; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(Error{Status: 200}), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}

if got, want := IsConflict(&http.Response{StatusCode: 409}), true; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
if got, want := IsConflict(&http.Response{StatusCode: 200}), false; got != want {
t.Errorf("expected %v; got: %v", want, got)
}
}

func TestIsStatusCode(t *testing.T) {
tests := []struct {
Error interface{}
Code int
Want bool
}{
// #0
{
Error: nil,
Code: 200,
Want: false,
},
// #1
{
Error: "",
Code: 200,
Want: false,
},
// #2
{
Error: http.StatusConflict,
Code: 409,
Want: true,
},
// #3
{
Error: http.StatusConflict,
Code: http.StatusInternalServerError,
Want: false,
},
// #4
{
Error: &Error{Status: http.StatusConflict},
Code: 409,
Want: true,
},
// #5
{
Error: Error{Status: http.StatusConflict},
Code: 409,
Want: true,
},
// #6
{
Error: &http.Response{StatusCode: http.StatusConflict},
Code: 409,
Want: true,
},
}

for i, tt := range tests {
if have, want := IsStatusCode(tt.Error, tt.Code), tt.Want; have != want {
t.Errorf("#%d: have %v, want %v", i, have, want)
}
}
}

config/elasticsearch.yml → etc/elasticsearch.yml View File


config/jvm.options → etc/jvm.options View File


config/log4j2.properties → etc/log4j2.properties View File


config/scripts/.gitkeep → etc/scripts/.gitkeep View File


+ 37
- 21
fetch_source_context.go View File

@@ -9,13 +9,20 @@ import (
"strings"
)

// FetchSourceContext enables source filtering, i.e. it allows control
// over how the _source field is returned with every hit. It is used
// with various endpoints, e.g. when searching for documents, retrieving
// individual documents, or even updating documents.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html
// for details.
type FetchSourceContext struct {
fetchSource bool
transformSource bool
includes []string
excludes []string
fetchSource bool
includes []string
excludes []string
}

// NewFetchSourceContext returns a new FetchSourceContext.
func NewFetchSourceContext(fetchSource bool) *FetchSourceContext {
return &FetchSourceContext{
fetchSource: fetchSource,
@@ -24,51 +31,60 @@ func NewFetchSourceContext(fetchSource bool) *FetchSourceContext {
}
}

// FetchSource indicates whether to return the _source.
func (fsc *FetchSourceContext) FetchSource() bool {
return fsc.fetchSource
}

// SetFetchSource specifies whether to return the _source.
func (fsc *FetchSourceContext) SetFetchSource(fetchSource bool) {
fsc.fetchSource = fetchSource
}

// Include indicates to return specific parts of the _source.
// Wildcards are allowed here.
func (fsc *FetchSourceContext) Include(includes ...string) *FetchSourceContext {
fsc.includes = append(fsc.includes, includes...)
return fsc
}

// Exclude indicates to exclude specific parts of the _source.
// Wildcards are allowed here.
func (fsc *FetchSourceContext) Exclude(excludes ...string) *FetchSourceContext {
fsc.excludes = append(fsc.excludes, excludes...)
return fsc
}

func (fsc *FetchSourceContext) TransformSource(transformSource bool) *FetchSourceContext {
fsc.transformSource = transformSource
return fsc
}

// Source returns the JSON-serializable data to be used in a body.
func (fsc *FetchSourceContext) Source() (interface{}, error) {
if !fsc.fetchSource {
return false, nil
}
return map[string]interface{}{
"includes": fsc.includes,
"excludes": fsc.excludes,
}, nil
if len(fsc.includes) == 0 && len(fsc.excludes) == 0 {
return true, nil
}
src := make(map[string]interface{})
if len(fsc.includes) > 0 {
src["includes"] = fsc.includes
}
if len(fsc.excludes) > 0 {
src["excludes"] = fsc.excludes
}
return src, nil
}

// Query returns the parameters in a form suitable for a URL query string.
func (fsc *FetchSourceContext) Query() url.Values {
params := url.Values{}
if !fsc.fetchSource {
if fsc.fetchSource {
if len(fsc.includes) > 0 {
params.Add("_source_include", strings.Join(fsc.includes, ","))
}
if len(fsc.excludes) > 0 {
params.Add("_source_exclude", strings.Join(fsc.excludes, ","))
}
} else {
params.Add("_source", "false")
return params
}
if len(fsc.includes) > 0 {
params.Add("_source_include", strings.Join(fsc.includes, ","))
}
if len(fsc.excludes) > 0 {
params.Add("_source_exclude", strings.Join(fsc.excludes, ","))
}
return params
}

+ 2
- 2
fetch_source_context_test.go View File

@@ -54,7 +54,7 @@ func TestFetchSourceContextFetchSource(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"excludes":[],"includes":[]}`
expected := `true`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
@@ -71,7 +71,7 @@ func TestFetchSourceContextFetchSourceWithIncludesOnly(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"excludes":[],"includes":["a","b"]}`
expected := `{"includes":["a","b"]}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}

+ 1
- 1
get_test.go View File

@@ -45,7 +45,7 @@ func TestGet(t *testing.T) {
}

func TestGetWithSourceFiltering(t *testing.T) {
client := setupTestClientAndCreateIndex(t)
client := setupTestClientAndCreateIndex(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
_, err := client.Index().Index(testIndexName).Type("tweet").Id("1").BodyJson(&tweet1).Do(context.TODO())

+ 7
- 7
indices_put_mapping.go View File

@@ -27,7 +27,7 @@ type IndicesPutMappingService struct {
ignoreUnavailable *bool
allowNoIndices *bool
expandWildcards string
ignoreConflicts *bool
updateAllTypes *bool
timeout string
bodyJson map[string]interface{}
bodyString string
@@ -94,10 +94,10 @@ func (s *IndicesPutMappingService) ExpandWildcards(expandWildcards string) *Indi
return s
}

// IgnoreConflicts specifies whether to ignore conflicts while updating
// the mapping (default: false).
func (s *IndicesPutMappingService) IgnoreConflicts(ignoreConflicts bool) *IndicesPutMappingService {
s.ignoreConflicts = &ignoreConflicts
// UpdateAllTypes, if true, indicates that all fields that span multiple indices
// should be updated (default: false).
func (s *IndicesPutMappingService) UpdateAllTypes(updateAllTypes bool) *IndicesPutMappingService {
s.updateAllTypes = &updateAllTypes
return s
}

@@ -153,8 +153,8 @@ func (s *IndicesPutMappingService) buildURL() (string, url.Values, error) {
if s.expandWildcards != "" {
params.Set("expand_wildcards", s.expandWildcards)
}
if s.ignoreConflicts != nil {
params.Set("ignore_conflicts", fmt.Sprintf("%v", *s.ignoreConflicts))
if s.updateAllTypes != nil {
params.Set("update_all_types", fmt.Sprintf("%v", *s.updateAllTypes))
}
if s.timeout != "" {
params.Set("timeout", s.timeout)

+ 124
- 1
reindex.go View File

@@ -280,6 +280,48 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
return ret, nil
}

// DoAsync executes the reindexing operation asynchronously by starting a new task.
// Callers need to use the Task Management API to watch the outcome of the reindexing
// operation.
func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// DoAsync only makes sense with WaitForCompletion set to true
if s.waitForCompletion != nil && *s.waitForCompletion {
return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
}
f := false
s.waitForCompletion = &f

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Setup HTTP request body
body, err := s.getBody()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
if err != nil {
return nil, err
}

// Return operation response
ret := new(StartTaskResult)
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}
return ret, nil
}

// -- Source of Reindex --

// ReindexSource specifies the source of a Reindex process.
@@ -295,6 +337,7 @@ type ReindexSource struct {
sorts []SortInfo
sorters []Sorter
searchSource *SearchSource
remoteInfo *ReindexRemoteInfo
}

// NewReindexSource creates a new ReindexSource.
@@ -359,12 +402,18 @@ func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
return s
}

// SortBy adds a sort order.
// SortBy adds a sort order.
func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
s.sorters = append(s.sorters, sorter...)
return s
}

// RemoteInfo sets up reindexing from a remote cluster.
func (s *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
s.remoteInfo = ri
return s
}

// Source returns a serializable JSON request for the request.
func (r *ReindexSource) Source() (interface{}, error) {
source := make(map[string]interface{})
@@ -415,6 +464,14 @@ func (r *ReindexSource) Source() (interface{}, error) {
source["scroll"] = r.scroll
}

if r.remoteInfo != nil {
src, err := r.remoteInfo.Source()
if err != nil {
return nil, err
}
source["remote"] = src
}

if len(r.sorters) > 0 {
var sortarr []interface{}
for _, sorter := range r.sorters {
@@ -440,6 +497,72 @@ func (r *ReindexSource) Source() (interface{}, error) {
return source, nil
}

// ReindexRemoteInfo contains information for reindexing from a remote cluster.
type ReindexRemoteInfo struct {
host string
username string
password string
socketTimeout string // e.g. "1m" or "30s"
connectTimeout string // e.g. "1m" or "30s"
}

// NewReindexRemoteInfo creates a new ReindexRemoteInfo.
func NewReindexRemoteInfo() *ReindexRemoteInfo {
return &ReindexRemoteInfo{}
}

// Host sets the host information of the remote cluster.
// It must be of the form "http(s)://<hostname>:<port>"
func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo {
ri.host = host
return ri
}

// Username sets the username to authenticate with the remote cluster.
func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo {
ri.username = username
return ri
}

// Password sets the password to authenticate with the remote cluster.
func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo {
ri.password = password
return ri
}

// SocketTimeout sets the socket timeout to connect with the remote cluster.
// Use ES compatible values like e.g. "30s" or "1m".
func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo {
ri.socketTimeout = timeout
return ri
}

// ConnectTimeout sets the connection timeout to connect with the remote cluster.
// Use ES compatible values like e.g. "30s" or "1m".
func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo {
ri.connectTimeout = timeout
return ri
}

// Source returns the serializable JSON data for the request.
func (ri *ReindexRemoteInfo) Source() (interface{}, error) {
res := make(map[string]interface{})
res["host"] = ri.host
if len(ri.username) > 0 {
res["username"] = ri.username
}
if len(ri.password) > 0 {
res["password"] = ri.password
}
if len(ri.socketTimeout) > 0 {
res["socket_timeout"] = ri.socketTimeout
}
if len(ri.connectTimeout) > 0 {
res["connect_timeout"] = ri.connectTimeout
}
return res, nil
}

// -source Destination of Reindex --

// ReindexDestination is the destination of a Reindex API call.

+ 110
- 0
reindex_test.go View File

@@ -82,6 +82,31 @@ func TestReindexSourceWithSourceAndDestinationAndVersionType(t *testing.T) {
}
}

func TestReindexSourceWithSourceAndRemoteAndDestination(t *testing.T) {
client := setupTestClient(t)
src := NewReindexSource().Index("twitter").RemoteInfo(
NewReindexRemoteInfo().Host("http://otherhost:9200").
Username("alice").
Password("secret").
ConnectTimeout("10s").
SocketTimeout("1m"),
)
dst := NewReindexDestination().Index("new_twitter")
out, err := client.Reindex().Source(src).Destination(dst).getBody()
if err != nil {
t.Fatal(err)
}
b, err := json.Marshal(out)
if err != nil {
t.Fatal(err)
}
got := string(b)
want := `{"dest":{"index":"new_twitter"},"source":{"index":"twitter","remote":{"connect_timeout":"10s","host":"http://otherhost:9200","password":"secret","socket_timeout":"1m","username":"alice"}}}`
if got != want {
t.Fatalf("\ngot %s\nwant %s", got, want)
}
}

func TestReindexSourceWithSourceAndDestinationAndOpType(t *testing.T) {
client := setupTestClient(t)
src := NewReindexSource().Index("twitter")
@@ -289,3 +314,88 @@ func TestReindex(t *testing.T) {
t.Fatalf("expected %d documents; got: %d", sourceCount, targetCount)
}
}

func TestReindexAsync(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
}

sourceCount, err := client.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if sourceCount <= 0 {
t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
}

targetCount, err := client.Count(testIndexName2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if targetCount != 0 {
t.Fatalf("expected %d documents; got: %d", 0, targetCount)
}

// Simple copying
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
res, err := client.Reindex().Source(src).Destination(dst).DoAsync(context.TODO())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected result != nil")
}
if res.TaskId == "" {
t.Errorf("expected a task id, got %+v", res)
}

tasksGetTask := client.TasksGetTask()
taskStatus, err := tasksGetTask.TaskId(res.TaskId).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if taskStatus == nil {
t.Fatal("expected task status result != nil")
}
}

func TestReindexWithWaitForCompletionTrueCannotBeStarted(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
}

sourceCount, err := client.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if sourceCount <= 0 {
t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
}

targetCount, err := client.Count(testIndexName2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if targetCount != 0 {
t.Fatalf("expected %d documents; got: %d", 0, targetCount)
}

// DoAsync should fail when WaitForCompletion is true
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
_, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).DoAsync(context.TODO())
if err == nil {
t.Fatal("error should have been returned")
}
}

+ 3
- 3
retry_test.go View File

@@ -21,14 +21,14 @@ func TestRetry(t *testing.T) {
// This function is successfull on "successOn" calls.
f := func() error {
i++
t.Logf("function is called %d. time\n", i)
// t.Logf("function is called %d. time\n", i)

if i == successOn {
t.Log("OK")
// t.Log("OK")
return nil
}

t.Log("error")
// t.Log("error")
return errors.New("error")
}


+ 12
- 0
search_aggs_bucket_date_range.go View File

@@ -192,20 +192,32 @@ func (a *DateRangeAggregation) Source() (interface{}, error) {
switch from := ent.From.(type) {
case int, int16, int32, int64, float32, float64:
r["from"] = from
case *int, *int16, *int32, *int64, *float32, *float64:
r["from"] = from
case time.Time:
r["from"] = from.Format(time.RFC3339)
case *time.Time:
r["from"] = from.Format(time.RFC3339)
case string:
r["from"] = from
case *string:
r["from"] = from
}
}
if ent.To != nil {
switch to := ent.To.(type) {
case int, int16, int32, int64, float32, float64:
r["to"] = to
case *int, *int16, *int32, *int64, *float32, *float64:
r["to"] = to
case time.Time:
r["to"] = to.Format(time.RFC3339)
case *time.Time:
r["to"] = to.Format(time.RFC3339)
case string:
r["to"] = to
case *string:
r["to"] = to
}
}
ranges = append(ranges, r)

+ 25
- 0
search_aggs_bucket_date_range_test.go View File

@@ -29,6 +29,31 @@ func TestDateRangeAggregation(t *testing.T) {
}
}

func TestDateRangeAggregationWithPointers(t *testing.T) {
d1 := "2012-12-31"
d2 := "2013-01-01"
d3 := "2013-12-31"
d4 := "2014-01-01"

agg := NewDateRangeAggregation().Field("created_at")
agg = agg.AddRange(nil, &d1)
agg = agg.AddRange(d2, &d3)
agg = agg.AddRange(d4, nil)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"date_range":{"field":"created_at","ranges":[{"to":"2012-12-31"},{"from":"2013-01-01","to":"2013-12-31"},{"from":"2014-01-01"}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestDateRangeAggregationWithUnbounded(t *testing.T) {
agg := NewDateRangeAggregation().Field("created_at").
AddUnboundedFrom("2012-12-31").

+ 4
- 0
search_aggs_bucket_geo_distance.go View File

@@ -156,6 +156,8 @@ func (a *GeoDistanceAggregation) Source() (interface{}, error) {
r["from"] = from
case string:
r["from"] = from
case *string:
r["from"] = from
}
}
if ent.To != nil {
@@ -166,6 +168,8 @@ func (a *GeoDistanceAggregation) Source() (interface{}, error) {
r["to"] = to
case string:
r["to"] = to
case *string:
r["to"] = to
}
}
ranges = append(ranges, r)

+ 22
- 0
search_aggs_bucket_geo_distance_test.go View File

@@ -29,6 +29,28 @@ func TestGeoDistanceAggregation(t *testing.T) {
}
}

func TestGeoDistanceAggregationWithPointers(t *testing.T) {
hundred := 100
threeHundred := 300
agg := NewGeoDistanceAggregation().Field("location").Point("52.3760, 4.894")
agg = agg.AddRange(nil, &hundred)
agg = agg.AddRange(hundred, &threeHundred)
agg = agg.AddRange(threeHundred, nil)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"geo_distance":{"field":"location","origin":"52.3760, 4.894","ranges":[{"to":100},{"from":100,"to":300},{"from":300}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestGeoDistanceAggregationWithUnbounded(t *testing.T) {
agg := NewGeoDistanceAggregation().Field("location").Point("52.3760, 4.894")
agg = agg.AddUnboundedFrom(100)

+ 12
- 0
search_aggs_bucket_range.go View File

@@ -191,20 +191,32 @@ func (a *RangeAggregation) Source() (interface{}, error) {
switch from := ent.From.(type) {
case int, int16, int32, int64, float32, float64:
r["from"] = from
case *int, *int16, *int32, *int64, *float32, *float64:
r["from"] = from
case time.Time:
r["from"] = from.Format(time.RFC3339)
case *time.Time:
r["from"] = from.Format(time.RFC3339)
case string:
r["from"] = from
case *string:
r["from"] = from
}
}
if ent.To != nil {
switch to := ent.To.(type) {
case int, int16, int32, int64, float32, float64:
r["to"] = to
case *int, *int16, *int32, *int64, *float32, *float64:
r["to"] = to
case time.Time:
r["to"] = to.Format(time.RFC3339)
case *time.Time:
r["to"] = to.Format(time.RFC3339)
case string:
r["to"] = to
case *string:
r["to"] = to
}
}
ranges = append(ranges, r)

+ 22
- 0
search_aggs_bucket_range_test.go View File

@@ -29,6 +29,28 @@ func TestRangeAggregation(t *testing.T) {
}
}

func TestRangeAggregationWithPointers(t *testing.T) {
fifty := 50
hundred := 100
agg := NewRangeAggregation().Field("price")
agg = agg.AddRange(nil, &fifty)
agg = agg.AddRange(fifty, &hundred)
agg = agg.AddRange(hundred, nil)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"range":{"field":"price","ranges":[{"to":50},{"from":50,"to":100},{"from":100}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestRangeAggregationWithUnbounded(t *testing.T) {
agg := NewRangeAggregation().Field("field_name").
AddUnboundedFrom(50).

+ 1
- 1
search_aggs_metrics_top_hits_test.go View File

@@ -24,7 +24,7 @@ func TestTopHitsAggregation(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_hits":{"_source":{"excludes":[],"includes":["title"]},"size":1,"sort":[{"last_activity_date":{"order":"desc"}}]}}`
expected := `{"top_hits":{"_source":{"includes":["title"]},"size":1,"sort":[{"last_activity_date":{"order":"desc"}}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}

+ 2
- 1
search_queries_terms.go View File

@@ -20,7 +20,8 @@ type TermsQuery struct {
// NewTermsQuery creates and initializes a new TermsQuery.
func NewTermsQuery(name string, values ...interface{}) *TermsQuery {
q := &TermsQuery{
name: name,
name: name,
values: make([]interface{}, 0),
}
if len(values) > 0 {
q.values = append(q.values, values...)

+ 18
- 0
search_queries_terms_test.go View File

@@ -26,6 +26,24 @@ func TestTermsQuery(t *testing.T) {
}
}

func TestTermsQueryWithEmptyArray(t *testing.T) {
included := make([]interface{}, 0)
q := NewTermsQuery("tags", included...)
src, err := q.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"terms":{"tags":[]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTermsQueryWithTermsLookup(t *testing.T) {
q := NewTermsQuery("user").
TermsLookup(NewTermsLookup().Index("users").Type("user").Id("2").Path("followers"))

+ 1
- 1
suggester_term.go View File

@@ -214,7 +214,7 @@ func (q *TermSuggester) Source(includeName bool) (interface{}, error) {
suggester["max_term_freq"] = *q.maxTermFreq
}
if q.prefixLength != nil {
suggester["prefix_len"] = *q.prefixLength
suggester["prefix_length"] = *q.prefixLength
}
if q.minWordLength != nil {
suggester["min_word_len"] = *q.minWordLength

+ 20
- 0
suggester_term_test.go View File

@@ -27,3 +27,23 @@ func TestTermSuggesterSource(t *testing.T) {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTermSuggesterWithPrefixLengthSource(t *testing.T) {
s := NewTermSuggester("name").
Text("n").
Field("suggest").
PrefixLength(0)
src, err := s.Source(true)
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"name":{"text":"n","term":{"field":"suggest","prefix_length":0}}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

+ 104
- 0
tasks_get_task.go View File

@@ -0,0 +1,104 @@
package elastic

import (
"context"
"fmt"
"net/url"

"gopkg.in/olivere/elastic.v5/uritemplates"
)

// TasksGetTaskService retrieves the state of a task in the cluster. It is part of the Task Management API
// documented at http://www.elastic.co/guide/en/elasticsearch/reference/5.2/tasks-list.html.
//
// It is supported as of Elasticsearch 2.3.0.
type TasksGetTaskService struct {
client *Client
pretty bool
taskId string
waitForCompletion *bool
}

// NewTasksGetTaskService creates a new TasksGetTaskService.
func NewTasksGetTaskService(client *Client) *TasksGetTaskService {
return &TasksGetTaskService{
client: client,
}
}

// TaskId indicates to return the task with specified id.
func (s *TasksGetTaskService) TaskId(taskId string) *TasksGetTaskService {
s.taskId = taskId
return s
}

// WaitForCompletion indicates whether to wait for the matching tasks
// to complete (default: false).
func (s *TasksGetTaskService) WaitForCompletion(waitForCompletion bool) *TasksGetTaskService {
s.waitForCompletion = &waitForCompletion
return s
}

// Pretty indicates that the JSON response be indented and human readable.
func (s *TasksGetTaskService) Pretty(pretty bool) *TasksGetTaskService {
s.pretty = pretty
return s
}

// buildURL builds the URL for the operation.
func (s *TasksGetTaskService) buildURL() (string, url.Values, error) {
// Build URL
path, err := uritemplates.Expand("/_tasks/{task_id}", map[string]string{
"task_id": s.taskId,
})
if err != nil {
return "", url.Values{}, err
}

// Add query string parameters
params := url.Values{}
if s.pretty {
params.Set("pretty", "1")
}
if s.waitForCompletion != nil {
params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
}
return path, params, nil
}

// Validate checks if the operation is valid.
func (s *TasksGetTaskService) Validate() error {
return nil
}

// Do executes the operation.
func (s *TasksGetTaskService) Do(ctx context.Context) (*TasksGetTaskResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}

// Return operation response
ret := new(TasksGetTaskResponse)
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}
return ret, nil
}

type TasksGetTaskResponse struct {
Completed bool `json:"completed"`
Task *TaskInfo `json:"task,omitempty"`
}

+ 43
- 0
tasks_get_task_test.go View File

@@ -0,0 +1,43 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"testing"
)

func TestTasksGetTaskBuildURL(t *testing.T) {
client := setupTestClient(t)

// Get specific task
got, _, err := client.TasksGetTask().TaskId("123").buildURL()
if err != nil {
t.Fatal(err)
}
want := "/_tasks/123"
if got != want {
t.Errorf("want %q; got %q", want, got)
}
}

/*
func TestTasksGetTask(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Tasks Management API yet", esversion)
}
res, err := client.TasksGetTask().TaskId("123").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("response is nil")
}
}
*/

+ 9
- 2
tasks_list.go View File

@@ -204,11 +204,18 @@ type TaskInfo struct {
Id int64 `json:"id"` // the task id
Type string `json:"type"`
Action string `json:"action"`
Status interface{} `json:"status"`
Description interface{} `json:"description"`
Status interface{} `json:"status"` // has separate implementations of Task.Status in Java for reindexing, replication, and "RawTaskStatus"
Description interface{} `json:"description"` // same as Status
StartTime string `json:"start_time"`
StartTimeInMillis int64 `json:"start_time_in_millis"`
RunningTime string `json:"running_time"`
RunningTimeInNanos int64 `json:"running_time_in_nanos"`
Cancellable bool `json:"cancellable"`
ParentTaskId string `json:"parent_task_id"` // like "YxJnVYjwSBm_AUbzddTajQ:12356"
}

// StartTaskResult is used in cases where a task gets started asynchronously and
// the operation simply returnes a TaskID to watch for via the Task Management API.
type StartTaskResult struct {
TaskId string `json:"task"`
}

+ 25
- 0
update.go View File

@@ -25,6 +25,7 @@ type UpdateService struct {
parent string
script *Script
fields []string
fsc *FetchSourceContext
version *int64
versionType string
retryOnConflict *int
@@ -172,6 +173,23 @@ func (b *UpdateService) Pretty(pretty bool) *UpdateService {
return b
}

// FetchSource asks Elasticsearch to return the updated _source in the response.
func (s *UpdateService) FetchSource(fetchSource bool) *UpdateService {
if s.fsc == nil {
s.fsc = NewFetchSourceContext(fetchSource)
} else {
s.fsc.SetFetchSource(fetchSource)
}
return s
}

// FetchSourceContext indicates that _source should be returned in the response,
// allowing wildcard patterns to be defined via FetchSourceContext.
func (s *UpdateService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *UpdateService {
s.fsc = fetchSourceContext
return s
}

// url returns the URL part of the document request.
func (b *UpdateService) url() (string, url.Values, error) {
// Build url
@@ -250,6 +268,13 @@ func (b *UpdateService) body() (interface{}, error) {
if b.detectNoop != nil {
source["detect_noop"] = *b.detectNoop
}
if b.fsc != nil {
src, err := b.fsc.Source()
if err != nil {
return nil, err
}
source["_source"] = src
}

return source, nil
}

+ 68
- 0
update_test.go View File

@@ -5,6 +5,7 @@
package elastic

import (
"context"
"encoding/json"
"net/url"
"testing"
@@ -231,3 +232,70 @@ func TestUpdateViaDocAndUpsert(t *testing.T) {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

func TestUpdateViaDocAndUpsertAndFetchSource(t *testing.T) {
client := setupTestClient(t)
update := client.Update().
Index("test").Type("type1").Id("1").
Doc(map[string]interface{}{"name": "new_name"}).
DocAsUpsert(true).
Timeout("1s").
Refresh("true").
FetchSource(true)
path, params, err := update.url()
if err != nil {
t.Fatalf("expected to return URL, got: %v", err)
}
expectedPath := `/test/type1/1/_update`
if expectedPath != path {
t.Errorf("expected URL path\n%s\ngot:\n%s", expectedPath, path)
}
expectedParams := url.Values{
"refresh": []string{"true"},
"timeout": []string{"1s"},
}
if expectedParams.Encode() != params.Encode() {
t.Errorf("expected URL parameters\n%s\ngot:\n%s", expectedParams.Encode(), params.Encode())
}
body, err := update.body()
if err != nil {
t.Fatalf("expected to return body, got: %v", err)
}
data, err := json.Marshal(body)
if err != nil {
t.Fatalf("expected to marshal body as JSON, got: %v", err)
}
got := string(data)
expected := `{"_source":true,"doc":{"name":"new_name"},"doc_as_upsert":true}`
if got != expected {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

func TestUpdateAndFetchSource(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
res, err := client.Update().
Index(testIndexName).Type("tweet").Id("1").
Doc(map[string]interface{}{"user": "sandrae"}).
DetectNoop(true).
FetchSource(true).
Do(context.Background())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected response != nil")
}
if res.GetResult == nil {
t.Fatal("expected GetResult != nil")
}
data, err := json.Marshal(res.GetResult.Source)
if err != nil {
t.Fatalf("expected to marshal body as JSON, got: %v", err)
}
got := string(data)
expected := `{"user":"sandrae","message":"Welcome to Golang and Elasticsearch.","retweets":0,"created":"0001-01-01T00:00:00Z"}`
if got != expected {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

Loading…
Cancel
Save