Ver código fonte

mq处理订单 锁测试代码

dinglan 2 anos atrás
pai
commit
cd99913b77

+ 1 - 1
src/main/java/com/sw/activemq/consumer/MessageConsumerService.java

@@ -41,7 +41,7 @@ public class MessageConsumerService {
                 snatchMaskService.doDecrAndCreateOrder(uid, mid);
                 MsgPara msgPara = new MsgPara(uid+"",mid+"");
                 //订单生成后发送延时消息,处理订单状态
-				iMessageProducerService.delayMessage(new ActiveMQQueue("delay.mask.queue"),msgPara, (long) 60000);
+				iMessageProducerService.delayMessage(new ActiveMQQueue("delay.mask.queue"),msgPara, (long) 1000);
             } catch (JMSException e) {
                 e.printStackTrace();
             }

+ 11 - 14
src/main/java/com/sw/service/impl/SnatchMaskServiceImpl.java

@@ -81,12 +81,14 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
     public Boolean checkAndDecrRemaining(Integer mid) {
         RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
         Boolean flag = false;
+        final byte[] key = redisTemplate.getKeySerializer().serialize("mask-setnx-mq-" + mid);
+        final byte[] value = redisTemplate.getValueSerializer().serialize(1);
         try {
 //
 //            while (!connection.setNX(redisTemplate.getKeySerializer().serialize("mask-setnx-mq-" + mid), redisTemplate.getValueSerializer().serialize(1))) {
 //                Thread.sleep(3);
 //            }
-            if(connection.setNX(redisTemplate.getKeySerializer().serialize("mask-setnx-mq-" + mid), redisTemplate.getValueSerializer().serialize(1))){
+            if(redisTemplate.opsForValue().setIfAbsent("mask-setnx-mq-" + mid, Thread.currentThread().getName(), 1000, TimeUnit.MILLISECONDS)) {
                 Integer remaining = Integer.valueOf(redisTemplate.opsForHash().get("usableMasks", "mask-" + mid).toString());
                 System.out.println("库存扣减前:" + remaining + "---" + Thread.currentThread());
                 if (remaining.compareTo(amount) >= 0) {
@@ -96,7 +98,7 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
                 } else {
                     flag = false;
                 }
-                connection.del(redisTemplate.getKeySerializer().serialize("mask-setnx-mq-" + mid));
+                connection.del(key);
                 connection.close();
                 return flag;
             }else {
@@ -158,11 +160,10 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
     public void doDecrAndCreateOrder(Integer uid, Integer maskId) {
         RLock lock = redissonClient.getLock("mask-" + maskId);
         String threadName = Thread.currentThread().getName();
-        log.info("线程:{} 正在尝试获取锁。。。", threadName);
-        boolean b = false;
+
         try {
-            b = lock.tryLock(3000, TimeUnit.MILLISECONDS);
-            if (b) {
+            while (lock.tryLock(1000, TimeUnit.MILLISECONDS)) {
+                log.info("线程:{} 获取锁", threadName);
                 //1.get the amount of remaining masks
                 TMask mask = maskService.getById(maskId);
                 System.out.println(mask.getMaskStock() + "----" + Thread.currentThread());
@@ -172,7 +173,6 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
                     //4.decrease the stock of mask
                     Boolean flag = maskService.decrease(maskId, mask.getMaskStock() - amount);
                     if (res > 0 && flag) {
-
                         //3. add to successUserUids
                         redisTemplate.opsForSet().add("successUserUids", uid);
                     }
@@ -180,15 +180,12 @@ public class SnatchMaskServiceImpl implements SnatchMaskService {
                     // TODO 如果sql中库存不够 扣减失败之后 消息已经消费了 或者是事物回滚了  如何处理???
                     return;
                 }
+                lock.unlock();
                 log.info("{}:业务执行完成", threadName);
-            } else {
-                log.info("{}:没有获取到锁,锁已被占用", threadName);
+                break;
             }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } finally {
-            lock.unlock();
-            log.info("线程:{},释放了锁", threadName);
+        } catch(InterruptedException e){
+                e.printStackTrace();
         }
     }
 }