一:介绍
1.介绍
在前面的说的模式中会出现一个问题。
就是生产者将消息发送出去到底有没有到达rabbitMq,默认情况下是不知道。
有两种解决方式。
AMQP实现事务机制
Confirm机制。
这里先说明第一种实现方式。
2.事务机制
txSelect:用于将当前的channel设置成transation模式。
txCommit:用于提交事务
txRollback:回滚事务
3.缺点
很耗时,降低吞吐量。
二:程序
1.生产者
1 package com.mq.TxCommit; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 import java.util.Map; 8 import java.util.concurrent.TimeoutException; 9 10 public class Send {11 private static final String QUEUE_NAME="test_queue_tx";12 public static void main(String[] args)throws Exception{13 Connection connection= ConnectionUtil.getConnection();14 Channel channel=connection.createChannel();15 channel.queueDeclare(QUEUE_NAME,false,false,false,null);16 String msg="tx msg";17 try {18 channel.txSelect();19 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());20 System.out.println("msg:"+msg);21 //营造一个可以回退的语句22 int a=1/0;23 channel.txCommit();24 }catch (Exception e){25 channel.txRollback();26 }27 }28 }
2.消费者
1 package com.mq.TxCommit; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class Receive { 9 private static final String QUEUE_NAME="test_queue_tx";10 public static void main(String[] args)throws Exception {11 Connection connection = ConnectionUtil.getConnection();12 Channel channel = connection.createChannel();13 channel.queueDeclare(QUEUE_NAME, false, false, false, null);14 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){15 @Override16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {17 System.out.println(new String(body,"utf-8"));18 }19 });20 }21 }
3.现象
就不粘图了,用文字说明一番。
如果没有粘贴可以造成异常的语句,就可以收到消息;如果有,就收不到消息。