Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions doc/design/cluster_train/pserver_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,25 @@ typedef enum {
typedef struct {
char* name;
paddle_element_type element_type;
void* content;
unsigned char* content;
int content_len;
} paddle_parameter, paddle_gradient;

typedef struct paddle_pserver_client paddle_pserver_client;
typedef int paddle_pserver_client;

paddle_pserver_client* paddle_new_pserver_client();
void paddle_pserver_client_release(paddle_pserver_client* client);
/**
* @brief creates a pserver client that talks to etcd for coordination.
*/
paddle_pserver_client paddle_new_etcd_pserver_client(char* etcd_addr);

/**
* @brief creates a pserver client given pserver addresses.
*
* @param pserver_addrs comma-separated pserver addresses.
* @param selected if current pserver client is selected to initialize all parameter servers.
*/
paddle_pserver_client paddle_new_pserver_client(char* pserver_addrs, int selected);
void paddle_pserver_client_release(paddle_pserver_client c);

/**
* @brief paddle_begin_init_params begins to initialize parameters on
Expand All @@ -95,7 +106,7 @@ void paddle_pserver_client_release(paddle_pserver_client* client);
* @return 1 if the trainer is selected to initialize parameter
* servers, otherwise 0.
*/
int paddle_begin_init_params(paddle_pserver_client* client);
int paddle_begin_init_params(paddle_pserver_client client);

/**
* @brief paddle_init_param initializes the parameter on parameter
Expand All @@ -109,7 +120,7 @@ int paddle_begin_init_params(paddle_pserver_client* client);
* @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer.
*/
int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
int paddle_init_param(paddle_pserver_client client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);

/**
* @brief paddle_finish_init_params tells parameter servers client has
Expand All @@ -120,7 +131,7 @@ int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, con
* @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer.
*/
int paddle_finish_init_params(paddle_pserver_client* client);
int paddle_finish_init_params(paddle_pserver_client client);

/**
* @brief paddle_send_grads sends gradients to parameter servers for
Expand All @@ -131,21 +142,23 @@ int paddle_finish_init_params(paddle_pserver_client* client);
* @param learning_rate the learning rate for the gradients.
* @return 0 if successful, otherwise -1.
*/
int paddle_send_grads(paddle_pserver_client* client, const paddle_gradient* grads, int len);
int paddle_send_grads(paddle_pserver_client client, const paddle_gradient* grads, int len);

/**
* @brief paddle_get_params gets parameters from parameter servers.
*
* paddle_get_params will block until parameters are initialized on
* the parameter servers.
*
* @param names the array of names of the parameters to get.
* @param dst the destination array of parameters to save to.
* @param dst the destination array of parameter pointers to save to.
* The parameter pointer must be pre-popullated with required parameter name,
* and the content of parameter must be pre-allocated of the size of required
* parameter on pserver.
* @param len the length of the names array and the paddle_parameter
* array.
* @return 0 if successful, otherwise -1.
*/
int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_parameter* dst, int len);
int paddle_get_params(paddle_pserver_client client, paddle_parameter** dst, int len);

/**
* @brief paddle_save_model indicates parameters to save the parameter
Expand All @@ -154,5 +167,5 @@ int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_
* @param path the path to save parameters.
* @return 0 if successful, otherwise -1.
*/
int paddle_save_model(paddle_pserver_client* client, const char* path);
int paddle_save_model(paddle_pserver_client client, const char* path);
```
132 changes: 64 additions & 68 deletions go/pserver/cclient/cclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,9 @@ typedef struct {
int content_len;
} paddle_parameter, paddle_gradient;

static inline void paddle_release_param(paddle_parameter* param) {
if (param != NULL) {
if (param->name != NULL) {
free(param->name);
}

if (param->content != NULL) {
free(param->content);
}

free(param);
}
}

typedef int client;
typedef int paddle_pserver_client;
#define PSERVER_ERROR -1
#define PSERVER_OK 0
*/
import "C"

Expand All @@ -48,10 +36,10 @@ import (

var nullPtr = unsafe.Pointer(uintptr(0))
var mu sync.Mutex
var handleMap = make(map[C.client]*pserver.Client)
var curHandle C.client
var handleMap = make(map[C.paddle_pserver_client]*pserver.Client)
var curHandle C.paddle_pserver_client

func add(c *pserver.Client) C.client {
func add(c *pserver.Client) C.paddle_pserver_client {
mu.Lock()
defer mu.Unlock()
client := curHandle
Expand All @@ -60,13 +48,13 @@ func add(c *pserver.Client) C.client {
return client
}

func get(client C.client) *pserver.Client {
func get(client C.paddle_pserver_client) *pserver.Client {
mu.Lock()
defer mu.Unlock()
return handleMap[client]
}

func remove(client C.client) *pserver.Client {
func remove(client C.paddle_pserver_client) *pserver.Client {
mu.Lock()
defer mu.Unlock()
h := handleMap[client]
Expand Down Expand Up @@ -100,7 +88,7 @@ func (l lister) List() []pserver.Server {
}

//export paddle_new_pserver_client
func paddle_new_pserver_client(addrs *C.char, selected int) C.client {
func paddle_new_pserver_client(addrs *C.char, selected int) C.paddle_pserver_client {
a := C.GoString(addrs)
as := strings.Split(a, ",")
servers := make([]pserver.Server, len(as))
Expand All @@ -113,27 +101,27 @@ func paddle_new_pserver_client(addrs *C.char, selected int) C.client {
}

//export paddle_new_etcd_pserver_client
func paddle_new_etcd_pserver_client(etcd_addr *C.char) C.client {
func paddle_new_etcd_pserver_client(etcd_addr *C.char) C.paddle_pserver_client {
// TODO(helin): fault tolerant pserver client using etcd.
panic("not implemented.")
}

//export paddle_pserver_client_release
func paddle_pserver_client_release(client C.client) {
func paddle_pserver_client_release(client C.paddle_pserver_client) {
remove(client)
}

//export paddle_begin_init_params
func paddle_begin_init_params(client C.client) C.int {
func paddle_begin_init_params(client C.paddle_pserver_client) C.int {
c := get(client)
if selected := c.BeginInitParams(); selected {
return 1
}
return 0
return C.PSERVER_OK
}

//export paddle_init_param
func paddle_init_param(client C.client, param C.paddle_parameter, param_config unsafe.Pointer, config_len C.int) C.int {
func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter, param_config unsafe.Pointer, config_len C.int) C.int {
et := pserver.ElementType(param.element_type)
name := C.GoString(param.name)
content := cArrayToSlice(unsafe.Pointer(param.content), int(param.content_len))
Expand All @@ -143,28 +131,38 @@ func paddle_init_param(client C.client, param C.paddle_parameter, param_config u
}
c := get(client)
err := c.InitParam(pc)

if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Printf("parameter %s already initialized, treat paddle_init_param as sucessful.\n", name)
return C.PSERVER_OK
}
log.Println(err)
return -1
return C.PSERVER_ERROR
}

return 0
return C.PSERVER_OK
}

//export paddle_finish_init_params
func paddle_finish_init_params(client C.client) C.int {
func paddle_finish_init_params(client C.paddle_pserver_client) C.int {
c := get(client)
err := c.FinishInitParams()
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Println("parameters already initialized, treat paddle_finish_init_params as sucessful.")
return C.PSERVER_OK
}

log.Println(err)
return -1
return C.PSERVER_ERROR
}

return 0
return C.PSERVER_OK
}

//export paddle_send_grads
func paddle_send_grads(client C.client, grads *C.paddle_gradient, total C.int) C.int {
func paddle_send_grads(client C.paddle_pserver_client, grads *C.paddle_gradient, total C.int) C.int {
var gs []pserver.Gradient
for i := 0; i < int(total); i++ {
grad := (*C.paddle_gradient)(unsafe.Pointer((uintptr(unsafe.Pointer(grads)) + uintptr(i)*unsafe.Sizeof(*grads))))
Expand All @@ -178,83 +176,81 @@ func paddle_send_grads(client C.client, grads *C.paddle_gradient, total C.int) C
err := c.SendGrads(gs)
if err != nil {
log.Println(err)
return -1
return C.PSERVER_ERROR
}

return 0
return C.PSERVER_OK
}

//export paddle_get_params
func paddle_get_params(client C.client, names **C.char, dst **C.paddle_parameter, total C.int) C.int {
func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, total C.int) C.int {
var ns []string
for i := 0; i < int(total); i++ {
name := *(**C.char)(unsafe.Pointer((uintptr(unsafe.Pointer(names)) + uintptr(i)*unsafe.Sizeof(*names))))
ns = append(ns, C.GoString(name))
param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst))))
ns = append(ns, C.GoString(param.name))
}
c := get(client)
ps, err := c.GetParams(ns)
if err != nil {
log.Println(err)
return -1
return C.PSERVER_ERROR
}

for i := 0; i < int(total); i++ {
if i >= len(ps) {
break
if len(ps) != len(ns) {
pn := make([]string, len(ps))
for i, p := range ps {
pn[i] = p.Name
}
log.Printf("pserver returned wrong number of parameters. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", "))
return C.PSERVER_ERROR
}

for i := range ps {
if ns[i] != ps[i].Name {
pn := make([]string, len(ps))
for i, p := range ps {
pn[i] = p.Name
}
log.Printf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", "))
return C.PSERVER_ERROR
}
}

for i := 0; i < int(total); i++ {
p := ps[i]
param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst))))
nameReady := false
contentAllocated := false

if unsafe.Pointer(param) == nullPtr {
param = (*C.paddle_parameter)(C.calloc(1, C.size_t(unsafe.Sizeof(*param))))
log.Println("must pre-allocate parameter.")
return C.PSERVER_ERROR
} else {
if unsafe.Pointer(param.name) != nullPtr {
if n := C.GoString(param.name); n != p.Name {
log.Println("Warning: the pre-allocated parameter name does not match the parameter name, it will be freed.", n, p.Name)
C.free(unsafe.Pointer(param.name))
} else {
nameReady = true
}
}

if unsafe.Pointer(param.content) != nullPtr {
if int(param.content_len) == len(p.Content) {
contentAllocated = true
} else {
log.Println("Warning: the pre-allocated content len does not match parameter content len, the pre-allocated content will be freed.", param.content_len, len(p.Content))
C.free(unsafe.Pointer(param.content))
if int(param.content_len) != len(p.Content) {
log.Printf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content))
return C.PSERVER_ERROR
}
}
}

if !nameReady {
param.name = C.CString(p.Name)
}
if !contentAllocated {
param.content = (*C.uchar)(C.malloc(C.size_t(len(p.Content))))
}
C.memcpy(unsafe.Pointer(param.content), unsafe.Pointer(&p.Content[0]), C.size_t(len(p.Content)))
param.content_len = C.int(len(p.Content))
param.element_type = C.paddle_element_type(p.ElementType)
}

return 0
return C.PSERVER_OK
}

//export paddle_save_model
func paddle_save_model(client C.client, path *C.char) C.int {
func paddle_save_model(client C.paddle_pserver_client, path *C.char) C.int {
p := C.GoString(path)
c := get(client)
err := c.Save(p)
if err != nil {
log.Println(err)
return -1
return C.PSERVER_ERROR
}

return 0
return C.PSERVER_OK
}

func main() {} // Required but ignored
2 changes: 2 additions & 0 deletions go/pserver/cclient/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ add_dependencies(main client)

if(APPLE)
set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security")
else()
set(CMAKE_EXE_LINKER_FLAGS "-pthread")
endif()
target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a)
Loading