基于Spring的发布订阅模式
在我们使用spring开发应用时,经常会碰到要去解耦合一些依赖调用,比如我们在做代码的发布流程中,需要去通知相关的测试,开发人员关注发布中的错误信息。而且通知这个操作又不希望强耦合在主业务流程中,这个时候我们很容易就想到了观察者设计模式,而spring恰好提供了事件-监听机制,让我们看一下他们是具体怎么实现的吧。
事件-监听机制:
- 首先是一种对象间的一对多的关系;最简单的如交通信号灯,信号灯是目标(一方),行人注视着信号灯(多方);
- 当目标发送改变(发布),观察者(订阅者)就可以接收到改变;
- 观察者如何处理(如行人如何走,是快走/慢走/不走,目标不会管的),目标无需干涉;所以就松散耦合了它们之间的关系。
Spring提供的事件驱动模型/观察者抽象
其实整个模型就有三个角色,事件,目标(发布者),监听者,我们看一下spring中如何实现这三者
事件:
具体代表者是:ApplicationEvent:
1、 我们可以看到spring中ApplicationEvent该抽象类继承自JDK的EventObject。JDK要求所有事件将继承它,并通过source得到事件源。
package org.springframework.context; import java.util.EventObject; /** * Class to be extended by all application events. Abstract as it * doesn't make sense for generic events to be published directly. * * @author Rod Johnson * @author Juergen Hoeller */ public abstract class ApplicationEvent extends EventObject { /** use serialVersionUID from Spring 1.2 for interoperability */ private static final long serialVersionUID = 7099057708183571937L; /** System time when the event happened */ private final long timestamp; /** * Create a new ApplicationEvent. * @param source the object on which the event initially occurred (never {@code null}) */ public ApplicationEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); } /** * Return the system time in milliseconds when the event happened. */ public final long getTimestamp() { return this.timestamp; } }
目标(发布者)
具体代表者是具体代表者是:ApplicationEventPublisher及ApplicationEventMulticaster。ApplicationContext该接口继承了ApplicationEventPublisher,并在AbstractApplicationContext实现了具体代码,实际执行是委托给ApplicationEventMulticaster(可以认为是多播):
ApplicationContext继承自ApplicationEventPublisher
public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory, MessageSource, ApplicationEventPublisher, ResourcePatternResolver { }
ApplicationEventPublisher定义了publishEvent方法
public interface ApplicationEventPublisher { /** * Notify all <strong>matching</strong> listeners registered with this * application of an application event. Events may be framework events * (such as RequestHandledEvent) or application-specific events. * @param event the event to publish * @see org.springframework.web.context.support.RequestHandledEvent */ void publishEvent(ApplicationEvent event); /** * Notify all <strong>matching</strong> listeners registered with this * application of an event. * <p>If the specified {@code event} is not an {@link ApplicationEvent}, * it is wrapped in a {@link PayloadApplicationEvent}. * @param event the event to publish * @since 4.2 * @see PayloadApplicationEvent */ void publishEvent(Object event); }
在AbstractApplicationContext实现了具体代码,实际执行是委托给ApplicationEventMulticaster(可以认为是多播)
protected void publishEvent(Object event, ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); if (logger.isTraceEnabled()) { logger.trace("Publishing event in " + getDisplayName() + ": " + event); } // Decorate event as an ApplicationEvent if necessary ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<Object>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType(); } } // Multicast right now if possible - or lazily once the multicaster is initialized if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } // Publish event via parent context as well... if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
ApplicationContext自动到本地容器里找一个ApplicationEventMulticaster实现,如果没有自己new一个SimpleApplicationEventMulticaster。
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(new Runnable() { @Override public void run() { invokeListener(listener, event); } }); } else { invokeListener(listener, event); } } }
大家可以看到如果给它一个executor(java.util.concurrent.Executor),它就可以异步支持发布事件了。
所以我们发送事件只需要通过ApplicationContext.publishEvent即可
监听器
具体代表者是:ApplicationListener
1、其继承自JDK的EventListener
package org.springframework.context; import java.util.EventListener; /** * Interface to be implemented by application event listeners. * Based on the standard {@code java.util.EventListener} interface * for the Observer design pattern. * * <p>As of Spring 3.0, an ApplicationListener can generically declare the event type * that it is interested in. When registered with a Spring ApplicationContext, events * will be filtered accordingly, with the listener getting invoked for matching event * objects only. * * @author Rod Johnson * @author Juergen Hoeller * @param <E> the specific ApplicationEvent subclass to listen to * @see org.springframework.context.event.ApplicationEventMulticaster */ public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { /** * Handle an application event. * @param event the event to respond to */ void onApplicationEvent(E event); }
我们在spring中自己实现的demo:
1.定义事件
@Data public class OrderCreateEvent extends ApplicationEvent { private String tradeOrderNo; private Long userId; public OrderCreateEvent(Object source, String tradeOrderNo, Long userId) { super(source); this.tradeOrderNo = tradeOrderNo; this.userId = userId; } }
2.实现目标(发布者)
public interface OrderPublisher { default void publish(ApplicationEvent event) { ApplicationContext context = SpringBeanUtil.getContext(); context.publishEvent(event); } }
3.实现监听器
方式一
@Component @Slf4j public class OrderEventHandler { private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); @Async @EventListener public void createHandle(OrderCreateEvent event) { RedisKey redisKey = new RedisKey("order", "create", event.getTradeOrderNo()); if (redisTemplate.hasKey(redisKey.toString())) { return; } Order order = (Order) event.getSource(); //调用 HSF 接口创建订单落 ES try { Boolean syncResult = orderSyncEsServiceProxy.createOrder(order); if (!syncResult) { log.error("同步订单 tradeOrderNo:[{}],创建订单失败", order.getTradeOrderNo()); } } catch (Throwable e) { log.error("同步订单 tradeOrderNo:[{}],创建订单异常:", order.getTradeOrderNo(), e); } String messageId = mqProductConfig.sendMessage( mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_CREATE_ORDER), MqGroupConstant.Tag.CREATE_ORDER_TAG, event.getTradeOrderNo(), 60 * 30L); redisTemplate.opsForValue().set(redisKey.toString(), gson.toJson(event), 60 * 60 * 24, TimeUnit.SECONDS); // 锁定用户权益 benefitServiceProxy.lockCouponBenefit(order); // 用户信息 UserInfoDTO userInfoDTO = userServiceProxy.queryUserInfo(order.getUserId(), order.getSubOrders().get(0).getAddressId(), order.getTradeChannel(), null); // 创建订单埋点 youShuTrackService.track(TrackFactory.createOrderFactory(order, userInfoDTO), YouShuApiEnum.ORDER_ADD); log.error("[{}] OrderEventHandler.createHandle.event , messageId [{}] ", event.getTradeOrderNo(), messageId); } @Async @EventListener public void paidHandle(OrderPaidEvent event) { RedisKey redisKey = new RedisKey("order", "paid", event.getTradeOrderNo()); if (redisTemplate.hasKey(redisKey.toString())) { return; } redisTemplate.opsForValue().set(redisKey.toString(), gson.toJson(event), 60 * 60 * 24, TimeUnit.SECONDS); // 拼团付款成功后 开团或者加团接口 Order order = (Order) event.getSource(); if (BusinessTypeEnum.AID_GROUP_ORDER.getCode().equals(order.getBizType()) || BusinessTypeEnum.COMMUNITY_GROUP_ORDER.getCode().equals(order.getBizType())) { aidGroupServiceProxy.createAndJoinGroup(order); } // 使用用户权益 benefitServiceProxy.useCouponBenefit(order); } @Async @EventListener public void confirmHandle(OrderConfirmEvent event) { RedisKey redisKey = new RedisKey("order", "confirm", event.getTradeOrderNo()); if (redisTemplate.hasKey(redisKey.toString())) { return; } String messageId = mqProductConfig.sendMessage( mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_CONFIRM_ORDER), MqGroupConstant.Tag.CONFIRM_ORDER_TAG + evn, gson.toJson(event.getSource()), 0L); Map<String, String> eventMap = Maps.newHashMap(); eventMap.put("event", gson.toJson(event)); eventMap.put("source", gson.toJson(event.getSource())); eventMap.put("messageId", messageId); redisTemplate.opsForHash().putAll(redisKey.toString(), eventMap); redisTemplate.expire(redisKey.toString(), 60 * 60 * 24, TimeUnit.SECONDS); Order source = (Order) event.getSource(); source.getSubOrders().forEach(subOrder -> { itemLimitHelper.addItemLimitNum(subOrder.getUserId(), subOrder.getBizItemId(), subOrder.getBuyAmount().intValue(), subOrder.getTradeChannel(), source.getBizType()); }); // 订单 - 确单,埋点 youShuTrackService.track(TrackFactory.updateOrderFactory(event.getTradeOrderNo(), TrackOrderStatusEnum.UN_TRANSPORT), YouShuApiEnum.ORDER_UPDATE); log.error("[{}]OrderEventHandler.confirmHandle success , messageId=[{}]", event.getTradeOrderNo(), messageId); } @Async @EventListener public void refundHandle(OrderRefundEvent event) { RefundOrderDTO refundOrderDTO = event.getRefundOrderDTO(); log.info("[{}] OrderEventHandler.refundHandle", event.getTradeOrderNo()); log.info("refundOrderDTO is{}", JSON.toJSONString(refundOrderDTO)); RedisKey redisKey = new RedisKey("order", "refund", event.getTradeOrderNo()); if (refundOrderDTO != null) { redisKey = new RedisKey("order", "refund", event.getTradeOrderNo(), CollectionUtils.isEmpty(refundOrderDTO.getTradeSubOrderIdList()) ? "" : Joiner.on(",").join(refundOrderDTO.getTradeSubOrderIdList())); } if (redisTemplate.hasKey(redisKey.toString())) { return; } RefundMqDTO refundMqDTO = new RefundMqDTO(); refundMqDTO.setTradeOrderNo(event.getTradeOrderNo()); if (refundOrderDTO != null) { refundMqDTO.setRefundFee(refundOrderDTO.getRefundFee()); refundMqDTO.setSubOrderId(refundOrderDTO.getTradeSubOrderId()); refundMqDTO.setSubOrderIdList(refundOrderDTO.getTradeSubOrderIdList()); } String messageId = mqProductConfig.sendMessage( mqPropertiesConfig.getGroupNameMap().get(MqGroupConstant.GroupType.GID_REFUND_PAYMENT), MqGroupConstant.Tag.REFUND_PAYMENT, GsonUtils.gson().toJson(refundMqDTO), 0L); redisTemplate.opsForValue().set(redisKey.toString(), messageId, 3, TimeUnit.DAYS); Order order = (Order) event.getSource(); //退款根据 benefitServiceProxy.returnCouponBenefit(order); if (Objects.nonNull(refundOrderDTO) && Objects.nonNull(refundOrderDTO.getBenefitRefundFee()) && refundOrderDTO.getBenefitRefundFee() > 0) { benefitServiceProxy.returnBenefitDiscountFee(order.getUserId(), order.getTradeOrderNo(), refundOrderDTO.getTradeSubOrderIdList(), refundOrderDTO.getBenefitRefundFee()); } //库存回退 if (TradeChannelEnum.WECHAT_SUBSCRIPTION.getCode().equals(order.getTradeChannel())) { Date startDeliveryDate = order.getSubOrders().get(0).getSubOrderLine().getStartDeliveryDate(); saleStockOptProxy.returnStock(SaleStockOptProxy.groupBizItemByOrder(order), startDeliveryDate); } //全部退款进行调用es更新兼容拼团退款 if (Objects.isNull(refundOrderDTO) || CollectionUtils.isEmpty(refundOrderDTO.getTradeSubOrderIdList())) { orderSyncEsServiceProxy.changeOrderStatus(order.getTradeOrderNo(), OrderPayStatusEnum.CLOSED); } // 退款埋点 youShuTrackService.track(TrackFactory.returnOrderFactory(order), YouShuApiEnum.ORDER_RETURN); log.error("[{}]OrderEventHandler.refundHandle success , messageId=[{}]", event.getTradeOrderNo(), messageId); } @Async @EventListener public void cancelHandle(OrderCancelEvent event) { log.info("[{}] OrderEventHandler.cancelHandle", event.getTradeOrderNo()); RedisKey redisKey = new RedisKey("order", "cancel", event.getTradeOrderNo()); if (redisTemplate.hasKey(redisKey.toString())) { return; } redisTemplate.opsForValue().set(redisKey.toString(), System.currentTimeMillis() + "", 3, TimeUnit.DAYS); Order order = (Order) event.getSource(); //权益解锁 benefitServiceProxy.unLockCouponBenefit(order); //库存回退 if (TradeChannelEnum.WECHAT_SUBSCRIPTION.getCode().equals(order.getTradeChannel())) { log.info("startReturnStock : {}", JSON.toJSONString(SaleStockOptProxy.groupBizItemByOrder(order))); Date startDeliveryDate = order.getSubOrders().get(0).getSubOrderLine().getStartDeliveryDate(); saleStockOptProxy.returnStock(SaleStockOptProxy.groupBizItemByOrder(order), startDeliveryDate); } String freezerNum = AttributesUtils.getAttrByKey(order.getAttributes(), "appoint_freezer"); String freezerBoxNum = AttributesUtils.getAttrByKey(order.getAttributes(), "appoint_freezer_box"); freezerOperationProxy.unlockBox(freezerNum, freezerBoxNum); // 接龙活动套餐库存 if (BusinessTypeEnum.JIE_LONG_ORDER.getCode().equals(order.getBizType())) { Long storeId = order.getSubOrders().get(0).getStoreId(); String comboId = AttributesUtils.getAttrByKey(order.getAttributes(), "combo_id"); if (comboId == null) { log.error("Order content error {}", JSON.toJSONString(order)); throw new BusinessException(OrderErrorCode.ITEM_NOT_EXISTS); } comboStoreServiceProxy.returnStock(Long.parseLong(comboId), storeId); } orderSyncEsServiceProxy.changeOrderStatus(order.getTradeOrderNo(), OrderPayStatusEnum.PAID_OUTTIME); // 订单 - 取消,埋点 youShuTrackService.track(TrackFactory.updateOrderFactory(event.getTradeOrderNo(), TrackOrderStatusEnum.CANCEL_UN_PAY), YouShuApiEnum.ORDER_UPDATE); log.info("[{}]OrderEventHandler.cancelHandle success", event.getTradeOrderNo()); } }
好了,通过这3步,我们就可以在spring中愉快的实现事件-监听机制了。在我们需要发送事件时,只要调用 EventService的发送方法即可
@Slf4j @Builder @Data @AllArgsConstructor @NoArgsConstructor public class Order extends BaseDO implements OrderPublisher { private Long orderId; private String bizType; private String tradeOrderNo; private Long totalFee; private Long actualPaidFee; private Long discountFee; private Long refundFee; private Date payTime; private Date refundTime; private Integer status; private Long userId; private String tradeChannel; private String attributes; private Long deliveryFee; private List<SubOrder> subOrders; private List<Promotion> promotions; public void create() { this.publish(new OrderCreateEvent(this, this.tradeOrderNo, this.userId)); } public void confirm() { this.publish(new OrderConfirmEvent(this, this.tradeOrderNo, this.userId)); } public void paid() { this.publish(new OrderPaidEvent(this, this.tradeOrderNo, this.userId)); } public void refund(RefundOrderDTO subOrderId) { this.publish(new OrderRefundEvent(this, this.tradeOrderNo, this.userId,subOrderId)); } public void canceled() { this.publish(new OrderCancelEvent(this, this.tradeOrderNo, this.userId)); } public void finishPaid() { if (this.status == OrderPayStatusEnum.PAID_DONE.getValue() || this.status == OrderPayStatusEnum.PAID_DONE_SHIPPED.getValue()) { return; } if (this.status != OrderPayStatusEnum.UN_PAID.getValue()) { throw new BusinessException(OrderErrorCode.ORDER_STATUS_IS_ERROR); } this.status = OrderPayStatusEnum.PAID_DONE.getValue(); this.payTime = new Date(); } public void confirmOrder() { if (this.status == OrderPayStatusEnum.PAID_DONE_SHIPPED.getValue()) { return; } if (this.status == OrderPayStatusEnum.CLOSED.getValue()) { return; } if (this.status != OrderPayStatusEnum.PAID_DONE.getValue()) { throw new BusinessException(OrderErrorCode.ORDER_STATUS_IS_ERROR); } this.status = OrderPayStatusEnum.PAID_DONE_SHIPPED.getValue(); subOrders.forEach(subOrder -> { if (subOrder.getStatus() != SubOrderStatusEnum.INIT.getValue()) { throw new BusinessException(OrderErrorCode.ORDER_STATUS_IS_ERROR); } subOrder.setStatus(SubOrderStatusEnum.WAIT.getValue()); }); } public void cancel() { if (this.status != OrderPayStatusEnum.UN_PAID.getValue()) { return; } if (this.status == OrderPayStatusEnum.PAID_OUTTIME.getValue()) { return; } this.status = OrderPayStatusEnum.PAID_OUTTIME.getValue(); } public void complete(Long subOrderId) { subOrders.stream().filter(e -> e.getSubOrderId().equals(subOrderId)).forEach(subOrder -> { subOrder.setStatus(SubOrderStatusEnum.END.getValue()); }); if (subOrders.stream().allMatch(subOrder -> SubOrderStatusEnum.END.getValue().equals(subOrder.getStatus()) || SubOrderStatusEnum.TXP_CLOSE.getValue().equals(subOrder.getStatus()) )) { this.status = OrderPayStatusEnum.SUCCESS.getValue(); } } public void refundOrder(Long refundFee, Date refundDate) { if (this.status == OrderPayStatusEnum.UN_PAID.getValue()) { throw new BusinessException(OrderErrorCode.ORDER_STATUS_IS_ERROR); } if (this.status == OrderPayStatusEnum.CLOSED.getValue()) { throw new BusinessException(RefundErrorCode.ORDER_IS_REFUND); } refundOrder(refundFee, refundDate, null, null); } }
使用@EventListener还支持SPEL表达式
@EventListener(condition = "#jkRecordConfirmEvent.jkRecordConfirmEventDTO.cwConfirmAndReturnFlag") @Async("asyncListenerExecutor")
我们可以看到在监听器中,实现的方法使用了@Async注解。在spring3提供了@Aync注解来完成异步调用。我们可以使用这个新特性来完成异步调用。在实际项目中,我们一般也是把这两者结合来使用的,特别是监听事件是一件耗时过程时,这种方式降低了代码的耦合性,非常好用。
标签:
留言评论