From 2859e9f94608b44c9c351c7ccfcff21665573627 Mon Sep 17 00:00:00 2001 From: Sky Ao Date: Thu, 15 Oct 2020 15:34:16 +0800 Subject: [PATCH 1/4] update grpc-go version to v1.32.0 which has some breaking api changes in naming and loadbalancer package --- clientv3/balancer/balancer.go | 39 ++- clientv3/balancer/picker/err.go | 6 +- .../balancer/picker/roundrobin_balanced.go | 7 +- clientv3/naming/grpc.go | 27 +- clientv3/naming/grpcnaming/naming.go | 73 +++++ proxy/grpcproxy/cluster.go | 2 +- proxy/grpcproxy/register.go | 2 +- 7 files changed, 424 insertions(+), 46 deletions(-) create mode 100644 clientv3/naming/grpcnaming/naming.go diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index 3eecb9d1d2..d3e672c632 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -136,15 +136,34 @@ type baseBalancer struct { connectivityRecorder connectivity.Recorder picker picker.Picker + + resolverErr error // the last error reported by the resolver; cleared on successful resolution } -// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. -// gRPC sends initial or updated resolved addresses from "Build". -func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { - if err != nil { - bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) +func (bb *baseBalancer) ResolverError(err error) { + bb.resolverErr = err + if len(bb.addrToSc) == 0 { + bb.connectivityRecorder.RecordTransition(bb.connectivityRecorder.GetCurrentState(), grpcconnectivity.TransientFailure) + } + + if bb.connectivityRecorder.GetCurrentState() != grpcconnectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. return } + bb.updatePicker() + bb.currentConn.UpdateState(balancer.State{ + ConnectivityState: bb.connectivityRecorder.GetCurrentState(), + Picker: bb.picker, + }) +} + +// UpdateClientConnState implements "grpc/balancer.Balancer" interface. +func (bb *baseBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + addrs := state.ResolverState.Addresses + // Successful resolution; clear resolver error and ensure we return nil. + bb.resolverErr = nil + bb.lg.Info("resolved", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), @@ -191,10 +210,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) // (DO NOT) delete(bb.scToSt, sc) } } + + return nil } -// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. -func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) { +// UpdateSubConnState implements "grpc/balancer.Balancer" interface. +func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + s := state.ConnectivityState + bb.mu.Lock() defer bb.mu.Unlock() @@ -247,7 +270,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconn bb.updatePicker() } - bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker) + bb.currentConn.UpdateState(balancer.State{ConnectivityState: bb.connectivityRecorder.GetCurrentState(), Picker: bb.picker}) } func (bb *baseBalancer) updatePicker() { diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go index f4b941d652..a37baa7bd2 100644 --- a/clientv3/balancer/picker/err.go +++ b/clientv3/balancer/picker/err.go @@ -15,8 +15,6 @@ package picker import ( - "context" - "google.golang.org/grpc/balancer" ) @@ -34,6 +32,6 @@ func (ep *errPicker) String() string { return ep.p.String() } -func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { - return nil, nil, ep.err +func (ep *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{}, ep.err } diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go index e3971ecc42..993c05ec13 100644 --- a/clientv3/balancer/picker/roundrobin_balanced.go +++ b/clientv3/balancer/picker/roundrobin_balanced.go @@ -15,7 +15,6 @@ package picker import ( - "context" "sync" "go.uber.org/zap" @@ -52,12 +51,12 @@ type rrBalanced struct { func (rb *rrBalanced) String() string { return rb.p.String() } // Pick is called for every client request. -func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { +func (rb *rrBalanced) Pick(opts balancer.PickInfo) (balancer.PickResult, error) { rb.mu.RLock() n := len(rb.scs) rb.mu.RUnlock() if n == 0 { - return nil, nil, balancer.ErrNoSubConnAvailable + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } rb.mu.Lock() @@ -91,5 +90,5 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance rb.lg.Warn("balancer failed", fss...) } } - return sc, doneFunc, nil + return balancer.PickResult{SubConn: sc, Done: doneFunc}, nil } diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go index b680274bb3..f7cd2bd8a6 100644 --- a/clientv3/naming/grpc.go +++ b/clientv3/naming/grpc.go @@ -20,9 +20,9 @@ import ( "fmt" etcd "go.etcd.io/etcd/clientv3" + gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming" "google.golang.org/grpc/codes" - "google.golang.org/grpc/naming" "google.golang.org/grpc/status" ) @@ -34,15 +33,15 @@ type GRPCResolver struct { Client *etcd.Client } -func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { +func (gr *GRPCResolver) Update(ctx context.Context, target string, nm gnaming.Update, opts ...etcd.OpOption) (err error) { switch nm.Op { - case naming.Add: + case gnaming.Add: var v []byte if v, err = json.Marshal(nm); err != nil { return status.Error(codes.InvalidArgument, err.Error()) } _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - case naming.Delete: + case gnaming.Delete: _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) default: return status.Error(codes.InvalidArgument, "naming: bad naming op") @@ -50,7 +49,7 @@ func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Upd return err } -func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { +func (gr *GRPCResolver) Resolve(target string) (gnaming.Watcher, error) { ctx, cancel := context.WithCancel(context.Background()) w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} return w, nil @@ -68,7 +67,7 @@ type gRPCWatcher struct { // Next gets the next set of updates from the etcd resolver. // Calls to Next should be serialized; concurrent calls are not safe since // there is no way to reconcile the update ordering. -func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { +func (gw *gRPCWatcher) Next() ([]*gnaming.Update, error) { if gw.wch == nil { // first Next() returns all addresses return gw.firstNext() @@ -87,17 +86,17 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { return nil, gw.err } - updates := make([]*naming.Update, 0, len(wr.Events)) + updates := make([]*gnaming.Update, 0, len(wr.Events)) for _, e := range wr.Events { - var jupdate naming.Update + var jupdate gnaming.Update var err error switch e.Type { case etcd.EventTypePut: err = json.Unmarshal(e.Kv.Value, &jupdate) - jupdate.Op = naming.Add + jupdate.Op = gnaming.Add case etcd.EventTypeDelete: err = json.Unmarshal(e.PrevKv.Value, &jupdate) - jupdate.Op = naming.Delete + jupdate.Op = gnaming.Delete default: continue } @@ -108,7 +107,7 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { return updates, nil } -func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { +func (gw *gRPCWatcher) firstNext() ([]*gnaming.Update, error) { // Use serialized request so resolution still works if the target etcd // server is partitioned away from the quorum. resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) @@ -116,9 +115,9 @@ func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { return nil, err } - updates := make([]*naming.Update, 0, len(resp.Kvs)) + updates := make([]*gnaming.Update, 0, len(resp.Kvs)) for _, kv := range resp.Kvs { - var jupdate naming.Update + var jupdate gnaming.Update if err := json.Unmarshal(kv.Value, &jupdate); err != nil { continue } diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go new file mode 100644 index 0000000000..9b1f20cc61 --- /dev/null +++ b/clientv3/naming/grpcnaming/naming.go @@ -0,0 +1,73 @@ +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package naming defines the naming API and related data structures for gRPC. +// +// This package is deprecated: please use package resolver instead. + + +// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1. +// The package of grpc naming is removed since grpc-go v1.30.0. +// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change. +package grpcnaming + +// Operation defines the corresponding operations for a name resolution change. +// +// Deprecated: please use package resolver. +type Operation uint8 + +const ( + // Add indicates a new address is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update defines a name resolution update. Notice that it is not valid having both +// empty string Addr and nil Metadata in an Update. +// +// Deprecated: please use package resolver. +type Update struct { + // Op indicates the operation of the update. + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + Metadata interface{} +} + +// Resolver creates a Watcher for a target to track its resolution changes. +// +// Deprecated: please use package resolver. +type Resolver interface { + // Resolve creates a Watcher for target. + Resolve(target string) (Watcher, error) +} + +// Watcher watches for the updates on the specified target. +// +// Deprecated: please use package resolver. +type Watcher interface { + // Next blocks until an update or error happens. It may return one or more + // updates. The first call should get the full set of the results. It should + // return an error if and only if Watcher cannot recover. + Next() ([]*Update, error) + // Close closes the Watcher. + Close() +} \ No newline at end of file diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 5f3ab76584..a786457a6a 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming" "os" "sync" @@ -27,7 +28,6 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go index e74cd5ca90..912e147d77 100644 --- a/proxy/grpcproxy/register.go +++ b/proxy/grpcproxy/register.go @@ -16,6 +16,7 @@ package grpcproxy import ( "encoding/json" + gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming" "os" "go.etcd.io/etcd/clientv3" @@ -23,7 +24,6 @@ import ( "go.etcd.io/etcd/clientv3/naming" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second From b1d600ee05737adf6fc2ba3a5c3d92c1037bb6a6 Mon Sep 17 00:00:00 2001 From: Sky Ao Date: Thu, 22 Oct 2020 11:20:17 +0800 Subject: [PATCH 3/4] add doc for method ResolveError() --- clientv3/balancer/balancer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index d3e672c632..1488580331 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -140,6 +140,7 @@ type baseBalancer struct { resolverErr error // the last error reported by the resolver; cleared on successful resolution } +// ResolverError implements "grpc/balancer.Balancer" interface. func (bb *baseBalancer) ResolverError(err error) { bb.resolverErr = err if len(bb.addrToSc) == 0 { From 5ba58f2283eda7247f51b4e9dd07c367c4d77235 Mon Sep 17 00:00:00 2001 From: Sky Ao Date: Thu, 22 Oct 2020 11:36:03 +0800 Subject: [PATCH 4/4] update to pass fmt checking --- clientv3/naming/grpcnaming/naming.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go index 9b1f20cc61..a4415a6541 100644 --- a/clientv3/naming/grpcnaming/naming.go +++ b/clientv3/naming/grpcnaming/naming.go @@ -19,11 +19,6 @@ // Package naming defines the naming API and related data structures for gRPC. // // This package is deprecated: please use package resolver instead. - - -// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1. -// The package of grpc naming is removed since grpc-go v1.30.0. -// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change. package grpcnaming // Operation defines the corresponding operations for a name resolution change. @@ -70,4 +65,4 @@ type Watcher interface { Next() ([]*Update, error) // Close closes the Watcher. Close() -} \ No newline at end of file +}