-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Trainer library discover master by etcd #2551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
helinwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR! Could you move the code to https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/c/client.go#L58 ? We already have function paddle_new_master_client which takes master address rather than etcd address, we can just add a new function paddle_new_etcd_master_client. Sorry that I have not mentioned to you earlier!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #2559?
I think this PR will implement in the master client with paddle_new_etcd_master_client
--Yancey1989
|
Updated with @helinwang 's comment. |
go/master/client.go
Outdated
|
|
||
| mAddr := kvs[0].Value | ||
| log.Debugf("Fetched master address: %s\n", mAddr) | ||
| return NewClient(MasterAddresser(mAddr), bufSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you take a look at this piece of code: https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/client.go#L80
The idea is the client takes a Addresser and query it to know if it have changed.
This current implementation is create a "constant" Addresser that returns the same address every time. The thing is master server could be killed and restarts with a different address.
So I think a better approach could be this etcd query function be the implementation of the Addresser interface. So the master server will know the updated address when calling Address.
Another detail is when implementing the Addresser, it don't need to create a client in every call to Address, it should reuse the same client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, move GET operator to Address method.
go/master/client.go
Outdated
| ch chan []byte | ||
| } | ||
|
|
||
| // MasterAddresser provide master address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment function name must be the same to function name masterAddresser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Delete MasterAddresser.
go/master/client.go
Outdated
| time.Sleep(m.timeout) | ||
| continue | ||
| } | ||
| // reconnect to etcd server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seem that etcd have a some kind of retry method for grpc but don't know how to use yet, may be help here https://github.com/coreos/etcd/blob/master/clientv3/retry.go#L28
go/master/client.go
Outdated
| func (m masterAddresser) Address() string { | ||
| for { | ||
| ctx, cancel := context.WithTimeout(context.Background(), m.timeout) | ||
| resp, err := m.client.Get(ctx, masterAddrPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients should use watch to retrieve notification about master node events. masterMonitor should also change to handle these events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients should use watch to retrieve notification about master node events
I think it's a good idea, it will reduce the count forGEToperator to etcd.
masterMonitor should also change to handle these events
I do not think masterMonitor will handle the events, we can handle these in a new method named masterAddresser.init(), and masterAddresser.Addresser() only return the latest master address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally thought client can poll etcd to get the latest master address. But I think @typhoonzero 's idea is better.
Because Addresser is an polling interface, to get notification about the change rather than polling to get the change. We need to change the interface. One possible solution could be:
Change:
func NewClient(addr Addresser, bufSize int)To:
func NewClient(addrCh <-chan string, bufSize int)And maybe change moniterMaster to:
func (c *Client) monitorMaster(addrCh <-chan string) {
lastMaster := ""
for curMaster := range addrCh {
// connect to the new address once address changed.
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Errorln(err)
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Errorln(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curMaster = lastMaster
}
}
}
lastMaster = curMaster
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I think we also need an interface with etcd operator, so that we can do some unit test easier, I'll add this feature in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yancey1989 Thanks!
We can use interface to make unit test easier. For example, the master server is taking an interface Store as the parameter: https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/service.go#L94.
The definition of store is:
type Store interface {
Save([]byte) error
Load() ([]byte, error)
}We have an implementation of Store using etcd. But to test the master server, it's easy, we don't to run a etcd cluster. We can just write a stub interface for Store.
However this only tests master service. Does not test the etcd Store implementation. But that's a good start.
go/.gitignore
Outdated
| @@ -0,0 +1,4 @@ | |||
| CMakeCache.txt | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this files? Since the build result is now moved to build dir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done.
go/master/client.go
Outdated
| ch chan []byte | ||
| } | ||
|
|
||
| // EtcdClient is the client of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complete this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Move to etcd_client.go
go/master/c/client.go
Outdated
| //export paddle_new_etcd_master_client | ||
| func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { | ||
| p := C.GoString(etcdEndpoints) | ||
| c := master.NewEtcdClient(addresser(p), bufSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just c := master.NewEtcdClient(p), bufSize) (remove addresser)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| if curMaster != lastMaster { | ||
| if curMaster == "" { | ||
| err := c.conn.Close() | ||
| fmt.Printf("close conn error: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use log.Errorf instead of fmt.Printf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| }) | ||
| if err != nil { | ||
| log.Errorf("Init etcd connection failed: %v", err) | ||
| panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please return error here instead of panic, and don't need to log error. Caller will get the error and log it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| } | ||
| etcdClient.ch = make(chan string) | ||
| c := NewClient(etcdClient.ch, bufSize) | ||
| //go etcdClient.monitorMasterAddr() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No commented out code please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| } | ||
|
|
||
| // EtcdClient is the client of | ||
| type EtcdClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps put EtcdClient into etcd_client.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| e.ch <- mAddr | ||
| break | ||
| } | ||
| fmt.Println("init master addr finished.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use log rather than fmt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| fmt.Println("init master addr finished.") | ||
| } | ||
| func (e *EtcdClient) monitorMasterAddr() { | ||
| rch := e.client.Watch(context.Background(), masterAddrPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
第一次调用watch的时候能返回现有的值吗,如果可以的话,就不需要initMasterAddr这个函数了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看起来是不行的,还是得先Get再watch。
initMasterAddr 改为了 Get这样更通用一些。
| func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { | ||
| p := C.GoString(etcdEndpoints) | ||
| c := master.NewEtcdClient(addresser(p), bufSize) | ||
| return add(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉把NewEtcdClient里面的c := NewClient(etcdClient.ch, bufSize)放在这里比较好。这样master.EtcdClient就和master.Client不耦合了。EtcdClient负责和etcd交互,Client只读chan string里的新地址。这样unit test也比较好写。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
直接用了etcd_clint.go中的EtcdClient,因为想要复用NewEtcdClient所以将原有初始化时写入master地址的逻辑独立到一个Put函数中。
go/master/c/client.go
Outdated
| func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { | ||
| a := C.GoString(addr) | ||
| c := master.NewClient(addresser(a), bufSize) | ||
| ch := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use:
ch := make(chan string, 1)
ch <- a
c := master.NewClient(ch, bufSize)Otherwise this function will not return unless master.Client read the value. We don't want to add unnecessary requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good comment, thanks @helinwang !!
Done.
go/master/client_internal_test.go
Outdated
| c.conn = connection.New() | ||
| go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p))) | ||
| addr := fmt.Sprintf(":%d", p) | ||
| ch := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ch := make(chan string, 1) is better, so ch <- addr does not need to wait for channel to be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client_test.go
Outdated
| f.Close() | ||
|
|
||
| c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10) | ||
| curAddr := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe curAddr := make(chan string, 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
| } | ||
|
|
||
| // NewEtcdMasterClient creates a new Client by etcd | ||
| func NewEtcdMasterClient(db DatabaseOperator, bufSize int) *Client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db is only used here to create ch, which is used as input for NewClient. this is a sign that maybe we don't need the function NewEtcdMasterClient.
I think we need to follow rule: less code is better.
Here is a possible way to do it:
func paddle_new_master_client(...) {
p := C.GoString(etcdEndpoints)
e, err := master.NewEtcdClientWithoutLock(strings.Split(p, ","))
if err != nil {
panic(err)
}
ch := make(chan string, 1)
ch <- string(db.WaitMasterReady(DefaultAddrPath, 3))
go db.WatchWithKey(DefaultAddrPath, ch)
c := master.NewClient(ch, bufSize)
return add(c)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Delete function NewEtcdMasterClient
go/master/etcd_client.go
Outdated
|
|
||
| // EtcdClientWithoutLock is the etcd client the master users | ||
| // for service discovery | ||
| type EtcdClientWithoutLock struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EtcdClientWithoutLock is a very confusing name, it's hard to figure out what does "WithoutLock" means.
Considering the two function are only few lines, maybe just remove this type, and add two helper functions in go/master/etcd_client.go:
func GetKey(c *clientv3.Client, key string) string {
// ...
}
func WatchKey(c *clientv3.Client, key string, ch chan<- string) {
// ...
}So paddle_new_etcd_master_client could be changed to:
func paddle_new_etcd_master_client(...) {
p := C.GoString(etcdEndpoints)
cli, err := clientv3.New(...)
if err != nil {
panic(err)
}
ch := make(chan string, 1)
ch <- master.GetKey(cli, DefaultAddrPath)
go master.WatchKey(cli, DefaultAddrPath, ch)
c := master.NewClient(ch, bufSize)
return add(c)
}Just an idea, feel free to modify!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
WaitMasterReady => GetKey, and I also think this function does not need to be blocked, if the master address is empty, the function monitorMaster will not connect to master.
go/master/etcd_client.go
Outdated
| } | ||
|
|
||
| // WaitMasterReady will wait for master is ready | ||
| func (e *EtcdClientWithoutLock) WaitMasterReady(key string, timeout int) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WaitMasterReady is very long, maybe Getkey would be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
| } | ||
|
|
||
| // WatchWithKey watch the specify key and send to valChan | ||
| func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WatchWithKey is very long, maybe WatchKey would be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
| } | ||
|
|
||
| // WatchWithKey watch the specify key and send to valChan | ||
| func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valChan chan string can be changed to valChan chan<- string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
|
|
||
| // DatabaseOperator is an interface fo database operator, | ||
| // it's useful for unittest | ||
| type DatabaseOperator interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my other comment on "removing NewEtcdMasterClient". Maybe we don't need this interface anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
helinwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++ after the doc string has been fixed.
go/master/etcd_client.go
Outdated
| } | ||
|
|
||
| // NewEtcdClient creates a new EtcdClient. | ||
| // NewEtcdClient creates a new EtcdClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
go/master/etcd_client.go
Outdated
| return state, nil | ||
| } | ||
|
|
||
| // GetKey gets the value by the specify key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
go/master/etcd_client.go
Outdated
| return string(v), nil | ||
| } | ||
|
|
||
| // WatchKey watches the specify key and send to valChan if there is some event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
Fixed #2514