在redisson中,由于使用了netty来封装对redis的协议访问,因此对于连接对象的创建和释放,也借由相应的connection来实现。由篇由masterSlave的角度,描述整个redisson中对于client的封装,以及在网络中断情况下,客户端会得到什么样的反馈,如何实现重连的情况。
本篇主要介绍master slave情况下各个对象的关系图信息,以及在具体创建时的一些处理问题.Redisson版本:2.1.1
1 对象关系图
整个关系如上所示,由下图进行描述
- 由MasterSlaveServersConfig负责配置连接的各项参数,比如master地址,slave地址,连接数大小等,这个对象是在redision创建时,由Config对象负责创建的
- 在调用Redission.create时,创建起相应的connectionManager对象,其持有相应的master连接信息,以及相应slave的连接信息
- 相应的connectionManager负责创建卢相应masterEntry以及slaveEntry信息,并且保存相应的映射信息
- connectionManager负责当前客户端对于具体服务器端的各项配置以下,比如转码器,连接池,使用的各项协议等,其根据这些信息,使用netty创建起相应的redisClient对象
- entry将已经创建好的redisClient交由connectionEntry负责持有,因此这里的client仅表示一个特有的客户端连接信息。在初始化时并不自动创建相应的连接
- entry因此持有相应的client,因此也自然根据当前的需要创建出相应的connection对象,这里entry的创建工作交由client负责,同时将创建好的connection管理起来
2 代码的创建过程
这里通过一个具体的API调用来描述创建过程,选择的API为 RedissonList.size.先列出相应的调用代码
1 //size方法,发起sizeAsync异步调用 public int size() { return get(sizeAsync()); } 2 //sizeAsync,使用命令执行器,发起 LLEN的异步调用 public Future<Integer> sizeAsync() { return commandExecutor.readAsync(getName(), LLEN, getName()); } 3 //异步调用,先计算具体用于连接的桶(用于集群调用),最终落到具体的客户端中 public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { Promise<R> mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); async(true, slot, null, codec, command, params, mainPromise, 0); ...... } 4 //执行具体的异步调用逻辑 protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command, final Object[] params, final Promise<R> mainPromise, final int attempt) { ...... org.redisson.client.RedisConnection connection; //创建起相应的对象 connection = connectionManager.connectionWriteOp(slot); ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); ...... //注册调用成功回调,以释放连接 attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); ...... attemptPromise.addListener(new FutureListener<R>() { public void operationComplete(Future<R> future) throws Exception { //成功回调,设置相应的值 mainPromise.setSuccess(future.getNow()); }); } 5 //这里回到创建写连接的过程,通过定位到具体的entry,返回相应的连接对象, //这里即connection维护了相应entry对象,反向由entry来获取相应的连接 public RedisConnection connectionWriteOp(int slot) { MasterSlaveEntry e = getEntry(slot); return e.connectionWriteOp(); } 6 //获取写连接,先尝试从池中获取,不然就创建新的(这里受连接数限制,由Semaphore信号量控制) public RedisConnection connectionWriteOp() { ..... RedisConnection conn = masterEntry.getConnections().poll(); //这里的masterEntry即,connectionEntry,转由connectionEntry来创建新连接 return masterEntry.connect(config); } 7 //connectionEntry创建connection对象,其实这里就将相应的工作转由client来进行,同时负责持有相应connection的引用信息,以形成连接池 public RedisConnection connect(MasterSlaveServersConfig config) { RedisConnection conn = client.connect(); ......//负责绑定database,验证等 return conn; } 8 //具体client创建connection的过程,即将相应的工作交由bootstrap来进行。而bootstrap即netty的一个简单客户端连接工具类 public RedisConnection connect() { try { ChannelFuture future = bootstrap.connect(); future.syncUninterruptibly(); return new RedisConnection(this, future.channel()); } catch (Exception e) { throw new RedisConnectionException("unable to connect", e); } }
经过以上的步骤,整个连接对象的创建即创建起来,并且最终交由connectionEntry持有,其内部的结构如下所示:
//相应的底层连接client private final RedisClient client; //缓存的连接信息 private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>(); //信号量,用于控制并发连接数信息 private final Semaphore connectionsSemaphore;
可以看出,通过这个类,即可完成连接信息的存储以及创建,获取等过程
3 封装底层netty处理
在上面的第2部分的第8个步骤当中,可以看出。redisson将创建好的通道注册入collection中,那么一个connection通过即持有相应的IO通道信息,通过通道,即可达到发送命令以及接收数据的目的。相应的调用方法如下所示:
//等待,直到获取相应的结果信息 public <R> R await(Future<R> cmd) { if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { throw ex; } return cmd.getNow(); } //一个同步调用,通过在相应的异步方法作作await操作,以获取相应的调用 public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { Future<R> r = async(encoder, command, params); return await(r); } //一个异步调用,典型的promise调用 public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise(); send(new CommandData<T, R>(promise, encoder, command, params)); return promise; } //发送命令,即直接往通道里写相应的数据信息 public <T, R> ChannelFuture send(CommandData<T, R> data) { return channel.writeAndFlush(data); }
4 数据处理器
在创建connection时,并没有看到相应的数据处理器的注册操作,原因是redisson已经将整个注册工作提前。即在创建connection之前已经设置了相应的处理器,如下所示:
public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) { addr = new InetSocketAddress(host, port); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels), new CommandEncoder(), new CommandsListEncoder(), new CommandsQueue(), new CommandDecoder()); } }); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); this.timeout = timeout; }
这里注册了5个处理器,每个处理器的工作如下所示.
- CommandEncoder 用于发送数据的编码操作,即将对象转换redis协议命令进行发送
- CommaonsListEncoder 用于批量命令的编码操作
- CommandsQueue 用于收集正在执行的命令列表,挨个发送相应的命令
- CommandDecoder 用于对返回数据进行解码,同时通过commandsQueue继续发送下一个命令
- ConnectionWatchDog 用于对连接进行检测,并在合适的时候进行连接重建
其中上面的commaondQueue作为发送命令的第一执行器,commandList和commandEncoder作为后续发送执行器。commandDecoder作为输入的惟一执行器
5 连接检测
在第4步中,我们看到一个特别的类ConnectionWatchdog,在google中的翻译为看门狗,可以理解为就是负责监控连接的网络情况,如果网络一旦中断,那么将立即进行连接重建,保证相应的命令发送不会被中断。
在这里,我们看到一个特别的数据结构,即之前提到的 RedisConnection对象。如果一个连接不可用,一般情况下,我们会丢弃这个对象,然后重新创建一个信息,来替代原来的数据。即一般为一个remove操作,然后加一个add操作。但是在这个操作中,实际上不可用是一个网络通道,作为上层的connection并没有出现不可用的情况(只要不发送命令)。那么,可以这样理解,即这里只需要替换掉不可用的channel,即达到一个链接重建的目的。相应的设计方法如下所示:
public RedisConnection(RedisClient redisClient, Channel channel) { super(); this.redisClient = redisClient; this.channel = channel; //在通道上绑定当前connection channel.attr(CONNECTION).set(this); } //替换掉旧的通道,使用新的通道 public void updateChannel(Channel channel) { this.channel = channel; }
那么在redisson中,负责进行通道的类,即为 watchdog。其负责监控通道的启动以及通道断开等事件,一旦发现通道断开,就负责创建新的链接。进而实现了连接的重建操作。
相应的代码如下所示:
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get(); if (!connection.isClosed()) { EventLoopGroup group = ctx.channel().eventLoop().parent(); //进行链接重建 reconnect(group, connection); } } //具体地重建操作,负责新建一个通道,并且替换旧的通道信息 private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { ...... bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr()); connection.updateChannel(future.channel()); return; } ...... }); }
备注:
在整个连接创建过程当中,对于断点重连这块,每个连接池框架都会有所涉及。在redisson中,由于使用了netty,因此可以从更底层的网络入手,通过对网络情况进行监控来达到判断连接是否有效的目的。
同时,在连接的重建当中,也并没有像传统的连接池处理,丢弃旧的,添加新的,而是使用了一个类似包装器将底层数据进行包装,通过简单替换就达到了重建对象的目的。这一点的设计与其使用的commandQueue机制有关,但思路很值得借鉴。
转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201509040001.html