延时队列在日常的应用场景还是非常多的,例如:
用户下单后,如果24小时未支付则自动取消订单
订单完成后,如果15天未评论则自动默认好评
像这些场景,不使用延时队列也可以实现,例如定期检查数据库找到符合条件的记录,再取消订单或默认好评也可以实现,而且更简单。这个在数据量比较小的时候,是没有差异的,当数据达到一定数量后,定期扫表就会给系统带来压力负担,这时就可以引入延时队列来降低系统的压力。
延时队列与普通队列的主要区别就在延时触发指定事件上。实现延时队列的方式也有很多,这里基于Redis的实现。
基于Redis的实现,是使用Redis有序集合(ZSET),思路是将时间转为数字作为ZSET的score,每次从集合中取出score小于当前时间数字的内容。
Redis 有序集合(sorted set)集合详解见这里
详细代码
DelayedQueue.cs
/// <summary>
/// 延时队列
/// </summary>
/// <typeparam name="T"></typeparam>
public class DelayedQueue<T>
{
#region 私有变量
private Func<T, bool> checkData = t => true;
private Action<T> action = t => { Console.WriteLine("请调用SetAction配置具体操作"); };
private string queueName = null;
private long dbId = 0;
private Timer timer = null;
private int dueTime = 1000;
private int period = 500;
#endregion
#region 构建一个新的延时队列
/// <summary>
/// 构建一个新的延时队列
/// </summary>
/// <param name="queueName">队列名称</param>
public DelayedQueue(string queueName, long dbId = 0)
{
this.queueName = queueName;
this.dbId = dbId;
}
#endregion
#region 向队列中添加一个记录
/// <summary>
/// 向队列中添加一个记录
/// </summary>
/// <param name="t">添加的内容</param>
/// <param name="delayed">延时时间</param>
/// <returns></returns>
public bool Push(T t, TimeSpan delayed)
{
return RedisCache.SortedSet_Add<T>(queueName, t, DateTime.Now.Add(delayed).Ticks, dbId);
}
#endregion
#region 将数据移出队列
/// <summary>
/// 将数据移出队列
/// </summary>
/// <param name="t"></param>
/// <returns></returns>
public bool Remove(T t)
{
return RedisCache.SortedSet_Remove(queueName, t, dbId);
}
#endregion
#region 设置检查是否需要执行默认操作
/// <summary>
/// 设置检查是否需要执行默认操作
/// </summary>
/// <returns>当返回True时,执行默认操作</returns>
public DelayedQueue<T> SetCheck(Func<T, bool> func)
{
checkData = func;
return this;
}
#endregion
#region 设置操作
/// <summary>
/// 设置操作
/// </summary>
public DelayedQueue<T> SetAction(Action<T> action)
{
this.action = action;
return this;
}
#endregion
#region 更改Time的调度时间
/// <summary>
/// 更改Time的调度时间
/// </summary>
/// <param name="dueTime"></param>
/// <param name="period"></param>
/// <returns></returns>
public DelayedQueue<T> ChangeTime(int dueTime = 1000, int period = 500)
{
this.dueTime = dueTime;
this.period = period;
timer?.Change(dueTime, period);
return this;
}
#endregion
#region 开始监听队列
/// <summary>
/// 开始监听数据
/// </summary>
public void Listen()
{
Stop();
timer = new Timer(TimerAction, null, dueTime, period);
}
/// <summary>
/// 获取到期记录
/// </summary>
/// <returns></returns>
private IEnumerable<T> GetDelayeds()
{
var dict = RedisCache.SortedSet_GetListByLowestScore<T>(queueName, 0, DateTime.Now.Ticks, dbId);
return dict?.Keys;
}
private int isRunning = 0;
/// <summary>
/// Timer定期触发
/// </summary>
/// <param name="obj"></param>
private void TimerAction(object obj)
{
if (Interlocked.Exchange(ref isRunning, 1) == 0)
{
try
{
IEnumerable<T> list = null;
do
{
list = GetDelayeds();
if (!(list?.Any() ?? false)) break;
list.ToList().ForEach(item =>
{
if (checkData(item))
{
action(item);
Remove(item);
}
});
} while (true);
}
finally
{
Interlocked.Exchange(ref isRunning, 0);
}
}
}
#endregion
#region 停止监听队列
/// <summary>
/// 停止监听队列
/// </summary>
public void Stop()
{
timer?.Dispose();
timer = null;
}
#endregion
}
RedisCache:
public class RedisCache
{
#region -- 连接信息 --
/// <summary>
/// redis配置文件信息
/// </summary>
private static RedisConfigInfo redisConfigInfo = RedisConfigInfo.GetConfig();
/// <summary>
/// 创建链接池管理对象
/// </summary>
private static PooledRedisClientManager CreateManager(long dbId)
{
string[] writeServerList = SplitString(redisConfigInfo.WriteServerList, ",");
string[] readServerList = SplitString(redisConfigInfo.ReadServerList, ",");
return new PooledRedisClientManager(readServerList, writeServerList,
new RedisClientManagerConfig
{
MaxWritePoolSize = redisConfigInfo.MaxWritePoolSize,
MaxReadPoolSize = redisConfigInfo.MaxReadPoolSize,
AutoStart = redisConfigInfo.AutoStart,
DefaultDb = dbId
});
}
#endregion
/// <summary>
/// 添加数据到 SortedSet
/// </summary>
/// <typeparam name="T">类型</typeparam>
/// <param name="key">集合id</param>
/// <param name="t">数值</param>
/// <param name="score">排序码</param>
/// <param name="dbId">库</param>
public static bool SortedSet_Add<T>(string key, T t, double score, long dbId = 0)
{
using (IRedisClient redis = CreateManager(dbId).GetClient())
{
string value = ServiceStack.Text.JsonSerializer.SerializeToString<T>(t);
return redis.AddItemToSortedSet(key, value, score);
}
}
/// <summary>
/// 移除数据从SortedSet
/// </summary>
/// <typeparam name="T">类型</typeparam>
/// <param name="key">集合id</param>
/// <param name="t">数值</param>
/// <param name="dbId">库</param>
/// <returns></returns>
public static bool SortedSet_Remove<T>(string key, T t, long dbId = 0)
{
using (IRedisClient redis = CreateManager(dbId).GetClient())
{
string value = ServiceStack.Text.JsonSerializer.SerializeToString<T>(t);
return redis.RemoveItemFromSortedSet(key, value);
}
}
/// <summary>
/// 按Score升序获取数据
/// </summary>
/// <param name="key">键值</param>
/// <param name="dbId">库</param>
/// <returns></returns>
public static IDictionary<T, double> SortedSet_GetListByLowestScore<T>(string key, double fromScore, double toScore, long dbId = 0)
{
using (IRedisClient redis = CreateManager(dbId).GetClient())
{
var list = redis.GetRangeWithScoresFromSortedSetByLowestScore(key, fromScore, toScore);
if (list != null && list.Count > 0)
{
var dict = new Dictionary<T, double>();
foreach (var item in list)
{
var data = ServiceStack.Text.JsonSerializer.DeserializeFromString<T>(item.Key);
dict.Add(data, item.Value);
}
return dict;
}
}
return null;
}
}
这个实现版本是通过内部Timer定期检查ZSET中是否存在过期数据,存在则检查是否需要调用默认操作。由于内部使用的是Redis,也拥有Redis带来的高性能、分布式等种种好处,但也由于使用的是Timer定期检查,因此不适用对时效性要求很高的场景。在实际使用中,可按如下过程应用:
1、用户下单后,将订单号加到延时队列中
2、用户支付后,将订单号从延时队列中移出
3、延时队列检测到过期数据后,将订单取消并从队列中移出
测试代码
DelayedQueue<String> queue = new DelayedQueue<string>("delayedQueue")
.SetAction(s =>
Console.WriteLine(s)
);
queue.Push("A", TimeSpan.FromSeconds(20));
queue.Push("B", TimeSpan.FromSeconds(8));
queue.Push("C", TimeSpan.FromSeconds(4));
queue.Push("D", TimeSpan.FromSeconds(25));
queue.Listen();
测试结果将会定时输出:
C
B
A
D