From e38ee61e660b10f17468c2cc6e629e73192582f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E6=96=87=E5=A9=B7?= Date: Wed, 23 Sep 2020 15:32:35 +0800 Subject: [PATCH 1/4] support ipv6 --- src/rdma_van.h | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/rdma_van.h b/src/rdma_van.h index 119601bf..2f388df5 100755 --- a/src/rdma_van.h +++ b/src/rdma_van.h @@ -18,6 +18,7 @@ #ifdef DMLC_USE_RDMA +#include #include "rdma_utils.h" #include "rdma_transport.h" @@ -125,26 +126,39 @@ class RDMAVan : public Van { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); + int af = PF_INET; + int ret = -EINVAL; + struct addrinfo *res; + auto val = Environment::Get()->find("DMLC_NODE_HOST"); + std::string val_str = std::string(val); if (val) { - PS_VLOG(1) << "bind to DMLC_NODE_HOST: " << std::string(val); + PS_VLOG(1) << "bind to DMLC_NODE_HOST: " << val_str; + std::size_t n = std::count(val_str.begin(), val_str.end(), ':'); + if (n > 1) { + af = PF_INET6; + } addr.sin_addr.s_addr = inet_addr(val); } - addr.sin_family = AF_INET; + // addr.sin_family = AF_INET; + addr.sin_family = af; int port = node.port; + addr.sin_port = htons(port); + ret = getaddrinfo(val_str.c_str(), std::to_string(port).c_str(), NULL, &res); + CHECK(ret >= 0) << "could not getaddrinfo address " << val_str << " error code " << ret; unsigned seed = static_cast(time(NULL) + port); for (int i = 0; i < max_retry + 1; ++i) { - addr.sin_port = htons(port); - if (rdma_bind_addr(listener_, - reinterpret_cast(&addr)) == 0) { + // if (rdma_bind_addr(listener_, + // reinterpret_cast(&addr)) == 0) { + if (rdma_bind_addr(listener_, res->ai_addr) == 0) { break; } - if (i == max_retry) { - port = -1; - } else { + // if (i == max_retry) { + // port = -1; + // } else { port = 10000 + rand_r(&seed) % 40000; - } + // } } CHECK(rdma_listen(listener_, kRdmaListenBacklog) == 0) << "Listen RDMA connection failed: " << strerror(errno); @@ -207,11 +221,11 @@ class RDMAVan : public Van { CHECK_EQ(rc, 0) << "getaddrinfo failed: " << gai_strerror(rc); CHECK_EQ(rdma_resolve_addr(endpoint->cm_id, addr->ai_addr, - remote_addr->ai_addr, kTimeoutms), 0) + (struct sockaddr *)remote_addr->ai_addr, kTimeoutms), 0) << "Resolve RDMA address failed with errno: " << strerror(errno); } else { CHECK_EQ(rdma_resolve_addr(endpoint->cm_id, nullptr, - remote_addr->ai_addr, kTimeoutms), + (struct sockaddr *)remote_addr->ai_addr, kTimeoutms), 0) << "Resolve RDMA address failed with errno: " << strerror(errno); } From d9d592feefd7a5dca56affefa156ced11238ae30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E6=96=87=E5=A9=B7?= Date: Wed, 14 Oct 2020 14:55:54 +0800 Subject: [PATCH 2/4] support ipv6 for non-RDMA case --- src/rdma_van.h | 12 ++++++------ src/zmq_van.h | 28 +++++++++++++++++++++++----- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/rdma_van.h b/src/rdma_van.h index 2f388df5..4c8d958d 100755 --- a/src/rdma_van.h +++ b/src/rdma_van.h @@ -126,7 +126,7 @@ class RDMAVan : public Van { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); - int af = PF_INET; + int af = AF_INET; int ret = -EINVAL; struct addrinfo *res; @@ -136,7 +136,7 @@ class RDMAVan : public Van { PS_VLOG(1) << "bind to DMLC_NODE_HOST: " << val_str; std::size_t n = std::count(val_str.begin(), val_str.end(), ':'); if (n > 1) { - af = PF_INET6; + af = AF_INET6; } addr.sin_addr.s_addr = inet_addr(val); } @@ -154,11 +154,11 @@ class RDMAVan : public Van { if (rdma_bind_addr(listener_, res->ai_addr) == 0) { break; } - // if (i == max_retry) { - // port = -1; - // } else { + if (i == max_retry) { + port = -1; + } else { port = 10000 + rand_r(&seed) % 40000; - // } + } } CHECK(rdma_listen(listener_, kRdmaListenBacklog) == 0) << "Listen RDMA connection failed: " << strerror(errno); diff --git a/src/zmq_van.h b/src/zmq_van.h index f7b58e72..39c6da03 100644 --- a/src/zmq_van.h +++ b/src/zmq_van.h @@ -5,6 +5,7 @@ #define PS_ZMQ_VAN_H_ #include #include +#include #include #include #include @@ -98,15 +99,21 @@ class ZMQVan : public Van { int Bind(const Node& node, int max_retry) override { receiver_ = zmq_socket(context_, ZMQ_ROUTER); int option = 1; + std::string hostname = node.hostname.empty() ? "*" : node.hostname; + size_t n = std::count(hostname.begin(), hostname.end(), ':'); CHECK(!zmq_setsockopt(receiver_, ZMQ_ROUTER_MANDATORY, &option, sizeof(option))) << zmq_strerror(errno); CHECK(receiver_ != NULL) << "create receiver socket failed: " << zmq_strerror(errno); int local = GetEnv("DMLC_LOCAL", 0); - std::string hostname = node.hostname.empty() ? "*" : node.hostname; int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0); if (use_kubernetes > 0 && node.role == Node::SCHEDULER) { - hostname = "0.0.0.0"; + hostname = (n > 1) ? "::/0" : "0.0.0.0"; + } + if (n > 1) { + CHECK(!zmq_setsockopt(receiver_, ZMQ_IPV6, &option, sizeof(option))) + << zmq_strerror(errno); + PS_VLOG(1) << "bind with ipv6 socket with host " << hostname; } std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":"; int port = node.port; @@ -117,9 +124,9 @@ class ZMQVan : public Van { if (ret == 0) break; if (i == max_retry) { port = -1; - int zmq_err = zmq_errno(); - LOG(FATAL) << "Reached max retry for bind: " << zmq_strerror(zmq_err) - << ". errno = " << zmq_err; + int zmq_err = zmq_errno(); + LOG(FATAL) << "Reached max retry for bind: " << zmq_strerror(zmq_err) + << ". errno = " << zmq_err; } else { port = 10000 + rand_r(&seed) % 40000; } @@ -137,6 +144,7 @@ class ZMQVan : public Van { CHECK_NE(node.port, node.kEmpty); CHECK(node.hostname.size()); int id = node.id; + bool is_ipv6 = false; mu_.lock(); auto it = senders_.find(id); if (it != senders_.end()) { @@ -155,6 +163,16 @@ class ZMQVan : public Van { << zmq_strerror(errno) << ". it often can be solved by \"sudo ulimit -n 65536\"" << " or edit /etc/security/limits.conf"; + std::string hostname = node.hostname.empty() ? "*" : node.hostname; + size_t n = std::count(hostname.begin(), hostname.end(), ':'); + PS_VLOG(1) << "connect to host " << hostname; + if (n > 1) { + int option = 1; + is_ipv6 = true; + PS_VLOG(1) << "connect with ipv6 socket"; + CHECK(!zmq_setsockopt(sender, ZMQ_IPV6, &option, sizeof(option))) + << zmq_strerror(errno); + } if (my_node_.id != Node::kEmpty) { std::string my_id = "ps" + std::to_string(my_node_.id); zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size()); From 60139b5d0c5c6538e4aea39fa932701cea9884d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E6=96=87=E5=A9=B7?= Date: Wed, 14 Oct 2020 15:03:18 +0800 Subject: [PATCH 3/4] change AF_INET to PF_INET --- src/rdma_van.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdma_van.h b/src/rdma_van.h index 4c8d958d..0cba5be3 100755 --- a/src/rdma_van.h +++ b/src/rdma_van.h @@ -126,7 +126,7 @@ class RDMAVan : public Van { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); - int af = AF_INET; + int af = PF_INET; int ret = -EINVAL; struct addrinfo *res; @@ -136,7 +136,7 @@ class RDMAVan : public Van { PS_VLOG(1) << "bind to DMLC_NODE_HOST: " << val_str; std::size_t n = std::count(val_str.begin(), val_str.end(), ':'); if (n > 1) { - af = AF_INET6; + af = PF_INET6; } addr.sin_addr.s_addr = inet_addr(val); } From 739a193874ac382efe3838a796b360c8056d43aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E6=96=87=E5=A9=B7?= Date: Mon, 19 Oct 2020 14:01:48 +0800 Subject: [PATCH 4/4] address comment --- src/zmq_van.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zmq_van.h b/src/zmq_van.h index 39c6da03..dc6b42b5 100644 --- a/src/zmq_van.h +++ b/src/zmq_van.h @@ -113,7 +113,7 @@ class ZMQVan : public Van { if (n > 1) { CHECK(!zmq_setsockopt(receiver_, ZMQ_IPV6, &option, sizeof(option))) << zmq_strerror(errno); - PS_VLOG(1) << "bind with ipv6 socket with host " << hostname; + PS_VLOG(1) << "bind IPv6 socket to host " << hostname; } std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":"; int port = node.port;