Blob Blame History Raw
From 2859e9f94608b44c9c351c7ccfcff21665573627 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 15 Oct 2020 15:34:16 +0800
Subject: [PATCH 1/4] update grpc-go version to v1.32.0 which has some breaking
 api changes in naming and loadbalancer package

---
 clientv3/balancer/balancer.go                 |  39 ++-
 clientv3/balancer/picker/err.go               |   6 +-
 .../balancer/picker/roundrobin_balanced.go    |   7 +-
 clientv3/naming/grpc.go                       |  27 +-
 clientv3/naming/grpcnaming/naming.go          |  73 +++++
 proxy/grpcproxy/cluster.go                    |   2 +-
 proxy/grpcproxy/register.go                   |   2 +-
 7 files changed, 424 insertions(+), 46 deletions(-)
 create mode 100644 clientv3/naming/grpcnaming/naming.go

diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go
index 3eecb9d1d2..d3e672c632 100644
--- a/clientv3/balancer/balancer.go
+++ b/clientv3/balancer/balancer.go
@@ -136,15 +136,34 @@ type baseBalancer struct {
 	connectivityRecorder connectivity.Recorder
 
 	picker picker.Picker
+
+	resolverErr error // the last error reported by the resolver; cleared on successful resolution
 }
 
-// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
-// gRPC sends initial or updated resolved addresses from "Build".
-func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
-	if err != nil {
-		bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
+func (bb *baseBalancer) ResolverError(err error) {
+	bb.resolverErr = err
+	if len(bb.addrToSc) == 0 {
+		bb.connectivityRecorder.RecordTransition(bb.connectivityRecorder.GetCurrentState(), grpcconnectivity.TransientFailure)
+	}
+
+	if bb.connectivityRecorder.GetCurrentState() != grpcconnectivity.TransientFailure {
+		// The picker will not change since the balancer does not currently
+		// report an error.
 		return
 	}
+	bb.updatePicker()
+	bb.currentConn.UpdateState(balancer.State{
+		ConnectivityState: bb.connectivityRecorder.GetCurrentState(),
+		Picker:            bb.picker,
+	})
+}
+
+// UpdateClientConnState implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
+	addrs := state.ResolverState.Addresses
+	// Successful resolution; clear resolver error and ensure we return nil.
+	bb.resolverErr = nil
+
 	bb.lg.Info("resolved",
 		zap.String("picker", bb.picker.String()),
 		zap.String("balancer-id", bb.id),
@@ -191,10 +210,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
 			// (DO NOT) delete(bb.scToSt, sc)
 		}
 	}
+
+	return nil
 }
 
-// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
+// UpdateSubConnState implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+	s := state.ConnectivityState
+
 	bb.mu.Lock()
 	defer bb.mu.Unlock()
 
@@ -247,7 +270,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconn
 		bb.updatePicker()
 	}
 
-	bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
+	bb.currentConn.UpdateState(balancer.State{ConnectivityState: bb.connectivityRecorder.GetCurrentState(), Picker: bb.picker})
 }
 
 func (bb *baseBalancer) updatePicker() {
diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go
index f4b941d652..a37baa7bd2 100644
--- a/clientv3/balancer/picker/err.go
+++ b/clientv3/balancer/picker/err.go
@@ -15,8 +15,6 @@
 package picker
 
 import (
-	"context"
-
 	"google.golang.org/grpc/balancer"
 )
 
@@ -34,6 +32,6 @@ func (ep *errPicker) String() string {
 	return ep.p.String()
 }
 
-func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
-	return nil, nil, ep.err
+func (ep *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+	return balancer.PickResult{}, ep.err
 }
diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go
index e3971ecc42..993c05ec13 100644
--- a/clientv3/balancer/picker/roundrobin_balanced.go
+++ b/clientv3/balancer/picker/roundrobin_balanced.go
@@ -15,7 +15,6 @@
 package picker
 
 import (
-	"context"
 	"sync"
 
 	"go.uber.org/zap"
@@ -52,12 +51,12 @@ type rrBalanced struct {
 func (rb *rrBalanced) String() string { return rb.p.String() }
 
 // Pick is called for every client request.
-func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (rb *rrBalanced) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
 	rb.mu.RLock()
 	n := len(rb.scs)
 	rb.mu.RUnlock()
 	if n == 0 {
-		return nil, nil, balancer.ErrNoSubConnAvailable
+		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
 	}
 
 	rb.mu.Lock()
@@ -91,5 +90,5 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
 			rb.lg.Warn("balancer failed", fss...)
 		}
 	}
-	return sc, doneFunc, nil
+	return balancer.PickResult{SubConn: sc, Done: doneFunc}, nil
 }
diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go
index b680274bb3..f7cd2bd8a6 100644
--- a/clientv3/naming/grpc.go
+++ b/clientv3/naming/grpc.go
@@ -20,9 +20,9 @@ import (
 	"fmt"

 	etcd "go.etcd.io/etcd/clientv3"
+	gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"

 	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/naming"
 	"google.golang.org/grpc/status"
 )

@@ -34,15 +33,15 @@ type GRPCResolver struct {
 	Client *etcd.Client
 }
 
-func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
+func (gr *GRPCResolver) Update(ctx context.Context, target string, nm gnaming.Update, opts ...etcd.OpOption) (err error) {
 	switch nm.Op {
-	case naming.Add:
+	case gnaming.Add:
 		var v []byte
 		if v, err = json.Marshal(nm); err != nil {
 			return status.Error(codes.InvalidArgument, err.Error())
 		}
 		_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
-	case naming.Delete:
+	case gnaming.Delete:
 		_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
 	default:
 		return status.Error(codes.InvalidArgument, "naming: bad naming op")
@@ -50,7 +49,7 @@ func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Upd
 	return err
 }
 
-func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
+func (gr *GRPCResolver) Resolve(target string) (gnaming.Watcher, error) {
 	ctx, cancel := context.WithCancel(context.Background())
 	w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
 	return w, nil
@@ -68,7 +67,7 @@ type gRPCWatcher struct {
 // Next gets the next set of updates from the etcd resolver.
 // Calls to Next should be serialized; concurrent calls are not safe since
 // there is no way to reconcile the update ordering.
-func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
+func (gw *gRPCWatcher) Next() ([]*gnaming.Update, error) {
 	if gw.wch == nil {
 		// first Next() returns all addresses
 		return gw.firstNext()
@@ -87,17 +86,17 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
 		return nil, gw.err
 	}
 
-	updates := make([]*naming.Update, 0, len(wr.Events))
+	updates := make([]*gnaming.Update, 0, len(wr.Events))
 	for _, e := range wr.Events {
-		var jupdate naming.Update
+		var jupdate gnaming.Update
 		var err error
 		switch e.Type {
 		case etcd.EventTypePut:
 			err = json.Unmarshal(e.Kv.Value, &jupdate)
-			jupdate.Op = naming.Add
+			jupdate.Op = gnaming.Add
 		case etcd.EventTypeDelete:
 			err = json.Unmarshal(e.PrevKv.Value, &jupdate)
-			jupdate.Op = naming.Delete
+			jupdate.Op = gnaming.Delete
 		default:
 			continue
 		}
@@ -108,7 +107,7 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
 	return updates, nil
 }
 
-func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
+func (gw *gRPCWatcher) firstNext() ([]*gnaming.Update, error) {
 	// Use serialized request so resolution still works if the target etcd
 	// server is partitioned away from the quorum.
 	resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
@@ -116,9 +115,9 @@ func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
 		return nil, err
 	}
 
-	updates := make([]*naming.Update, 0, len(resp.Kvs))
+	updates := make([]*gnaming.Update, 0, len(resp.Kvs))
 	for _, kv := range resp.Kvs {
-		var jupdate naming.Update
+		var jupdate gnaming.Update
 		if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
 			continue
 		}
diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go
new file mode 100644
index 0000000000..9b1f20cc61
--- /dev/null
+++ b/clientv3/naming/grpcnaming/naming.go
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package naming defines the naming API and related data structures for gRPC.
+//
+// This package is deprecated: please use package resolver instead.
+
+
+// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1.
+// The package of grpc naming is removed since grpc-go v1.30.0.
+// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change.
+package grpcnaming
+
+// Operation defines the corresponding operations for a name resolution change.
+//
+// Deprecated: please use package resolver.
+type Operation uint8
+
+const (
+	// Add indicates a new address is added.
+	Add Operation = iota
+	// Delete indicates an existing address is deleted.
+	Delete
+)
+
+// Update defines a name resolution update. Notice that it is not valid having both
+// empty string Addr and nil Metadata in an Update.
+//
+// Deprecated: please use package resolver.
+type Update struct {
+	// Op indicates the operation of the update.
+	Op Operation
+	// Addr is the updated address. It is empty string if there is no address update.
+	Addr string
+	// Metadata is the updated metadata. It is nil if there is no metadata update.
+	// Metadata is not required for a custom naming implementation.
+	Metadata interface{}
+}
+
+// Resolver creates a Watcher for a target to track its resolution changes.
+//
+// Deprecated: please use package resolver.
+type Resolver interface {
+	// Resolve creates a Watcher for target.
+	Resolve(target string) (Watcher, error)
+}
+
+// Watcher watches for the updates on the specified target.
+//
+// Deprecated: please use package resolver.
+type Watcher interface {
+	// Next blocks until an update or error happens. It may return one or more
+	// updates. The first call should get the full set of the results. It should
+	// return an error if and only if Watcher cannot recover.
+	Next() ([]*Update, error)
+	// Close closes the Watcher.
+	Close()
+}
\ No newline at end of file
diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go
index 5f3ab76584..a786457a6a 100644
--- a/proxy/grpcproxy/cluster.go
+++ b/proxy/grpcproxy/cluster.go
@@ -18,6 +18,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"
 	"os"
 	"sync"

@@ -27,7 +28,6 @@ import (
 	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

 	"golang.org/x/time/rate"
-	gnaming "google.golang.org/grpc/naming"
 )

 // allow maximum 1 retry per second
diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go
index e74cd5ca90..912e147d77 100644
--- a/proxy/grpcproxy/register.go
+++ b/proxy/grpcproxy/register.go
@@ -16,6 +16,7 @@ package grpcproxy

 import (
 	"encoding/json"
+	gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"
 	"os"

 	"go.etcd.io/etcd/clientv3"
@@ -23,7 +24,6 @@ import (
 	"go.etcd.io/etcd/clientv3/naming"

 	"golang.org/x/time/rate"
-	gnaming "google.golang.org/grpc/naming"
 )

 // allow maximum 1 retry per second
From b1d600ee05737adf6fc2ba3a5c3d92c1037bb6a6 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 22 Oct 2020 11:20:17 +0800
Subject: [PATCH 3/4] add doc for method ResolveError()

---
 clientv3/balancer/balancer.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go
index d3e672c632..1488580331 100644
--- a/clientv3/balancer/balancer.go
+++ b/clientv3/balancer/balancer.go
@@ -140,6 +140,7 @@ type baseBalancer struct {
 	resolverErr error // the last error reported by the resolver; cleared on successful resolution
 }
 
+// ResolverError implements "grpc/balancer.Balancer" interface.
 func (bb *baseBalancer) ResolverError(err error) {
 	bb.resolverErr = err
 	if len(bb.addrToSc) == 0 {

From 5ba58f2283eda7247f51b4e9dd07c367c4d77235 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 22 Oct 2020 11:36:03 +0800
Subject: [PATCH 4/4] update to pass fmt checking

---
 clientv3/naming/grpcnaming/naming.go | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)

diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go
index 9b1f20cc61..a4415a6541 100644
--- a/clientv3/naming/grpcnaming/naming.go
+++ b/clientv3/naming/grpcnaming/naming.go
@@ -19,11 +19,6 @@
 // Package naming defines the naming API and related data structures for gRPC.
 //
 // This package is deprecated: please use package resolver instead.
-
-
-// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1.
-// The package of grpc naming is removed since grpc-go v1.30.0.
-// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change.
 package grpcnaming
 
 // Operation defines the corresponding operations for a name resolution change.
@@ -70,4 +65,4 @@ type Watcher interface {
 	Next() ([]*Update, error)
 	// Close closes the Watcher.
 	Close()
-}
\ No newline at end of file
+}