From 2859e9f94608b44c9c351c7ccfcff21665573627 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 15 Oct 2020 15:34:16 +0800
Subject: [PATCH 1/4] update grpc-go version to v1.32.0 which has some breaking
api changes in naming and loadbalancer package
---
clientv3/balancer/balancer.go | 39 ++-
clientv3/balancer/picker/err.go | 6 +-
.../balancer/picker/roundrobin_balanced.go | 7 +-
clientv3/naming/grpc.go | 27 +-
clientv3/naming/grpcnaming/naming.go | 73 +++++
proxy/grpcproxy/cluster.go | 2 +-
proxy/grpcproxy/register.go | 2 +-
7 files changed, 424 insertions(+), 46 deletions(-)
create mode 100644 clientv3/naming/grpcnaming/naming.go
diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go
index 3eecb9d1d2..d3e672c632 100644
--- a/clientv3/balancer/balancer.go
+++ b/clientv3/balancer/balancer.go
@@ -136,15 +136,34 @@ type baseBalancer struct {
connectivityRecorder connectivity.Recorder
picker picker.Picker
+
+ resolverErr error // the last error reported by the resolver; cleared on successful resolution
}
-// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
-// gRPC sends initial or updated resolved addresses from "Build".
-func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
- if err != nil {
- bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
+func (bb *baseBalancer) ResolverError(err error) {
+ bb.resolverErr = err
+ if len(bb.addrToSc) == 0 {
+ bb.connectivityRecorder.RecordTransition(bb.connectivityRecorder.GetCurrentState(), grpcconnectivity.TransientFailure)
+ }
+
+ if bb.connectivityRecorder.GetCurrentState() != grpcconnectivity.TransientFailure {
+ // The picker will not change since the balancer does not currently
+ // report an error.
return
}
+ bb.updatePicker()
+ bb.currentConn.UpdateState(balancer.State{
+ ConnectivityState: bb.connectivityRecorder.GetCurrentState(),
+ Picker: bb.picker,
+ })
+}
+
+// UpdateClientConnState implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
+ addrs := state.ResolverState.Addresses
+ // Successful resolution; clear resolver error and ensure we return nil.
+ bb.resolverErr = nil
+
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
@@ -191,10 +210,14 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
// (DO NOT) delete(bb.scToSt, sc)
}
}
+
+ return nil
}
-// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
+// UpdateSubConnState implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+ s := state.ConnectivityState
+
bb.mu.Lock()
defer bb.mu.Unlock()
@@ -247,7 +270,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconn
bb.updatePicker()
}
- bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
+ bb.currentConn.UpdateState(balancer.State{ConnectivityState: bb.connectivityRecorder.GetCurrentState(), Picker: bb.picker})
}
func (bb *baseBalancer) updatePicker() {
diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go
index f4b941d652..a37baa7bd2 100644
--- a/clientv3/balancer/picker/err.go
+++ b/clientv3/balancer/picker/err.go
@@ -15,8 +15,6 @@
package picker
import (
- "context"
-
"google.golang.org/grpc/balancer"
)
@@ -34,6 +32,6 @@ func (ep *errPicker) String() string {
return ep.p.String()
}
-func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
- return nil, nil, ep.err
+func (ep *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
+ return balancer.PickResult{}, ep.err
}
diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go
index e3971ecc42..993c05ec13 100644
--- a/clientv3/balancer/picker/roundrobin_balanced.go
+++ b/clientv3/balancer/picker/roundrobin_balanced.go
@@ -15,7 +15,6 @@
package picker
import (
- "context"
"sync"
"go.uber.org/zap"
@@ -52,12 +51,12 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request.
-func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
+func (rb *rrBalanced) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
if n == 0 {
- return nil, nil, balancer.ErrNoSubConnAvailable
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
rb.mu.Lock()
@@ -91,5 +90,5 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
rb.lg.Warn("balancer failed", fss...)
}
}
- return sc, doneFunc, nil
+ return balancer.PickResult{SubConn: sc, Done: doneFunc}, nil
}
diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go
index b680274bb3..f7cd2bd8a6 100644
--- a/clientv3/naming/grpc.go
+++ b/clientv3/naming/grpc.go
@@ -20,9 +20,9 @@ import (
"fmt"
etcd "go.etcd.io/etcd/clientv3"
+ gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
)
@@ -34,15 +33,15 @@ type GRPCResolver struct {
Client *etcd.Client
}
-func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
+func (gr *GRPCResolver) Update(ctx context.Context, target string, nm gnaming.Update, opts ...etcd.OpOption) (err error) {
switch nm.Op {
- case naming.Add:
+ case gnaming.Add:
var v []byte
if v, err = json.Marshal(nm); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
- case naming.Delete:
+ case gnaming.Delete:
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
default:
return status.Error(codes.InvalidArgument, "naming: bad naming op")
@@ -50,7 +49,7 @@ func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Upd
return err
}
-func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
+func (gr *GRPCResolver) Resolve(target string) (gnaming.Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
return w, nil
@@ -68,7 +67,7 @@ type gRPCWatcher struct {
// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
-func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
+func (gw *gRPCWatcher) Next() ([]*gnaming.Update, error) {
if gw.wch == nil {
// first Next() returns all addresses
return gw.firstNext()
@@ -87,17 +86,17 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
return nil, gw.err
}
- updates := make([]*naming.Update, 0, len(wr.Events))
+ updates := make([]*gnaming.Update, 0, len(wr.Events))
for _, e := range wr.Events {
- var jupdate naming.Update
+ var jupdate gnaming.Update
var err error
switch e.Type {
case etcd.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &jupdate)
- jupdate.Op = naming.Add
+ jupdate.Op = gnaming.Add
case etcd.EventTypeDelete:
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
- jupdate.Op = naming.Delete
+ jupdate.Op = gnaming.Delete
default:
continue
}
@@ -108,7 +107,7 @@ func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
return updates, nil
}
-func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
+func (gw *gRPCWatcher) firstNext() ([]*gnaming.Update, error) {
// Use serialized request so resolution still works if the target etcd
// server is partitioned away from the quorum.
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
@@ -116,9 +115,9 @@ func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
return nil, err
}
- updates := make([]*naming.Update, 0, len(resp.Kvs))
+ updates := make([]*gnaming.Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
- var jupdate naming.Update
+ var jupdate gnaming.Update
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
continue
}
diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go
new file mode 100644
index 0000000000..9b1f20cc61
--- /dev/null
+++ b/clientv3/naming/grpcnaming/naming.go
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package naming defines the naming API and related data structures for gRPC.
+//
+// This package is deprecated: please use package resolver instead.
+
+
+// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1.
+// The package of grpc naming is removed since grpc-go v1.30.0.
+// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change.
+package grpcnaming
+
+// Operation defines the corresponding operations for a name resolution change.
+//
+// Deprecated: please use package resolver.
+type Operation uint8
+
+const (
+ // Add indicates a new address is added.
+ Add Operation = iota
+ // Delete indicates an existing address is deleted.
+ Delete
+)
+
+// Update defines a name resolution update. Notice that it is not valid having both
+// empty string Addr and nil Metadata in an Update.
+//
+// Deprecated: please use package resolver.
+type Update struct {
+ // Op indicates the operation of the update.
+ Op Operation
+ // Addr is the updated address. It is empty string if there is no address update.
+ Addr string
+ // Metadata is the updated metadata. It is nil if there is no metadata update.
+ // Metadata is not required for a custom naming implementation.
+ Metadata interface{}
+}
+
+// Resolver creates a Watcher for a target to track its resolution changes.
+//
+// Deprecated: please use package resolver.
+type Resolver interface {
+ // Resolve creates a Watcher for target.
+ Resolve(target string) (Watcher, error)
+}
+
+// Watcher watches for the updates on the specified target.
+//
+// Deprecated: please use package resolver.
+type Watcher interface {
+ // Next blocks until an update or error happens. It may return one or more
+ // updates. The first call should get the full set of the results. It should
+ // return an error if and only if Watcher cannot recover.
+ Next() ([]*Update, error)
+ // Close closes the Watcher.
+ Close()
+}
\ No newline at end of file
diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go
index 5f3ab76584..a786457a6a 100644
--- a/proxy/grpcproxy/cluster.go
+++ b/proxy/grpcproxy/cluster.go
@@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
+ gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"
"os"
"sync"
@@ -27,7 +28,6 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"golang.org/x/time/rate"
- gnaming "google.golang.org/grpc/naming"
)
// allow maximum 1 retry per second
diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go
index e74cd5ca90..912e147d77 100644
--- a/proxy/grpcproxy/register.go
+++ b/proxy/grpcproxy/register.go
@@ -16,6 +16,7 @@ package grpcproxy
import (
"encoding/json"
+ gnaming "go.etcd.io/etcd/clientv3/naming/grpcnaming"
"os"
"go.etcd.io/etcd/clientv3"
@@ -23,7 +24,6 @@ import (
"go.etcd.io/etcd/clientv3/naming"
"golang.org/x/time/rate"
- gnaming "google.golang.org/grpc/naming"
)
// allow maximum 1 retry per second
From b1d600ee05737adf6fc2ba3a5c3d92c1037bb6a6 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 22 Oct 2020 11:20:17 +0800
Subject: [PATCH 3/4] add doc for method ResolveError()
---
clientv3/balancer/balancer.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go
index d3e672c632..1488580331 100644
--- a/clientv3/balancer/balancer.go
+++ b/clientv3/balancer/balancer.go
@@ -140,6 +140,7 @@ type baseBalancer struct {
resolverErr error // the last error reported by the resolver; cleared on successful resolution
}
+// ResolverError implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.resolverErr = err
if len(bb.addrToSc) == 0 {
From 5ba58f2283eda7247f51b4e9dd07c367c4d77235 Mon Sep 17 00:00:00 2001
From: Sky Ao <aoxiaojian@gmail.com>
Date: Thu, 22 Oct 2020 11:36:03 +0800
Subject: [PATCH 4/4] update to pass fmt checking
---
clientv3/naming/grpcnaming/naming.go | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/clientv3/naming/grpcnaming/naming.go b/clientv3/naming/grpcnaming/naming.go
index 9b1f20cc61..a4415a6541 100644
--- a/clientv3/naming/grpcnaming/naming.go
+++ b/clientv3/naming/grpcnaming/naming.go
@@ -19,11 +19,6 @@
// Package naming defines the naming API and related data structures for gRPC.
//
// This package is deprecated: please use package resolver instead.
-
-
-// Notice: this file is a copy of naming/naming.go from grpc-go v1.29.1.
-// The package of grpc naming is removed since grpc-go v1.30.0.
-// This is a work around to make etcd work with grpc new version (>=v1.30.0) without too many code change.
package grpcnaming
// Operation defines the corresponding operations for a name resolution change.
@@ -70,4 +65,4 @@ type Watcher interface {
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
-}
\ No newline at end of file
+}