Redis消息队列类 引用using ServiceStack.Redis;
转载请保留:http://www.luofenming.com/show.aspx?id=ART2020062700001
转载请保留:http://www.luofenming.com/show.aspx?id=ART2020062700001
public class RedisMessageQueue : IDisposable
{
/// <summary>
/// redis客户端
/// </summary>
public RedisClient redisClient { get; }
public RedisMessageQueue(string redisHost)
{
redisClient = new RedisClient(redisHost);
}
/// <summary>
/// 入队(生产者)
/// </summary>
/// <param name="Qkey">消息key </param>
/// <param name="Qmessage"> 消息内容</param>
public void EnQueue(string Qkey, string Qmessage)
{
// 1、转换内容
byte[] bytes = Encoding.UTF8.GetBytes(Qmessage);
if (bytes != null)
{
// 2、入队
redisClient.LPush(Qkey, bytes);
}
}
/// <summary>
/// 出队(消费者) === 阻塞
/// </summary>
public string DeQueue(string Qkey, int Timeout)
{
// 1、阻塞的方式从右端出数据
byte[] bytes = redisClient.BRPopValue(Qkey, Timeout);
if (bytes != null)
{
//2、转换成数据
string message = Encoding.UTF8.GetString(bytes);
return message;
}
else
{
return "无数据";
}
}
/// <summary>
/// 获取队列数量
/// </summary>
/// <param name="QKey">队列key</param>
/// <returns></returns>
public long GetQueueCount(string QKey)
{
return redisClient.GetListCount(QKey);
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
// 1、关闭redis
redisClient.Dispose();
}
}
队列生产服务端(注意:启动前打开redis)
static void Main(string[] args)
{
Console.WriteLine("订单生成器开始执行......");
while (true)
{
//统计时间
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Thread.Sleep(1000);
// 生产者 localhost:6379 redis连接地址
using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379"))
{
Random ran = new Random();
string order_sn = "R" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString();
// 1、生产积分消息
redisMessageQueue.EnQueue("order_points", order_sn);
Console.WriteLine($"生成订单:{order_sn}");
}
stopwatch.Stop();
Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms");
}
}
队列消费端
static void Main(string[] args)
{
Console.WriteLine("******************短息服务********************");
Console.WriteLine("短息消费者.......");
using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379"))
{
// 多次消费
while (true)
{
string order_sn = redisMessageQueue.DeQueue("order_points", 60);
Console.WriteLine($"获取短息消息:{order_sn}");
Thread.Sleep(1000);
Console.WriteLine($"发送短信;order_sn:{order_sn}成功");
}
}
}