参考视频:雷神谷粒商城项目


项目名称
- 书阁”图书商城管理系统、微盟电子商城网络交易系统、高校闲置资源交易系统
- 购物在“e”零售商城平台、惠农通—智慧农资商城 、农产品轻量级微商城系统
- 精美鞋业贸易系统
项目简介
- 本系统采用微服务架构设计,在分布式环境下利用Spring Cloud框架,通过业务划分,设计独立模块的微服务,拆分为订单服务、购物车服务、支付服务、用户管理服务、商品管理服务、文件上传服务等模块,结合了当前比较流行的互联网电商模式,为消费者提供商品贸易平台。
系统架构
- 该系用采用SpringCloud架构,利用SpringBoot构建应用,Nacos作为服务的注册、配置中心,OpenFeign实现与其他模块进行交互,Sentinel实现熔断降级和错误处理,Seata分布式事务解决方案,Gateway作为服务网关,Sleuth链路追踪,RabbitMQ实现延迟队列,Redis作为缓存解决读多写少的场景,MySQL进行持久化,MyBatisPlus作为持久化框架。
我的职责
- 完成平台商品、购物车、订单、库存、优惠券、支付、文件上传等服务模块的接口开发;
- 使用RabbitMQ延时队列实现未付款订单,超过一定时间后,系统自动取消订单并解锁库存;
- 使用redis+lua脚本防止重复提交攻击,解决了用户利用浏览器刷新和回退重复提交订单的问题;
- 基于Redisson分布式限流:Semaphore信号量实现秒杀和一人一单功能,通过逐步改进分布式锁的方案,解决在多线程情况下用户重复提交订单的幂等性问题;
- 基于 Token 的认证授权机制:JWT,通过对登录用户颁发登录凭证,实现登录模块认证授权功能;
- 使用ElasticSearch分布式全文搜索引擎,对冷数据,商品信息数据建立索引,保证冷数据,商品数据的查询性能;
- 利用Jmeter工具进行压测,找到在多线程情况下造成的内存泄漏,并发与同步等问题,保证了系统在线上的处理能力和稳定性维持在一个标准范围内;
- 使用Redis进行热点信息缓存,比如附近购物车信息和登录信息,提高服务器的性能;
- 使用Spring Schedule定时任务提前上架抢购商品信息到Redis缓存中实现库存预热功能
- 使用Redisson分布式锁解决分布式系统下商品重复上架的幂等性问题;
- 使用Spring Cache方法级别缓存技术,实现已经被调用过的指定的目标方法,直接从缓存中获取方法调用后的结果返回,提高系统的响应速度;
- 使用CompletableFuture 异步编排解决查询商品详情⻚响应速度慢的问题;
- 使用Nacos作为注册中心与配置中心,将服务名称及其对应地址进行存储,实现服务地址的注册与发现以及配置动态加载等功能
- 使用Seata中的TCC事务模式,把一个完整的业务逻辑拆分成三个阶段,通过事务管理器进行管理,保证分布式系统数据一致性问题;
- 整合第三方文件上传服务,如阿里云对象存储,基于服务端签名后直传,保证文件传输的安全性;
- 整合OAuth2.0协议授权,使用AccessToken调用开发API获取用户信息,支持微信、QQ、微博、Gitee、Github等第三方登陆;
- 使用RSA算法保证数据加密安全,成功对接第三方支付功能,订单付款支持支付宝、微信支付等第三方支付服务。
1、梳理自己项目的难点或亮点是什么? 2、项目中,为什么用 xx 技术点,用 yy 的可以吗?或者为什么这么设计?
关于第一点,这个内容即使面试官没问,我们也可以在自我介绍时候表述出来。
如果你觉得自己的项目的确没什么厉害的东西,都是业务的 curd。那就挑一个值得说过的优化,或者设计方案也行。
毕竟高大上的东西的确只有少数人接触到,都是理解的。
接下来关于第二点,目的是考察你对自己项目的理解是不是真的知其所以然,还是说自己只是一个无情的 curd 机器。
项目切入点:分布式锁的实现、分布式Session、分布式缓存、缓存一致性、跨域、异步编排等,重点把握秒杀模块和 订单模块的设计及流程
项目概述
本系统是专门为精美鞋业有限公司制定的对外贸易系统,大部分商品出售给越南地区,供那边的厂商进行二次加工。 本系统采用微服务架构设计,在分布式环境下利用Spring Cloud框架,通过业务划分,设计独立模块的微服务,拆分为订单服务、购物车服务、支付服务、用户管理服务、商品管理服务、文件上传服务等模块,为客户提供半成品商品贸易平台。
核心业务一:购物车
购物车Redis数据结构选型:
双层 Map:Map<String,Map<String,String>>
第一层 Map,Key 是用户 id
第二层 Map,Key 是购物车中商品 id,值是购物项数据
购物车两个核心功能:新增商品到购物车、查询购物车
新增商品:判断是否登录
- 是:则添加商品到后台 Redis 中,把 user 的唯一标识符作为 key。
- 否:则添加商品到后台 Redis 中,使用随机生成的 user-key 作为 key。
查询购物车列表:判断是否登录
- 否:直接根据 user-key 查询 Redis 中数据并展示
- 是:已登录,则需要先根据 user-key 查询 Redis 是否有数据。
- 有:需要提交到后台添加到 Redis ,合并数据,而后查询。
- 否:直接去后台查询 Redis ,而后返回
核心业务二:订单
订单中心
- 电商系统涉及到 3 流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集 合起来。
- 订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这 些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。
订单构成

订单状态
- 待付款
- 用户提交订单后,订单进行预下单,目前主流电商网站都会唤起支付,便于用户快速完成支 付,需要注意的是待付款状态下可以对库存进行锁定,锁定库存需要配置支付超时时间,超 时后将自动取消订单,订单变更关闭状态。
- 已付款/ 待发货
- 用户完成订单支付,订单系统需要记录支付时间,支付流水单号便于对账,订单下放到 WMS 系统,仓库进行调拨,配货,分拣,出库等操作。
- 待收货/ 已发货
- 仓储将商品出库后,订单进入物流环节,订单系统需要同步物流信息,便于用户实时知悉物 品物流状态
- 已完成
- 用户确认收货后,订单交易完成。后续支付侧进行结算,如果订单存在问题进入售后状态
- 已取消
- 付款之前取消订单。包括超时未付款或用户商户取消订单都会产生这种订单状态。
订单流程
- 正常的网购步骤:订单生成–>支付订单–>卖家发货–>确认收货–>交易成功
1.订单创建与支付
http://order.gulimall.com/toTrade

http://order.gulimall.com/submitOrder
com.klaus.gulimall.order.web.OrderWebController#submitOrder
/**
*
* @param vo 上一个页面携带的数据
* @param model
* @param redirectAttributes
* @return
*/
@PostMapping("/submitOrder")
public String submitOrder(OrderSubmitVo vo, Model model, RedirectAttributes redirectAttributes){
//捕获异常
try {
SubmitOrderResponseVo responseVo = orderService.submitOrder(vo);
//下单:去创建订单,验令牌,验价格,锁库存....
System.out.println("订单提交的数据..." + vo);
if (responseVo.getCode() == 0){
//下单成功来到支付选择页
model.addAttribute("submitOrderResponse", responseVo);
return "pay";
}else {
//下单失败重定向到订单确认页
String msg = "下单失败:";
switch (responseVo.getCode()){
case 1: msg+= "订单信息过期,请刷新再次提交"; break;
case 2: msg+= "订单商品价格发生了变化,请刷新再次提交"; break;
case 3: msg+= "库存锁定失败,商品库存不足"; break;
}
redirectAttributes.addFlashAttribute("msg", msg);
return "redirect:http://order.gulimall.com/toTrade";
}
}catch (Exception e) {
if (e instanceof NoStockException) {
String message = ((NoStockException) e).getMessage();
//抛出无库存异常后页面显示此异常信息NO_STOCK_EXCEPTION(21000, "商品库存不足");
redirectAttributes.addFlashAttribute("msg", message);
}
//抛出任何异常后重定向会订单确认页
return "redirect:http://order.gulimall.com/toTrade";
}
}com.klaus.gulimall.order.service.impl.OrderServiceImpl#submitOrder
/**
* 下单
* 加入统一事务
* @param vo
* @return
* @Transactional 本地事务,在分布式系统下,只能控制自己的回滚,控制不了其他服务的回滚
* (isolation = Isolation.REPEATABLE_READ)默认隔离级别
* ->分布式事务: 最大原因是网络问题+分布式机器
*
*/
//@GlobalTransactional 高并发 此注解会加很多全局锁,导致下单只能等别人先下完单才能下,高并发环境不可取
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {
submitVoThreadLocal.set(vo);
SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();
//从拦截器里获取用户信息
MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();
responseVo.setCode(0);
//1、验证令牌【令牌的对比和删除必须保证原子性】
//0令牌失败 - 1删除成功(校验成功)
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String orderToken = vo.getOrderToken();
//原子验证令牌和删除令牌
Long result = redisTemplate.execute(
//脚本返回类型->0,1
new DefaultRedisScript<Long>(script, Long.class),
//将缓存中将要比对的key转为集合
Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()),
//传入要校验的值
orderToken);
if (result == 0L) {
//验证失败,设置错误状态码为1,key过期等情况
responseVo.setCode(1);
return responseVo;
} else {
//验证成功
//下单:去创建订单,验令牌,验价格,锁库存....
//1、创建订单,订单项等信息
OrderCreateTo order = createOrder();
// todo 观察者模式: 发布商品下单事件
ApplicationContextHolder.getInstance().publishEvent(new OrderCreateEvent(this, order));
//2、验价,拿到应付金额与页面提交的金额进行校验
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = vo.getPayPrice();
//两者相减的绝对值小于0.01就算校验通过
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01){
//校验通过
//TODO 3、保存订单
saveOrder(order);
//4、库存锁定,这样能避免库存不足等现象发生
//只要有异常就回滚订单数据(订单号,所有订单项(skuId,skuName,num))
WareSkuLockVo lockVo = new WareSkuLockVo();
//设置指定库存的订单号
lockVo.setOrderSn(order.getOrder().getOrderSn());
List<OrderItemVo> locks = order.getOrderItems().stream().map(item -> {
OrderItemVo orderItemVo = new OrderItemVo();
//订单已经保存了,可以直接设置
orderItemVo.setSkuId(item.getSkuId());
orderItemVo.setCount(item.getSkuQuantity());
orderItemVo.setTitle(item.getSkuName());
return orderItemVo;
}).collect(Collectors.toList());
//锁定订单项数据
lockVo.setLocks(locks);
//todo 4、远程锁库存
//库存锁定成功了,但是网络原因超时了,订单回滚,库存不回滚
//为了保证高并发,库存服务自己回滚,可以发消息给库存服务
//库存服务本身也可以使用自动解锁模式->消息队列
R r = wmsFeignService.orderLockStock(lockVo);
//判断远程调用是否成功
if (r.getCode() == 0){
//远程调用成功,锁成功了
responseVo.setOrder(order.getOrder());
//todo 5、远程扣减积分 出异常 引入seata后业务标注@GlobalTransactional订单回滚,库存回滚
// int i = 10/0;//不标注@GlobalTransactional 订单回滚,库存不回滚
//todo 订单创建成功发送消息给MQ
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
return responseVo;
}else {
//远程调用失败,锁定失败
//锁定失败
String msg = (String) r.get("msg");
//抛出异常就不能设置错误状态码了
throw new NoStockException(msg);
// responseVo.setCode(3);
// return responseVo;
}
}else {
//校验失败
responseVo.setCode(2);
return responseVo;
}
}
}com.klaus.gulimall.order.service.impl.OrderServiceImpl#createOrder
/**
* 创建订单聚合方法,含构建订单和所有订单项
* @return
*/
private OrderCreateTo createOrder() {
//获取当前登录的用户信息
MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();
OrderCreateTo createTo = new OrderCreateTo();
//1、todo 生成订单号(使用雪花算法根据用户id创建订单号)
// String orderSn = IdWorker.getTimeId();
String orderSn = SnowflakeIdUtil.nextIdStrByService(memberRespVo.getId().toString());
//创建订单
OrderEntity orderEntity = buildOrder(orderSn);
//2、获取到所有的订单项
List<OrderItemEntity> itemEntities = buildOrderItems(orderSn);
//3、验价,计算价格、积分等相关
computePrice(orderEntity, itemEntities);
// createTo.setOrderItems(itemEntities);
// createTo.setOrder(orderEntity);
// 构建订单聚合根
OrderCreateTo order = createTo.builder()
.order(orderEntity)
.orderItems(itemEntities)
.fare(orderEntity.getFreightAmount())
.payPrice(orderEntity.getPayAmount())
.build();
return order;
}
支付服务,提供支付宝、微信、银行卡等支付方式,支持支付查询以及退款等功能。
配置项修改
详情见 支付宝沙箱对接 中需要修改配置项。
核心流程
核心流程(刚果商城项目参考):
1)通过策略模式封装支付渠道和支付场景,用户支付时动态选择对应的支付组件。
- 代码地址:
org.opengoofy.congomall.biz.pay.application.service.impl.PayServiceImpl#commonPay2)通过策略模式封装支付回调场景,三方支付平台回调时动态选择对应的支付回调组件。
- 代码地址:
org.opengoofy.congomall.biz.pay.application.service.impl.PayServiceImpl#callback
支付宝沙箱对接
项目中已对接支付宝沙箱环境,以下介绍如何对接支付宝沙箱。
- 注册账号
注册支付宝开发者账户,进入开发者控制台。
https://openhome.alipay.com/platform/developerIndex.htmopen in new window
直接进入沙箱环境地址。
https://open.alipay.com/develop/sandbox/appopen in new window
沙箱应用页面如下:

- 获取支付参数
拿到 APPID 替换 application.properties 文件中 alipay.app-id。

为了图方便,这里直接使用支付宝自定义公钥和私钥。
我这里是已经开启过的,所以启用按钮是置灰的。第一次使用应该是蓝色可点击的状态。

复制支付宝公钥替换 application.properties 文件中 alipay.alipay_public_key。
复制应用私钥替换 application.properties 文件中 alipay.private_key。

至此,就可以调用支付接口进行支付宝支付功能了。
- 调用支付宝接口
访问支付服务 API,调用支付宝支付接口。
调用支付接口http://order.gulimall.com/payOrder?orderSn=202304031500558581642784227126714369后,新创建 pay.html 空文件,复制控制台日志中前端文件代码到 pay.html。
com.klaus.gulimall.order.service.impl.OrderServiceImpl#getOrderPay
/**
* 获取当前订单的支付信息
* @param orderSn
* @return
*/
@Override
public PayVo getOrderPay(String orderSn) {
PayVo payVo = new PayVo();
OrderEntity entity = this.getOrderByOrderSn(orderSn);
//保留2位小数,有小数就想上取值
BigDecimal bigDecimal = entity.getTotalAmount().setScale(2, BigDecimal.ROUND_UP);
payVo.setTotal_amount(bigDecimal.toString());
payVo.setOut_trade_no(entity.getOrderSn());
List<OrderItemEntity> itemEntities = orderItemService.list(new QueryWrapper<OrderItemEntity>().eq("order_sn", orderSn));
OrderItemEntity item = itemEntities.get(0);
payVo.setSubject(item.getSkuName());
payVo.setBody(item.getSkuAttrsVals());
return payVo;
}com.klaus.gulimall.order.web.PayWebController#payOrder
/**
* 1、将支付页让浏览器展示
* 2、支付成功以后,我们要跳到用户的订单列表页
* @param orderSn
* @return
* @throws AlipayApiException
*/
@ResponseBody
@GetMapping(value = "/payOrder", produces = "text/html")
public String payOrder(@RequestParam("orderSn") String orderSn) throws AlipayApiException {
PayVo payVo = orderService.getOrderPay(orderSn);
//返回的是一个页面,将此页面直接交给浏览器就行
String pay = alipayTemplate.pay(payVo);
// System.out.println(pay);
return pay;
}com.klaus.gulimall.order.config.AlipayTemplate#pay
public String pay(PayVo vo) throws AlipayApiException {
//AlipayClient alipayClient = new DefaultAlipayClient(AlipayTemplate.gatewayUrl, AlipayTemplate.app_id, AlipayTemplate.merchant_private_key, "json", AlipayTemplate.charset, AlipayTemplate.alipay_public_key, AlipayTemplate.sign_type);
//1、根据支付宝的配置生成一个支付客户端
AlipayClient alipayClient = new DefaultAlipayClient(gatewayUrl,
app_id, merchant_private_key, "json",
charset, alipay_public_key, sign_type);
//2、创建一个支付请求 //设置请求参数
AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
alipayRequest.setReturnUrl(return_url);
alipayRequest.setNotifyUrl(notify_url);
//商户订单号,商户网站订单系统中唯一订单号,必填
String out_trade_no = vo.getOut_trade_no();
//付款金额,必填
String total_amount = vo.getTotal_amount();
//订单名称,必填
String subject = vo.getSubject();
//商品描述,可空
String body = vo.getBody();
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
+ "\"timeout_express\":\""+timeout+"\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");
String result = alipayClient.pageExecute(alipayRequest).getBody();
//会收到支付宝的响应,响应的是一个页面,只要浏览器显示这个页面,就会自动来到支付宝的收银台页面
System.out.println("支付宝的响应:"+result);
return result;
}
通过谷歌浏览器打开 pay.html即可,看到以下页面即为正常现象。

账户名和支付密码从沙箱账号处复制买方信息。如果买方账户余额为 0,可随意充值金额。

使用账户余额支付即可。

支付结果显示成功。

- 支付结果回调
如果希望在沙箱环境中得知具体支付结果,我们需要通过 Natapp 开通内网穿透,开通教程详细查看下述官方文档:
https://natapp.cn/article/natapp_newbieopen in new window
开通后,将内网穿透端口为 80,本地地址改为虚拟机映射本地host的地址(192.168.10.103=>order.gulimall.com)即支付服务项目端口。

将 Natapp 内网穿透地址替换 application.properties 文件下 alipay.notify_url 属性。
再重复调用下支付宝支付接口流程,即可看到回调效果。
- 配置以上内网穿透配置才能完成订单支付,否则订单支付失败,订单自动取消,库存自动解锁
#异步通知:支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
alipay.notify_url=http://mqbb4a.natappfree.cc/paid/notify
com.klaus.gulimall.order.service.impl.OrderServiceImpl#handlePayResult
/**
* 处理支付宝的支付结果
* @param vo
* @return
*/
@Override
public String handlePayResult(PayAsyncVo vo) {
//1、保存交易流水
PaymentInfoEntity infoEntity = new PaymentInfoEntity();
infoEntity.setAlipayTradeNo(vo.getTrade_no());
infoEntity.setOrderSn(vo.getOut_trade_no());
infoEntity.setPaymentStatus(vo.getTrade_status());
infoEntity.setCallbackTime(vo.getNotify_time());
paymentInfoService.save(infoEntity);
//2、修改订单的状态信息
if (vo.getTrade_status().equals("TRADE_SUCCESS") || vo.getTrade_status().equals("TRADE_FINISHED")){
//支付成功状态
String outTradeNo = vo.getOut_trade_no();
this.baseMapper.updateOrderStatus(outTradeNo, OrderStatusEnum.PAYED.getCode());
}
return "success";
}com.klaus.gulimall.order.listener.OrderPaidListener#handleAlipaid
/**
* 支付成功异步通知
* @param vo
* @param request
* @return
*/
@PostMapping("/paid/notify")
public String handleAlipaid(PayAsyncVo vo, HttpServletRequest request) throws Exception {
//只要我们收到了支付宝给我们异步的通知,告诉我们订单支付成功,返回success,支付宝就再也不通知
// Map<String, String[]> map = request.getParameterMap();
// for (String key : map.keySet()) {
// String value = request.getParameter(key);
// System.out.println("参数名:"+key+"==>参数值:"+ value);
// }
// System.out.println("支付宝通知到位了...数据:"+map);
//验签
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
// valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
boolean signVerified = AlipaySignature.rsaCheckV1(params, alipayTemplate.getAlipay_public_key(), alipayTemplate.getCharset(), alipayTemplate.getSign_type()); //
if (signVerified){
//验签成功
System.out.println("签名验证成功.....");
String result = orderService.handlePayResult(vo);
return result;
}else {
System.out.println("签名验证失败.....");
return "error";
}
}页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问;订单支付完成成功返回以下页面(同步通知,支付成功,一般跳转到成功页):
private String return_url = "http://member.gulimall.com/memberOrder.html";com.klaus.gulimall.member.Web.MemberWebController#memberOrderPage
@GetMapping("/memberOrder.html")
public String memberOrderPage(@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
Model model) {
//查出当前登录的用户的所有订单列表数据
Map<String, Object> page = new HashMap<>();
page.put("page", pageNum.toString());
R r = orderFeignService.listWithItem(page);
System.out.println(JSON.toJSONString(r));
model.addAttribute("orders", r);
return "orderList";
}- 远程调用订单服务查出当前登录的用户的所有订单列表数据
com.klaus.gulimall.order.controller.OrderController#listWithItem
/**
* todo 远程调用都要全用@POSTMapping+@RequestBody(请求体只支持post)
* 分页查询当前登录用户的所有订单
* @param params
* @return
*/
@PostMapping("/listWithItem")
//@RequiresPermissions("order:order:list")
public R listWithItem(@RequestBody Map<String, Object> params){
PageUtils page = orderService.queryPageWithItem(params);
return R.ok().put("page", page);
}com.klaus.gulimall.order.service.impl.OrderServiceImpl#queryPageWithItem
@Override
public PageUtils queryPageWithItem(Map<String, Object> params) {
//获取当前登录的用户信息
MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();
IPage<OrderEntity> page = this.page(
new Query<OrderEntity>().getPage(params), //根据id降序
new QueryWrapper<OrderEntity>().eq("member_id", memberRespVo.getId()).orderByDesc("id")
);
List<OrderEntity> orderEntities = page.getRecords().stream().map(order -> {
//封装订单项到订单
List<OrderItemEntity> itemEntities = orderItemService.list(new QueryWrapper<OrderItemEntity>().eq("order_sn", order.getOrderSn()));
order.setItemEntities(itemEntities);
return order;
}).collect(Collectors.toList());
page.setRecords(orderEntities);
return new PageUtils(page);
}
2.订单创建前需要预览订单,选择收货信息等
http://cart.gulimall.com/cart.html

http://order.gulimall.com/toTrade
com.klaus.gulimall.order.web.OrderWebController#toTrade
/**
* OrderConfirmVo = [List<MemberAddressVo> address, //收货地址,
* List<OrderItemVo> items, //所有选中的购物项
* Integer integration, //积分...
* Map<Long, Boolean> stocks, //库存信息
* String orderToken //防重令牌]
*/
@GetMapping("/toTrade")
public String toTrade(Model model, HttpServletRequest request) throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = orderService.confirmOrder();
model.addAttribute("orderConfirmData", confirmVo);
//展示订单确认的数据(页面跳转)
return "confirm";
}com.klaus.gulimall.order.service.impl.OrderServiceImpl#confirmOrder注:使用异步编排时用到远程调用,如果不进行请求上下文共享将造成Feign异步情况丢失上下文问题
/**
* 订单确认页返回需要用到的数据
* @return
*/
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
//从拦截器里获取用户信息
MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();
System.out.println("主线程...." + Thread.currentThread().getId());
//获取原来请求上下文请求属性
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
//进行异步编排
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//1、远程查询所有的收货地址列表
System.out.println("member线程...." + Thread.currentThread().getId());
//在Feign异步调用前将请求上下文进行共享操作(之前的请求数据都来共享每一个线程)
RequestContextHolder.setRequestAttributes(requestAttributes);
List<MemberAddressVo> address = memberFeignService.getAddress(memberRespVo.getId());
confirmVo.setAddress(address);
}, executor);
CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {
//2、远程查询购物车所有选中的购物项
System.out.println("cart线程...." + Thread.currentThread().getId());
//在Feign异步调用前将请求上下文进行共享操作
RequestContextHolder.setRequestAttributes(requestAttributes);
List<OrderItemVo> items = cartFeignService.currentUserCartItems();
confirmVo.setItems(items);
//feign在远程调用之前要构造请求,调用很多的拦截器
//for(RequestInterceptor interceptor:requestInterceptors)
}, executor).thenRunAsync(() -> {
List<OrderItemVo> items = confirmVo.getItems();
List<Long> skuIds = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());
//远程查询库存
R r = wmsFeignService.getSkusHasStock(skuIds);
List<SkuStockVo> data = r.getData(new TypeReference<List<SkuStockVo>>() {
});
if (data != null) {
// Map<SkuId, hasStock> map
Map<Long, Boolean> map = data.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));
// Map<Long, Boolean> stocks;
confirmVo.setStocks(map);
}
}, executor);
//3、查询用户积分
Integer integration = memberRespVo.getIntegration();
confirmVo.setIntegration(integration);
//4、其他数据自动计算
//todo 5、防重令牌
String token = UUID.randomUUID().toString().replace("-", "");
//给服务器一个令牌
redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId(), token, 30, TimeUnit.MINUTES);
//给页面一个令牌
confirmVo.setOrderToken(token);
CompletableFuture.allOf(getAddressFuture, cartFuture).get();
return confirmVo;
}

3.订单创建需要锁定库存,库存有才可创建,否则不能创建
- 查询sku是否有库存
com.klaus.gulimall.ware.controller.WareSkuController#getSkusHasStock
//查询sku是否有库存
@PostMapping("/hasstock")
public R getSkusHasStock(@RequestBody List<Long> skuIds) {
//sku_id stock
List<SkuHasStockVo> vos = wareSkuService.getSkusHasStock(skuIds);
return R.ok().setData(vos);
}com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#getSkusHasStock
@Override
public List<SkuHasStockVo> getSkusHasStock(List<Long> skuIds) {
List<SkuHasStockVo> collect = skuIds.stream().map(skuId -> {
SkuHasStockVo vo = new SkuHasStockVo();
//查询当前sku的总库存量
//SELECT SUM(stock-stock_locked) FROM `wms_ware_sku` WHERE sku_id=1
//获取查询到的库存记录数
Long count = baseMapper.getSkuStock(skuId);
vo.setSkuId(skuId);
//记录数大于0表示有库存
vo.setHasStock(count == null ? false : count > 0);
return vo;
}).collect(Collectors.toList());
return collect;
}- SQL
<select id="getSkuStock" resultType="java.lang.Long">
SELECT SUM(stock-stock_locked) FROM `wms_ware_sku` WHERE sku_id=#{skuId}
</select>
- 创建订单进行库存锁定
com.klaus.gulimall.ware.controller.WareSkuController#orderLockStock
@RequestMapping("/lock/order")
public R orderLockStock(@RequestBody WareSkuLockVo vo){
Boolean lockStock = null;
try {
lockStock = wareSkuService.orderLockStock(vo);
return R.ok();
} catch (NoStockException e) {
//通过测试发现商品库存不足将会在页面显示以下错误信息(http://order.gulimall.com/toTrade)
return R.error(BizCodeEnum.NO_STOCK_EXCEPTION.getCode(), BizCodeEnum.NO_STOCK_EXCEPTION.getMsg());
}
}com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#orderLockStock
/**
* 为某个订单锁定库存
* 加入统一事务
* (rollbackFor = NoStockException.class )
* 默认只要是运行时异常都会回滚
* 库存解锁的场景
* 1)、下订单成功,订单过期没有支付被系统自动取消、被用户手动取消,都要解锁库存
* 2)、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚
* 之前锁定的库存就要自动解锁
* @param vo
* @return
*/
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo vo) {
/**
* 保存库存工作单的详情的两张表
* 追溯
*/
WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
taskEntity.setOrderSn(vo.getOrderSn());
wareOrderTaskService.save(taskEntity);
//按照下单的收货地址,找到一个就近仓库,锁定库存
//1、找到每个商品在哪个仓库都有库存
List<OrderItemVo> locks = vo.getLocks();
List<SkuWareHasStock> collect = locks.stream().map(item -> {
SkuWareHasStock stock = new SkuWareHasStock();
Long skuId = item.getSkuId();
stock.setSkuId(skuId);
stock.setNum(item.getCount());
//查询这个商品在哪里有库存
List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
stock.setWareIds(wareIds);
return stock;
}).collect(Collectors.toList());
//2、锁定库存
for (SkuWareHasStock hasStock : collect) {
Boolean skuStocked = false;//lock_status=0
Long skuId = hasStock.getSkuId();
List<Long> wareIds = hasStock.getWareIds();
if (wareIds == null || wareIds.size() == 0) {
//没有任何仓库有这个商品的库存
throw new NoStockException(skuId);
}
//1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
//2、锁定失败。前面保存的工作单信息就回滚了,即使要解锁记录,由于去数据库查不到id,所以就不用解锁
// log:skuId-num-ware: 1:1-2-1(v) 2:2-1-3(v) 3:3-1-1(x)
// 既然锁成功了就应该扣减,之后就应该解锁,因为3记录锁失败,1,2记录就回滚了,相当于工作单回滚了,库存没回滚库存扣减
// 只传id工作单是不知道当时库存锁了多少个
for (Long wareId : wareIds) {
//成功就返回1,否则就是0
Long count = wareSkuDao.lockSkuStock(skuId, wareId, hasStock.getNum());
if (count == 1) {
//锁成功了==>lock_status=1
skuStocked = true;
//退出当前循环
//todo 告诉MQ库存锁定成功
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
//保存工作单详情
wareOrderTaskDetailService.save(detailEntity);
StockLockedTo lockedTo = new StockLockedTo();
//属性拷贝到to
StockDetailTo stockDetailTo = new StockDetailTo();
BeanUtils.copyProperties(detailEntity, stockDetailTo);
//只传工作单id还不够,防止回滚以后找不到数据
lockedTo.setId(taskEntity.getId());
lockedTo.setDetailTo(stockDetailTo);
//发送锁定库存消息;提交订单后,出现异常,订单回滚,库存不回滚,延时队列有2个任务等待死信时间结束后发送消息
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);
break;
} else {
//当前仓库锁失败了,重试下一个仓库
}
}
if (skuStocked == false) {
//当前商品所有仓库都没有锁住
throw new NoStockException(skuId);
}
}
//3、肯定全部都是锁定成功过的
return true;
}- 库存工作单表
wms_ware_order_task

- 库存工作单详情
wms_ware_order_task_detail 

//查询这个商品在哪里有库存
//com.klaus.gulimall.ware.dao.WareSkuDao#listWareIdHasSkuStock
List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);- SQL
<select id="listWareIdHasSkuStock" resultType="java.lang.Long">
SELECT ware_id FROM `wms_ware_sku` WHERE sku_id=#{skuId} AND stock-stock_locked >0
</select>
//库存锁定成功就返回1,否则就是0
//com.klaus.gulimall.ware.dao.WareSkuDao#lockSkuStock
Long count = wareSkuDao.lockSkuStock(skuId, wareId, hasStock.getNum());- SQL
<update id="lockSkuStock">
UPDATE `wms_ware_sku` SET stock_locked = stock_locked + #{num}
WHERE sku_id =#{skuId} AND ware_id=#{wareId} AND stock-stock_locked>=#{num}
</update>
4.订单创建后超时未支付需要解锁库存
库存解锁分两种情况,一种是订单正常支付超时库存自动解锁;一种是订单服务异常/宕机库存自动解锁
com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#unlockStock(com.klaus.common.to.mq.StockLockedTo)
/**
* 1、库存自动解锁
* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁
* 2、下订单失败
* 锁库存失败
* 只要解锁库存的消息失败,一定要告诉服务器解锁失败
* @param to
* @param message
*/
@Override
public void unlockStock(StockLockedTo to) {
StockDetailTo detail = to.getDetailTo();
Long detailId = detail.getId();
//1、查询数据库关于这个订单的锁定库存信息
// 有: 证明库存锁定成功了
// 解锁:订单情况
// 1、没有这个订单,必须解锁
// 2、有这个订单,就不是解锁库存
// 订单状态: 已取消:解锁库存
// 没取消:不能接受
// 没有:库存锁定失败了,库存回滚了。这种情况无需解锁
WareOrderTaskDetailEntity taskDetailEntity = wareOrderTaskDetailService.getById(detailId);
if (taskDetailEntity != null) {
//有工作详情单,但只能证明库存服务在锁库存的时候锁成功后返回出去的,
// 要看整个订单有没有下单成功,如果订单远程调用的其他服务崩了导致订单回滚,这跟库存服务没有关系,直接解锁是有问题的,因为订单都没有
//解锁
Long id = to.getId();//库存工作单的id
//获取订单状态
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
String orderSn = taskEntity.getOrderSn();
//根据订单号查询订单的状态
R r = orderFeignService.getOrderStatus(orderSn);
if (r.getCode() == 0) {
//远程调用成功,订单数据返回成功
OrderVo data = r.getData(new TypeReference<OrderVo>() {
});
if (data == null || data.getStatus() == 4) {
//订单已经被取消或订单不存在,才能解锁库存
//问题:订单服务由于网络等原因迟迟没有将订单解锁(data.getStatus()==0)导致库存永远释放不了锁
if (taskDetailEntity.getLockStatus() == 1){
//当前库存工作单详情,状态为1 已锁定但是未解锁才可以解锁
unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
}
}
} else {
//远程调用失败直接抛异常
throw new RuntimeException("远程服务失败");
}
} else {
//无需解锁
}
}远程调用订单服务获取订单状态
com.klaus.gulimall.order.controller.OrderController#getOrderStatus
@GetMapping("/status/{orderSn}")
public R getOrderStatus(@PathVariable("orderSn") String orderSn){
OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);
return R.ok().setData(orderEntity);
}@Override
public OrderEntity getOrderByOrderSn(String orderSn) {
OrderEntity orderEntity = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
return orderEntity;
}com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#unlockStock(com.klaus.common.to.mq.OrderTo)
/**
* 防止订单服务卡顿,导致更改订单状态的操作无法完成以及更改的mq消息没有得到执行,解锁库存的消息优先到期,扫描订单状态是新建状态,就会什么都不做
* 导致卡顿的订单永远不能解锁库存
* @param orderTo
* 加入统一事务
*/
@Transactional
@Override
public void unlockStock(OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
//查一下最新库存的状态,防止重复解锁库存
WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
Long id = task.getId();
//按照工作单找到所有 没有解锁的库存,进行解锁
List<WareOrderTaskDetailEntity> detailEntities = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", id).
//新建的还没被解锁的库存
eq("lock_status", 1));
for (WareOrderTaskDetailEntity entity : detailEntities) {
//此解锁方式可以解决高并发问题和整个系统的异构(两个服务开发语言不同。如java,php)
unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}- 库存解锁方法
com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#unLockStock
private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
//库存解锁
wareSkuDao.unLockStock(skuId, wareId, num);
//更新库存工作单的状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2);//变为已解锁
wareOrderTaskDetailService.updateById(entity);
}- SQL
<update id="unLockStock">
UPDATE `wms_ware_sku` SET stock_locked=stock_locked-#{num}
WHERE sku_id=#{skuId} AND ware_id=#{wareId}
</update>
5.支付成功后,需要进行拆单,根据商品打包方式,所在仓库,物流等进行拆单(待开发)
6.支付的每笔流水都需要记录,以待查账

com.klaus.gulimall.order.service.impl.OrderServiceImpl#handlePayResult
/**
* 处理支付宝的支付结果
* @param vo
* @return
*/
@Override
public String handlePayResult(PayAsyncVo vo) {
//1、保存交易流水
PaymentInfoEntity infoEntity = new PaymentInfoEntity();
infoEntity.setAlipayTradeNo(vo.getTrade_no());
infoEntity.setOrderSn(vo.getOut_trade_no());
infoEntity.setPaymentStatus(vo.getTrade_status());
infoEntity.setCallbackTime(vo.getNotify_time());
paymentInfoService.save(infoEntity);
//2、修改订单的状态信息
if (vo.getTrade_status().equals("TRADE_SUCCESS") || vo.getTrade_status().equals("TRADE_FINISHED")){
//支付成功状态
String outTradeNo = vo.getOut_trade_no();
this.baseMapper.updateOrderStatus(outTradeNo, OrderStatusEnum.PAYED.getCode());
}
return "success";
}7.订单创建,支付成功(待开发)等状态都需要给 MQ 发送消息,方便其他系统感知订阅
- 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>- 订单创建:(MQ)
- 订单创建采用延迟队列
com.klaus.gulimall.order.config.MyMQConfig
@Bean
public Queue orderDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
/**
* x-dead-letter-exchange: order-event-exchange
* x-dead-letter-routing-key: order.release.order
* x-message-ttl: 60000
*/
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000);
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
@Bean
public Exchange orderEventExchange(){
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateOrderBinding(){
//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order", null);
}
- 创建订单 进入路由键-order.create.order- 进入交换机order-event-exchange, 根据路由键会转发到延时队列order.delay.queue 30min(60s)过期时间,过期时间到了之后,根据死信路由键-order.release.order-到达释放订单队列order.release.order.queue,监听这个队列的方法判断订单是不是已支付。

com.klaus.gulimall.order.service.impl.OrderServiceImpl#submitOrder
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {
...
//todo 订单创建成功发送消息给MQ
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
...
}库存锁定:(MQ)
库存锁定采用延迟队列
com.klaus.gulimall.ware.config.MyRabbitConfig
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange", true, false);
}
@Bean
public Queue stockDelayQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
arguments.put("x-message-ttl", 120000);
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
return new Queue("stock.delay.queue", true, false, false, arguments);
}
@Bean
public Binding stockLockedBinding(){
//String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked", null);
}
- 库存锁定成功 进入路由键-stock.locked- 进入交换机stock-event-exchange, 根据路由键会转发到延时队列stock.delay.queue 50min(120s)过期时间,延迟时间到,根据死信路由键-stock.release-到达释放订单队列stock.release.stock.queue,检查订单状态,确认是否需要解锁。

订单创建成功后库存锁定也成功就发送消息给MQ
com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#orderLockStock
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo vo) {
...
//发送锁定库存消息;提交订单后,出现异常,订单回滚,库存不回滚,延时队列有2个任务等待死信时间结束后发送消息
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);
...
}防止超卖:数据库 unsigned int 做最后的保证。 (范围0~65535)
- 在mysql数据库中,unsigned表面含义是 '无符号’的意思,unsigned既为非负数,用此类型可以增加数据长度。
自动关单:订单超时未支付,需要取消订单 (MQ)
com.klaus.gulimall.order.config.MyMQConfig
@Bean
public Queue orderReleaseOrderQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
@Bean
public Exchange orderEventExchange(){
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderReleaseOrderBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order", null);
}
- 订单创建延迟时间到了之后,根据路由键-order.release.order-到达释放订单队列order.release.order.queue, 监听这个队列的方法 先是判断订单是不是已支付,如果不是就关闭订单,
关闭订单之后,根据路由键order.release.other 发送关闭的订单数据消息到交换机order-event-exchange,交换机再转发到order.release.coupon.queue优惠券队列,返回优惠券,与之通过 也是根据路由键order.release.other(待开发)

com.klaus.gulimall.order.listener.OrderCloseListener- 添加配置文件
application.properties中的RabbitMQ配置项
spring.rabbitmq.host=192.168.10.103
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 开启发送端确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL/**
* 订单释放(关单)监听器
*/
@Service
@RabbitListener(queues = "order.release.order.queue")
public class OrderCloseListener {
@Autowired
OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
//等待死信时间到后收到订单消息
System.out.println("收到过期的订单信息,准备关闭订单"+entity.getOrderSn());
try {
orderService.closeOrder(entity);
//手动调用支付宝收单
//签收订单
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}com.klaus.gulimall.order.service.impl.OrderServiceImpl#closeOrder
/**
* 关单方法,将订单新建(待付款)状态设置为已取消状态并发送消息给MQ确认关单完成
* @param entity
*/
@Override
public void closeOrder(OrderEntity entity) {
//查询当前这个订单的最新状态
//将要发送的消息队列实体(释放订单服务前)
OrderEntity orderEntity = this.getById(entity.getId());
//status=0
if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()){
//在待付款状态下才关单(消息队列的实体有可能过期了,所以创建新实体)
OrderEntity update = new OrderEntity();
update.setId(entity.getId());
// status=4
update.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(update);
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderEntity, orderTo);
try {
//todo 保证消息一定会发送出去,每一个消息都可以做好日志记录(给数据库保存每一个消息的详细信息)
//todo 定期扫描数据库将失败的消息再发送一遍;
//只要解锁成功了就【立即】发消息给MQ
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
//todo 将没发送成功的消息进行重试发送 (while)
}
}
}解锁库存: (MQ)
com.klaus.gulimall.ware.config.MyRabbitConfig
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange", true, false);
}
@Bean
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue", true, false, false);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#", null);
}
关闭订单之后,根据路由键order.release.other 发送关闭的订单数据到交换机order-event-exchange,交换机再转发到order.release.coupon.queue优惠券队列,返回优惠券,与之通过 也是根据路由键order.release.other,和数据一起转发到解锁库存队列stock.release.stock.queue 解锁库存(待开发)。

订单关闭,需要解锁已经占用的库存
com.klaus.gulimall.ware.listener.StockReleaseListener
/**
* 监听库存解锁队列
*/
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
/**
* 订单正常关闭(订单服务正常运行),需要解锁已经占用的库存
* @param to 库存工作单(含工作单详情)
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
System.out.println("收到解锁库存的消息...");
try {
//当前消息是否被第二次及以后(重新)派发过来了
// Boolean redelivered = message.getMessageProperties().getRedelivered();
wareSkuService.unlockStock(to);
//只要解锁成功了,就签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//有任何异常直接拒绝签收;消息拒绝以后重新放到队列里面,让别人继续消费解锁
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#unlockStock(com.klaus.common.to.mq.StockLockedTo)
@Override
public void unlockStock(StockLockedTo to) {
...
//问题:订单服务由于网络等原因迟迟没有将订单解锁(data.getStatus()==0)导致库存永远释放不了锁
if (taskDetailEntity.getLockStatus() == 1){
//当前库存工作单详情,状态为1 已锁定但是未解锁才可以解锁
unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
...
}- 库存锁定成功,订单回滚,保证最终一致性,也需要库存自动解锁;库存锁定成功,订单服务全部炸了,订单都没有创建好,就必须有自动解锁功能
com.klaus.gulimall.ware.listener.StockReleaseListener
/**
* 监听库存解锁队列
*/
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
/**
* 订单异常关闭(订单服务发送运行时异常或宕机),需要解锁已经占用的库存
* 如果库存锁定成功,订单服务全部炸了,订单都没有创建好,就必须有自动解锁功能
* @param orderTo 完整订单信息
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
System.out.println("订单关闭准备解锁库存....");
try {
wareSkuService.unlockStock(orderTo);
//只要解锁成功了,就签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//有任何异常直接拒绝签收;消息拒绝以后重新放到队列里面,让别人继续消费解锁
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}com.klaus.gulimall.ware.service.impl.WareSkuServiceImpl#unlockStock(com.klaus.common.to.mq.OrderTo)+#unLockStock
/**
* 防止订单服务卡顿,导致更改订单状态的操作无法完成以及更改的mq消息没有得到执行,解锁库存的消息优先到期,扫描订单状态是新建状态,就会什么都不做
* 导致卡顿的订单永远不能解锁库存
* @param orderTo
* 加入统一事务
*/
@Transactional
@Override
public void unlockStock(OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
//查一下最新库存的状态,防止重复解锁库存
WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
Long id = task.getId();
//按照工作单找到所有 没有解锁的库存,进行解锁
List<WareOrderTaskDetailEntity> detailEntities = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().
eq("task_id", id).
//新建的还没被解锁的库存
eq("lock_status", 1));
for (WareOrderTaskDetailEntity entity : detailEntities) {
//此解锁方式可以解决高并发问题和整个系统的异构(两个服务开发语言不同。如java,php)
unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}
private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
//库存解锁
wareSkuDao.unLockStock(skuId, wareId, num);
//更新库存工作单的状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2);//变为已解锁
wareOrderTaskDetailService.updateById(entity);
}订单确认页流程

电商订单流程图

消息队列流程图

支付宝支付流程图

项目难点
订单服务与库存的联动(延时队列的使用)
订单释放&库存解锁

解决问题
使用CompletableFuture 异步编排解决查询商品详情⻚响应速度慢的问题;
- 假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。
使用Spring Cache方法级别缓存技术,实现已经被调用过的指定的目标方法,直接从缓存中获取方法调用后的结果返回,提高系统的响应速度;
幂等解决方案
token 机制
1、服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,就必须在执行业务前,先去获取 token,服务器会把token 保存到 redis 中。
2、然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。
3、服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业务。
4、如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样就保证了业务代码,不被重复执行。
危险性:
1、先删除 token 还是后删除 token;
(1) 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致,请求还是不能执行。
(2) 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别人继续重试,导致业务被执行两遍
(3) 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
2、Token 获取、比较和删除必须是原子性
(1) redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行
(2) 可以在 redis 使用 lua 脚本完成这个操作
shellif redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 endSession共享问题解决-不同服务,子域session共享
- jsessionid这个cookie默认是当前系统域名的。当我们分拆服务,不同域名部署的时候,我们可以使用如下解决方案;
java@Configuration public class GulimallSessionConfig { @Bean public CookieSerializer cookieSerializer(){ DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer(); cookieSerializer.setDomainName("gulimall.com"); cookieSerializer.setCookieName("KLAUSSESSION"); return cookieSerializer; } @Bean public RedisSerializer<Object> springSessionDefaultRedisSerializer() { return new GenericJackson2JsonRedisSerializer(); } }- SpringSession核心原理
java/** * 加入依赖 * <dependency> * <groupId>org.springframework.boot</groupId> * <artifactId>spring-boot-starter-data-redis</artifactId> * </dependency> * * <!--整合Springsession完成session共享问题--> * <dependency> * <groupId>org.springframework.session</groupId> * <artifactId>spring-session-data-redis</artifactId> * </dependency> * 核心原理 * 1、@EnableRedisHttpSession导入RedisHttpSessionConfiguration配置 * 1)、给容器中添加了一个组件 * 传入SessionRepository==》【RedisOperationsSessionRepository】===》redis操作session,session的增删改查封装类 * 2)、SessionRepositoryFilter==》Filter(web):session存储过滤器;每个请求过来都必须经过Filter * I)、创建的时候,就自动从容器中获取了SessionRepository * II)、原始的HttpServletRequest request, HttpServletResponse response都被分别包装成了SessionRepositoryRequestWrapper,SessionRepositoryResponseWrapper * III)、以后获取session-》request.getSession();(原始的) * //SessionRepositoryRequestWrapper重写getSession方法 * IV)、wrapperRequest.getSession();==> SessionRepository中获取的到 * 装饰者模式; * * 自动延期;redis中的数据也是有过期时间的 */ @EnableRedisHttpSession //整合redis作为session存储 @SpringBootApplication public class GulimallAuthServerApplication { public static void main(String[] args) { SpringApplication.run(GulimallAuthServerApplication.class, args); } }
ThreadLocal-同一个线程共享数据

Feign远程调用丢失请求头问题

com.klaus.gulimall.member.config
@Configuration
public class GulimallFeignConfig {
@Bean("requestInterceptor")
public RequestInterceptor requestInterceptor() {
// RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
//RequestContextHolder.setRequestAttributes(requestAttributes);
// 进行共享操作,拦截器才会有请求上下文的所有数据
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
//1、RequestContextHolder拿到刚进来的这个请求
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null){
System.out.println("RequestInterceptor线程...."+Thread.currentThread().getId());
HttpServletRequest request = attributes.getRequest();//老请求
if (request != null){
//请求不为空
//同步请求头数据,Cookie
String cookie = request.getHeader("Cookie");
// System.out.println("feign远程之前先进行RequestInterceptor.apply");
//给新请求同步了老请求的Cookie
template.header("Cookie", cookie);
}
}
}
};
}
}Feign异步情况丢失上下文问题

com.klaus.gulimall.order.service.impl.OrderServiceImpl#confirmOrder
//获取原来请求上下文请求属性
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
//进行异步编排
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//1、远程查询所有的收货地址列表
System.out.println("member线程...." + Thread.currentThread().getId());
//在Feign异步调用前将请求上下文进行共享操作(之前的请求数据都来共享每一个线程)
RequestContextHolder.setRequestAttributes(requestAttributes);
List<MemberAddressVo> address = memberFeignService.getAddress(memberRespVo.getId());
confirmVo.setAddress(address);
}, executor);
CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {
//2、远程查询购物车所有选中的购物项
System.out.println("cart线程...." + Thread.currentThread().getId());
//在Feign异步调用前将请求上下文进行共享操作
RequestContextHolder.setRequestAttributes(requestAttributes);
List<OrderItemVo> items = cartFeignService.currentUserCartItems();
confirmVo.setItems(items);
//feign在远程调用之前要构造请求,调用很多的拦截器
//for(RequestInterceptor interceptor:requestInterceptors)
}, executor).thenRunAsync(() -> {
List<OrderItemVo> items = confirmVo.getItems();
List<Long> skuIds = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());
//远程查询库存
R r = wmsFeignService.getSkusHasStock(skuIds);
List<SkuStockVo> data = r.getData(new TypeReference<List<SkuStockVo>>() {
});
if (data != null) {
// Map<SkuId, hasStock> map
Map<Long, Boolean> map = data.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));
// Map<Long, Boolean> stocks;
confirmVo.setStocks(map);
}
}, executor);项目优化
- 使用动态线程池管理和监控,定时获取线程池的运行数据
Hippo4j 提供了应用线程池运行时变更核心参数的功能。而且,如果应用是集群部署,可以选择修改线程池某一实例,或者修改集群全部实例,运行时生效,不需要再重启服务。
压测时可以使用 Hippo4j 动态调整线程池参数,判断线程池核心参数设置是否合理。对于开发测试来说,如果不满足可以随时调整。
CompletableFuture异步编排
概念
- CompletableFuture是JDK1.8里面引入的一个异步回调类,就是说当前使用异步线程去执行一个任务时候,我们希望在这个任务结束以后,触发一个后续的动作,而CompletableFuture就可以实现这样的功能。
业务场景
- 查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

- 假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。
- 在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。
项目代码
com.klaus.gulimall.product.service.impl.SkuInfoServiceImpl#item
//四、线程串行化方法
/*
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行
thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
*/
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
//一、创建异步对象
/*
runAsync和supplyAsync runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
可以传入自定义的线程池,否则就用默认的线程池;
*/
CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
//1.sku基本信息获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、 获取spu的销售属性组合
List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
//4.获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5.获取spu的规格参数信息
List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
//一、创建异步对象
/*
runAsync和supplyAsync runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
可以传入自定义的线程池,否则就用默认的线程池;
*/
//2.sku图片信息 pms_spu_images
CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
//TODO //3、远程调用查询当前sku是否参与秒杀优惠活动
CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
//3.远程调用查询当前sku是否参与秒杀优惠活动
R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
if (skuSeckilInfo.getCode() == 0) {
//查询成功
SeckillSkuVo skuSeckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
});
skuItemVo.setSeckillSkuVo(skuSeckilInfoData);
if (skuSeckilInfoData != null) {
long currentTime = System.currentTimeMillis();
if (currentTime > skuSeckilInfoData.getEndTime()) {
skuItemVo.setSeckillSkuVo(null);
}
}
}
}, executor);
//七、多任务组合
/*
allOf:等待所有任务完成
anyOf:只要有一个任务完成
*/
//等到所有任务完成 //TODO 秒杀优惠活动Future
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();
return skuItemVo;
}RabbitMQ延时队列(实现定时任务)
场景:

比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:spring的 schedule 定时任务轮询数据库
缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差
定时任务的时效性问题
定时任务开启的第一次扫描,订单未创建,1min后创建完成,30min第二次扫描订单未过期,无事发生,31min订单过期,直到60min第三次扫描才扫到过期订单

解决:rabbitmq的消息TTL和死信Exchange结合
订单释放&库存解锁

消息的TTL(Time To Live)
- 消息的TTL就是消息的存活时间。
- RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的 TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。
Dead Letter Exchanges(DLX)
一个消息在满足如下条件下,会进
死信路由
,记住这里是路由而不是队列,
一个路由可以对应很多队列。(什么是死信)
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列里,被其他消费者使用。 ( basic.reject/ basic.nack ) requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他 exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
手动ack&异常消息统一放在一个队列处理建议的两种方式
catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
延时队列实现
- 给队列设置过期时间

- 给消息设置过期时间

- 推荐:给队列设置过期时间,因为给消息设置过期时间的话,RabbitMQ采用的是惰性检查机制,假设队列里面的消息过期时间不同,后面的消息必须等前面的消息过期,最终导致消息不能按照规定的过期时间过期。
SpringBoot中使用延时队列
- 1、Queue、Exchange、Binding可以@Bean进去
- 2、监听消息的方法可以有三种参数(不分数量,顺序)
- Object content, Message message, Channel channel
- 3、channel可以用来拒绝消息,否则自动ack;
消息队列流程
- 使用RabbitMQ延时队列实现未付款订单,超过一定时间后,系统自动取消订单并解锁库存。
- PS:创建订单 进入路由键-order.create.order- 进入交换机order-event-exchange, 根据路由键会转发到延时队列order.delay.queue 1min过期时间,过期时间到了之后,根据路由键order.release.order-到达释放订单队列order.release.order.queue, 监听这个队列的方法 先是判断订单是不是已支付,如果不是 就关闭订单,关闭订单之后,根据路由键order.release.other 发送关闭的订单数据到交换机order-event-exchange,交换机再转发到order.release.coupon.queue优惠券队列,返回优惠券,与之通过 也是根据路由键order.release.other,和数据一起转发到解锁库存队列stock.release.stock.queue 解锁库存。

Redis缓存中的数据结构
分类菜单
- 类型:String
- value: 菜单信息
登陆信息-SpringSession
购物车
- 类型:Hash
- key: gulimall:cart:2 // 2为用户ID
- field: 商品 id
- value: 购物项数据
- 综上所述,我们的购物车结构是一个双层 Map:Map<String,Map<String,String>>
- 第一层 Map,Key 是用户 id
- 第二层 Map,Key 是购物车中商品 id,值是购物项数据
秒杀上架 (幂等性处理)
- 缓存秒杀活动信息 (seckill:session:)
- 类型: List
- key: seckill:session:start_endtime // 开始时间的时间戳_结束时间的时间戳
- value: 场次ID_商品ID
- 缓存秒杀活动所关联的商品信息 (seckill:skus)
- 类型:Hash
- key: seckill:skus
- field: 场次ID_商品ID
- value: 秒杀商品信息
- 秒杀库存信号量
- 类型: String
- key: seckIll:stock:UUID
- value: 库存数量
幂等性保证
- 缓存商品信息前,先判断有没有这个key
String redisKey = seckillSkuVo.getPromotionSessionId().toString() + "-" + seckillSkuVo.getSkuId().toString();
if (!operations.hasKey(redisKey)) {
//判断Redis中是否有该信息,如果没有才进行添加
Boolean hasKey = redisTemplate.hasKey(key);
//缓存活动信息
if (!hasKey) {Redission分布式锁
- 设置分布式锁以及信号量-关键代码
//秒杀商品上架功能的锁
private final String upload_lock = "seckill:upload:lock";
//分布式锁 .可重入锁(Reentrant Lock)
RLock lock = redissonClient.getLock(upload_lock);
try {
//加锁
lock.lock(10, TimeUnit.SECONDS);
seckillService.uploadSeckillSkuLatest3Days();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
//如果当前这个场次的商品库存信息已经上架就不需要上架
//5、使用库存作为分布式Redisson信号量(限流)
// 使用库存作为分布式信号量
RSemaphore semaphore = redissonClient.getSemaphore(SKU_STOCK_SEMAPHORE + token);
// 商品可以秒杀的数量作为信号量
semaphore.trySetPermits(seckillSkuVo.getSeckillCount());分布式事务

- 事务保证:
- 订单服务异常,库存锁定不运行,全部回滚,撤销操作
- 库存服务事务自治,锁定失败全部回滚,订单感受到,继续回滚
- 库存服务锁定成功了,但是网络原因返回数据途中问题?
- 库存服务锁定成功了,库存服务下面的逻辑发生故障,订单回滚了,怎么处理?
- 利用消息队列实现最终一致
库存服务锁定成功后发给消息队列消息(当前库存工作单),过段时间自动解锁,解锁时先查询订单的支付状态。解锁成功修改库存工作单详情项状态为已解锁
- 1、远程服务假失败:
- 远程服务其实成功了,由于网络故障等没有返回
- 导致:订单回滚,库存却扣减
- 2、远程服务执行完成,下面的其他方法出现问题
- 导致:已执行的远程请求,肯定不能回滚
事务的坑
- 在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到代理对象的缘故。
- 解决:
- 0)、导入 spring-boot-starter-aop
- 1)、@EnableTransactionManagement(proxyTargetClass = true)
- 2)、@EnableAspectJAutoProxy(exposeProxy=true)
- 3)、AopContext.currentProxy() 调用方法
CAP理论
- C是一致性,分布式系统的数据要保持一致
- A是可用性,分布式系统能进行故障转移
- P是分区容错性,分布式系统出现网络问题能正常运行
- CAP理论是指分布式系统中不能保证三者同时存在,只能两两组合
BASE 理论
- 是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可 以采用适当的采取弱一致性,即最终一致性。
分布式事务几种方案
- 2PC 模式
- 数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
- 第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是 否可以提交.
- 第二阶段:事务协调器要求每个数据库提交数据。
- 性能不理想
- 数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
- 柔性事务-TCC 事务补偿型方案
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
- 所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中
- 柔性事务-最大努力通知型方案
- 按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。
- 这种 方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。
- 这种 方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
- 柔性事务-可靠消息+最终一致性方案(异步确保型)
- 实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只 记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确 认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
- 防止消息丢失:
- 1、做好消息确认机制(pulisher,consumer【手动 ack】)
- 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一 遍
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
Seata
Seata的理解
- Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
- AT模式。是一种基于本地事务+二阶段协议来实现的最终数据一致性方案,也是Seata默认的解决方案。
- TCC模式,TCC事务是Try,Confirm,Cancel 三个词语的缩写,简单理解就是把一个完整的业务逻辑拆分成三个阶段,然后通过事务管理器在业务逻辑层面,根据每个分支事务的执行情况分别调用该业务的Confirm 或者Cacel方法。
- Saga模式,Saga模式是SEATA提供长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者。
- XA模式,XA可以认为是一种强一致性的事务解决方法,它利用事务资源(数据库,消息服务等)对XA协议的支持,以XA协议的机制来管理分支事务的一种事务模式。
TC (Transaction Coordinator) - 事务协调者
- 维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
- 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
- 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata项目整合
- Seata控制分布式事务
- 1)、每一个微服务先必须创建 undo_log;
- 2)、安装事务协调器;seata-server: https://github.com/seata/seata/releases
- 3)、整合
- 1、导入依赖 spring-cloud-starter-alibaba-seata seata-all-0.7.1
- 2、解压并启动seata-server;
- registry.conf: 注册中心配置; 修改registry type=nacos
- file.conf:
- 3、所有想要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源
- 4、每个微服务,都必须导入
- registry.conf
- file.conf vgroup_mapping.{application.name}-fescar-service-group = “default”
- 5、启动测试分布式事务
- 6、给分布式大事务的入口标注@GlobalTransactional
- 7、每一个远程的小事务用 @Transactional
应用场景:
- 订单服务提交订单,库存服务扣减库存
- seta-提交订单接口 OrderServiceImpl 类submitOrder()方法 加 @GlobalTransactional
- (下单是高并发场景,不推荐seata分布式事务。推荐用消息队列 最终一致性)
- 商品服务保持商品信息,远程调用优惠服务保持商品优惠券信息
- 开启seta全局事务-SpuInfoServiceImpl类saveSpuInfo()方法 (推荐)
Sentinel
介绍 · alibaba/Sentinel Wiki · GitHub
Sentinel理解
- 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
- 我们说的资源,可以是任何东西,服务,服务里的方法,甚至是一段代码。使用 Sentinel 来进行资源保护,主要分为几个步骤:
- 定义资源
- 定义规则
- 检验规则是否生效
- Sentinel 分为两个部分:
- 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时 对 Dubbo / Spring Cloud 等框架也有较好的支持。
- 控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器
熔断降级限流
- 熔断
- A 服务调用 B 服务的某个功能,B服务卡死,功能时间超长,我们就将B断路,返回降级数据。
- 降级
- 流量高峰期,服务器压力剧增,对一些页面和服务做策略的降级(停止服务,调用直接返回降级数据)
- 熔断、降级区别
- 相同点
- 保证服务的可用性
- 不同点
- 1、熔断是被调用方故障,触发的系统主动规则
- 2、降级是基于全局考虑,停止一些正常服务,释放资源
- 相同点
- 限流
- 对打入服务的请求流量进行控制,使服务能够承担不超过自己能力的流量压力
项目整合步骤
/**
* 1、整合Sentinel
* 1)、导入依赖 spring-cloud-starter-alibaba-sentinel
* 2)、下载sentinel的控制台
* 3)、配置sentinel控制台地址信息
* 4) 、在控制台调整参数。【默认所有的流控设置保存在内存中,重启失效】
*
*
* 2、每一个微服务都导入 actuator ();并配合management.endpoints.web.exposure.include=*
* 3、自定义sentinel流控返回数据
*
* 4、使用Sentinel来保护feign远程调用:熔断;
* 1)、调用方的熔断保护:feign.sentinel.enabled=true
* 2)、调用方手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法
* 3)、超大浏览的时候,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略;
* 提供方是在运行。但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据),
*
* 5、自定义受保护的资源
* 1)、代码
* try(Entry entry = SphU.entry("seckillSkus")){
* //业务逻辑
* }
* catch(Execption e){}
*
* 2)、基于注解。
* @SentinelResource(value = "getCurrentSeckillSkusResource",blockHandler = "blockHandler")
*
* 无论是1,2方式一定要配置被限流以后的默认返回.
* url请求可以设置统一返回:WebCallbackManager
*
*
*/整合 Feign+Sentinel 测试熔断降级
引入依赖:>spring-cloud-starter-openfeign、spring-cloud-starter-alibaba-sentinel<
2、使用 Nacos 注册中心 spring-cloud-starter-alibaba-nacos-discovery
定义 fallbackfactory 并放在容器中
java/** * @Author zly * @Date 2022/7/26 16:22 */ @Slf4j @Component public class SeckillFeignServiceFallBack implements SeckillFeignService { @Override public R getSkuSeckilInfo(Long skuId) { log.info("熔断方法调用.."); return R.error(BizCodeEnum.TO_MANY_REQUEST.getCode(),BizCodeEnum.TO_MANY_REQUEST.getMessage()); } }远程接口配置 feign 客户端容错
java/** * @Author zly * @Date 2022/7/26 16:21 */ @FeignClient(value = "gulimall-seckill",fallback = SeckillFeignServiceFallBack.class) public interface SeckillFeignService { /** * 根据skuId查询商品是否参加秒杀活动 * @param skuId * @return */ @GetMapping(value = "/sku/seckill/{skuId}") R getSkuSeckilInfo(@PathVariable("skuId") Long skuId); }开启 sentinel 代理 feign 功能;在 application.properties 中配置
- feign.sentinel.enabled=true
注解支持 · alibaba/Sentinel Wiki · GitHub
1、使用@SentinelResource,并定义 fallback
javacom.atguigu.gulimall.seckill.service.impl.SeckillServiceImpl @SentinelResource(value = "getCurrentSeckillSkusResource",blockHandler = "blockHandler") @Override public List<SeckillSkuRedisTo> getCurrentSeckillSkus() { }
Nacos
项目整合
/**
* 1、如何使用Nacos作为配置中心统一管理配置
*
* 1)、引入依赖,
* <dependency>
* <groupId>com.alibaba.cloud</groupId>
* <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
* </dependency>
* 2)、创建一个bootstrap.properties。
* spring.application.name=gulimall-coupon
* spring.cloud.nacos.config.server-addr=127.0.0.1:8848
* 3)、需要给配置中心默认添加一个叫 数据集(Data Id)gulimall-coupon.properties。默认规则,应用名.properties
* 4)、给 应用名.properties 添加任何配置
* 5)、动态获取配置。
* @RefreshScope:动态获取并刷新配置
* @Value("${配置项的名}"):获取到配置。
* 如果配置中心和当前应用的配置文件中都配置了相同的项,优先使用配置中心的配置。
*
* 2、细节
* 1)、命名空间:配置隔离;
* 默认:public(保留空间);默认新增的所有配置都在public空间。
* 1、开发,测试,生产:利用命名空间来做环境隔离。
* 注意:在bootstrap.properties;配置上,需要使用哪个命名空间下的配置,
* spring.cloud.nacos.config.namespace=9de62e44-cd2a-4a82-bf5c-95878bd5e871
* 2、每一个微服务之间互相隔离配置,每一个微服务都创建自己的命名空间,只加载自己命名空间下的所有配置
*
* 2)、配置集:所有的配置的集合
*
* 3)、配置集ID:类似文件名。
* Data ID:类似文件名
*
* 4)、配置分组:
* 默认所有的配置集都属于:DEFAULT_GROUP;
* 1111,618,1212
*
* 项目中的使用:每个微服务创建自己的命名空间,使用配置分组区分环境,dev,test,prod
*
* 3、同时加载多个配置集
* 1)、微服务任何配置信息,任何配置文件都可以放在配置中心中
* 2)、只需要在bootstrap.properties说明加载配置中心中哪些配置文件即可
* 3)、@Value,@ConfigurationProperties。。。
* 以前SpringBoot任何方法从配置文件中获取值,都能使用。
* 配置中心有的优先使用配置中心中的,
*
/**
* 1、开启服务注册发现
* (配置nacos的注册中心地址)
* 2、编写网关配置文件
*/Nacos作为注册中心:
将微服务注册到 nacos 中
1、首先,修改 pom.xml 文件,引入 Nacos Discovery Starter。
2、在应用的 /src/main/resources/application.properties 配置文 件中配置 Nacos Server 地址
propertiesspring.cloud.nacos.discovery.server-addr=127.0.0.1:88483、使用@EnableDiscoveryClient 开启服务注册发现功能
4、启动应用,观察 nacos 服务列表是否已经注册上服务
注意:每一个应用都应该有名字,这样才能注册上去。修改 application.properties 文件
spring.application.name=service-provider
server.port=8000
5、注册更多的服务上去,测试使用 feign 远程调用
Nacos 使用三步
1、导包 nacos-discovery
2、写配置,指定 nacos 地址,指定应用的名字
3、开启服务注册发现功能@EnableDiscoveryClient
Feign 使用三步
1、导包 openfeign
2、开启@EnableFeignClients 功能
3、编写接口,进行远程调用
Nacos作为配置中心
1、pom.xml 引入 Nacos Config Starter。
2、在应用的 /src/main/resources/bootstrap.properties 配 置文件中配置 Nacos Config 元数据
propertiesspring.application.name=nacos-config-example spring.cloud.nacos.config.server-addr=127.0.0.1:8848 #指定命名空间 spring.cloud.nacos.config.namespace=af7b6638-68dd-43a0-8bef-19dbaf2ae5c3 #指定分组 默认DEFAULT_GROUP spring.cloud.nacos.config.group=dev #主要配置应用名和配置中心地址3、在 nacos 中添加配置
在 nacos 中创建一个 应用名.properties 配置文件并编写配置
Nacos Config 数据结构
Nacos Config 主要通过 dataId 和 group 来唯一确定一条配置。
Nacos Client 从 Nacos Server 端获取数据时,调用的是此接口 ConfigService.getConfig(String dataId, String group, long timeoutMs)。
Spring Cloud 应用获取数据
dataID:
在 Nacos Config Starter 中,dataId 的拼接格式如下
- ${prefix} - ${spring.profiles.active} . ${file-extension} prefix 默认为 spring.application.name 的值,也可以通过配置项 spring.cloud.nacos.config.prefix 来配置。
- spring.profiles.active 即为当前环境对应的 profile
注意,当 activeprofile 为空时,对应的连接符 - 也将不存在,dataId 的拼接格式变成 prefix . {prefix}.prefix.
file-extension 为配置内容的数据格式,可以通过配置项 spring.cloud.nacos.config.file-extension 来配置。 目前只支持 properties 类型。
Group:
Group 默认为 DEFAULT_GROUP,可以通过 spring.cloud.nacos.config.group 配置。
4、在应用中使用@Value 和@RefreshScope
完成上述两步后,应用会从 Nacos Config 中获取相应的配置,并添加在 Spring Environment
的 PropertySources 中 。 这 里 我 们 使 用 @Value 注 解 来 将 对 应 的 配 置 注 入 到
SampleController 的 userName 和 age 字段,并添加 @RefreshScope 打开动态刷新功能
@RefreshScope
class SampleController {
@Value(“${user.name}”)
String userName;
@Value(“${user.age}”)
int age;
}
Nacos进阶
核心概念
命名空间:配置隔离
用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的
配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生
产环境的资源(如配置、服务)隔离等
配置集:所有配置的集合
一组相关或者不相关的配置项的集合称为配置集。在系统中,一个配置文件通常就是一个配
置集,包含了系统各个方面的配置。例如,一个配置集可能包含了数据源、线程池、日志级
别等配置项。
配置集ID:类似文件名。
Nacos 中的某个配置集的 ID。配置集 ID 是组织划分配置的维度之一。Data ID 通常用于组
织划分系统的配置集。一个系统或者应用可以包含多个配置集,每个配置集都可以被一个有
意义的名称标识。Data ID 通常采用类 Java 包(如 com.taobao.tc.refund.log.level)的命名
规则保证全局唯一性。此命名规则非强制。
配置分组:默认所有的配置集都属于:DEFAULT_GROUP;
Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或
Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个
配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置
分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和
MQ_topic 配置。
原理
- 自动注入:
- NacosConfigStarter 实现了 org.springframework.cloud.bootstrap.config.PropertySourceLocator 接口,并将优先级设置成了最高。 在 Spring Cloud 应用启动阶段,会主动从 Nacos Server 端获取对应的数据,并将获取到的 数据转换成 PropertySource 且注入到 Environment 的 PropertySources 属性中,所以使用 @Value 注解也能直接获取 Nacos Server 端配置的内容。
- 动态刷新:
- Nacos Config Starter 默认为所有获取数据成功的 Nacos 的配置项添加了监听功能,在监听 到服务端配置发生变化时会实时触发 org.springframework.cloud.context.refresh.ContextRefresher 的 refresh 方法 。
- 如果需要对 Bean 进行动态刷新,请参照 Spring 和 Spring Cloud 规范。推荐给类添加**@RefreshScope** 或 @ConfigurationProperties 注解,
- 自动注入:
3、加载多配置文件
propertiesspring.cloud.nacos.config.server-addr=127.0.0.1:8848 spring.cloud.nacos.config.namespace=31098de9-fa28-41c9-b0bd-c754ce319ed4 spring.cloud.nacos.config.ext-config[0].data-id=gulimall-datasource.yml #动态刷新 spring.cloud.nacos.config.ext-config[0].refresh=false spring.cloud.nacos.config.ext-config[0].group=dev4、namespace 与 group 最佳实践
- 每个微服务创建自己的 namespace 进行隔离,group 来区分 dev,beta,prod 等环境
Gateway
- 简介
- 网关作为流量的入口,常用功能包括路由转发、权限校验、限流控制等。
- 特点
- 基于 Spring5,支持响应式编程和 SpringBoot2.0
- 支持使用任何请求属性进行路由匹配
- 特定于路由的断言和过滤器
- 集成 Hystrix 进行断路保护
- 集成服务发现功能
- 易于编写 Predicates 和 Filters
- 支持请求速率限制
- 支持路径重写
- API网关优点
- 易于监控。可以在网关收集监控数据并将其推送到外部系统进行分析。
- 易于认证。可以在网关上进行认证,然后再将请求转发到后端的微服务,而无须在 每个微服务中进行认证。
- 减少了客户端与各个微服务之间的交互次数。
- 核心概率
- 路由
- 路由是网关最基础的部分,路由信息有一个 ID、一个目的 URL、一组断言和一组 Filter 组成。如果断言路由为真,则说明请求的 URL 和配置匹配
- 断言
- Java8 中的断言函数。Spring Cloud Gateway 中的断言函数输入类型是 Spring5.0 框 架中的 ServerWebExchange。Spring Cloud Gateway 中的断言函数允许开发者去定义匹配 来自于 http request 中的任何信息,比如请求头和参数等。
- 过滤器
- 一个标准的 Spring webFilter。Spring cloud gateway 中的 filter 分为两种类型的 Filter,分别是 Gateway Filter 和 Global Filter。过滤器 Filter 将会对请求和响应进行修改 处理
- 满足某些断言(predicates)就路由到指定的地址(uri),使用指定的过滤器(filter)
- 路由
yamlspring: cloud: gateway: routes: - id: test_route uri: https://www.baidu.com predicates: - Query=url,baidu - id: product_route uri: lb://gulimall-product predicates: - Path=/api/product/** filters: - RewritePath=/api/?(?<segment>.*),/$\{segment} - id: gulimall_host_route uri: lb://gulimall-product predicates: - Host=gulimall.com,item.gulimall.com
Feign
Feign 声明式远程调用
简介
- Feign 是一个声明式的 HTTP 客户端,它的目的就是让远程调用更加简单。
- Feign 整合了 Ribbon(负载均衡)和 Hystrix(服务熔断),
使用
1、引入依赖 spring-cloud-starter-openfeign
2、开启 feign 功能 @EnableFeignClients(basePackages = “com.atguigu.gulimall.member.feign”)
3、声明远程接口
java/** * @Author zly * @Date 2022/7/25 12:04 */ @FeignClient("gulimall-order") public interface OrderFeignService { /** * 分页查询当前登录用户的所有订单信息 * @param params * @return */ @PostMapping("/order/order/listWithItem") R listWithItem(@RequestBody Map<String, Object> params); }
Feign远程调用丢失请求头问题

Feign异步情况丢失上下文问题

SpringCloud Alibaba 常用组件端口号总结
Nacos 默认端口号:8848
properties#*************** Spring Boot Related Configurations ***************# ### Default web context path: server.servlet.contextPath=/nacos ### Default web server port: server.port=8848Seata默认端口号:8091
Sentinel控制台默认运行在8080端口上
Elasticsearch 默认端口 9200 搜索引擎 不属于SpringCloud Alibaba
支付
加密-对称加密

加密-非对称加密

支付宝验签

收单
- 1、订单在支付页,不支付,一直刷新,订单过期了才支付,订单状态改为已支付了,但是库存解锁了。
- 使用支付宝自动收单功能解决。只要一段时间不支付,就不能支付了。
- 2、由于时延等问题。订单解锁完成,正在解锁库存的时候,异步通知才到
- 订单解锁,手动调用收单
- 3、网络阻塞问题,订单支付成功的异步通知一直不到达
- 查询订单列表时,ajax获取当前未支付的订单状态,查询订单状态时,再获取一下支付宝此订单的状态
- 4、其他各种问题
- 每天晚上闲时下载支付宝对账单,一一进行对账
MD5&MD5盐值加密
- MD5
- Message Digest algorithm 5,信息摘要算法
- 压缩性:任意长度的数据,算出的MD5值长度都是固定的。
- 容易计算:从原数据计算出MD5值很容易。
- 抗修改性:对原数据进行任何改动,哪怕只修改1个字节,所得到的MD5值都有很大区别。
- 强抗碰撞:想找到两个不同的数据,使它们具有相同的MD5值,是非常困难的。
- 不可逆
- Message Digest algorithm 5,信息摘要算法
- 加盐:
- 通过生成随机数与MD5生成字符串进行组合
- 数据库同时存储MD5值与salt值。验证正确性时使用salt进行MD5即可
项目微服务

Nginx+Windows搭建域名访问环境

域名映射效果
- 请求接口 gulimall.com
- 请求页面 gulimall.com
- nginx直接代理给网关,网关判断
- 如果/api/****,转交给对应的服务器
- 如果是 满足域名,转交给对应的服务
正向代理与反向代理
- 正向代理:如科学上网,隐藏客户端信息

- 反向代理:屏蔽内网服务器信息,负载均衡访问

Nginx配置文件

Nginx动静分离

Nginx转发效果

内网穿透


内网穿透联调

配置
- Nginx配置
…
listen 80; server_name gulimall.com *.gulimall.com 497n86m7k7.52http.net; #charset koi8-r; #access_log /var/log/nginx/log/host.access.log main; location /static/ { root /usr/share/nginx/html; } location /payed/ { proxy_set_header Host order.gulimall.com; proxy_pass http://gulimall; }
…
缓存
本地缓存

分布式缓存-本地模式在分布式下的问题

分布式缓存

高并发下缓存失效问题-缓存穿透

- 缓存穿透:
- 指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
- 风险:
- 利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
- 解决:
- null结果缓存,并加入短暂过期时间
高并发下缓存失效问题-缓存雪崩

缓存雪崩:
- 缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
解决:
- 原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
高并发下缓存失效问题-缓存击穿

缓存穿透:
- 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常"热点"的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
解决:
- 加锁
- 大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db
- 加锁
分布式下如何加锁?

- 本地锁,只能锁住当前进程,所以我们需要分布式锁
锁-时序问题

分布式锁演进-基本原理

我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。 等待可以自旋的方式。
分布式锁演进-阶段一

分布式锁演进-阶段二

分布式锁演进-阶段三

分布式锁演进-阶段四

分布式锁演进-阶段五-最终形态

缓存数据一致性-双写模式

缓存数据一致性-失效模式

缓存数据一致性-解决方案
- 无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
- 1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
- 2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
- 3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
- 4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心脏数据,允许临时脏数据可忽略);
- 总结:
- 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
- 我们不应该过度设计,增加系统的复杂性
- 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
缓存数据一致性-解决-Canal

Session共享问题
session原理

分布式下session共享问题

解决-session复制

- 优点
- web-server(Tomcat)原生支持,只需要修改配置文件
- 缺点
- session同步需要数据传输,占用大量网络带宽,降低了服务器群的业务处理能力
- 任意一台web-server保存的数据都是所有web-server的session总和,受到内存限制无法水平扩展更多的web-server
- 大型分布式集群情况下,由于所有web-server都全量保存数据,所以此方案不可取。
解决-客户端存储

优点
- 服务器不需存储session,用户保存自己的session信息到cookie中。节省服务端资源
缺点
- 都是缺点,这只是一种思路。
- 具体如下:
- 每次http请求,携带用户在cookie中的完整信息,浪费网络带宽
- session数据放在cookie中,cookie有长度限制4K,不能保存大量信息
- session数据放在cookie中,存在泄漏、篡改、窃取等安全隐患
这种方式不会使用。
解决-hash一致性

- 优点:
- 只需要改nginx配置,不需要修改应用代码
- 负载均衡,只要hash属性的值分布是均匀的,多台web-server的负载是均衡的
- 可以支持web-server水平扩展(session同步法是不行的,受内存限制)
- 缺点
- session还是存在web-server中的,所以web-server重启可能导致部分session丢失,影响业务,如部分用户需要重新登录
- 如果web-server水平扩展,rehash后session重新分布,也会有一部分用户路由不到正确的session
- 但是以上缺点问题也不是很大,因为session本来都是有有效期的。所以这两种反向代理的方式可以使用
解决-统一存储

- 优点:
- 没有安全隐患
- 可以水平扩展,数据库/缓存水平切分即可
- web-server重启或者扩容都不会有session丢失
- 不足
- 增加了一次网络调用,并且需要修改应用代码;如将所有的getSession方法替换为从Redis查数据的方式。redis获取数据比内存慢很多
- 上面缺点可以用SpringSession完美解决
解决-不同服务,子域session共享
jsessionid这个cookie默认是当前系统域名的。当我们分拆服务,不同域名部署的时候,我们可以使用 如下解决方案;

SpringSession核心原理

购物车
购物车数据结构

- Map<String k1,Map<String k2,CartItemInfo>> 在redis中
- k1:标识每一个用户的购物车 key:用户标识
- k2:购物项的商品id value:Hash(k:商品id,v:购物项详情)
消息队列RabbitMQ
RabbitMQ概念

rabbitmq 是一个开源的消息代理和队列服务器,通过普通的协议(Amqp 协议)来完成不同应用之间的数据共享,rabbitMq 是通过 erlang 语言来开发的基于 amqp 协议。
什么AMQP协议
- 是一个二进制协议
- amqp 是一个应用层协议的规范(定义了很多规范),可以有很多不同的消息中间件产品(需要遵循该规范)
server: 是消息队列节点
virtual host: 虚拟主机
exchange: 交换机(消息投递到交换机上)
message queue: 被消费者监听消息
交换机和队列是有一个绑定的关系

AMQP的核心概念
- server : 又称为broker,接受客户端连接,实现amqp实体服务
- connection: 连接,应用程序与broker建立网络连接
- channel: 网络通道,几乎所有的操作都是在 channel 中进行的,是进行消息对象的通道,客户端可以建立多个通道,每一个 channnel 表示一个会话任务。
- Message: 服务器和应用程序之间传递数据的载体,有 properties(消息属性,用来修饰消息,比如消息的优先级,延时投递)和 Body (消息体)
- virtual host(虚拟主机):是一个逻辑概念,最上层的消息路由,一个虚拟主机中可以包含多个 exchange 和 queue 但是一个虚拟主机中不能有名称相同的 exchange 和 queue
- exchange (交换机): 消息直接投递到交换机上,然后交换机根据消息的路由 key 来路由到对应绑定的队列上。
- binding (绑定): 绑定 exchange 与 queue 的虚拟连接, binding 中可以包含route_key
- route_key(路由key):它的作用是在交换机上通过 route_key 来把消息路由到那个队列上。
- queue 队列(队列): 用来保存消息的载体,有消费者监听,然后消费消息。
RabbitMq的消息是如何流转的?

RabbitMq 交换机详解
作用:
- 接受生产者的消息,然后根据路由键把消息投递到跟交换机绑定的对应的队列上。

交换机属性
- Name: 交换机名称
- Type: 交换机类型,Direct、topic、fanout、headers
- Durablity:是否需要持久化
- autodelete: 假如没有队列绑定到该交换机,那么交换机会自动删除。
- internal: 当前交换机交换机是否用户 RabbitMq 内部使用不常用,默认为false
- Argurements: 扩展参数,用户扩展AMQP定制化协议。
交换机类型
- 直连交换机(Direct exchange)
所有发送的 Direct exchange 的消息都会被投递到与 Routekey名称(与队列名称)相同的queue上
direct 模式下,可以使用 RabbitMq 自定exchange —> default exchange 所以不需要交换机和任何队列绑定,消息将会投递到route_key名称和队列名称相同的队列上。

- 主题交换机 TopicExchange
就是在队列上绑到 top 交换机上的路由key 可以是通过通配符来匹配的通配符的规则是
比如:log.#: 可以是匹配一个单词 也可以匹配多个单词 比如 log.# 可以匹配log.a log.a.b log.* : 可以匹配一个单词,比如log.* ,可以匹配log.a 但是不可以匹配log.a.b。

- 扇形交换机(fanout exchange)
就是消息通过丛交换机到队列上不会通过路由key 所以该模式的速度是最快的 只要和交换机绑定的那么消息就会被分发到与之绑定的队列上。

队列、绑定主机、消息
绑定: exchange 与 之间的连接关系(通过路由规则)
队列: 用来存储消息的实体
队列的属性: durablility 是否被持久化
AutoDelete: 表示最后一个监听被移除那么该队列就会删除。
消息: 用来生产者 和 消费者之间传递数据的
消息属性: 包括 消息体Body 和 属性 properties
常用属性: delivery mode,headers,content_type(消息类型) content_encoding(消息编码),priporty(消息优先级) correntlation_id( 最终消息唯一的ID)、reply_to(消息失败重回队列),expiretion(消息的过期时 间),message_id(消息id);timestamp,type,user_id ,app_id,cluster_id等
Q.E.D.
