.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信

这篇具有很好参考价值的文章主要介绍了.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、概念及介绍

        通常通过使用事件总线实现来执行此发布/订阅系统。 事件总线可以设计为包含 API 的接口,该 API 是订阅和取消订阅事件和发布事件所需的。 它还可以包含一个或多个基于跨进程或消息通信的实现,例如支持异步通信和发布/订阅模型的消息队列或服务总线。

        本问介绍如何使用RabbitMQ通用事件总线接口实现这种与 .NET 的通信,并结合项目代码实践演示。 存在多种可能的实现,每种实现使用不同的技术或基础结构,例如 RabbitMQ、Azure 服务总线或任何其他第三方开源或商用服务总线。

二、什么是观察者模式?

观察者模式又名发布-订阅模式,具体概念就是定义对象间的一种一对多的依赖关系,当一个对象的状态发生变化时,所有依赖的对象都得到通知并被自动更新。

三、创建下发订单的项目(发布服务)

        使用Visual Studio2022新建web api 项目, 该项目主要实现简易的订单和工单发布的测试样例

1、新增IntegrationEvent类
public abstract class IntegrationEvent
{
    public string? Status { get; set; }
    public DateTime OccurredOn { get; set; }
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口

参数说明:

        TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类

        TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口

public interface IEventBusServices
{
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
    /// <param name="event">事件</param>
    void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
    /// <summary>
    /// 订阅消息
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
    void Subscribe<TEvent, TEventHandler>() 
        where TEvent : IntegrationEvent 
        where TEventHandler: IIntegrationEventHandler<TEvent>;
    /// <summary>
    /// 取消订阅
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler"></typeparam>
    void Unsubscribe<TEvent,TEventHandler>() 
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>;
}
3、新建IIntegrationEventHandler.cs
 public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
 {
     Task Handle(TEvent @event);
 }
4、创建RabbitMQConfigs.cs 抽象基类

        通过实例化 ConnectionFactory 对象,并设置其属性,来创建一个 RabbitMQ 连接工厂。然后,将从配置中获取的相关信息分配给连接工厂的属性,例如主机名、端口、用户名和密码。最后,返回创建的连接工厂对象。

public abstract class RabbitMQConfigs
{
    protected ConnectionFactory GreateConnectionFactory(IConfiguration configuration)
    {
        // 在这里设置ConnectionFactory的属性
        var factory = new ConnectionFactory
        {
            // 设置连接属性
            HostName = configuration["RabbitMQ:HostName"],
            Port = int.Parse(configuration["RabbitMQ:Port"]),
            UserName = configuration["RabbitMQ:UserName"],
            Password = configuration["RabbitMQ:Password"]
        };
        return factory;
    }
}
5、新建EventBusServices.cs

        通过继承 RabbitMQConfigs 类,并调用 GreateConnectionFactory 方法,创建和配置 RabbitMQ 的连接工厂对象,并创建RabbitMQ通道,实现消息发布、订阅、取消订阅接口

public class EventBusServices : RabbitMQConfigs,IEventBusServices
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    public EventBusServices(IConfiguration configuration) {
        var factory = GreateConnectionFactory(configuration);
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }
    public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
    {
        var eventName = @event.GetType().Name;
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.BasicPublish(exchange: "events",
                              routingKey: eventName,
                              basicProperties: null,
                              body: body);
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var @event = JsonSerializer.Deserialize<TEvent>(message);
            var handler = Activator.CreateInstance<TEventHandler>();
            if(@event != null)
            {
                await handler.Handle(@event);
            }                
        };

        _channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
    }

    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
    }
}
6、创建测试订单服务OrderServices.cs

        通过构造函数依赖注入IEventBusServices事件总线服务,使用事件总线实现订单消息的发布

 public class OrderServices: IOrderServices
 {
     private readonly IEventBusServices _eventBusServices;
     public OrderServices(IEventBusServices eventBusServices) 
     {
         _eventBusServices = eventBusServices;
     }

     public void PlaceOrder(OrderCreatedEvent order)
     {
         //处理订单
         order.OccurredOn = DateTime.Now;

         //发布订单消息
         _eventBusServices.Publish(order);
         
     }

     public void PlaceWorkOrder(WorkOrderCreatedEvent workOrder)
     {
         //处理
         workOrder.OccurredOn = DateTime.Now;

         //发布消息
         _eventBusServices.Publish(workOrder);
     }
 }

public class OrderCreatedEvent : IntegrationEvent
{
    public int OrderId { get; set; }

    public string? OrderName { get; set; }
}

public class WorkOrderCreatedEvent : IntegrationEvent
{
    public int WorkOrderId { get; set; }

    public string? WorkOrderName { get; set; }
}
7、实现模拟订单发送请求的代码
[Route("api/createOrder")]
[HttpPut]
public void PutOrder()
{
    //创建一个子线程:发布订单到MQ中(发布服务)
    Task.Run(() =>
    {
        // 调用订单服务的方法
        for (int i = 0; i< 8; i++)
        {
            var order = new OrderCreatedEvent { OrderId = i, OrderName =$"测试订单{i}", Status ="下单中" };
            _orderService.PlaceOrder(order);                    
            Console.WriteLine($"订单创建完成,订单ID:{order.OrderId};订单:{order.OrderName}{order.Status}");
            Thread.Sleep(1000);
        }
    });
    
    //创建一个子线程:发布工单到MQ中(发布服务)
    Task.Run(() =>
    {
        // 调用订单服务的方法
        for (int i = 0; i< 8; i++)
        {
            var workorder = new WorkOrderCreatedEvent { WorkOrderId = i, WorkOrderName =$"测试工单{i}", Status ="下单中" };
            _orderService.PlaceWorkOrder(workorder);                    
            Console.WriteLine($"工单创建完成,工单ID: {workorder.WorkOrderId} ;工单: {workorder.WorkOrderName}{workorder.Status}");
            Thread.Sleep(1000);
        }
    });
    
}

四、创建接收订单并处理的项目(订阅服务)

        使用Visual Studio2022新建控制台应用项目, 该项目主要实现订阅监听订单服务,接收订单服务发布的订单消息,并实现对订单的处理完成

1、新增EventBusClass类
public class EventBusClass
{
    public abstract class IntegrationEvent
    {
        public string? Status { get; set; }
        public DateTime OccurredOn { get; set; }
    }
    public class OrderCreatedEvent : IntegrationEvent
    {
        public int OrderId { get; set; }

        public string? OrderName { get; set; }
    }

    public class WorkOrderCreatedEvent : IntegrationEvent
    {
        public int WorkOrderId { get; set; }

        public string? WorkOrderName { get; set; }
    }
}
2、新建IEventBusServices.cs,定义发布、订阅、取消订阅接口的泛型参数接口

参数说明:

        TEvent:表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类

        TEventHandler:表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口

public interface IEventBusServices
{
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <typeparam name="TEvent">表示要订阅的事件类型。它是泛型参数,必须继承自 IntegrationEvent 类</typeparam>
    /// <param name="event">事件</param>
    void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;
    /// <summary>
    /// 订阅消息
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler">表示要处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>
    void Subscribe<TEvent, TEventHandler>() 
        where TEvent : IntegrationEvent 
        where TEventHandler: IIntegrationEventHandler<TEvent>;
    /// <summary>
    /// 取消订阅
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <typeparam name="TEventHandler"></typeparam>
    void Unsubscribe<TEvent,TEventHandler>() 
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>;
}
 3、新建IIntegrationEventHandler.cs
 public interface IIntegrationEventHandler<in TEvent> where TEvent : IntegrationEvent
 {
     Task Handle(TEvent @event);
 }
4、新建EventBusServices.cs
public class EventBusServices : IEventBusServices
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    public EventBusServices() {
        var factory = new ConnectionFactory
        {
            // 设置连接属性
            HostName = "localhost",
            Port = 5672,
            UserName = "my",
            Password = "123456"
        }; ;
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }
    public void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent
    {
        var eventName = @event.GetType().Name;
        var message = JsonSerializer.Serialize(@event);
        var body = Encoding.UTF8.GetBytes(message);
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.BasicPublish(exchange: "events",
                              routingKey: eventName,
                              basicProperties: null,
                              body: body);
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.ExchangeDeclare(exchange: "events", type: ExchangeType.Direct);
        _channel.QueueDeclare(queue: eventName, durable: true, exclusive: false, autoDelete: false, arguments: null);
        _channel.QueueBind(queue: eventName, exchange: "events", routingKey: eventName);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var @event = JsonSerializer.Deserialize<TEvent>(message);
            var handler = Activator.CreateInstance<TEventHandler>();
            if(@event != null)
            {
                await handler.Handle(@event);
            }                
        };

        _channel.BasicConsume(queue:eventName,autoAck:true,consumer:consumer);
    }

    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IntegrationEvent
        where TEventHandler : IIntegrationEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        _channel.QueueUnbind(queue: eventName, exchange: "events", routingKey: eventName);
    }
}
4、新建订单处理集成事件OrderHandlerServices.cs

        除了事件订阅逻辑外,还需要实现集成事件处理程序的内部代码(例如回调方法)。 在事件处理程序中,可指定接收和处理某种事件消息的位置

public class OrderCreatedEventHandler : IIntegrationEventHandler<OrderCreatedEvent>
{
    public async Task Handle(OrderCreatedEvent @event)
    {
        await Task.Run(() =>
        {
            // 处理订单创建事件逻辑...                
            @event.Status = "完成";
            Console.WriteLine($"订单创建事件处理完成,订单ID:{@event.OrderId};订单:{@event.OrderName}{@event.Status}");
        });
    }
}

public class WorkOrderCreatedEventHandler : IIntegrationEventHandler<WorkOrderCreatedEvent>
{
    public async Task Handle(WorkOrderCreatedEvent @event)
    {
        await Task.Run(() =>
        {
            // 处理订单创建事件逻辑...                
            @event.Status = "完成";
            Console.WriteLine($"工单创建事件处理完成,工单ID:{@event.WorkOrderId};工单:{@event.WorkOrderName}{@event.Status}");
        });
    }
}
5、修改Program.cs程序主入口,实现订阅
using rabbirmqtestReciver;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using study_project.IServices.EventBus;
using study_project.Services.EventBus;
using System.Text;

EventBusServices eventBusServices = new EventBusServices();


//订阅订单服务
eventBusServices.Subscribe<OrderCreatedEvent, OrderCreatedEventHandler>();
//订阅工单服务
eventBusServices.Subscribe<WorkOrderCreatedEvent, WorkOrderCreatedEventHandler>();

Console.WriteLine("按任意键退出...");
Console.ReadKey();

五、测试运行与结果

分别启动运行创建的api项目和控制台应用项目,请求api的发送订单接口api/createOrder

发布服务:

.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信,rabbitmq,分布式,.net,c#

订阅服务:

.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信,rabbitmq,分布式,.net,c#

结果对比:

.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信,rabbitmq,分布式,.net,c#

六、结语

        至此,本文已经演示了使用基于RabbitMQ实现的事件总线,展示了2个实例:订单和工单发布的案例,通过使用EventBus实现发布与订阅,指定事件类型和集成事件,便可以实现发布和订阅,减少了使用RabbitMQ实现信息发布订阅的部分代码编程;如果是分布式应用中,可以使用EventBus实现应用的业务逻辑模块与RabbitMQ发布订阅模块的解耦。文章来源地址https://www.toymoban.com/news/detail-861225.html

到了这里,关于.NET6 项目使用RabbitMQ实现基于事件总线EventBus通信的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于.net6的WPF程序使用SignalR进行通信

    之前写的SignalR通信,是基于.net6api,BS和CS进行通信的。 .net6API使用SignalR+vue3聊天+WPF聊天_signalr wpf_故里2130的博客-CSDN博客 今天写一篇关于CS客户端的SignalR通信,后台服务使用.net6api 。其实和之前写的差不多,主要在于服务端以后台进程的方式存在,而客户端以exe方式存在,

    2024年02月16日
    浏览(22)
  • 如何使用Docker将.Net6项目部署到Linux服务器(一)

    目录 一 配置服务器环境 1.1 配置yum                                                                                                                     1.1.1 更新yum包 1.1.2 yum命令 1.2 配置docker                                                           

    2024年02月04日
    浏览(27)
  • 如何在linux上使用docker发布.net6.0的webApi项目

    打开vs2022,创建一个新的webapi项目。 默认选项,一直下一步就ok。注意框架要使用.net6.0,不要选择.netframework(不支持跨平台)。 创建完后,已经生成完整的示例代码了,这个项目是可以直接运行的。这里我们在Program.cs文件中,稍作修改,在最后一行Run方法指定api的端口号,

    2024年02月11日
    浏览(25)
  • .Net6 记一次RabbitMq消息订阅/发布优化

             首先介绍一下项目情况,项目需要设备在线实时采集,最高采集频率为1次/秒,设备上传数据时,协议规定的是10条/包,服务端通过rabbitMq接收消息,并进行存储、预警、推送等进行多层处理,因为web端要求数据实时展示,且延时不得超过1分钟,因数据量较大,

    2024年01月18日
    浏览(24)
  • .Net6使用SignalR实现前后端实时通信

    后端代码 (Asp.net core web api,用的.net6) Program.cs 代码运行逻辑: ​1. 通过 WebApplication.CreateBuilder(args) 创建一个 ASP.NET Core 应用程序建造器。 2. 使用 builder.Services.AddControllers() 添加 MVC 控制器服务和 builder.Services.AddSignalR() 添加 SignalR 服务。 3. 注册 Swagger 和 Cors 跨域设置的服务

    2024年02月01日
    浏览(26)
  • .net6中, 用数据属性事件触发 用httpclient向服务器提交Mes工单

    MES开发中, 客户往往会要求 工单开始时记录工艺数据, 工单结束时将这些工艺数据回传到更上一级的WES系统中. 因为MES系统和PLC 是多线程读取, 所以加锁, 事件触发是常用手段.

    2024年02月10日
    浏览(22)
  • C#/.NET6项目的搭建

    目录 一.下载Visual Studio: 二.下载.NET6的SDK文件: 三:下载Visual Studio Code: 四.新建一个项目  1.根据自己的需求选择项目类型  2.项目的启动​编辑 五.创建MVC项目   1.项目结构 1.文件配置信息: 2.根目录 存储着静态文件: 3.启动项:  4.MVC三层: Controllers appsettings.json Prog

    2024年02月04日
    浏览(23)
  • .NET6项目连接数据库方式方法

    接上一篇Linux系统下创建dotnet项目,这一篇我们聊聊.NET6环境下dotnet项目连接数据库的方式方法,包括数据库字符串该如何配置。看了很多博主写的文章,连接数据库字符串配置的方式和位置五花八门,这篇文章给大家介绍一下连接数据库字符串的配置方式方法,顺便介绍下一

    2024年02月04日
    浏览(27)
  • 【.NET6+WPF】WPF使用prism框架+Unity IOC容器实现MVVM双向绑定和依赖注入

    前言:在C/S架构上,WPF无疑已经是“桌面一霸”了。在.NET生态环境中,很多小伙伴还在使用Winform开发C/S架构的桌面应用。但是WPF也有很多年的历史了,并且基于MVVM的开发模式,受到了很多开发者的喜爱。 并且随着工业化的进展,以及几年前微软对.NET平台的开源,国内大多

    2024年02月06日
    浏览(26)
  • c# .net6 在线条码打印基于

    条码打印基于:BarTender、ORM EF架构 UI展示: 主页代码: 1.条码打印实现类 实体类:

    2024年02月08日
    浏览(19)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包