using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Yi.Framework.Common.IOCOptions; namespace Yi.Framework.Core { /// /// 一个Exchange----多个Queue-----弄个缓存映射关系,初始化+支持全新绑定 /// 全局单例使用 /// /// 关系应该是直接配置到RabbitMQ了---程序只是向某个位置写入即可 /// /// /// 全量更新--耗时---阻塞实时更新---换不同的exchange? /// public class RabbitMQInvoker { #region Identity private readonly RabbitMQOptions _rabbitMQOptions; private readonly string _HostName = null; private readonly string _UserName = null; private readonly string _Password = null; private readonly int _Port = 0; public RabbitMQInvoker(IOptionsMonitor optionsMonitor) : this(optionsMonitor.CurrentValue.HostName, optionsMonitor.CurrentValue.UserName, optionsMonitor.CurrentValue.Password,optionsMonitor.CurrentValue.Port) { this._rabbitMQOptions = optionsMonitor.CurrentValue; } public RabbitMQInvoker(string hostName, string userName = "cc", string password = "cc",int port= 5672) { this._HostName = hostName; this._UserName = userName; this._Password = password; this._Port = port; } #endregion #region Init private static object RabbitMQInvoker_BindQueueLock = new object(); private static Dictionary RabbitMQInvoker_ExchangeQueue = new Dictionary(); private void InitBindQueue(RabbitMQConsumerModel rabbitMQConsumerModel) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")) { lock (RabbitMQInvoker_BindQueueLock) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}")) { this.InitConnection(); using (IModel channel = _CurrentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: rabbitMQConsumerModel.ExchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: rabbitMQConsumerModel.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: rabbitMQConsumerModel.QueueName, exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, arguments: null); } RabbitMQInvoker_ExchangeQueue[$"InitBindQueue_{rabbitMQConsumerModel.ExchangeName}_{rabbitMQConsumerModel.QueueName}"] = true; } } } } /// /// 必须先声明exchange--检查+初始化 /// /// private void InitExchange(string exchangeName) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}"))//没用api确认 { lock (RabbitMQInvoker_BindQueueLock) { if (!RabbitMQInvoker_ExchangeQueue.ContainsKey($"InitExchange_{exchangeName}")) { this.InitConnection(); using (IModel channel = _CurrentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); } RabbitMQInvoker_ExchangeQueue[$"InitExchange_{exchangeName}"] = true; } } } } //public void UnBindQueue(string exchangeName, string queueName) //{ //} private static object RabbitMQInvoker_InitLock = new object(); private static IConnection _CurrentConnection = null;//链接做成单例重用--channel是新的 private void InitConnection() { //https://blog.csdn.net/weixin_30646315/article/details/99101279 if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { lock (RabbitMQInvoker_InitLock) { if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { var factory = new ConnectionFactory() { HostName = this._HostName, Password = this._Password, UserName = this._UserName, Port=this._Port }; _CurrentConnection = factory.CreateConnection(); } } } } #endregion /// /// 只管exchange--- /// 4种路由类型? /// /// Send前完成交换机初始化 /// /// /// 建议Json格式 public void Send(RabbitMQConsumerModel rabbitMQConsumerModel, string message) { this.InitExchange(rabbitMQConsumerModel.ExchangeName); this.InitBindQueue(rabbitMQConsumerModel); if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { this.InitConnection(); } using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信 { try { channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: rabbitMQConsumerModel.ExchangeName, routingKey: string.Empty, basicProperties: null, body: body); channel.TxCommit();//提交 Console.WriteLine($" [x] Sent {body.Length}"); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}"); channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。 } } } /// /// 固定无消费队列名字---转移到目标队列---定好时间 /// /// /// /// public void SendDelay(string targetExchangeName, string message, int delaySecond) { this.InitExchange(targetExchangeName); if (_CurrentConnection == null || !_CurrentConnection.IsOpen) { this.InitConnection(); } using (var channel = _CurrentConnection.CreateModel())//开辟新的信道通信 { try { string delayExchangeName = "ZhaoxiMSA_DelayExchange"; //普通交换器 channel.ExchangeDeclare(delayExchangeName, "fanout", true, false, null); //参数设置 Dictionary args = new Dictionary(); args.Add("x-message-ttl", delaySecond * 1000);//TTL 毫秒 args.Add("x-dead-letter-exchange", targetExchangeName);//DLX args.Add("x-dead-letter-routing-key", "routingkey");//routingKey channel.QueueDeclare("ZhaoxiMSA_DelayQueue", true, false, false, args); channel.QueueBind(queue: "ZhaoxiMSA_DelayQueue", exchange: delayExchangeName, routingKey: string.Empty, arguments: null); ////DLX--- //死信队列绑定 //channel.ExchangeDeclare("ZhaoxiMSA_exchange_dlx", "fanout", true, false, null); //channel.QueueDeclare("ZhaoxiMSA_queue_dlx", true, false, false, null); //channel.QueueBind("ZhaoxiMSA_queue_dlx", "ZhaoxiMSA_exchange_dlx", "routingkey", null); channel.TxSelect();//开启Tx事务---RabbitMQ协议级的事务-----强事务 var properties = channel.CreateBasicProperties(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: delayExchangeName, routingKey: string.Empty, basicProperties: properties, body: body); channel.TxCommit();//提交 Console.WriteLine($" [x] Sent {body.Length}"); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine($"【{message}】发送到Broker失败!{ex.Message}"); channel.TxRollback(); //事务回滚--前面的所有操作就全部作废了。。。。 } } } #region Receive /// /// 注册处理动作 /// /// /// public void RegistReciveAction(RabbitMQConsumerModel rabbitMQConsumerMode, Func func) { this.InitBindQueue(rabbitMQConsumerMode); Task.Run(() => { using (var channel = _CurrentConnection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); channel.BasicQos(0, 0, true); consumer.Received += (sender, ea) => { string str = Encoding.UTF8.GetString(ea.Body.ToArray()); if (func(str)) { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认已消费 } else { //channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);//放回队列--重新包装信息,放入其他队列 } }; channel.BasicConsume(queue: rabbitMQConsumerMode.QueueName, autoAck: false,//不ACK consumer: consumer); Console.WriteLine($" Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}"); Console.ReadLine(); Console.WriteLine($" After Register Consumer To {rabbitMQConsumerMode.ExchangeName}-{rabbitMQConsumerMode.QueueName}"); } }); } #endregion } }