redisson中连接对象创建及断线重连

在redisson中,由于使用了netty来封装对redis的协议访问,因此对于连接对象的创建和释放,也借由相应的connection来实现。由篇由masterSlave的角度,描述整个redisson中对于client的封装,以及在网络中断情况下,客户端会得到什么样的反馈,如何实现重连的情况。

本篇主要介绍master slave情况下各个对象的关系图信息,以及在具体创建时的一些处理问题.Redisson版本:2.1.1

1 对象关系图

整个关系如上所示,由下图进行描述

  1. 由MasterSlaveServersConfig负责配置连接的各项参数,比如master地址,slave地址,连接数大小等,这个对象是在redision创建时,由Config对象负责创建的
  2. 在调用Redission.create时,创建起相应的connectionManager对象,其持有相应的master连接信息,以及相应slave的连接信息
  3. 相应的connectionManager负责创建卢相应masterEntry以及slaveEntry信息,并且保存相应的映射信息
  4. connectionManager负责当前客户端对于具体服务器端的各项配置以下,比如转码器,连接池,使用的各项协议等,其根据这些信息,使用netty创建起相应的redisClient对象
  5. entry将已经创建好的redisClient交由connectionEntry负责持有,因此这里的client仅表示一个特有的客户端连接信息。在初始化时并不自动创建相应的连接
  6. entry因此持有相应的client,因此也自然根据当前的需要创建出相应的connection对象,这里entry的创建工作交由client负责,同时将创建好的connection管理起来

2 代码的创建过程

这里通过一个具体的API调用来描述创建过程,选择的API为 RedissonList.size.先列出相应的调用代码

1 //size方法,发起sizeAsync异步调用
    public int size() {
        return get(sizeAsync());
    }

2 //sizeAsync,使用命令执行器,发起 LLEN的异步调用
    public Future<Integer> sizeAsync() {
        return commandExecutor.readAsync(getName(), LLEN, getName());
    }

3 //异步调用,先计算具体用于连接的桶(用于集群调用),最终落到具体的客户端中
    public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        int slot = connectionManager.calcSlot(key);
        async(true, slot, null, codec, command, params, mainPromise, 0);
......
    }

4 //执行具体的异步调用逻辑
    protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
                            final Object[] params, final Promise<R> mainPromise, final int attempt) {
......
            org.redisson.client.RedisConnection connection;
//创建起相应的对象
                connection = connectionManager.connectionWriteOp(slot);
            ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
......

//注册调用成功回调,以释放连接
                attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
......
        attemptPromise.addListener(new FutureListener<R>() {
            public void operationComplete(Future<R> future) throws Exception {
//成功回调,设置相应的值
                    mainPromise.setSuccess(future.getNow());
        });
    }

5 //这里回到创建写连接的过程,通过定位到具体的entry,返回相应的连接对象,
//这里即connection维护了相应entry对象,反向由entry来获取相应的连接
    public RedisConnection connectionWriteOp(int slot) {
        MasterSlaveEntry e = getEntry(slot);
        return e.connectionWriteOp();
    }

6 //获取写连接,先尝试从池中获取,不然就创建新的(这里受连接数限制,由Semaphore信号量控制)
    public RedisConnection connectionWriteOp() {
.....
        RedisConnection conn = masterEntry.getConnections().poll();
          //这里的masterEntry即,connectionEntry,转由connectionEntry来创建新连接
            return masterEntry.connect(config);
    }

7 //connectionEntry创建connection对象,其实这里就将相应的工作转由client来进行,同时负责持有相应connection的引用信息,以形成连接池
    public RedisConnection connect(MasterSlaveServersConfig config) {
        RedisConnection conn = client.connect();
          ......//负责绑定database,验证等
        return conn;
    }

8 //具体client创建connection的过程,即将相应的工作交由bootstrap来进行。而bootstrap即netty的一个简单客户端连接工具类
    public RedisConnection connect() {
        try {
            ChannelFuture future = bootstrap.connect();
            future.syncUninterruptibly();
            return new RedisConnection(this, future.channel());
        } catch (Exception e) {
            throw new RedisConnectionException("unable to connect", e);
        }
    }

经过以上的步骤,整个连接对象的创建即创建起来,并且最终交由connectionEntry持有,其内部的结构如下所示:

     //相应的底层连接client
    private final RedisClient client;

//缓存的连接信息
    private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
//信号量,用于控制并发连接数信息
    private final Semaphore connectionsSemaphore;

可以看出,通过这个类,即可完成连接信息的存储以及创建,获取等过程

3 封装底层netty处理

在上面的第2部分的第8个步骤当中,可以看出。redisson将创建好的通道注册入collection中,那么一个connection通过即持有相应的IO通道信息,通过通道,即可达到发送命令以及接收数据的目的。相应的调用方法如下所示:

//等待,直到获取相应的结果信息
    public <R> R await(Future<R> cmd) {
        if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
            throw ex;
        }
        return cmd.getNow();
    }

//一个同步调用,通过在相应的异步方法作作await操作,以获取相应的调用
    public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
        Future<R> r = async(encoder, command, params);
        return await(r);
    }

//一个异步调用,典型的promise调用
    public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
        Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
        send(new CommandData<T, R>(promise, encoder, command, params));
        return promise;
    }

//发送命令,即直接往通道里写相应的数据信息
    public <T, R> ChannelFuture send(CommandData<T, R> data) {
        return channel.writeAndFlush(data);
    }

4    数据处理器

在创建connection时,并没有看到相应的数据处理器的注册操作,原因是redisson已经将整个注册工作提前。即在创建connection之前已经设置了相应的处理器,如下所示:

    public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) {
        addr = new InetSocketAddress(host, port);
        bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
                                        new CommandEncoder(),
                                        new CommandsListEncoder(),
                                        new CommandsQueue(),
                                        new CommandDecoder());
            }
        });

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
        this.timeout = timeout;
    }

这里注册了5个处理器,每个处理器的工作如下所示.

  • CommandEncoder     用于发送数据的编码操作,即将对象转换redis协议命令进行发送
  • CommaonsListEncoder     用于批量命令的编码操作
  • CommandsQueue     用于收集正在执行的命令列表,挨个发送相应的命令
  • CommandDecoder     用于对返回数据进行解码,同时通过commandsQueue继续发送下一个命令
  • ConnectionWatchDog     用于对连接进行检测,并在合适的时候进行连接重建

其中上面的commaondQueue作为发送命令的第一执行器,commandList和commandEncoder作为后续发送执行器。commandDecoder作为输入的惟一执行器

5 连接检测

在第4步中,我们看到一个特别的类ConnectionWatchdog,在google中的翻译为看门狗,可以理解为就是负责监控连接的网络情况,如果网络一旦中断,那么将立即进行连接重建,保证相应的命令发送不会被中断。

在这里,我们看到一个特别的数据结构,即之前提到的 RedisConnection对象。如果一个连接不可用,一般情况下,我们会丢弃这个对象,然后重新创建一个信息,来替代原来的数据。即一般为一个remove操作,然后加一个add操作。但是在这个操作中,实际上不可用是一个网络通道,作为上层的connection并没有出现不可用的情况(只要不发送命令)。那么,可以这样理解,即这里只需要替换掉不可用的channel,即达到一个链接重建的目的。相应的设计方法如下所示:

    public RedisConnection(RedisClient redisClient, Channel channel) {
        super();
        this.redisClient = redisClient;
        this.channel = channel;

          //在通道上绑定当前connection
        channel.attr(CONNECTION).set(this);
    }

//替换掉旧的通道,使用新的通道
    public void updateChannel(Channel channel) {
        this.channel = channel;
    }

那么在redisson中,负责进行通道的类,即为 watchdog。其负责监控通道的启动以及通道断开等事件,一旦发现通道断开,就负责创建新的链接。进而实现了连接的重建操作。
相应的代码如下所示:

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get();
        if (!connection.isClosed()) {
            EventLoopGroup group = ctx.channel().eventLoop().parent();
          //进行链接重建
            reconnect(group, connection);
        }
    }

//具体地重建操作,负责新建一个通道,并且替换旧的通道信息
    private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
......
        bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    log.debug("{} connected to {}", connection, connection.getRedisClient().getAddr());
                    connection.updateChannel(future.channel());
                    return;
                }
......
        });
    }

备注:

在整个连接创建过程当中,对于断点重连这块,每个连接池框架都会有所涉及。在redisson中,由于使用了netty,因此可以从更底层的网络入手,通过对网络情况进行监控来达到判断连接是否有效的目的。
同时,在连接的重建当中,也并没有像传统的连接池处理,丢弃旧的,添加新的,而是使用了一个类似包装器将底层数据进行包装,通过简单替换就达到了重建对象的目的。这一点的设计与其使用的commandQueue机制有关,但思路很值得借鉴。

转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201509040001.html

相关文章:

作者: flym

I am flym,the master of the site:)

发表评论

电子邮件地址不会被公开。 必填项已用*标注