Jedis 连接池

连接池是创建和管理一个连接的缓冲池的技术,这些连接准备好被任何需要它们的线程使用,使用连接池可以减少每次创建、消费连接消耗的时间,提高程序性能。

在上一章节介绍了两种方式通过 Jedis 去操作 Redis,可见 Jedis 的基本使用非常简单,只需要创建 Jedis 对象的时候指定 host,port, password 即可。当然,Jedis 对象又提供了很多构造方法,都大同小异,只是对应和 Redis 连接的 socket 的参数不一样而已。构造方法声明如下:

Jedis()
Jedis(Connection connection)
Jedis(HostAndPort hp)
Jedis(HostAndPort hostPort, JedisClientConfig config)
Jedis(JedisSocketFactory jedisSocketFactory)
Jedis(JedisSocketFactory jedisSocketFactory, JedisClientConfig clientConfig)
Jedis(String url)
Jedis(String host, int port)
Jedis(String host, int port, boolean ssl)
Jedis(String host, int port, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(String host, int port, int timeout)
Jedis(String host, int port, int timeout, boolean ssl)
Jedis(String host, int port, int timeout, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(String host, int port, int connectionTimeout, int soTimeout)
Jedis(String host, int port, int connectionTimeout, int soTimeout, boolean ssl)
Jedis(String host, int port, int connectionTimeout, int soTimeout, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(String host, int port, int connectionTimeout, int soTimeout, int infiniteSoTimeout)
Jedis(String host, int port, int connectionTimeout, int soTimeout, int infiniteSoTimeout, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(String host, int port, JedisClientConfig config)
Jedis(URI uri)
Jedis(URI uri, int timeout)
Jedis(URI uri, int connectionTimeout, int soTimeout)
Jedis(URI uri, int connectionTimeout, int soTimeout, int infiniteSoTimeout, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(URI uri, int connectionTimeout, int soTimeout, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(URI uri, int timeout, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)
Jedis(URI uri, JedisClientConfig config)
Jedis(URI uri, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier)

虽然,Jedis 基本使用十分简单,在每次使用时,构建 Jedis 对象即可。在 Jedis 对象构建好之后,Jedis 底层会打开一条 Socket 通道和 Redis 服务进行连接。所以在使用完 Jedis 对象之后,需要调用 Jedis.close() 方法把连接关闭,才不会占用系统资源。

当然,如果应用非常频繁的创建和销毁 Jedis 对象,对应用的性能是很大影响的,因为构建 Socket 通道是很耗时的(类似数据库连接)。此时,我们需要改用 Jedis 连接池来减少 Socket 对象的创建和销毁过程。

Jedis 连接池是基于 apache-commons-pool2 实现的。在构建连接池对象的时候,需要提供连接池对象的配置对象,及 JedisPoolConfig(继承自 GenericObjectPoolConfig)。我们可以通过这个配置对象对连接池进行相关参数的配置,如最大连接数、最大空闲数等等。如下:

JedisPoolConfig config = new JedisPoolConfig();
// 设置最大空闲数
config.setMaxIdle(8);
// 设置最大连接数
config.setMaxTotal(18);

// 创建连接池
JedisPool pool = new JedisPool(config, "127.0.0.1", 6379, 2000, "password");
// 获取 Jedis 对象
Jedis jedis = pool.getResource();
String value = jedis.get("key");

// ...

// 释放资源
jedis.close();
pool.close();

使用 Jedis 连接池之后,在每次用完连接对象后一定要记得把连接归还给连接池。Jedis 对 close 方法进行了改造,如果是连接池中的连接对象,调用 close() 方法将会是把连接对象返回到对象池,若不是则关闭连接。源码如下:

@Override
public void close() {
 if (dataSource != null) {
   JedisPoolAbstract pool = this.dataSource;
   this.dataSource = null;
   if (client.isBroken()) {
     pool.returnBrokenResource(this);
   } else {
     pool.returnResource(this);
   }
 } else {
   super.close(); // 没有使用连接池,则直接关闭
 }
}

另外,当我们调用 JedisPool.getResource() 方法时,从对象池中获取 Jedis 连接时,将会对 dataSource 进行设置,设置成 JedisPool。源码如下:

@Override
public Jedis getResource() {
 Jedis jedis = super.getResource();
 jedis.setDataSource(this);
 return jedis;
}

注意:上面介绍的连接池仅仅指定了一个 Redis 服务的 Host 和 端口,属于单点 Redis 使用方式。

高可用连接

我们知道,连接池可以大大提高应用访问 Reids 服务的性能,减去大量的 Socket的 创建和销毁过程。

Redis 为了保障高可用,服务一般都是 Sentinel(哨兵)部署方式。当 Redis 服务中的主服务挂掉之后,会仲裁出另外一台 Slaves 服务充当 Master。此时,我们的应用将无法连接新的 Master 服务,因为旧的 Master 服务已经挂了。

为了解决这个问题,Jedis 提供了相应的 Sentinel 实现,能够在 Redis Sentinel 主从切换时候,通知我们的应用,把我们的应用连接到新的 Master 服务。

先看下怎么使用,如下:

Set<String> sentinels = new HashSet<>();
sentinels.add("172.18.18.207:26379");
sentinels.add("172.18.18.208:26379");

JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(5);
config.setMaxTotal(20);

JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config);

Jedis jedis = pool.getResource();
jedis.set("jedis", "jedis");
//...

jedis.close();
pool.close();

注意:Jedis 版本必须 2.4.2 或更新版本。

Jedis Sentinel 的使用也是十分简单的,只是在 JedisPool 中添加了 Sentinel 和 MasterName 参数。Jedis Sentinel 底层是基于 Redis 订阅实现 Redis 主从服务的切换通知。当 Redis 发生主从切换时,Sentinel 会发送通知,主动通知 Jedis 进行连接的切换。JedisSentinelPool 在每次从连接池中获取连接对象的时候,都要对连接对象进行检测,如果此连接和 Sentinel 的 Master 服务连接参数不一致,则会关闭此连接,重新获取新的 Jedis 连接对象。

我们先看看 JedisSentinelPool 的 getResource() 方法源码,如下:

@Override
public Jedis getResource() {
 while (true) {
   Jedis jedis = super.getResource();
   jedis.setDataSource(this);

   // get a reference because it can change concurrently
   final HostAndPort master = currentHostMaster;
   final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient()
       .getPort());

   if (master.equals(connection)) {
     // connected to the correct master
     return jedis;
   } else {
     returnBrokenResource(jedis);
   }
 }
}

当然,JedisSentinelPool 对象要时时监控 RedisSentinel 的主从切换。在其内部通过 Reids 的订阅实现。具体的实现看 JedisSentinelPool 的 initSentinels() 方法,如下:

// 初始化哨兵
// sentinels 哨兵集合
// masterName Master名称
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

 HostAndPort master = null;
 boolean sentinelAvailable = false;

 // 试图从可用的哨兵中选择一个 master
 log.info("Trying to find master from available Sentinels...");

 for (String sentinel : sentinels) {
   final HostAndPort hap = HostAndPort.parseString(sentinel);

   // 连接到哨兵
   log.fine("Connecting to Sentinel " + hap);

   Jedis jedis = null;
   try {
     jedis = new Jedis(hap.getHost(), hap.getPort());

     // 根据 master 名称获取地址和端口信息
     // 127.0.0.1:26381> sentinel get-master-addr-by-name mymaster
     // 1) "127.0.0.1"
     // 2) "6379"
     List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

     // connected to sentinel...
     sentinelAvailable = true;

     if (masterAddr == null || masterAddr.size() != 2) {
       log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
           + ".");
       continue;
     }

     // 找到了 master,将 IP 和端口组装成 HostAndPort
     master = toHostAndPort(masterAddr);
     log.fine("Found Redis master at " + master);
     break;
   } catch (JedisException e) {
     // resolves #1036, it should handle JedisException there's another chance
     // of raising JedisDataException
     log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
         + ". Trying next one.");
   } finally {
     if (jedis != null) {
       jedis.close();
     }
   }
 }

 // 没有获取到 master
 if (master == null) {
   if (sentinelAvailable) {
     // can connect to sentinel, but master name seems to not
     // monitored
     throw new JedisException("Can connect to sentinel, but " + masterName
         + " seems to be not monitored...");
   } else {
     throw new JedisConnectionException("All sentinels down, cannot determine where is "
         + masterName + " master is running...");
   }
 }

 log.info("Redis master running at " + master + ", starting Sentinel listeners...");

 //启动后台线程监控 RedisSentinal 的主从切换通知
 for (String sentinel : sentinels) {
   final HostAndPort hap = HostAndPort.parseString(sentinel);
   // protected class MasterListener extends Thread {...}
   MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
   // whether MasterListener threads are alive or not, process can be stopped
   masterListener.setDaemon(true);
   masterListeners.add(masterListener);
   masterListener.start();
 }

 return master;
}

可以看到,JedisSentinel 监控时使用 MasterListener 这个对象来实现的。看对应源码可以发现是基于 Redis 的订阅实现的,其订阅频道为 "+switch-master"。源码如下:

protected class MasterListener extends Thread {
 //...
 @Override
 public void run() {
   running.set(true);
   while (running.get()) {
     // 创建 Redis 连接
     j = new Jedis(host, port);
     try {
       // double check that it is not being shutdown
       if (!running.get()) {
         break;
       }

       // 订阅 "+switch-master" 频道
       j.subscribe(new JedisPubSub() {
         @Override
         public void onMessage(String channel, String message) {
           log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");

           String[] switchMasterMsg = message.split(" ");
           if (switchMasterMsg.length > 3) {

             if (masterName.equals(switchMasterMsg[0])) {
               // 初始化连接池
               initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
             } else {
               log.fine("Ignoring message on +switch-master for master name "
                   + switchMasterMsg[0] + ", our master name is " + masterName);
             }

           } else {
             log.severe("Invalid message received on Sentinel " + host + ":" + port
                 + " on channel +switch-master: " + message);
           }
         }
       }, "+switch-master");

     } catch (JedisConnectionException e) {

       if (running.get()) {
         log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port
             + ". Sleeping 5000ms and retrying.", e);
         try {
           Thread.sleep(subscribeRetryWaitTimeMillis);
         } catch (InterruptedException e1) {
           log.log(Level.SEVERE, "Sleep interrupted: ", e1);
         }
       } else {
         log.fine("Unsubscribing from Sentinel at " + host + ":" + port);
       }
     } finally {
       j.close();
     }
   }
 }
 //...
}

由上可知,当 MasterListener 接收到 switch-master 消息时候,会使用新的 Host 和 port 调用 initPool()。这样对连接池中的连接对象清除,重新创建新的连接指向新的 Master 服务。initPool() 源码如下:

private void initPool(HostAndPort master) {
 // 当传递的 master 和当前正在使用的 master 不一致时,初始化新的连接工厂
 if (!master.equals(currentHostMaster)) {
   currentHostMaster = master;
   if (factory == null) {
     factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout,
         soTimeout, password, database, clientName, false, null, null, null);
     initPool(poolConfig, factory);
   } else {
     factory.setHostAndPort(currentHostMaster);
     // although we clear the pool, we still have to check the
     // returned object
     // in getResource, this call only clears idle instances, not
     // borrowed instances
     internalPool.clear();
   }

   log.info("Created JedisPool to master at " + master);
 }
}
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号