C# 实现Redis消息队列功能

首次发布:2020-06-27 16:07
Redis消息队列类 引用using ServiceStack.Redis;
转载请保留:http://www.luofenming.com/show.aspx?id=ART2020062700001
public class RedisMessageQueue : IDisposable
{
    /// <summary>
    /// redis客户端
    /// </summary>
    public RedisClient redisClient { get; }

    public RedisMessageQueue(string redisHost)
    {
        redisClient = new RedisClient(redisHost);
    }


    /// <summary>
    /// 入队(生产者)
    /// </summary>
    /// <param name="Qkey">消息key </param>
    /// <param name="Qmessage"> 消息内容</param>
    public void EnQueue(string Qkey, string Qmessage)
    {
        // 1、转换内容
        byte[] bytes = Encoding.UTF8.GetBytes(Qmessage);
        if (bytes != null)
        {
            // 2、入队
            redisClient.LPush(Qkey, bytes);
        }
    }


    /// <summary>
    /// 出队(消费者) === 阻塞
    /// </summary>
    public string DeQueue(string Qkey, int Timeout)
    {
        // 1、阻塞的方式从右端出数据
        byte[] bytes = redisClient.BRPopValue(Qkey, Timeout);
        if (bytes != null)
        {
            //2、转换成数据
            string message = Encoding.UTF8.GetString(bytes);
            return message;
        }
        else
        {
            return "无数据";
        }
    }

    /// <summary>
    /// 获取队列数量
    /// </summary>
    /// <param name="QKey">队列key</param>
    /// <returns></returns>
    public long GetQueueCount(string QKey)
    {
        return redisClient.GetListCount(QKey);
    }

    /// <summary>
    /// 释放资源
    /// </summary>
    public void Dispose()
    {
        // 1、关闭redis
        redisClient.Dispose();
    }
}
队列生产服务端(注意:启动前打开redis)
static void Main(string[] args)
{
    Console.WriteLine("订单生成器开始执行......");
    while (true)
    {
        //统计时间
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        Thread.Sleep(1000);
        // 生产者  localhost:6379 redis连接地址
        using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379"))
        {
            Random ran = new Random();
            string order_sn = "R" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString();
            // 1、生产积分消息
            redisMessageQueue.EnQueue("order_points", order_sn);
            Console.WriteLine($"生成订单:{order_sn}");
        }
        stopwatch.Stop();
        Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms");
    }
}
队列消费端
static void Main(string[] args)
{
    Console.WriteLine("******************短息服务********************");
    Console.WriteLine("短息消费者.......");
    using (RedisMessageQueue redisMessageQueue = new RedisMessageQueue("localhost:6379"))
    {
        // 多次消费
        while (true)
        {
            string order_sn = redisMessageQueue.DeQueue("order_points", 60);

            Console.WriteLine($"获取短息消息:{order_sn}");
            Thread.Sleep(1000);
            Console.WriteLine($"发送短信;order_sn:{order_sn}成功");
        }
    }
}