Redisson延迟队列是怎么做的?
发布时间:2022-01-17 02:03:56 所属栏目:动态 来源:互联网
导读:昨天,记录了Spring Boot基于Redisson实现订单状态延迟处理的思路后,就想着,还是要去搞清楚RedissonDelayedQueue的实现思路,所以,今天就忙里偷闲,去Github下载Redisson源码来大概查略一番。 如何创建RedissonDelayedQueue队列 在Test中,可以看到这样一
昨天,记录了Spring Boot基于Redisson实现订单状态延迟处理的思路后,就想着,还是要去搞清楚RedissonDelayedQueue的实现思路,所以,今天就忙里偷闲,去Github下载Redisson源码来大概查略一番。 如何创建RedissonDelayedQueue队列 在Test中,可以看到这样一段代码 RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("test"); //按名称获取一个阻塞队列实例 RDelayedQueue<Integer> dealyedQueue = redisson.getDelayedQueue(queue1); //按名称获取一个延迟队列实例。参数是`RQueue<V>` 在获取DelayedQueue队列时,会初始化两个队列名称redisson_delay_queue_{队列名}和redisson_delay_queue_timeout_{队列名},还会创建一个QueueTransferTask队列中转的定时任务, 添加队列 dealyedQueue.offer(3, 5, TimeUnit.SECONDS); //第一次参数是要发送给队列的数据,第二个参数是要延迟的时间,第三个参数是延迟的时间单位 这里,我们直接来到offerAsync方法 public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } 在代码中,我们可以看到,最终执行的Lua脚本,其他的代码基本是一目了然,我们主要来分析一下这段Lua脚本, local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]); //将超时时间、随机Id和消息内容序列化为二进制 redis.call('zadd', KEYS[2], ARGV[1], value); //将序列化后的二进制内容按超时时间作为`score`存放到`redisson_delay_queue_timeout_{队列名}`这个有序集合(sorted set)中 redis.call('rpush', KEYS[3], value); //将序列化后的二进制内容添加到`redisson_delay_queue_{队列名}`列表(List)中 local v = redis.call('zrange', KEYS[2], 0, 0); //取出有序集合中的第一个元素 if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); //如果取到第一个元素,则`publish`到`channel`中 end; 当publish到channel中,此时会触发onSubscribe然后执行pushTask方法 } if (res != null) { //取到延迟时间,设置执行时间,到期时便去执行`pushTaskAsync`方法 scheduleTask(res); } }); 我们继续来看pushTaskAsync方法 protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } 这里,我们依然只看Lua脚本这部分 local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); //取出`redisson_delay_queue_timeout_{队列名}`中,分数小于当前时间戳的100条数据,意思就是取出到达延迟时间的数据 if #expiredValues > 0 then //如果有到期数据 for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v); //将二进制反序列化 redis.call('rpush', KEYS[1], value); //将反序列化后的数据放入到`队列名`中的集合(List)中 redis.call('lrem', KEYS[3], 1, v); //将数据从`redisson_delay_queue_{队列名}`中移除掉 end; (编辑:威海站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐
热点阅读