Redis消息队列类 引用using ServiceStack.Redis;
转载请保留:http://www.luofenming.com/show.aspx?id=ART2020062700001
转载请保留:http://www.luofenming.com/show.aspx?id=ART2020062700001
public class RedisMessageQueue : IDisposable { /// <summary> /// redis客户端 /// </summary> public RedisClient redisClient { get; } public RedisMessageQueue(string redisHost) { redisClient = new RedisClient(redisHost); } /// <summary> /// 入队(生产者) /// </summary> /// <param name="Qkey">消息key </param> /// <param name="Qmessage"> 消息内容</param> public void EnQueue(string Qkey, string Qmessage) { // 1、转换内容 byte[] bytes = Encoding.UTF8.GetBytes(Qmessage); if (bytes != null) { // 2、入队 redisClient.LPush(Qkey, bytes); } } /// <summary> /// 出队(消费者) === 阻塞 /// </summary> public string DeQueue(string Qkey, int Timeout) { // 1、阻塞的方式从右端出数据 byte[] bytes = redisClient.BRPopValue(Qkey, Timeout); if (bytes != null) { //2、转换成数据 string message = Encoding.UTF8.GetString(bytes); return message; } else { return "无数据"; } } /// <summary> /// 获取队列数量 /// </summary> /// <param name="QKey">队列key</param> /// <returns></returns> public long GetQueueCount(string QKey) { return redisClient.GetListCount(QKey); } /// <summary> /// 释放资源 /// </summary> public void Dispose() { // 1、关闭redis redisClient.Dispose(); } }队列生产服务端(注意:启动前打开redis)
static void Main(string[] args) { Console.WriteLine("订单生成器开始执行......"); while (true) { //统计时间 Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); Thread.Sleep(1000); // 生产者 localhost:6379 redis连接地址 using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379")) { Random ran = new Random(); string order_sn = "R" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString(); // 1、生产积分消息 redisMessageQueue.EnQueue("order_points", order_sn); Console.WriteLine($"生成订单:{order_sn}"); } stopwatch.Stop(); Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms"); } }队列消费端
static void Main(string[] args) { Console.WriteLine("******************短息服务********************"); Console.WriteLine("短息消费者......."); using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379")) { // 多次消费 while (true) { string order_sn = redisMessageQueue.DeQueue("order_points", 60); Console.WriteLine($"获取短息消息:{order_sn}"); Thread.Sleep(1000); Console.WriteLine($"发送短信;order_sn:{order_sn}成功"); } } }