The standard set of yrmcds consists of a master server and two slave servers. A set of yrmcds will be prepared for each server farm.
To elect the master server automatically, keepalived or similar virtual IP address allocation tool is used. The server owning the virtual IP address becomes the master server. The others are slaves.
Slaves connect to the master via the virtual IP address. The master will not connect to slaves. A new slave will receive all data from the master server once the connection has been established.
For some reason, if the master suddenly notices it is no longer the master (i.e. the server has lost the virtual IP), it kills itself to reborn as a new slave (restarted by upstart).
If a slave gets disconnected from the master, it waits for some seconds to see if the virtual IP address is assigned. If assigned, the slave promotes to the new master. If not, the slave forgets all replicated data then try to connect to the new master. The new master should close the established connection to the old master, if any.
The replication protocol is the same as the binary protocol of memcached. Specifically, slaves receive only "SetQ" and "DeleteQ" requests.
This allows any memcached compatible programs can become yrmcds slaves with slight modifications.
The master will have tens of thousands of client connections from every PHP-fpm processes in a server farm. A thread per connection model is therefore not very efficient.
This encourages us to adopt asynchronous sockets and the reactor pattern.
So we adopt the reactor pattern with a thread-pool for the best performance.
The reactor is implemented with asynchronous sockets and epoll in
edge-triggered mode. Basically, with edge-triggered mode, a socket need
to be read until recv returns an EWOULDBLOCK
error. If, for some
reason like fairness, not all available data are received from a socket,
the reactor need to remember that socket to manually handle it later.
To remember such sockets, the reactor manages a list of readable socket internally.
When the reactor detects some data are available for a socket, it dispatches the socket to an idle worker thread. The worker then receives and processes the data. While a worker is handling a socket, that socket must not be passed to another worker to keep the protocol semantics.
To implement these, following properties are required:
- Every worker thread has an idle flag.
- Every socket has a busy (in-use) flag.
Both flags are set by the reactor thread, and cleared by a worker thread.
If the reactor finds a socket is readable but the socket is busy or there are no idle workers, the socket is remembered in the previously stated readable socket list. This keeps the reactor thread from being blocked.
Readable lists used in the reactor can be implemented with very rare memory allocations by preparing two lists that pre-allocate a certain amount of memory. Swap the list with another for each epoll loop.
To notify an idle worker a new job (or to wait a new job), we use Linux-specific eventfd rather than pipes or pthread's condition variables. eventfd is lighter than pipe. Unlike condition variables, eventfd remembers events so that workers never fail to catch events.
The biggest and the most important data structure in yrmcds is clearly the hash map of objects. To reduce contention between worker threads, the hash does not use a single lock to protect the data. Instead, each bucket has a lock to protect objects in the bucket and the bucket itself.
One drawback is that resizing the hash is almost impossible. In the real implementation, yrmcds statically allocates a large number of buckets.
yrmcds is essentially an object cache system. Objects whose life-time have been expired need to be removed from the cache. If the cache is full of objects, least-recently-used (LRU) objects should be removed.
For such housekeeping, a dedicated thread called GC thread is used. It scans the whole hash to detect and remove expired objects.
To implement LRU cache, objects have a counter that increments at every GC. The counter is reset to zero when a worker accesses the object. When the cache is full, GC thread removes objects whose LRU counter value is high.
A GC thread is started by the reactor thread only when there is no running GC thread. The reactor thread passes the current list of slaves to the GC thread in order to replicate object removals. If a slave is added while a GC thread is running, that slave may fail to remove some objects, which is not a big problem.
When a slave connect to the master, it need to receive all data in the master. yrmcds reuses the GC thread for this initial replication because the GC thread scans the entire hash.
While GC is running, the reactor and worker threads also keep running. The GC thread and worker threads therefore need to be synchronized when sending data of the same object. This can be achieved by keeping the lock of a hash bucket acquired while an object is being sent.
Sockets connected to slaves are used to send a lot of replication data. Further, worker threads serving client requests need to access replication sockets to send updates to slaves.
Another difference from client sockets is the possible number of sockets. The number of replication sockets can be limited to a fairly small value, say 5.
Worker threads need to have references to all replication sockets. This can be done without any locks; the reactor passes a copy of the references along with the socket and received data. However, this can introduce a race between workers and the GC thread.
If a new GC thread starts the initial replication process for a new slave while some worker threads having old copies of replication sockets are executing its job, the workers would fail to send some objects to the new slave.
To resolve this race, we need to guarantee that all worker threads are idle or have the latest copies of replication sockets. This can be implemented as:
- Accept a new replication socket and add it to the list of
replication sockets.
At this point, the GC thread must not begin the initial replication for this new slave. - The reactor thread puts a synchronization request.
The request will be satisfied once the reactor thread observes every worker thread gets idle. - The reactor thread adds the new replication socket to the confirmed list.
- At the next GC, the reactor thread requests initial replication for sockets stored in the confirmed list.
As yrmcds need to handle a lot of clients, having every socket has a large send buffer is not a good idea. Sockets connected to clients therefore can have temporarily allocated memory for pending data only.
On the other hand, sockets connected to slaves are few, hence they can statically allocate large memory for pending send data.
Anyways, a sending thread need to wait for the reactor to send pending data and clear the buffer when there is not enough room.
To achieve best TCP performance, we use TCP_CORK
option although
it is Linux-specific.
With TCP_CORK
turned on, the TCP send buffer need to be flushed explicitly.
The buffer should be flushed only when all data are successfully sent without
seeing EWOULDBLOCK
.
To put a complete response data with one call, the socket object should provide an API similar to writev.
Sockets connected to clients may be shared by the reactor thread and a worker thread. Sockets connected to slaves may be shared by the reactor thread, worker threads, and the initial replication thread.
This is because the reactor thread manages a mapping between file descriptors and resource objects including sockets. If another thread closes a file descriptor of a resource, the same file descriptor number may be reused by the operating system, which would break the mapping.
- Sockets can be invalidated by any thread.
Invalidated sockets refuse further access to them. - Inform the reactor thread of invalidated sockets.
- The reactor thread
- removes invalidated sockets from the internal mapping,
- closes the file descriptor, then
- adds them a list of pending destruction resources.
At some point when there is no running GC thread, the reactor thread can put a new synchronization request. To optimize memory allocations, the reactor thread will not put such a request if there are any pending requests. This way, the reactor can save the current pending destruction list by swapping contents with a pre-allocated save list.
Once the reactor observes idle state of all worker threads, resources in the save list can be destructed safely.
To avoid excessive contention between the reactor and worker threads, a blocking job queue is not used between them. Instead, the reactor thread loops over atomic flags for each worker thread to identify if a worker is idle or busy.
For the same reason, we do not use barrier synchronization between the reactor and worker threads. Instead, the reactor thread checks every worker thread is idle or becomes idle when there are any synchronization request. Once it confirms all worker threads gets idle at least once after a synchronization request, it starts a job associated with the synchronization request.
Slaves do accept connection requests from memcache clients, but they close the connections immediately. This way, slaves can essentially be single-threaded. As long as being a slave, the internal hash need not be guarded by locks. For this reason, the internal hash should provide lock-free version of methods.
All threads are started by the reactor thread. The reactor thread is therefore responsible for those threads to be joined.
All threads except for the reactor thread may block on either a condition variable of a socket or the eventfd file descriptor. For sockets, the reactor thread signals the condition variable to unblock threads when it closes the socket. For eventfd descriptors, the reactor thread simply writes to it.
When the program exits, the reactor thread executes the following termination process:
- Sets termination flags of all worker threads.
- Writes to eventfd descriptors to unblock worker threads.
- invalidates all resources to unblock worker threads.
- joins with the GC thread, if any, then
- destructs resources.
Too large data should be stored in a temporary file. The temporary file shall soon be unlinked for automatic removal upon the program exit.
In the master,
- the reactor thread,
- worker threads to process client requests, and
- a GC thread.
Slaves run the reactor (main) thread only.
In the master,
- a listening socket for clients,
- a listening socket for slaves,
- sockets connected to clients, and
- sockets connected to slaves.
In a slave,
- a listening socket for clients, and
- a socket connected to the master.
- Connected sockets.
- The object hash map.
- Close request queue in the reactor.
- Other less significant resources such as statistics counters.
- The reactor thread
- acquires a lock of a socket to send data.
- acquires a spinlock of socket close request queue.
- A worker thread
- acquires a lock of a hash bucket.
- acquires a lock of a socket.
- acquires a lock of a socket to send data independent of cached objects.
- acquires a spinlock of the reactor to put socket close request.
- acquires a lock of a hash bucket.
- A GC thread
- acquires a lock of a hash bucket.
- acquires locks of sockets connected to slaves.
- acquires a spinlock of the reactor to put socket close request.
- acquires a lock of a hash bucket.