Kaynağa Gözat

mq处理订单

dinglan 2 yıl önce
ebeveyn
işleme
86a7271caa

+ 21 - 0
src/main/java/com/sw/activemq/consumer/MessageConsumerService.java

@@ -1,17 +1,38 @@
 package com.sw.activemq.consumer;
 
+import com.sw.service.SnatchMaskService;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jms.annotation.JmsListener;
 import org.springframework.stereotype.Service;
 
+import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import java.util.Date;
+import java.util.Map;
 
 @Service
 @Slf4j
 public class MessageConsumerService {
 
+	@Autowired
+	private SnatchMaskService snatchMaskService;
+
 	@JmsListener(destination="mask")
 	public void receiveMessage(Message info) {	// 进行消息接收处理
+		System.out.println(new Date());
 		log.info("【*** 接收消息 ***】--->info", info);
+		if(info instanceof ObjectMessage){
+			ObjectMessage objectMessage=(ObjectMessage) info;
+			try {
+				Map<String, Object> mesg = (Map) objectMessage.getObject();
+				Integer mid = Integer.valueOf(mesg.get("mid").toString());
+				Integer uid = Integer.valueOf(mesg.get("uid").toString());
+				snatchMaskService.doDecrAndCreateOrder(uid, mid);
+			} catch (JMSException e) {
+				e.printStackTrace();
+			}
+		}
 	}
 }

+ 4 - 0
src/main/java/com/sw/activemq/producer/IMessageProducerService.java

@@ -1,6 +1,10 @@
 package com.sw.activemq.producer;
 
+import java.util.Map;
+
 public interface IMessageProducerService {
 
 	void sendMessage(String message);
+
+	void sendMessage(Map<String, Object> message);
 }

+ 6 - 0
src/main/java/com/sw/activemq/producer/impl/MessageProducerServiceImpl.java

@@ -6,6 +6,7 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import javax.jms.Queue;
+import java.util.Map;
 
 @Service
 public class MessageProducerServiceImpl implements IMessageProducerService {
@@ -19,4 +20,9 @@ public class MessageProducerServiceImpl implements IMessageProducerService {
 	public void sendMessage(String message) {
 		jmsMessagingTemplate.convertAndSend(this.queue, message);
 	}
+
+	@Override
+	public void sendMessage(Map<String, Object> message) {
+		jmsMessagingTemplate.convertAndSend(this.queue, message);
+	}
 }

+ 5 - 0
src/main/java/com/sw/config/RedisConfig.java

@@ -4,9 +4,12 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 import org.springframework.data.redis.serializer.RedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
+import java.nio.charset.Charset;
+
 /**
  * @author yegang
  * @create 2022-02-18 10:30
@@ -23,6 +26,8 @@ public class RedisConfig {
         RedisSerializer<String> redisSerializer = new StringRedisSerializer();// Long类型不可以会出现异常信息;
         redisTemplate.setKeySerializer(redisSerializer);
         redisTemplate.setHashKeySerializer(redisSerializer);
+        RedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
+        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
         return redisTemplate;
     }
 

+ 27 - 0
src/main/java/com/sw/controller/SnatchMaskController.java

@@ -4,10 +4,12 @@ import com.sw.service.SnatchMaskService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import javax.annotation.Resource;
+import java.util.Date;
 
 /**
  * @Description 抢口罩入口
@@ -36,4 +38,29 @@ public class SnatchMaskController {
             return "非法输入";
         }
     }
+
+    /**
+     * 库存保存redis  订单交给mq稍后异步处理
+     * @param uid
+     * @param mid
+     * @return
+     */
+    @GetMapping("/{mid}")
+    private String snatchMQ(Integer uid, @PathVariable("mid") Integer mid){
+        if(redisTemplate.opsForSet().isMember("userUids", uid)){
+            if(redisTemplate.opsForSet().isMember("successUserUids", uid)){
+                return "你已经抢到口罩了";
+            }else{
+                Boolean flag = snatchMaskService.checkAndDecrRemaining(mid);
+                if(flag){
+                    System.out.println(new Date());
+                    return snatchMaskService.doSnatchMQ(uid, mid);
+                }else{
+                    return  "库存不足 抢购失败 ";
+                }
+            }
+        }else{
+            return "非法输入";
+        }
+    }
 }

+ 6 - 0
src/main/java/com/sw/service/SnatchMaskService.java

@@ -7,4 +7,10 @@ package com.sw.service;
  **/
 public interface SnatchMaskService {
     String doSnatch(Integer uid);
+
+    Boolean checkAndDecrRemaining(Integer mid);
+
+    String doSnatchMQ(Integer uid, Integer mid);
+
+    void doDecrAndCreateOrder(Integer uid, Integer mid);
 }

+ 59 - 3
src/main/java/com/sw/service/impl/SnatchMaskServiceImpl.java

@@ -1,16 +1,20 @@
 package com.sw.service.impl;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.sw.activemq.producer.IMessageProducerService;
 import com.sw.domain.TMask;
 import com.sw.service.SnatchMaskService;
 import com.sw.service.TMaskService;
 import com.sw.service.TOrderService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @Description TODO
@@ -28,17 +32,23 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
     @Resource
     private RedisTemplate redisTemplate;
 
+    @Resource
+    private IMessageProducerService mqService;
+
+    @Value("${amount}")
+    private Integer amount;
+
     @Override
     @Transactional
     public String doSnatch(Integer uid) {
         Integer maskId = 1;
         //1.get the amount of remaining masks
         TMask mask = maskService.getById(maskId);
-        if(mask.getMaskStock() > 0){
+        if(mask.getMaskStock() > amount){
             //3.create order
             int res = orderService.creatOrder(uid, maskId);
             //4.decrease the stock of mask
-            Boolean flag = maskService.decrease(maskId, mask.getMaskStock() - 5);
+            Boolean flag = maskService.decrease(maskId, mask.getMaskStock() - amount);
             if(res > 0 && flag){
 
                 //3. add to successUserUids
@@ -48,6 +58,52 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
                 return  "抢购失败 稍后重试";
             }
         }
-        return "抢购失败 稍后重试";
+        return "库存不足 抢购失败";
+    }
+
+    /**
+     * 查看redis库存是否充足
+     */
+    @Override
+    public Boolean checkAndDecrRemaining(Integer mid) {
+        Integer remaining = Integer.valueOf(redisTemplate.opsForHash().get("usableMasks", "mask-" + mid).toString());
+        Long res = redisTemplate.opsForHash().increment("usableMasks", "mask-" + mid, -amount);
+        System.out.println("increment res : "+res);//扣减之后的库存数量
+        return  remaining.compareTo(amount) > 0;
+    }
+
+    @Override
+    public String doSnatchMQ(Integer uid, Integer mid) {
+        Map<String, Object> agrs = new HashMap<>();
+        agrs.put("uid", uid);
+        agrs.put("mid", mid);
+        mqService.sendMessage(agrs);
+        return "抢购成功 请稍后查看订单";
+    }
+
+    /**
+     * mq消费端 调用方法 扣减sql库存 创建未支付订单
+     * @param uid
+     * @param maskId
+     */
+    @Override
+    @Transactional
+    public void doDecrAndCreateOrder(Integer uid, Integer maskId) {
+        //1.get the amount of remaining masks
+        TMask mask = maskService.getById(maskId);
+        if(mask.getMaskStock() > amount) {
+            //3.create order
+            int res = orderService.creatOrder(uid, maskId);
+            //4.decrease the stock of mask
+            Boolean flag = maskService.decrease(maskId, mask.getMaskStock() - amount);
+            if (res > 0 && flag) {
+
+                //3. add to successUserUids
+                redisTemplate.opsForSet().add("successUserUids", uid);
+            }
+        }else {
+            // TODO 如果sql中库存不够 扣减失败之后 消息已经消费了 或者是事物回滚了  如何处理???
+            return;
+        }
     }
 }

+ 2 - 0
src/main/java/com/sw/service/impl/TMaskServiceImpl.java

@@ -9,6 +9,7 @@ import com.sw.domain.TMask;
 import com.sw.domain.TOrder;
 import com.sw.service.TMaskService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -32,6 +33,7 @@ public class TMaskServiceImpl extends ServiceImpl<TMaskDao, TMask> implements TM
     private TOrderDao tOrderDao;
     @Autowired
     private RedisTemplate redisTemplate;
+
     @Override
     /*
      * 方法描述:秒杀service

+ 2 - 2
src/test/java/com/sw/AddUserTest.java

@@ -22,9 +22,9 @@ public class AddUserTest {
     private SysUserDao userDao;
 
     @Test
-    private void addUser(){
+    public void addUser(){
         for (int i = 0;i < 1000; i++){
-            SysUser user = new SysUser("" + i + 1);
+            SysUser user = new SysUser("" + (i + 1));
             userDao.insert(user);
         }
     }

+ 20 - 2
src/test/java/com/sw/RedisTest.java

@@ -23,6 +23,9 @@ public class RedisTest {
     @Autowired
     private RedisTemplate redisTemplate;
 
+    /**
+     * 初始化合法用户uid 到redis
+     */
     @Test
     @Order(2)
     public void addSetTest(){
@@ -30,8 +33,8 @@ public class RedisTest {
         for (int i = 0;i < 1000; i++){
             uids.add( i + 1);
         }
-        redisTemplate.opsForSet().add("userUid", uids.toArray());
-        Assert.assertEquals(1000l, redisTemplate.opsForSet().size("userUid").longValue());
+        redisTemplate.opsForSet().add("userUids", uids.toArray());
+        Assert.assertEquals(1000l, redisTemplate.opsForSet().size("userUids").longValue());
     }
 
     @Test
@@ -49,4 +52,19 @@ public class RedisTest {
 //        System.out.println(redisTemplate.opsForSet().isMember("userUid", 18));
         System.out.println(redisTemplate.opsForSet().isMember("successUserUids", 18));
     }
+
+    @Test
+    public void addMas(){
+        redisTemplate.opsForHash().put("usableMasks", "mask-1", 100);
+        redisTemplate.opsForHash().put("usableMasks", "mask-2", 100);
+
+//        System.out.printf("res: "  + redisTemplate.opsForHash().get("usableMask", "mask-1").toString());
+
+    }
+    @Test
+    public void decr(){
+        Long res = redisTemplate.opsForHash().increment("usableMasks", "mask-1", -5l);
+        System.out.println(res);
+
+    }
 }