• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

grpc.go

原作者: [db:作者] 来自: [db:来源] 收藏 邀请
package naming

import (
    "encoding/json"

    etcd "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/naming"
)

// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
//GRPCResolver  创建一个 grpc.Watcher对于目标对象,追踪目标对象的改变
type GRPCResolver struct {
    // Client is an initialized etcd client.
    Client *etcd.Client
}
//监听对象更新
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
    switch nm.Op {
    case naming.Add:
        var v []byte
        if v, err = json.Marshal(nm); err != nil {
            return grpc.Errorf(codes.InvalidArgument, err.Error())
        }
        _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
    case naming.Delete:
        _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
    default:
        return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
    }
    return err
}

func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
    ctx, cancel := context.WithCancel(context.Background())
    w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
    return w, nil
}

type gRPCWatcher struct {
    c      *etcd.Client
    target string
    ctx    context.Context
    cancel context.CancelFunc
    wch    etcd.WatchChan
    err    error
}

// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
    if gw.wch == nil {
        // first Next() returns all addresses
        return gw.firstNext()
    }
    if gw.err != nil {
        return nil, gw.err
    }

    // process new events on target/*
    wr, ok := <-gw.wch
    if !ok {
        gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
        return nil, gw.err
    }
    if gw.err = wr.Err(); gw.err != nil {
        return nil, gw.err
    }

    updates := make([]*naming.Update, 0, len(wr.Events))
    for _, e := range wr.Events {
        var jupdate naming.Update
        var err error
        switch e.Type {
        case etcd.EventTypePut:
            err = json.Unmarshal(e.Kv.Value, &jupdate)
            jupdate.Op = naming.Add
        case etcd.EventTypeDelete:
            err = json.Unmarshal(e.PrevKv.Value, &jupdate)
            jupdate.Op = naming.Delete
        }
        if err == nil {
            updates = append(updates, &jupdate)
        }
    }
    return updates, nil
}

func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
    // Use serialized request so resolution still works if the target etcd
    // server is partitioned away from the quorum.
    resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
    if gw.err = err; err != nil {
        return nil, err
    }

    updates := make([]*naming.Update, 0, len(resp.Kvs))
    for _, kv := range resp.Kvs {
        var jupdate naming.Update
        if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
            continue
        }
        updates = append(updates, &jupdate)
    }

    opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
    gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
    return updates, nil
}

func (gw *gRPCWatcher) Close() { gw.cancel() }


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
使用go-mysql-server开发自己的mysqlserver发布时间:2022-07-10
下一篇:
golangrestful框架之go-swagger发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap