前言

Jedis客户端提供的API比较全面,方法名与redis的命令名非常相似,了解redis的命令也就能熟练使用Jedis客户端。由于提供的功能偏低层,springboot不支持依赖自动装配,且存取对象需要自己手动组装或转化,因此很少有直接使用jedis。

1.依赖

1
2
3
4
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

2.单机版配置

1.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# IP地址
spring.jedis.host=127.0.0.1

# 端口
spring.jedis.port=6379

# 连接密码
spring.jedis.password=123456

# 连接超时时间
spring.jedis.timeout=2000

# 连接池最大连接数(使用负值表示没有限制)
spring.jedis.pool.max-total=200

# 连接池最大空闲连接
spring.jedis.pool.max-idle=10

# 连接池最小空闲连接
spring.jedis.max-idle=5

# 获取连接最大等待时间
spring.jedis.max-wait-millis=10000

1.2 Java配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class JedisConfig{

@Value("${spring.jedis.host}")
private String host;

@Value("${spring.jedis.port}")
private int port;

@Value("${spring.jedis.password}")
private String password;

@Value("${spring.jedis.timeout}")
private int timeout;

@Value("${spring.jedis.pool.max-total}")
private int maxTotal;

@Value("${spring.jedis.pool.max-idle}")
private int maxIdle;

@Value("${spring.jedis.pool.min-idle}")
private int minIdle;

@Value("${spring.jedis.max-wait-millis}")
private long maxWaitMillis;

@Bean
public JedisPool jedisPool(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
return new JedisPool(jedisPoolConfig,host,port,timeout,password);
}
}

1.3 读写分离

单机、主从运行模式都是连接固定服务器地址,因此实现读写分只需要创建两个JedisPool即可。

2.哨兵版配置

2.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 哨兵节点
spring.jedis.sentinel.nodes=127.0.0.1:26379,127.0.0.2:26379,127.0.0.3:26379

# 哨兵主节点名称,用于指定连接的某个主从组合
spring.jedis.sentinel.master.name=abc

# 连接密码
spring.jedis.password=123456

# 连接超时时间
spring.jedis.timeout=2000

# 连接池最大连接数(使用负值表示没有限制)
spring.jedis.pool.max-total=200

# 连接池最大空闲连接
spring.jedis.pool.max-idle=10

# 连接池最小空闲连接
spring.jedis.max-idle=5

# 获取连接最大等待时间
spring.jedis.max-wait-millis=10000

2.1 Java配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
public class JedisConfig{

@Value("#{'${spring.jedis.sentinel.nodes}'.split(',')}")
private List<String> nodeList;

@Value("${spring.jedis.sentinel.master.name}")
private String masterName;

@Value("${spring.jedis.password}")
private String password;

@Value("${spring.jedis.timeout}")
private int timeout;

@Value("${spring.jedis.pool.max-total}")
private int maxTotal;

@Value("${spring.jedis.pool.max-idle}")
private int maxIdle;

@Value("${spring.jedis.pool.min-idle}")
private int minIdle;

@Value("${spring.jedis.max-wait-millis}")
private long maxWaitMillis;

@Bean
public JedisSentinelPool jedisPoolConfig(){

// 连接池参数
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 哨兵节点
Set<String> sentinels = new HashSet<>(nodeList);

// 哨兵连接池
return new JedisSentinelPool(masterName, sentinels, jedisPoolConfig, password);
}
}

2.3 感知节点变化

哨兵模式的配置中,JedisSentinelPool对象的构造器会为每个masterName下的每个sentinel节点开启一个线程,通过pub/sub模式订阅master节点的变化,接收到变化信息后重置jedis连接池信息。

监听器为JedisSentinelPool的内部类MasterListener,线程的run()源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void run() {

// 1.设置线程正常运行
this.running.set(true);

// 2.正常运行情况下,进入循环
while(this.running.get()) {

// 3.通过哨兵IP、端口与服务器建立连接
this.j = new Jedis(this.host, this.port);

try {

// 4.再次判断是否满足运行条件
if (!this.running.get()) {
break;
}

// 5.订阅master节点变化频道
this.j.subscribe(new JedisPubSub() {

// 订阅到消息后的处理逻辑
public void onMessage(String channel, String message) {

// 打印日志
JedisSentinelPool.this.log.fine("Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " published: " + message + ".");

// 5.1 解析订阅返回的消息字符串,格式:
String[] switchMasterMsg = message.split(" ");

// 5.2 分割后的数据长度小于3代表正常格式,大于等于3代表无效返回结果
if (switchMasterMsg.length > 3) {

// 5.3 再次判断masterName是否为自己监听的,估计是怕乱传数据
if (MasterListener.this.masterName.equals(switchMasterMsg[0])) {

// 5.4 提取新master节点的IP、端口,作为参数调用initPool()方法重置连接
JedisSentinelPool.this.initPool(JedisSentinelPool.this.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
JedisSentinelPool.this.log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + MasterListener.this.masterName);
}
} else {
JedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel +switch-master: " + message);
}
}

// "+switch-master",就是订阅的channel名称
}, new String[]{"+switch-master"});
} catch (JedisConnectionException var8) {

// 6.如果发生连接异常,且监听逻辑仍然允许运行
if (this.running.get()) {

// 打印日志
JedisSentinelPool.this.log.log(Level.SEVERE, "Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying.", var8);

// 7.暂停固定时间后,进入下次while循环继续监听
try {
Thread.sleep(this.subscribeRetryWaitTimeMillis);
} catch (InterruptedException var7) {
JedisSentinelPool.this.log.log(Level.SEVERE, "Sleep interrupted: ", var7);
}
} else {
JedisSentinelPool.this.log.fine("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
}
} finally {
// 8.关闭连接
this.j.close();
}
}
}

再来看看initPool()方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void initPool(HostAndPort master) {

// 只有订阅返回的master信息,与当前的不一致,才会进行重置
if (!master.equals(this.currentHostMaster)) {

// 重新赋值当前master节点指向实例
this.currentHostMaster = master;

// 重置Jedis工厂信息
if (this.factory == null) {
this.factory = new JedisFactory(master.getHost(), master.getPort(), this.connectionTimeout, this.soTimeout, this.password, this.database, this.clientName, false, (SSLSocketFactory)null, (SSLParameters)null, (HostnameVerifier)null);
this.initPool(this.poolConfig, this.factory);
} else {
this.factory.setHostAndPort(this.currentHostMaster);
this.internalPool.clear();
}

// 打印变化日志
this.log.info("Created JedisPool to master at " + master);
}

}

2.4 读写分离

之前在写Redis运行模式的时候有讲过,Redis服务端不支持读写分离功能,需要客户端自己去实现。然而Jedis客户端也不会自动对请求进行读写分离,具体情况还得看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 获取一个Jedis连接对象
public Jedis getResource() {
while(true) {

// 从池子里获取一个Jedis连接(连接池全都是master连接)
Jedis jedis = (Jedis)super.getResource();
jedis.setDataSource(this);

// 获取当前master信息
HostAndPort master = this.currentHostMaster;

// 如果连接池取出的Jedis连接信息,与当前master的连接一致时,才返回
HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient().getPort());
if (master.equals(connection)) {
return jedis;
}

// 到这里说明不一致,肯定是master挂掉了,选举出了新master,连接池的数据还没来得及更新,
// 特殊处理后进入下次循环,确保返回的一定是master的Jedis连接
this.returnBrokenResource(jedis);
}
}

源码可以看出,实现读写分离只能手动配置。实现方案也很简单,Jedis提供了sentinelSlaves()方法,可以通过哨兵节点获取到所有Slave节点信息,然后为每个Slave节点创建JedisPool对象,多个Slave就自己写代码实现负载均衡,另外照抄一份MasterListener类的订阅代码,及时更新从节点信息即可。

由于Jedis客户端很少直接使用,再加上Redis读写分离的应用不是很普遍,这里就不自己写代码实现了。

3.集群版配置

3.1 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 集群节点
spring.jedis.nodes=127.0.0.1:6379,127.0.0.2:6379,127.0.0.3:6379

# 连接密码
spring.jedis.password=123456

# 连接超时时间
spring.jedis.timeout=2000

# 连接池最大连接数(使用负值表示没有限制)
spring.jedis.pool.max-total=200

# 连接池最大空闲连接
spring.jedis.pool.max-idle=10

# 连接池最小空闲连接
spring.jedis.max-idle=5

# 获取连接最大等待时间
spring.jedis.max-wait-millis=10000

3.2 Java配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Configuration
public class JedisConfig{

@Value("${spring.jedis.nodes}")
private String nodes;

@Value("${spring.jedis.password}")
private String password;

@Value("${spring.jedis.timeout}")
private int timeout;

@Value("${spring.jedis.pool.max-total}")
private int maxTotal;

@Value("${spring.jedis.pool.max-idle}")
private int maxIdle;

@Value("${spring.jedis.pool.min-idle}")
private int minIdle;

@Value("${spring.jedis.max-wait-millis}")
private long maxWaitMillis;

@Bean
public JedisCluster jedisCluster(){

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

String[] nodesArray = nodes.split(",");
Set<HostAndPort> nodeSet = new HashSet<>();
for(int i = 0; i < nodesArray.length; i++){
String[] nodeInfo = nodesArray[i].split(":");
String host = nodeInfo[0];
Integer port = Integer.valueOf(nodeInfo[1]);
nodeSet.add(new HostAndPort(host, port));
}

return new JedisPool(nodeSet,timeout,jedisPoolConfig);
}
}

3.3 读写分离

集群模式的读写分离,除了要考虑主节点的变化外,还需要考虑到卡槽的路由,总之实现起来较为复杂,以后有空再写吧。

4.线程安全

使用Jedis对象直连服务端,多线程调用情况下会造成线程安全问题。Jedis发送请求命令是通过Connection的connect()方法实现,具体源码:
图片

4.1 传输流安全

源码中可以看出方法内的outputStream、inputStream均为Jedis对象的成员变量,并且俩成员变量的赋值没有任何加锁措施,这意味着多个线程同时调用一个Jedis对象发送命令时,输入输出流的值会被不停覆盖引起数据错乱,也就是出现了线程安全问题。

4.2 网络连接安全

在较低版本的Jedis依赖中,Socket的创建并没有单独抽成一个方法,而是写在connect()方法内部,具体源码:
图片

源码中可以看出Socket对象是先创建并赋值给socket成员变量,然后再建立长连接,最后处理连接的输入输出流。假设线程A创建完Socket对象并成功建立长连接,但是还没有调用getXXStream()方法,此时线程B也执行connect方法创建Socket对象并重新赋值socket,由于线程B还没来得及建立长连接,线程A就开始调用getXXStream()方法,因此会抛出java.net.SocketException: Socket is not connected异常。

4.3 解决方案

解决线程安全的方案是通过JedisPool连接池去管理Jedis实例,线程向服务器发送redis命令时,先通过连接池获取一个Jedis实例,使用完毕后归还连接池,这样就保证了一个Jedis实例同一时刻只会被一个线程调用。

评论

Powered By Valine
v1.4.14