簡介
目前的.net 生態中,最終一致性組件的選擇一直是一個問題。本地事務表(cap) 需要在每個服務的數據庫中插入消息表,而且做不了此類事務 比如:創建訂單需要 餘額滿足+庫存滿足,庫存和餘額處於兩個服務中。 masstransit 是我目前主要用的方案。以往一般都用 masstransit 中的 sagas 來實現 最終一致性,但是隨着併發的增加必定會對sagas 持久化的數據庫造成很大的壓力,根據stackoverflow 中的一個回答 我發現了 一個用 Request/Response 與 Courier 功能 實現最終一致性的方案 Demo地址。
Masstransit 中 Resquest/Response 功能
消息DTO
public class SampleMessageCommand
{
}
消費者
public class SampleMessageCommandHandler : IConsumer<SampleMessageCommand>
{
public async Task Consume(ConsumeContext<SampleMessageCommand> context)
{
await context.RespondAsync(new SampleMessageCommandResult() { Data = "Sample" });
}
}
返回結果DTO
public class SampleMessageCommandResult
{
public string Data { get; set; }
}
調用方式與註冊方式略過,詳情請看 官方文檔。
本質上使用消息隊列實現 Resquest/Response,客戶端(生產者)將請求消息發送至指定消息隊列並賦予RequestId和ResponseAddress(臨時隊列 rabbitmq),服務端(消費者)消費消息並把 需要返回的消息放入指定ResponseAddress,客戶端收到 Response message 通過匹配 RequestId 找到 指定Request,最後返回信息。
Masstransit 中 Courier 功能
通過有序組合一系列的Activity,得到一個routing slip。每個 activity(忽略 Execute Activities ) 都有 Execute 和 Compensate 兩個方法。Compensate 用來執撤銷 Execute 方法產生的影響(就是回退 Execute 方法)。每個 Activity Execute 最後都會 調用 Completed 方法把 回退所需要的的信息記錄在message中,最後持久化到消息隊列的某一個消息中。
餘額扣減的Activity ,這裏的 DeductBalanceModel 是請求扣減的數據模型,DeductBalanceLog 是回退時需要用到的信息。
public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
{
private readonly ILogger<DeductBalanceActivity> logger;
public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
{
logger.LogInformation("還原餘額");
var log = context.Log; //可以獲取 所有execute 完成時保存的信息
//throw new ArgumentException("some things were wrong");
return context.Compensated();
}
public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
{
logger.LogInformation("扣減餘額");
await Task.Delay(100);
return context.Completed(new DeductBalanceLog() { Price = 100 });
}
}
扣減庫存 Activity
public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
{
private readonly ILogger<DeductStockActivity> logger;
public DeductStockActivity(ILogger<DeductStockActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
{
var log = context.Log;
logger.LogInformation("還原庫存");
return context.Compensated();
}
public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
{
var argument = context.Arguments;
logger.LogInformation("扣減庫存");
await Task.Delay(100);
return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 });
}
}
生成訂單 Execute Activity
public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
{
private readonly ILogger<CreateOrderActivity> logger;
public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
{
this.logger = logger;
}
public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
{
logger.LogInformation("創建訂單");
await Task.Delay(100);
//throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="創建訂單成功" });
}
}
組裝 以上 Activity 生成一個 Routing Slip,這是一個有序的組合,扣減庫存=》扣減餘額=》生成訂單
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId });
builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price });
builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });
var routingSlip = builder.Build();
執行 Routing Slip
await bus.Execute(routingSlip);
這裡是沒有任何返回值的,所有activity都是 異步執行,雖然所有的activity可以執行完成或者由於某個Activity執行出錯而全部回退。(其實這裡有一種更壞的情況就是 Compensate 出錯,默認情況下 Masstransit 只會發送一個回退錯誤的消息,後面講到創建訂單的時候我會把它塞到錯誤隊列里,這樣我們可以通過修改 Compensate bug后重新導入到正常隊列來修正數據 ),這個功能完全滿足不了 創建訂單這個需求,執行 await bus.Execute(routingSlip) 后我們完全不知道訂單到底創建成功,還是由於庫存或餘額不足而失敗了(異步)。
還好 routing slip 在執行過程中產生很多消息,比如 RoutingSlipCompleted ,RoutingSlipCompensationFailed ,RoutingSlipActivityCompleted,RoutingSlipActivityFaulted 等,具體文檔,我們可以訂閱這些事件,再結合Request/Response 實現 創建訂單的功能。
)
實現創建訂單(庫存滿足+餘額滿足)長流程
創建訂單 command
/// <summary>
/// 長流程 分佈式事務
/// </summary>
public class CreateOrderCommand
{
public string ProductId { get ; set ; }
public string CustomerId { get ; set ; }
public int Price { get ; set ; }
}
事務第一步,扣減庫存相關 代碼
public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
{
private readonly ILogger<DeductStockActivity> logger;
public DeductStockActivity(ILogger<DeductStockActivity> logger)
{
this .logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
{
var log = context.Log;
logger.LogInformation( " 還原庫存 " );
return context.Compensated();
}
public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
{
var argument = context.Arguments;
logger.LogInformation( " 扣減庫存 " );
await Task.Delay(100 );
return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 });
}
}
public class DeductStockModel
{
public string ProductId { get ; set ; }
}
public class DeductStockLog
{
public string ProductId { get ; set ; }
public int Amount { get ; set ; }
}
事務第二步,扣減餘額相關代碼
public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
{
private readonly ILogger<DeductBalanceActivity> logger;
public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
{
this .logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
{
logger.LogInformation( " 還原餘額 " );
var log = context.Log;
// throw new ArgumentException("some things were wrong");
return context.Compensated();
}
public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
{
logger.LogInformation( " 扣減餘額 " );
await Task.Delay(100 );
return context.Completed(new DeductBalanceLog() { Price = 100 });
}
}
public class DeductBalanceModel
{
public string CustomerId { get ; set ; }
public int Price { get ; set ; }
}
public class DeductBalanceLog
{
public int Price { get ; set ; }
}
事務第三步,創建訂單相關代碼
public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
{
private readonly ILogger<CreateOrderActivity> logger;
public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
{
this .logger = logger;
}
public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
{
logger.LogInformation( " 創建訂單 " );
await Task.Delay(100 );
// throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
return context.CompletedWithVariables(new CreateOrderResult { OrderId=" 111122 " ,Message=" 創建訂單成功 " });
}
}
public class CreateOrderModel
{
public string ProductId { get ; set ; }
public string CustomerId { get ; set ; }
public int Price { get ; set ; }
}
public class CreateOrderResult
{
public string OrderId { get ; set ; }
public string Message { get ; set ; }
}
我通過 消費 創建訂單 request,獲取 request 的 response 地址與 RequestId,這兩個值 返回 response 時需要用到,我把這些信息存到 RoutingSlip中,並且訂閱 RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed 三種事件,當這三種消息出現時 我會根據 事件類別 和RoutingSlip中 之前加入的 (response 地址與 RequestId)生成 Response ,整個過程大概就是這麼個意思,沒理解可以看demo。這裏由於每一個事物所需要用到的 RoutingSlip + Request/Response 步驟都類似 可以抽象一下(模板方法),把Activity 的組裝 延遲到派生類去解決,這個代理類Masstransit有 ,但是官方沒有顧及到 CompensationFailed 的情況,所以我乾脆自己再寫一個。
public abstract class RoutingSlipDefaultRequestProxy<TRequest> :
IConsumer <TRequest>
where TRequest : class
{
public async Task Consume(ConsumeContext<TRequest> context)
{
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);
builder.AddVariable( " RequestId " , context.RequestId);
builder.AddVariable( " ResponseAddress " , context.ResponseAddress);
builder.AddVariable( " FaultAddress " , context.FaultAddress);
builder.AddVariable( " Request " , context.Message);
await BuildRoutingSlip(builder, context);
var routingSlip = builder.Build();
await context.Execute(routingSlip).ConfigureAwait(false );
}
protected abstract Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<TRequest> request);
}
這個 是派生類 Routing slip 的拼裝過程
public class CreateOrderRequestProxy : RoutingSlipDefaultRequestProxy<CreateOrderCommand>
{
private readonly IConfiguration configuration;
public CreateOrderRequestProxy(IConfiguration configuration)
{
this .configuration = configuration;
}
protected override Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<CreateOrderCommand> request)
{
builder.AddActivity( " DeductStock " , new Uri($" {configuration[ " RabbitmqConfig:HostUri" ]}/DeductStock_execute " ), new DeductStockModel { ProductId = request.Message.ProductId });
builder.AddActivity( " DeductBalance " , new Uri($" {configuration[ " RabbitmqConfig:HostUri" ]}/DeductBalance_execute " ), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price });
builder.AddActivity( " CreateOrder " , new Uri($" {configuration[ " RabbitmqConfig:HostUri" ]}/CreateOrder_execute " ), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });
return Task.CompletedTask;
}
}
構造response 基類,主要是對三種情況做處理。
public abstract class RoutingSlipDefaultResponseProxy<TRequest, TResponse, TFaultResponse> : IConsumer<RoutingSlipCompensationFailed>, IConsumer<RoutingSlipCompleted>,
IConsumer <RoutingSlipFaulted>
where TRequest : class
where TResponse : class
where TFaultResponse : class
{
public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
{
var request = context.Message.GetVariable<TRequest>(" Request " );
var requestId = context.Message.GetVariable<Guid>(" RequestId " );
Uri responseAddress = null ;
if (context.Message.Variables.ContainsKey(" ResponseAddress " ))
responseAddress = context.Message.GetVariable<Uri>(" ResponseAddress " );
if (responseAddress == null )
throw new ArgumentException($" The response address could not be found for the faulted routing slip: {context.Message.TrackingNumber} " );
var endpoint = await context.GetResponseEndpoint<TResponse>(responseAddress, requestId).ConfigureAwait(false );
var response = await CreateResponseMessage(context, request);
await endpoint.Send(response).ConfigureAwait(false );
}
public async Task Consume(ConsumeContext<RoutingSlipFaulted> context)
{
var request = context.Message.GetVariable<TRequest>(" Request " );
var requestId = context.Message.GetVariable<Guid>(" RequestId " );
Uri faultAddress = null ;
if (context.Message.Variables.ContainsKey(" FaultAddress " ))
faultAddress = context.Message.GetVariable<Uri>(" FaultAddress " );
if (faultAddress == null && context.Message.Variables.ContainsKey(" ResponseAddress " ))
faultAddress = context.Message.GetVariable<Uri>(" ResponseAddress " );
if (faultAddress == null )
throw new ArgumentException($" The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber} " );
var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false );
var response = await CreateFaultedResponseMessage(context, request, requestId);
await endpoint.Send(response).ConfigureAwait(false );
}
public async Task Consume(ConsumeContext<RoutingSlipCompensationFailed> context)
{
var request = context.Message.GetVariable<TRequest>(" Request " );
var requestId = context.Message.GetVariable<Guid>(" RequestId " );
Uri faultAddress = null ;
if (context.Message.Variables.ContainsKey(" FaultAddress " ))
faultAddress = context.Message.GetVariable<Uri>(" FaultAddress " );
if (faultAddress == null && context.Message.Variables.ContainsKey(" ResponseAddress " ))
faultAddress = context.Message.GetVariable<Uri>(" ResponseAddress " );
if (faultAddress == null )
throw new ArgumentException($" The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber} " );
var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false );
var response = await CreateCompensationFaultedResponseMessage(context, request, requestId);
await endpoint.Send(response).ConfigureAwait(false );
}
protected abstract Task<TResponse> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, TRequest request);
protected abstract Task<TFaultResponse> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, TRequest request, Guid requestId);
protected abstract Task<TFaultResponse> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, TRequest request, Guid requestId);
}
Response 派生類 ,這裏邏輯可以隨自己定義,我也是隨便寫了個 CommonResponse和一個業務錯誤拋錯(犧牲了一點性能)。
public class CreateOrderResponseProxy :
RoutingSlipDefaultResponseProxy <CreateOrderCommand, CommonCommandResponse<CreateOrderResult>, CommonCommandResponse<CreateOrderResult>>
{
protected override Task<CommonCommandResponse<CreateOrderResult>> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, CreateOrderCommand request)
{
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = 1 ,
Result = new CreateOrderResult
{
Message = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.Message))?.ToString(),
OrderId = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.OrderId))?.ToString(),
}
});
}
protected override Task<CommonCommandResponse<CreateOrderResult>> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, CreateOrderCommand request, Guid requestId)
{
var commonActivityExecuteFaildException = context.Message.ActivityExceptions.FirstOrDefault(m => m.ExceptionInfo.ExceptionType == typeof (CommonActivityExecuteFaildException).FullName);
if (commonActivityExecuteFaildException != null )
{
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = 2 ,
Message = commonActivityExecuteFaildException.ExceptionInfo.Message
});
}
// system error log here
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = 3 ,
Message = " System error "
});
}
protected override Task<CommonCommandResponse<CreateOrderResult>> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, CreateOrderCommand request, Guid requestId)
{
var exception = context.Message.ExceptionInfo;
// lg here context.Message.ExceptionInfo
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = 3 ,
Message = " System error "
});
}
}
對於 CompensationFailed 的處理 通過 ActivityCompensateErrorTransportFilter 實現 發送到錯誤消息隊列,後續通過prometheus + rabbitmq-exporter + alertmanager 觸發告警 通知相關人員處理。
public class ActivityCompensateErrorTransportFilter<TActivity, TLog> : IFilter<CompensateActivityContext<TActivity, TLog>>
where TActivity : class , ICompensateActivity<TLog>
where TLog : class
{
public void Probe(ProbeContext context)
{
context.CreateFilterScope( " moveFault " );
}
public async Task Send(CompensateActivityContext<TActivity, TLog> context, IPipe<CompensateActivityContext<TActivity, TLog>> next)
{
try
{
await next.Send(context).ConfigureAwait(false );
}
catch (Exception ex)
{
if (!context.TryGetPayload(out IErrorTransport transport))
throw new TransportException(context.ReceiveContext.InputAddress, $" The {nameof(IErrorTransport)} was not available on the {nameof(ReceiveContext)}. " );
var exceptionReceiveContext = new RescueExceptionReceiveContext(context.ReceiveContext, ex);
await transport.Send(exceptionReceiveContext);
}
}
}
註冊 filter
public class RoutingSlipCompensateErrorSpecification<TActivity, TLog> : IPipeSpecification<CompensateActivityContext<TActivity, TLog>>
where TActivity : class , ICompensateActivity<TLog>
where TLog : class
{
public void Apply(IPipeBuilder<CompensateActivityContext<TActivity, TLog>> builder)
{
builder.AddFilter( new ActivityCompensateErrorTransportFilter<TActivity, TLog>());
}
public IEnumerable<ValidationResult> Validate()
{
yield return this .Success(" success " );
}
}
cfg.ReceiveEndpoint( " DeductStock_compensate " , ep =>
{
ep.PrefetchCount = 100 ;
ep.CompensateActivityHost <DeductStockActivity, DeductStockLog>(context.Container, conf =>
{
conf.AddPipeSpecification( new RoutingSlipCompensateErrorSpecification<DeductStockActivity, DeductStockLog>());
});
});
實現創建產品(創建完成+添加庫存)
實現了 創建訂單的功能,整個流程其實是同步的,我在想能不能實現最為簡單的最終一致性 比如 創建一個產品 ,然後異步生成它的庫存 ,我發現是可以的,因為我們可以監聽到每一個Execute Activity 的完成事件,並且把出錯時的信息通過 filter 塞到 錯誤隊列中。
這裏的代碼就不貼了,詳情請看 demo
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※新北清潔 公司,居家、辦公、裝潢細清專業服務
※別再煩惱如何寫文案 ,掌握八大原則!
※網頁設計 一頭霧水該從何著手呢? 台北網頁設計 公司幫您輕鬆架站!
※超省錢租車 方案
※教你寫出一流的銷售文案 ?