Skip to content

Commit

Permalink
ipc: patch RDMAVan IPC transport with BYTEPS_ENCODING_SCHEME_VERSION (d…
Browse files Browse the repository at this point in the history
…mlc#102)

* ipc: patch RDMAVan IPC transport with BYTEPS_ENCODING_SCHEME_VERSION

* fix lint
  • Loading branch information
eric-haibin-lin authored Oct 26, 2021
1 parent dafebf6 commit c10714d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
27 changes: 24 additions & 3 deletions src/rdma_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ class IPCTransport : public RDMATransport {
val = Environment::Get()->find("BYTEPS_PARTITION_BYTES");
byteps_partition_bytes_ = val ? atoi(val) : 4096000;

val = Environment::Get()->find("BYTEPS_ENCODING_SCHEME_VERSION");
encoding_scheme_version_ = val ? atoi(val) : 0;

val = Environment::Get()->find("BYTEPS_LOCAL_SIZE");
auto byteps_local_size = val ? atoi(val) : 8;
byteps_partition_bytes_ = RoundUp(
Expand Down Expand Up @@ -637,6 +640,24 @@ class IPCTransport : public RDMATransport {
std::lock_guard<std::mutex> lock(shm_mu_);
auto worker_key = DecodeWorkerKey(key);
auto seq_num = worker_key % (1 << 16);
// Total key space is [0, 2^64 - 1]
// It will be divided to N PS servers, for now we assume N <= 2^16
// Then we have 2^48 key space left.
// Encoding scheme version 0:
// Then we have 2^48 key space left (top 16 bits for different servers)
// MXNet server has a bug dealing with keys larger than 2^32
// Below we support up to 2^16 tensors, and up to 2^16 partitions per
// tensor
// Encoding scheme version 1:
// Top 16 bits out of the 48 bits encodes the sender rank
// Mid 16 bits out of the 48 bits encodes the tensor id
// The next 6 bits encodes request types (pushpull, send, etc)
// The last 10 bits encodes the partition id
// Therefore, we support up to 2^16 tensors, and up to 2^10 partitions per
// tensor
if (encoding_scheme_version_ == 1) {
seq_num = worker_key % (1 << 10);
}
auto base_key = worker_key - seq_num;
uint64_t offset = byteps_partition_bytes_ * seq_num;
if (key_shm_addr_.find(base_key) != key_shm_addr_.end()) {
Expand All @@ -657,8 +678,8 @@ class IPCTransport : public RDMATransport {
CHECK_NE(base_ptr, (void *)-1) << strerror(errno);
key_shm_addr_[base_key] = base_ptr;

PS_VLOG(1) << "open Shared Memory: " << shm_name << ", offset=" << offset
<< ", (in bytes) size=" << total_shm_size;
PS_VLOG(1) << "open Shared Memory: " << shm_name << " offset=" << offset
<< " (in bytes) size=" << total_shm_size;
return (void *)((char *)key_shm_addr_[base_key] + offset);
}

Expand All @@ -675,7 +696,7 @@ class IPCTransport : public RDMATransport {
std::unordered_map<uint64_t, void *> key_shm_addr_;

bool enable_async_copy_;

int encoding_scheme_version_ = 0;
}; // class IPCTransport

}; // namespace ps
Expand Down
8 changes: 6 additions & 2 deletions src/rdma_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ class RDMAVan : public Van {

auto val = Environment::Get()->find("BYTEPS_ENABLE_IPC");
disable_ipc_ = val ? !atoi(val) : true;
if (disable_ipc_) LOG(INFO) << "Shared memory IPC has been disabled";

if (disable_ipc_) {
LOG(INFO) << "Shared memory IPC has been disabled";
} else {
std::string role = Environment::Get()->find("DMLC_ROLE");
CHECK(role != "joint") << "RDMAVan in joint mode does not support IPC";
}
if (event_channel_ == nullptr) {
event_channel_ = rdma_create_event_channel();
CHECK(event_channel_) << "Create RDMA event channel failed";
Expand Down

0 comments on commit c10714d

Please sign in to comment.