不积跬步,无以至千里;不积小流,无以成江海。

Dean's blog

  • Join Us on Facebook!
  • Follow Us on Twitter!
  • LinkedIn
  • Subcribe to Our RSS Feed

使用 Redis 实现延时队列

延时队列在日常的应用场景还是非常多的,例如:

用户下单后,如果24小时未支付则自动取消订单
订单完成后,如果15天未评论则自动默认好评

像这些场景,不使用延时队列也可以实现,例如定期检查数据库找到符合条件的记录,再取消订单或默认好评也可以实现,而且更简单。这个在数据量比较小的时候,是没有差异的,当数据达到一定数量后,定期扫表就会给系统带来压力负担,这时就可以引入延时队列来降低系统的压力。

延时队列与普通队列的主要区别就在延时触发指定事件上。实现延时队列的方式也有很多,这里基于Redis的实现。

基于Redis的实现,是使用Redis有序集合(ZSET),思路是将时间转为数字作为ZSET的score,每次从集合中取出score小于当前时间数字的内容。

Redis 有序集合(sorted set)集合详解见这里

详细代码

DelayedQueue.cs

    /// <summary>
    /// 延时队列
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class DelayedQueue<T>
    {
        #region 私有变量
        private Func<T, bool> checkData = t => true;
        private Action<T> action = t => { Console.WriteLine("请调用SetAction配置具体操作"); };
        private string queueName = null;
        private long dbId = 0;
        private Timer timer = null;
        private int dueTime = 1000;
        private int period = 500;
        #endregion

        #region 构建一个新的延时队列
        /// <summary>
        /// 构建一个新的延时队列
        /// </summary>
        /// <param name="queueName">队列名称</param>
        public DelayedQueue(string queueName, long dbId = 0)
        {
            this.queueName = queueName;
            this.dbId = dbId;
        }
        #endregion

        #region 向队列中添加一个记录
        /// <summary>
        /// 向队列中添加一个记录
        /// </summary>
        /// <param name="t">添加的内容</param>
        /// <param name="delayed">延时时间</param>
        /// <returns></returns>
        public bool Push(T t, TimeSpan delayed)
        {
            return RedisCache.SortedSet_Add<T>(queueName, t, DateTime.Now.Add(delayed).Ticks, dbId);
        }
        #endregion

        #region 将数据移出队列
        /// <summary>
        /// 将数据移出队列
        /// </summary>
        /// <param name="t"></param>
        /// <returns></returns>
        public bool Remove(T t)
        {
            return RedisCache.SortedSet_Remove(queueName, t, dbId);
        }
        #endregion

        #region 设置检查是否需要执行默认操作
        /// <summary>
        /// 设置检查是否需要执行默认操作
        /// </summary>
        /// <returns>当返回True时,执行默认操作</returns>
        public DelayedQueue<T> SetCheck(Func<T, bool> func)
        {
            checkData = func;
            return this;
        }
        #endregion

        #region 设置操作
        /// <summary>
        /// 设置操作
        /// </summary>
        public DelayedQueue<T> SetAction(Action<T> action)
        {
            this.action = action;
            return this;
        }
        #endregion

        #region 更改Time的调度时间
        /// <summary>
        /// 更改Time的调度时间
        /// </summary>
        /// <param name="dueTime"></param>
        /// <param name="period"></param>
        /// <returns></returns>
        public DelayedQueue<T> ChangeTime(int dueTime = 1000, int period = 500)
        {
            this.dueTime = dueTime;
            this.period = period;
            timer?.Change(dueTime, period);
            return this;
        }
        #endregion

        #region 开始监听队列
        /// <summary>
        /// 开始监听数据
        /// </summary>
        public void Listen()
        {
            Stop();
            timer = new Timer(TimerAction, null, dueTime, period);
        }

        /// <summary>
        /// 获取到期记录
        /// </summary>
        /// <returns></returns>
        private IEnumerable<T> GetDelayeds()
        {
            var dict = RedisCache.SortedSet_GetListByLowestScore<T>(queueName, 0, DateTime.Now.Ticks, dbId);
            return dict?.Keys;
        }

        private int isRunning = 0;
        /// <summary>
        /// Timer定期触发
        /// </summary>
        /// <param name="obj"></param>
        private void TimerAction(object obj)
        {
            if (Interlocked.Exchange(ref isRunning, 1) == 0)
            {
                try
                {
                    IEnumerable<T> list = null;
                    do
                    {
                        list = GetDelayeds();
                        if (!(list?.Any() ?? false)) break;

                        list.ToList().ForEach(item =>
                        {
                            if (checkData(item))
                            {
                                action(item);
                                Remove(item);
                            }
                        });
                    } while (true);
                }
                finally
                {
                    Interlocked.Exchange(ref isRunning, 0);
                }
            }
        }
        #endregion

        #region 停止监听队列
        /// <summary>
        /// 停止监听队列
        /// </summary>
        public void Stop()
        {
            timer?.Dispose();
            timer = null;
        }
        #endregion
    }

RedisCache:

    public class RedisCache
    {
        #region -- 连接信息 --
        /// <summary>
        /// redis配置文件信息
        /// </summary>
        private static RedisConfigInfo redisConfigInfo = RedisConfigInfo.GetConfig();
        /// <summary>
        /// 创建链接池管理对象
        /// </summary>
        private static PooledRedisClientManager CreateManager(long dbId)
        {
            string[] writeServerList = SplitString(redisConfigInfo.WriteServerList, ",");
            string[] readServerList = SplitString(redisConfigInfo.ReadServerList, ",");

            return new PooledRedisClientManager(readServerList, writeServerList,
                             new RedisClientManagerConfig
                             {
                                 MaxWritePoolSize = redisConfigInfo.MaxWritePoolSize,
                                 MaxReadPoolSize = redisConfigInfo.MaxReadPoolSize,
                                 AutoStart = redisConfigInfo.AutoStart,
                                 DefaultDb = dbId
                             });
        }
        #endregion

        /// <summary>
        ///  添加数据到 SortedSet
        /// </summary>
        /// <typeparam name="T">类型</typeparam>
        /// <param name="key">集合id</param>
        /// <param name="t">数值</param>
        /// <param name="score">排序码</param>
        /// <param name="dbId">库</param>
        public static bool SortedSet_Add<T>(string key, T t, double score, long dbId = 0)
        {
            using (IRedisClient redis = CreateManager(dbId).GetClient())
            {
                string value = ServiceStack.Text.JsonSerializer.SerializeToString<T>(t);
                return redis.AddItemToSortedSet(key, value, score);
            }
        }

        /// <summary>
        /// 移除数据从SortedSet
        /// </summary>
        /// <typeparam name="T">类型</typeparam>
        /// <param name="key">集合id</param>
        /// <param name="t">数值</param>
        /// <param name="dbId">库</param>
        /// <returns></returns>
        public static bool SortedSet_Remove<T>(string key, T t, long dbId = 0)
        {
            using (IRedisClient redis = CreateManager(dbId).GetClient())
            {
                string value = ServiceStack.Text.JsonSerializer.SerializeToString<T>(t);
                return redis.RemoveItemFromSortedSet(key, value);
            }
        }

        /// <summary>
        /// 按Score升序获取数据
        /// </summary>
        /// <param name="key">键值</param>
        /// <param name="dbId">库</param>
        /// <returns></returns>
        public static IDictionary<T, double> SortedSet_GetListByLowestScore<T>(string key, double fromScore, double toScore, long dbId = 0)
        {
            using (IRedisClient redis = CreateManager(dbId).GetClient())
            {
                var list = redis.GetRangeWithScoresFromSortedSetByLowestScore(key, fromScore, toScore);
                if (list != null && list.Count > 0)
                {
                    var dict = new Dictionary<T, double>();
                    foreach (var item in list)
                    {
                        var data = ServiceStack.Text.JsonSerializer.DeserializeFromString<T>(item.Key);
                        dict.Add(data, item.Value);
                    }
                    return dict;
                }
            }
            return null;
        }
    }

这个实现版本是通过内部Timer定期检查ZSET中是否存在过期数据,存在则检查是否需要调用默认操作。由于内部使用的是Redis,也拥有Redis带来的高性能、分布式等种种好处,但也由于使用的是Timer定期检查,因此不适用对时效性要求很高的场景。在实际使用中,可按如下过程应用:

1、用户下单后,将订单号加到延时队列中
2、用户支付后,将订单号从延时队列中移出
3、延时队列检测到过期数据后,将订单取消并从队列中移出

测试代码

DelayedQueue<String> queue = new DelayedQueue<string>("delayedQueue")
    .SetAction(s =>
    Console.WriteLine(s)
);
queue.Push("A", TimeSpan.FromSeconds(20));
queue.Push("B", TimeSpan.FromSeconds(8));
queue.Push("C", TimeSpan.FromSeconds(4));
queue.Push("D", TimeSpan.FromSeconds(25));
queue.Listen();

测试结果将会定时输出:

C
B
A
D

 

不允许评论
粤ICP备17049187号-1