From c10714d2b155c4b24ad463912b6f384cabf2434d Mon Sep 17 00:00:00 2001 From: Haibin Lin Date: Tue, 26 Oct 2021 10:58:54 -0700 Subject: [PATCH] ipc: patch RDMAVan IPC transport with BYTEPS_ENCODING_SCHEME_VERSION (#102) * ipc: patch RDMAVan IPC transport with BYTEPS_ENCODING_SCHEME_VERSION * fix lint --- src/rdma_transport.h | 27 ++++++++++++++++++++++++--- src/rdma_van.h | 8 ++++++-- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/rdma_transport.h b/src/rdma_transport.h index 1b8ca13d..477e1814 100644 --- a/src/rdma_transport.h +++ b/src/rdma_transport.h @@ -521,6 +521,9 @@ class IPCTransport : public RDMATransport { val = Environment::Get()->find("BYTEPS_PARTITION_BYTES"); byteps_partition_bytes_ = val ? atoi(val) : 4096000; + val = Environment::Get()->find("BYTEPS_ENCODING_SCHEME_VERSION"); + encoding_scheme_version_ = val ? atoi(val) : 0; + val = Environment::Get()->find("BYTEPS_LOCAL_SIZE"); auto byteps_local_size = val ? atoi(val) : 8; byteps_partition_bytes_ = RoundUp( @@ -637,6 +640,24 @@ class IPCTransport : public RDMATransport { std::lock_guard lock(shm_mu_); auto worker_key = DecodeWorkerKey(key); auto seq_num = worker_key % (1 << 16); + // Total key space is [0, 2^64 - 1] + // It will be divided to N PS servers, for now we assume N <= 2^16 + // Then we have 2^48 key space left. + // Encoding scheme version 0: + // Then we have 2^48 key space left (top 16 bits for different servers) + // MXNet server has a bug dealing with keys larger than 2^32 + // Below we support up to 2^16 tensors, and up to 2^16 partitions per + // tensor + // Encoding scheme version 1: + // Top 16 bits out of the 48 bits encodes the sender rank + // Mid 16 bits out of the 48 bits encodes the tensor id + // The next 6 bits encodes request types (pushpull, send, etc) + // The last 10 bits encodes the partition id + // Therefore, we support up to 2^16 tensors, and up to 2^10 partitions per + // tensor + if (encoding_scheme_version_ == 1) { + seq_num = worker_key % (1 << 10); + } auto base_key = worker_key - seq_num; uint64_t offset = byteps_partition_bytes_ * seq_num; if (key_shm_addr_.find(base_key) != key_shm_addr_.end()) { @@ -657,8 +678,8 @@ class IPCTransport : public RDMATransport { CHECK_NE(base_ptr, (void *)-1) << strerror(errno); key_shm_addr_[base_key] = base_ptr; - PS_VLOG(1) << "open Shared Memory: " << shm_name << ", offset=" << offset - << ", (in bytes) size=" << total_shm_size; + PS_VLOG(1) << "open Shared Memory: " << shm_name << " offset=" << offset + << " (in bytes) size=" << total_shm_size; return (void *)((char *)key_shm_addr_[base_key] + offset); } @@ -675,7 +696,7 @@ class IPCTransport : public RDMATransport { std::unordered_map key_shm_addr_; bool enable_async_copy_; - + int encoding_scheme_version_ = 0; }; // class IPCTransport }; // namespace ps diff --git a/src/rdma_van.h b/src/rdma_van.h index 3c066e95..05465c3d 100644 --- a/src/rdma_van.h +++ b/src/rdma_van.h @@ -41,8 +41,12 @@ class RDMAVan : public Van { auto val = Environment::Get()->find("BYTEPS_ENABLE_IPC"); disable_ipc_ = val ? !atoi(val) : true; - if (disable_ipc_) LOG(INFO) << "Shared memory IPC has been disabled"; - + if (disable_ipc_) { + LOG(INFO) << "Shared memory IPC has been disabled"; + } else { + std::string role = Environment::Get()->find("DMLC_ROLE"); + CHECK(role != "joint") << "RDMAVan in joint mode does not support IPC"; + } if (event_channel_ == nullptr) { event_channel_ = rdma_create_event_channel(); CHECK(event_channel_) << "Create RDMA event channel failed";