RabbitMQ
MongoDB in CSharp DataLayer
RabbitMQMassTransit
Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Discount.Domain.Base
{
public abstract class BaseEntity<TKey>
{
public TKey Id { get; set; }
public DateTime CreationDateTime { get; set; }
public DateTime ModificationDateTime { get; set; }
}
public abstract class BaseEntity : BaseEntity<int>
{
}
}
Entity , DTO , ICouponRepository
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Discount.Domain.Enums;
using Discount.Domain.Base;
namespace Discount.Domain.Coupons
{
public class Coupon:BaseEntity
{
[Required]
public int ProductId { get; set; }
[MaxLength(300)]
public string ProductTitle { get; set; }
public DiscountType DiscountType { get; set; }
public int Value { get; set; }
public DateTime StartDate { get; set; }
public DateTime EndDate { get; set; }
}
}
using Discount.Domain.Enums;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Discount.Domain.Coupons
{
public class CouponDto
{
public int Id { get; set; }
public int ProductId { get; set; }
public string ProductTitle { get; set; }
public DiscountType DiscountType { get; set; }
public int Value { get; set; }
public DateTime StartDate { get; set; }
public DateTime EndDate { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.ChangeTracking;
namespace Discount.Domain.Coupons
{
public interface ICouponRepository
{
Task<EntityEntry<Coupon>> AddAsync(Coupon coupon);
void Update(Coupon coupon);
Task<Coupon> GetAsync(int couponId);
Task<IEnumerable<Coupon>> GetAllAsync();
Task CommitAsync();
Task Delete(Coupon coupon);
}
}
using Discount.Domain.Coupons;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
namespace Discount.Infrastructure.Domain.Coupons
{
public class CouponRepository : ICouponRepository
{
private readonly DiscountDbContext _context;
public CouponRepository(DiscountDbContext context)
{
_context = context;
}
public async Task<EntityEntry<Coupon>> AddAsync(Coupon coupon)
{
coupon.CreationDateTime= DateTime.UtcNow;
coupon.ModificationDateTime = DateTime.UtcNow;
return await _context.Coupons.AddAsync(coupon);
}
public async Task Delete(Coupon coupon)
{
_context.Coupons.Remove(coupon);
await _context.SaveChangesAsync();
}
public async Task<IEnumerable<Coupon>> GetAllAsync()
{
return await _context.Coupons.ToListAsync();
}
public async Task<Coupon> GetAsync(int couponId)
{
return await _context.Coupons.FindAsync(couponId);
}
public void Update(Coupon coupon)
{
coupon.ModificationDateTime= DateTime.UtcNow;
_context.Coupons.Update(coupon);
}
public async Task CommitAsync()
{
await _context.SaveChangesAsync();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AutoMapper;
using Discount.Domain.Coupons;
namespace Discount.Infrastructure.Domain.Coupons
{
public class CouponMappingProfile:Profile
{
public CouponMappingProfile()
{
CreateMap<Coupon, CouponDto>().ReverseMap();
}
}
}
Application - Use Messages of RabbitMQ
using Discount.Domain.Enums;
using MediatR;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Discount.Domain.Coupons;
namespace Discount.Application.Coupons.Create
{
public class CreateCouponCommand: CouponDto,IRequest<int>
{
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AutoMapper;
using Discount.Domain.Coupons;
using MediatR;
namespace Discount.Application.Coupons.Create
{
public class CreateCouponCommandHandler : IRequestHandler<CreateCouponCommand, int>
{
private readonly ICouponRepository _couponRepository;
private readonly IMapper _mapper;
public CreateCouponCommandHandler(ICouponRepository couponRepository, IMapper mapper)
{
_couponRepository = couponRepository;
_mapper = mapper;
}
public async Task<int> Handle(CreateCouponCommand request, CancellationToken cancellationToken)
{
var coupon = _mapper.Map<Coupon>(request);
var addedCoupon = await _couponRepository.AddAsync(coupon);
await _couponRepository.CommitAsync();
return addedCoupon.Entity.Id;
}
}
}
Package API:
“MassTransit.Extensions.DependencyInjection”
“MassTransit.RabbitMQ”
Api
using Discount.Application.Coupons.Create;
using Discount.Domain.Coupons;
using System.Reflection;
using Discount.Infrastructure;
namespace Products.Api
{
public static class Assemblies
{
public static readonly Assembly EntityAssembly = typeof(Coupon).Assembly;
public static readonly Assembly ApplicationAssembly = typeof(CreateCouponCommand).Assembly;
public static readonly Assembly InfrastructureAssembly = typeof(DiscountDbContext).Assembly;
}
}
using System.Text.Json.Serialization;
using System.Text.Json;
using Discount.Domain.Coupons;
using MediatR;
using FluentValidation;
using Products.Api;
using Discount.Infrastructure;
using Discount.Infrastructure.Domain.Coupons;
using Microsoft.EntityFrameworkCore;
using MassTransit;
using Discount.Application.EventBusConsumers;
using EventBus.Messages.Common;
namespace Discount.Api
{
public static class ServiceRegistery
{
public static IServiceCollection AddServiceRegistery(this WebApplicationBuilder builder)
{
builder.Services.AddControllers().AddJsonOptions(options =>
{
options.JsonSerializerOptions.ReferenceHandler= ReferenceHandler.IgnoreCycles;
options.JsonSerializerOptions.WriteIndented = true;
});
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
return builder.Services;
}
public static IServiceCollection AddInfrastructureServices(this WebApplicationBuilder builder)
{
builder.Services.AddAutoMapper(Assemblies.InfrastructureAssembly);
builder.Services.AddDbContext<DiscountDbContext>(option =>
option.UseNpgsql(builder.Configuration.GetConnectionString("DiscountDbConn")));
builder.Services.AddScoped<ICouponRepository, CouponRepository>();
return builder.Services;
}
public static IServiceCollection AddApplicationServices(this WebApplicationBuilder builder)
{
builder.Services.AddMediatR(Assemblies.ApplicationAssembly);
return builder.Services;
}
public static IServiceCollection AddMessagingConfiguration(this WebApplicationBuilder builder)
{
builder.Services.AddMassTransit(
config =>
{
config.AddConsumer<AddProductConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
cfg.ReceiveEndpoint(EventBusConstants.AddProductQueue, c =>
{
c.ConfigureConsumer<AddProductConsumer>(ctx);
});
}
);
});
return builder.Services;
}
}
}
Producer
یک پروژه می سازیم : EventBus.Messages
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace EventBus.Messages.Events
{
public class IntegrationBaseEvent
{
public IntegrationBaseEvent()
{
Id = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
public IntegrationBaseEvent(Guid id, DateTime createDate)
{
Id = id;
CreationDate = createDate;
}
public Guid Id { get; private set; }
public DateTime CreationDate { get; private set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace EventBus.Messages.Events
{
public class AddProductEvent:IntegrationBaseEvent
{
public int ProductId { get; set; }
public string ProductTitle { get; set; }
}
}
مشخص کردن نام صف ها
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace EventBus.Messages.Common
{
public class EventBusConstants
{
public const string AddProductQueue = “addproduct-queue”;
}
}
Consumer
Producer
Products.Api
Package :
“MassTransit.Extensions.DependencyInjection”
“MassTransit.RabbitMQ”
Api
"EventBusSettings": {
"HostAddress": "amqp://guest:guest@localhost:5672"
}
public static IServiceCollection AddMessagingConfiguration(this WebApplicationBuilder builder)
{
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
});
تمام کلاس هایی که از این کلاس به ارث رفته باشند ، به عنوان Consumer شناخته می شوند.
x.AddConsumers(typeof(IntegrationBaseEvent).Assembly);
});
// OPTIONAL, but can be used to configure the bus options
builder.Services.AddOptions<MassTransitHostOptions>()
.Configure(options =>
{
// if specified, waits until the bus is started before
// returning from IHostedService.StartAsync
// default is false
options.WaitUntilStarted = true;
// if specified, limits the wait time when starting the bus
options.StartTimeout = TimeSpan.FromSeconds(10);
// if specified, limits the wait time when stopping the bus
options.StopTimeout = TimeSpan.FromSeconds(30);
});
return builder.Services;
}
builder.AddMessagingConfiguration();
Publish Event:
Package Application:
“MassTransit”
Application
using Products.Domain.Products;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MediatR;
namespace Products.Application.Products.Commands.Create
{
public class AddProductCommand:ProductReqDto,IRequest<ProductResDto>
{
}
}
using MediatR;
using Products.Domain;
using Products.Domain.Products;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using AutoMapper;
using MassTransit;
using EventBus.Messages.Events;
namespace Products.Application.Products.Commands.Create
{
public class AddProductCommandHandler : IRequestHandler<AddProductCommand, ProductResDto>
{
private readonly IWriteUnitOfWork _writeUnitOfWork;
private readonly IMapper _mapper;
private readonly ILogger<AddProductCommandHandler> _logger;
private readonly IPublishEndpoint _publishEndPoint; //mastransit
public AddProductCommandHandler(IWriteUnitOfWork writeUnitOfWork, IMapper mapper,IPublishEndpoint publishEndpoint, ILogger<AddProductCommandHandler> logger)
{
_writeUnitOfWork = writeUnitOfWork;
_mapper = mapper;
_logger = logger;
_publishEndPoint = publishEndpoint;
}
public async Task<ProductResDto> Handle(AddProductCommand request, CancellationToken cancellationToken)
{
var newProduct = _mapper.Map<Domain.Products.Product>(request);
var addedProduct = await _writeUnitOfWork.ProductWriteRepository.AddAsync(newProduct);
_logger.LogInformation($"Product {addedProduct.Id} is successfully created.");
ساختن Event
var addProductEvent = new AddProductEvent
{
ProductId = addedProduct.Id,
ProductTitle = addedProduct.Title
};
Event مون رو publish میکنیم :
await _publishEndPoint.Publish(addProductEvent);
return _mapper.Map<ProductResDto>(addedProduct);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using FluentValidation;
using Products.Domain;
namespace Products.Application.Products.Commands.Create
{
public class AddProductCommandValidator:AbstractValidator<AddProductCommand>
{
public AddProductCommandValidator()
{
RuleFor(p => p.Title)
.NotEmpty().WithMessage("{Title} is Required")
.NotNull()
.MaximumLength(200).WithMessage("{Title} must not exceed 200 characters");
RuleFor(p => p.Description)
.NotEmpty().WithMessage("{Description} is Required")
.NotNull()
.MaximumLength(5000).WithMessage("{Description} must not exceed 5000 characters");
RuleFor(p => p.CategoryId).NotEmpty().WithMessage("{CategoryId} is Required")
.NotEqual(0).WithMessage("{Category} must not be zero");
RuleFor(p => p.Price)
.NotNull().WithMessage("{Price} is Required")
.GreaterThanOrEqualTo(0).WithMessage("{Price} must not be less than zero");
}
}
}
Consumer
دریافت اطلاعات:
Api
"EventBusSettings": {
"HostAddress": "amqp://guest:guest@localhost:5672"
},
public static IServiceCollection AddMessagingConfiguration(this WebApplicationBuilder builder)
{
builder.Services.AddMassTransit(
config =>
{
چه Event هایی باید Consume شوند :
config.AddConsumer<AddProductConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
از صف AddProductQueue ، کانتکست رو کانسومر که در اینجا AddProductConsumer هستش .
cfg.ReceiveEndpoint(EventBusConstants.AddProductQueue, c =>
{
c.ConfigureConsumer<AddProductConsumer>(ctx);
});
}
);
});
return builder.Services;
}
builder.AddApplicationServices(); برای مدی ایت آر
builder.AddMessagingConfiguration();
Package Application:
“MassTransit”
Application
using AutoMapper;
using Discount.Application.Coupons.Create;
using EventBus.Messages.Events;
using MassTransit;
using MediatR;
namespace Discount.Application.EventBusConsumers
{
اون Event ی که می خوایم Consume ش کنیم رو کلاسش رو میدیم:
public class AddProductConsumer : IConsumer<AddProductEvent>
{
private readonly IMapper _mapper;
private readonly IMediator _mediator;
public AddProductConsumer(IMapper mapper, IMediator mediator)
{
_mapper = mapper;
_mediator = mediator;
}
public async Task Consume(ConsumeContext<AddProductEvent> context)
{
CreateCouponCommand createCouponCommand = new CreateCouponCommand
{
ProductId = context.Message.ProductId,
ProductTitle = context.Message.ProductTitle
};
var result = await _mediator.Send(createCouponCommand);
}
}
}