Skip to content

Commit

Permalink
Merge branch 'redis:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
J0sueTM authored Aug 14, 2024
2 parents 288617c + cd9a1ab commit 3aa572a
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 6 deletions.
65 changes: 65 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
Expand Down Expand Up @@ -37,6 +38,8 @@ public class Connection implements Closeable {
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;
private boolean strValActive;
private String strVal;

public Connection() {
this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
Expand Down Expand Up @@ -72,6 +75,63 @@ public String toString() {
return "Connection{" + socketFactory + "}";
}

public String toIdentityString() {
if (strValActive == broken && strVal != null) {
return strVal;
}

int id = hashCode();
String classInfo = getClass().toString();

if (socket == null) {
StringBuilder buf = new StringBuilder(56)
.append("[")
.append(classInfo)
.append(", id: 0x")
.append(id)
.append(']');
return buf.toString();
}

SocketAddress remoteAddr = socket.getRemoteSocketAddress();
SocketAddress localAddr = socket.getLocalSocketAddress();
if (remoteAddr != null) {
StringBuilder buf = new StringBuilder(101)
.append("[")
.append(classInfo)
.append(", id: 0x")
.append(id)
.append(", L:")
.append(localAddr)
.append(broken? " ! " : " - ")
.append("R:")
.append(remoteAddr)
.append(']');
strVal = buf.toString();
} else if (localAddr != null) {
StringBuilder buf = new StringBuilder(64)
.append("[")
.append(classInfo)
.append(", id: 0x")
.append(id)
.append(", L:")
.append(localAddr)
.append(']');
strVal = buf.toString();
} else {
StringBuilder buf = new StringBuilder(56)
.append("[")
.append(classInfo)
.append(", id: 0x")
.append(id)
.append(']');
strVal = buf.toString();
}

strValActive = broken;
return strVal;
}

public final RedisProtocol getRedisProtocol() {
return protocol;
}
Expand Down Expand Up @@ -435,6 +495,11 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

// set readonly flag to ALL connections (including master nodes) when enable read from replica
if (config.isReadOnlyForReplica()) {
fireAndForgetMsg.add(new CommandArguments(Command.READONLY));
}

for (CommandArguments arg : fireAndForgetMsg) {
sendCommand(arg);
}
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final ClientSetInfoConfig clientSetInfoConfig;

private final boolean readOnlyForReplica;

private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis,
int blockingSocketTimeoutMillis, Supplier<RedisCredentials> credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
ClientSetInfoConfig clientSetInfoConfig) {
ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForReplica) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
Expand All @@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
this.readOnlyForReplica = readOnlyForReplica;
}

@Override
Expand Down Expand Up @@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}

@Override
public boolean isReadOnlyForReplica() {
return readOnlyForReplica;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -149,6 +157,8 @@ public static class Builder {

private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;

private boolean readOnlyForReplicas = false;

private Builder() {
}

Expand All @@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() {

return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig,
readOnlyForReplicas);
}

/**
Expand Down Expand Up @@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}

public Builder readOnlyForReplicas() {
this.readOnlyForReplicas = true;
return this;
}
}

public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
Expand All @@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null,
false);
}

public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
Expand All @@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
copy.getClientSetInfoConfig());
copy.getClientSetInfoConfig(), copy.isReadOnlyForReplica());
}
}
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/Jedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@ public String shutdownAbort() {
* All the fields are in the form field:value
*
* <pre>
* edis_version:0.07
* redis_version:0.07
* connected_clients:1
* connected_slaves:0
* used_memory:3187
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ default HostAndPortMapper getHostAndPortMapper() {
return null;
}

default boolean isReadOnlyForReplica() {
return false;
}

/**
* Modify the behavior of internally executing CLIENT SETINFO command.
* @return CLIENT SETINFO config
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -266,4 +267,11 @@ public ClusterPipeline pipelined() {
public AbstractTransaction transaction(boolean doMulti) {
throw new UnsupportedOperationException();
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
if (!(executor instanceof ClusterCommandExecutor)) {
throw new UnsupportedOperationException("Support only execute to replica in ClusterCommandExecutor");
}
return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject);
}
}
34 changes: 34 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class JedisClusterInfoCache {
private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
private final List<ConnectionPool>[] replicaSlots;

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
Expand Down Expand Up @@ -85,6 +86,11 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
if (clientConfig.isReadOnlyForReplica()) {
replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
} else {
replicaSlots = null;
}
}

/**
Expand Down Expand Up @@ -144,6 +150,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -236,6 +244,8 @@ private void discoverClusterSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -307,6 +317,21 @@ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode)
}
}

public void assignSlotsToReplicaNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
ConnectionPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
if (replicaSlots[slot] == null) {
replicaSlots[slot] = new ArrayList<>();
}
replicaSlots[slot].add(targetPool);
}
} finally {
w.unlock();
}
}

public ConnectionPool getNode(String nodeKey) {
r.lock();
try {
Expand Down Expand Up @@ -338,6 +363,15 @@ public HostAndPort getSlotNode(int slot) {
}
}

public List<ConnectionPool> getSlotReplicaPools(int slot) {
r.lock();
try {
return replicaSlots[slot];
} finally {
r.unlock();
}
}

public Map<String, ConnectionPool> getNodes() {
r.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public final <T> T broadcastCommand(CommandObject<T> commandObject) {

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, false);
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, true);
}

private <T> T doExecuteCommand(CommandObject<T> commandObject, boolean toReplica) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
Expand All @@ -88,7 +96,8 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments())
: provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.ClusterCommandArguments;
Expand Down Expand Up @@ -102,6 +104,11 @@ public Connection getConnection(CommandArguments args) {
return slot >= 0 ? getConnectionFromSlot(slot) : getConnection();
}

public Connection getReplicaConnection(CommandArguments args) {
final int slot = ((ClusterCommandArguments) args).getCommandHashSlot();
return slot >= 0 ? getReplicaConnectionFromSlot(slot) : getConnection();
}

@Override
public Connection getConnection() {
// In antirez's redis-rb-cluster implementation, getRandomConnection always
Expand Down Expand Up @@ -158,6 +165,25 @@ public Connection getConnectionFromSlot(int slot) {
}
}

public Connection getReplicaConnectionFromSlot(int slot) {
List<ConnectionPool> connectionPools = cache.getSlotReplicaPools(slot);
ThreadLocalRandom random = ThreadLocalRandom.current();
if (connectionPools != null && !connectionPools.isEmpty()) {
// pick up randomly a connection
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

renewSlotCache();
connectionPools = cache.getSlotReplicaPools(slot);
if (connectionPools != null && !connectionPools.isEmpty()) {
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

return getConnectionFromSlot(slot);
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/redis/clients/jedis/ConnectionTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis.clients.jedis;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import redis.clients.jedis.exceptions.JedisConnectionException;
Expand Down Expand Up @@ -40,4 +41,14 @@ public void checkCloseable() {
client.connect();
client.close();
}

@Test
public void checkIdentityString() {
client = new Connection("localhost", 6379);
Assert.assertFalse(client.toIdentityString().contains("6379"));
client.connect();
Assert.assertTrue(client.toIdentityString().contains("6379"));
client.close();
Assert.assertTrue(client.toIdentityString().contains("6379"));
}
}
Loading

0 comments on commit 3aa572a

Please sign in to comment.