yegang 2 年 前
コミット
363642432f

+ 10 - 1
pom.xml

@@ -13,6 +13,7 @@
     <artifactId>mask</artifactId>
     <version>0.0.1-SNAPSHOT</version>
     <name>mask</name>
+    <packaging>jar</packaging>
     <description>口罩秒杀</description>
     <properties>
         <java.version>1.8</java.version>
@@ -78,6 +79,14 @@
 <!--            <version>4.5.7</version>-->
 <!--        </dependency>-->
     </dependencies>
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>2.6.0</version>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>

+ 54 - 17
src/main/java/com/sw/activemq/consumer/MessageConsumerService.java

@@ -1,7 +1,12 @@
 package com.sw.activemq.consumer;
 
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.sw.activemq.producer.IMessageProducerService;
+import com.sw.domain.TOrder;
 import com.sw.service.SnatchMaskService;
+import com.sw.service.TOrderService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jms.annotation.JmsListener;
 import org.springframework.stereotype.Service;
@@ -9,28 +14,60 @@ import org.springframework.stereotype.Service;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 
 @Service
 @Slf4j
 public class MessageConsumerService {
 
-	@Autowired
-	private SnatchMaskService snatchMaskService;
+    @Autowired
+    private SnatchMaskService snatchMaskService;
+    @Autowired
+    private IMessageProducerService iMessageProducerService;
+    @Autowired
+    private TOrderService tOrderService;
 
-	@JmsListener(destination="mask")
-	public void receiveMessage(Message info) {	// 进行消息接收处理
-		if(info instanceof MapMessage){
-			MapMessage objectMessage=(MapMessage) info;
-			try {
-				Integer mid = objectMessage.getInt("mid");
-				Integer uid = objectMessage.getInt("uid");
-				snatchMaskService.doDecrAndCreateOrder(uid, mid);
-			} catch (JMSException e) {
-				e.printStackTrace();
-			}
-		}
-	}
+    @JmsListener(destination = "mask")
+    public void receiveMessage(Message info) {    // 进行消息接收处理
+        if (info instanceof MapMessage) {
+            MapMessage objectMessage = (MapMessage) info;
+            try {
+                Integer mid = objectMessage.getInt("mid");
+                Integer uid = objectMessage.getInt("uid");
+                Map<String,Integer> map = new HashMap<>();
+                map.put("uid",uid);
+                map.put("mid",mid);
+                snatchMaskService.doDecrAndCreateOrder(uid, mid);
+                MsgPara msgPara = new MsgPara(uid+"",mid+"");
+				iMessageProducerService.delayMessage(new ActiveMQQueue("delay.mask.queue"),msgPara, (long) 60000);
+            } catch (JMSException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @JmsListener(destination = "delay.mask.queue")
+    public void delayMask(MsgPara msgPara) {    // 进行消息接收处理
+        if (null !=msgPara) {
+            Integer mid =Integer.parseInt(msgPara.getMid());
+            Integer uid = Integer.parseInt(msgPara.getUid());
+            QueryWrapper<TOrder> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("uid", uid);
+            queryWrapper.eq("mid", mid);
+            TOrder order = tOrderService.getOne(queryWrapper);
+            if (order != null) {
+                if ("2".equals(order.getState())) {
+                    System.out.println("订单已处理");
+                    return;
+                } else if("1".equals(order.getState())) {
+                    order.setState("3");
+                    tOrderService.updateById(order);
+                    System.out.println("订单已过期");
+                }
+            }else{
+                System.out.println("没有符合条件的订单");
+            }
+        }
+    }
 }

+ 23 - 0
src/main/java/com/sw/activemq/consumer/MsgPara.java

@@ -0,0 +1,23 @@
+package com.sw.activemq.consumer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * 延迟队列消息参数
+ *
+ * @author yegang
+ * @create 2022-02-22 15:58
+ **/
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class MsgPara  implements Serializable {
+    String uid;
+    String mid;
+}

+ 6 - 0
src/main/java/com/sw/activemq/producer/IMessageProducerService.java

@@ -1,5 +1,9 @@
 package com.sw.activemq.producer;
 
+
+
+import javax.jms.Destination;
+import java.io.Serializable;
 import java.util.Map;
 
 public interface IMessageProducerService {
@@ -7,4 +11,6 @@ public interface IMessageProducerService {
 	void sendMessage(String message);
 
 	void sendMessage(Map<String, Object> message);
+
+	<T extends Serializable>void delayMessage(Destination destination, T data, Long time);
 }

+ 45 - 1
src/main/java/com/sw/activemq/producer/impl/MessageProducerServiceImpl.java

@@ -1,11 +1,14 @@
 package com.sw.activemq.producer.impl;
 
 import com.sw.activemq.producer.IMessageProducerService;
+import org.apache.activemq.ScheduledMessage;
+import org.springframework.boot.autoconfigure.jms.JmsProperties;
 import org.springframework.jms.core.JmsMessagingTemplate;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import javax.jms.Queue;
+import javax.jms.*;
+import java.io.Serializable;
 import java.util.Map;
 
 @Service
@@ -25,4 +28,45 @@ public class MessageProducerServiceImpl implements IMessageProducerService {
 	public void sendMessage(Map<String, Object> message) {
 		jmsMessagingTemplate.convertAndSend(this.queue, message);
 	}
+	@Override
+	public <T extends Serializable> void delayMessage(Destination destination, T data, Long time) {
+		Connection connection =null;
+		Session session =null;
+		MessageProducer messageProducer =null;
+		ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
+		try {
+			//获取连接
+			connection = connectionFactory.createConnection();
+			//开始
+			connection.start();
+			//获取事务
+			session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+			//创建一个消息队列
+			messageProducer = session.createProducer(destination);
+			messageProducer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
+			ObjectMessage objectMessage = session.createObjectMessage( data);
+			// 设置延迟时间
+			objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);
+			messageProducer.send(objectMessage);
+			session.commit();
+		} catch (JMSException e) {
+			e.printStackTrace();
+		}finally {
+			try{
+				if(messageProducer!=null){
+					messageProducer.close();
+				}
+				if(session!=null){
+					session.close();
+				}
+				if(connection!=null){
+					connection.close();
+				}
+
+			}catch (Exception e){
+				e.printStackTrace();
+			}
+		}
+
+	}
 }

+ 8 - 1
src/main/java/com/sw/config/ActiveMqConfig.java

@@ -9,6 +9,8 @@ import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
 import org.springframework.jms.core.JmsMessagingTemplate;
 
 import javax.jms.ConnectionFactory;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @author yegang
@@ -28,7 +30,12 @@ public class ActiveMqConfig {
 
     @Bean
     public ConnectionFactory connectionFactory(){
-        return new ActiveMQConnectionFactory(username, password, brokerUrl);
+
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
+        List<String> models = new ArrayList<>();
+        models.add("com.sw.activemq.consumer");
+        activeMQConnectionFactory.setTrustedPackages(models);
+        return activeMQConnectionFactory;
     }
 
     @Bean

+ 33 - 10
src/main/java/com/sw/service/impl/SnatchMaskServiceImpl.java

@@ -16,6 +16,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import javax.annotation.Resource;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @Description TODO
@@ -37,26 +40,29 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
     private IMessageProducerService mqService;
 
     @Value("${amount}")
-    private  Integer amount;
+    private Integer amount;
     Object o = new Object();
+    ReentrantLock reentrantLock = new ReentrantLock();
+    Condition condition = reentrantLock.newCondition();
+
     @Override
     @Transactional
     public String doSnatch(Integer uid) {
         Integer maskId = 1;
         //1.get the amount of remaining masks
         TMask mask = maskService.getById(maskId);
-        if(mask.getMaskStock() > amount){
+        if (mask.getMaskStock() > amount) {
             //3.create order
             int res = orderService.creatOrder(uid, maskId);
             //4.decrease the stock of mask
             Boolean flag = maskService.decrease(maskId, mask.getMaskStock() - amount);
-            if(res > 0 && flag){
+            if (res > 0 && flag) {
 
                 //3. add to successUserUids
                 redisTemplate.opsForSet().add("successUserUids", uid);
                 return "恭喜你 抢购成功";
-            }else{
-                return  "抢购失败 稍后重试";
+            } else {
+                return "抢购失败 稍后重试";
             }
         }
         return "库存不足 抢购失败";
@@ -66,9 +72,25 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
      * 查看redis库存是否充足
      */
     @Override
-    public  Boolean checkAndDecrRemaining(Integer mid) {
+    public Boolean checkAndDecrRemaining(Integer mid) {
 
-        synchronized (o) {
+        reentrantLock.lock();
+        try {
+           Integer remaining = Integer.valueOf(redisTemplate.opsForHash().get("usableMasks", "mask-" + mid).toString());
+           System.out.println("库存扣减前:" + remaining + "---" + Thread.currentThread());
+           if (remaining.compareTo(amount) >= 0) {
+               Long res = redisTemplate.opsForHash().increment("usableMasks", "mask-" + mid, -amount);
+               System.out.println("库存扣减后:" + "increment res : " + res + "---" + Thread.currentThread());//扣减之后的库存数量
+               return true;
+           } else {
+               return false;
+           }
+       } finally {
+           reentrantLock.unlock();
+       }
+
+
+   /*     synchronized (o) {
 //            Integer  remaining = 0;
             Integer   remaining = Integer.valueOf(redisTemplate.opsForHash().get("usableMasks", "mask-" + mid).toString());
             System.out.println("库存扣减前:"+remaining+"---" + Thread.currentThread());
@@ -79,7 +101,7 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
             }else{
                 return false;
             }
-        }
+        }*/
     }
 
     @Override
@@ -93,6 +115,7 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
 
     /**
      * mq消费端 调用方法 扣减sql库存 创建未支付订单
+     *
      * @param uid
      * @param maskId
      */
@@ -102,7 +125,7 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
         //1.get the amount of remaining masks
         TMask mask = maskService.getById(maskId);
         System.out.println(mask.getMaskStock() + "----" + Thread.currentThread());
-        if(mask.getMaskStock() >= amount) {
+        if (mask.getMaskStock() >= amount) {
             //3.create order
             int res = orderService.creatOrder(uid, maskId);
             //4.decrease the stock of mask
@@ -112,7 +135,7 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
                 //3. add to successUserUids
                 redisTemplate.opsForSet().add("successUserUids", uid);
             }
-        }else {
+        } else {
             // TODO 如果sql中库存不够 扣减失败之后 消息已经消费了 或者是事物回滚了  如何处理???
             return;
         }