RabbitMQ 消息持久化、事务、Publisher的消息确认机制



RabbitMQ  消息持久化、事务、Publisher的消息确认机制

1. 声明MessageQueue
在RabbitMQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。
这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中的信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解:
如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患;
如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。
但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。

Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
  这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。
  其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
  其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
  这种队列适用于只限于一个客户端发送读取消息的应用场景。
b)Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c)Durable:持久化,这个会在后面作为专门一个章节讨论。
d)其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,
仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,
如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

2. 生产者发送消息
3. 消费者订阅消息    

4. 持久化

   RabbitMQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,
   所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。
   通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,
   但是这还不能使得队列中的消息持久化,还需要生产者在发送消息的时候,
   将basicPublish()方法的BasicProperties props参数中deliveryMode设置为2【使用basicPublish(MessageProperties.PERSISTENT_BASIC)】,
   只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。
   这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,
   否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。

5. 事务
   对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,
   因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。
   如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,
   如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。
   当然RabbitMQ也提供了txRollback()命令用于回滚某一个事务。

6. Confirm机制(Publisher Acknowledgements)
   使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,
   这背离了消息队列解耦的本质。RabbitMQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否
   已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,
   然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,
   这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。
   Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,
   会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,
   会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。
   Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,
   生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,
   并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

Java客户端Confirm Mode
[java]  view plain  copy
  1. import java.io.IOException;  
  2. import java.util.concurrent.CountDownLatch;  
  3.   
  4. import com.rabbitmq.client.AMQP.BasicProperties;  
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.DefaultConsumer;  
  9. import com.rabbitmq.client.Envelope;  
  10. import com.rabbitmq.client.MessageProperties;  
  11. import com.rabbitmq.client.QueueingConsumer;  
  12.   
  13. public class ConfirmDontLoseMessages {  
  14.     private final static int msgCount = 10000// 默认发送10000条消息  
  15.     private final static String QUEUE_NAME = "confirm-test2"// 队列名称  
  16.     private static ConnectionFactory connectionFactory;  
  17.   
  18.     public static void main(String[] args)  
  19.             throws IOException, InterruptedException {  
  20.         connectionFactory = new ConnectionFactory();  
  21.         connectionFactory.setHost("10.59.79.37");  
  22.         connectionFactory.setUsername("admin");  
  23.         connectionFactory.setPassword("admin");  
  24.         connectionFactory.setPort(5672);  
  25.   
  26.         Thread consumerThread = new Thread(new Consumer());  
  27.         Thread publisherThread = new Thread(new Publisher());  
  28.         // 开启消费者线程  
  29.   
  30.         consumerThread.start();  
  31.         // 开启生产者线程  
  32.         publisherThread.start();  
  33.     }  
  34.   
  35.     // 消息发布者  
  36.     static class Publisher implements Runnable {  
  37.         public void run() {  
  38.             try {  
  39.                 long startTime = System.currentTimeMillis();  
  40.                 // 连接并非线程安全的,所以要每线程一个连接  
  41.                 Connection conn = connectionFactory.newConnection();  
  42.                 Channel ch = conn.createChannel();  
  43.                 // 创建一个持久化的,非独享,不自动删除的队列  
  44.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  45.                 // 开启通道上的 publisher acknowledgements  
  46.                 ch.confirmSelect();  
  47.   
  48.                 // 发送持久化消息,消息内容为helloWorld  
  49.                 for (long i = 0; i < msgCount; ++i) {  
  50.                     ch.basicPublish("", QUEUE_NAME,  
  51.                             MessageProperties.PERSISTENT_BASIC,  
  52.                             "helloWorld".getBytes());  
  53.                 }  
  54.   
  55.                 // 等待所有消息都被ack或者nack,如果某个消息被nack,则抛出IOException  
  56.                 ch.waitForConfirmsOrDie();  
  57.                 long endTime = System.currentTimeMillis();  
  58.                 System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);  
  59.                           
  60.                 // 删除队列,不论是否在使用中  
  61.                 ch.queueDelete(QUEUE_NAME);  
  62.                 ch.close();  
  63.                 conn.close();  
  64.             } catch (Throwable e) {  
  65.                 System.out.println("damn fuck! error detected :(");  
  66.                 System.out.print(e);  
  67.             }  
  68.         }  
  69.     }  
  70.   
  71.     // 消息消费者  
  72.     static class Consumer implements Runnable {  
  73.         public void run() {  
  74.             try {  
  75.                 // 每线程一个连接  
  76.                 Connection conn = connectionFactory.newConnection();  
  77.                 Channel ch = conn.createChannel();  
  78.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  79.   
  80.                 // 创建消息消费者  
  81.                 QueueingConsumer qc = new QueueingConsumer(ch);  
  82.                 ch.basicConsume(QUEUE_NAME, true, qc);  
  83.                 for (int i = 0; i < msgCount; ++i) {  
  84.                     qc.nextDelivery();  
  85.                 }  
  86.                 // 关闭通道和连接  
  87.                 System.out.println("consumer done");  
  88.                 ch.close();  
  89.                 conn.close();  
  90.             } catch (Throwable e) {  
  91.                 System.out.println("damn fuck! some error happened!");  
  92.                 System.out.print(e);  
  93.             }  
  94.         }  
  95.     }  
  96.   
  97. }  
  98.   
  99. /** Wait until all messages published since the last call have 
  100.   * been either ack'd or nack'd by the broker.  If any of the 
  101.   * messages were nack'd, waitForConfirmsOrDie will throw an 
  102.   * IOException.  When called on a non-Confirm channel, it will 
  103.   * throw an IllegalStateException. 
  104.   * @throws java.lang.IllegalStateException 
  105.   */  
  106. void waitForConfirmsOrDie() throws IOException, InterruptedException;  

原文:
http://www.360doc.com/content/14/0608/22/834950_384932402.shtml    RabbitMQ持久化、事务、消息确认

http://www.tuicool.com/articles/bMJ7r2   RabbitMQ  Publisher的消息确认机制




RabbitMQ  消息持久化、事务、Publisher的消息确认机制

1. 声明MessageQueue
在RabbitMQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。
这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中的信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解:
如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患;
如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。
但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。

Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
  这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。
  其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
  其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
  这种队列适用于只限于一个客户端发送读取消息的应用场景。
b)Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c)Durable:持久化,这个会在后面作为专门一个章节讨论。
d)其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,
仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,
如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

2. 生产者发送消息
3. 消费者订阅消息    

4. 持久化

   RabbitMQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,
   所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。
   通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,
   但是这还不能使得队列中的消息持久化,还需要生产者在发送消息的时候,
   将basicPublish()方法的BasicProperties props参数中deliveryMode设置为2【使用basicPublish(MessageProperties.PERSISTENT_BASIC)】,
   只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。
   这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,
   否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。

5. 事务
   对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,
   因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。
   如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,
   如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。
   当然RabbitMQ也提供了txRollback()命令用于回滚某一个事务。

6. Confirm机制(Publisher Acknowledgements)
   使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,
   这背离了消息队列解耦的本质。RabbitMQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否
   已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,
   然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,
   这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。
   Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,
   会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,
   会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。
   Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,
   生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,
   并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

Java客户端Confirm Mode
[java]  view plain  copy
  1. import java.io.IOException;  
  2. import java.util.concurrent.CountDownLatch;  
  3.   
  4. import com.rabbitmq.client.AMQP.BasicProperties;  
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.DefaultConsumer;  
  9. import com.rabbitmq.client.Envelope;  
  10. import com.rabbitmq.client.MessageProperties;  
  11. import com.rabbitmq.client.QueueingConsumer;  
  12.   
  13. public class ConfirmDontLoseMessages {  
  14.     private final static int msgCount = 10000// 默认发送10000条消息  
  15.     private final static String QUEUE_NAME = "confirm-test2"// 队列名称  
  16.     private static ConnectionFactory connectionFactory;  
  17.   
  18.     public static void main(String[] args)  
  19.             throws IOException, InterruptedException {  
  20.         connectionFactory = new ConnectionFactory();  
  21.         connectionFactory.setHost("10.59.79.37");  
  22.         connectionFactory.setUsername("admin");  
  23.         connectionFactory.setPassword("admin");  
  24.         connectionFactory.setPort(5672);  
  25.   
  26.         Thread consumerThread = new Thread(new Consumer());  
  27.         Thread publisherThread = new Thread(new Publisher());  
  28.         // 开启消费者线程  
  29.   
  30.         consumerThread.start();  
  31.         // 开启生产者线程  
  32.         publisherThread.start();  
  33.     }  
  34.   
  35.     // 消息发布者  
  36.     static class Publisher implements Runnable {  
  37.         public void run() {  
  38.             try {  
  39.                 long startTime = System.currentTimeMillis();  
  40.                 // 连接并非线程安全的,所以要每线程一个连接  
  41.                 Connection conn = connectionFactory.newConnection();  
  42.                 Channel ch = conn.createChannel();  
  43.                 // 创建一个持久化的,非独享,不自动删除的队列  
  44.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  45.                 // 开启通道上的 publisher acknowledgements  
  46.                 ch.confirmSelect();  
  47.   
  48.                 // 发送持久化消息,消息内容为helloWorld  
  49.                 for (long i = 0; i < msgCount; ++i) {  
  50.                     ch.basicPublish("", QUEUE_NAME,  
  51.                             MessageProperties.PERSISTENT_BASIC,  
  52.                             "helloWorld".getBytes());  
  53.                 }  
  54.   
  55.                 // 等待所有消息都被ack或者nack,如果某个消息被nack,则抛出IOException  
  56.                 ch.waitForConfirmsOrDie();  
  57.                 long endTime = System.currentTimeMillis();  
  58.                 System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);  
  59.                           
  60.                 // 删除队列,不论是否在使用中  
  61.                 ch.queueDelete(QUEUE_NAME);  
  62.                 ch.close();  
  63.                 conn.close();  
  64.             } catch (Throwable e) {  
  65.                 System.out.println("damn fuck! error detected :(");  
  66.                 System.out.print(e);  
  67.             }  
  68.         }  
  69.     }  
  70.   
  71.     // 消息消费者  
  72.     static class Consumer implements Runnable {  
  73.         public void run() {  
  74.             try {  
  75.                 // 每线程一个连接  
  76.                 Connection conn = connectionFactory.newConnection();  
  77.                 Channel ch = conn.createChannel();  
  78.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  79.   
  80.                 // 创建消息消费者  
  81.                 QueueingConsumer qc = new QueueingConsumer(ch);  
  82.                 ch.basicConsume(QUEUE_NAME, true, qc);  
  83.                 for (int i = 0; i < msgCount; ++i) {  
  84.                     qc.nextDelivery();  
  85.                 }  
  86.                 // 关闭通道和连接  
  87.                 System.out.println("consumer done");  
  88.                 ch.close();  
  89.                 conn.close();  
  90.             } catch (Throwable e) {  
  91.                 System.out.println("damn fuck! some error happened!");  
  92.                 System.out.print(e);  
  93.             }  
  94.         }  
  95.     }  
  96.   
  97. }  
  98.   
  99. /** Wait until all messages published since the last call have 
  100.   * been either ack'd or nack'd by the broker.  If any of the 
  101.   * messages were nack'd, waitForConfirmsOrDie will throw an 
  102.   * IOException.  When called on a non-Confirm channel, it will 
  103.   * throw an IllegalStateException. 
  104.   * @throws java.lang.IllegalStateException 
  105.   */  
  106. void waitForConfirmsOrDie() throws IOException, InterruptedException;  

原文:
http://www.360doc.com/content/14/0608/22/834950_384932402.shtml    RabbitMQ持久化、事务、消息确认
http://www.tuicool.com/articles/bMJ7r2    RabbitMQ  Publisher的消息确认机制

http://blog.csdn.net/u010558660/article/details/53424394

RabbitMQ  消息持久化、事务、Publisher的消息确认机制

1. 声明MessageQueue
在RabbitMQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。
这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中的信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解:
如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患;
如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。
但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。

Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
  这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。
  其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
  其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
  这种队列适用于只限于一个客户端发送读取消息的应用场景。
b)Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c)Durable:持久化,这个会在后面作为专门一个章节讨论。
d)其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,
仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,
如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

2. 生产者发送消息
3. 消费者订阅消息    

4. 持久化

   RabbitMQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,
   所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。
   通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,
   但是这还不能使得队列中的消息持久化,还需要生产者在发送消息的时候,
   将basicPublish()方法的BasicProperties props参数中deliveryMode设置为2【使用basicPublish(MessageProperties.PERSISTENT_BASIC)】,
   只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。
   这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,
   否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。

5. 事务
   对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,
   因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。
   如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,
   如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。
   当然RabbitMQ也提供了txRollback()命令用于回滚某一个事务。

6. Confirm机制(Publisher Acknowledgements)
   使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,
   这背离了消息队列解耦的本质。RabbitMQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否
   已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,
   然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,
   这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。
   Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,
   会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,
   会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。
   Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,
   生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,
   并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

Java客户端Confirm Mode
[java]  view plain  copy
  1. import java.io.IOException;  
  2. import java.util.concurrent.CountDownLatch;  
  3.   
  4. import com.rabbitmq.client.AMQP.BasicProperties;  
  5. import com.rabbitmq.client.Channel;  
  6. import com.rabbitmq.client.Connection;  
  7. import com.rabbitmq.client.ConnectionFactory;  
  8. import com.rabbitmq.client.DefaultConsumer;  
  9. import com.rabbitmq.client.Envelope;  
  10. import com.rabbitmq.client.MessageProperties;  
  11. import com.rabbitmq.client.QueueingConsumer;  
  12.   
  13. public class ConfirmDontLoseMessages {  
  14.     private final static int msgCount = 10000// 默认发送10000条消息  
  15.     private final static String QUEUE_NAME = "confirm-test2"// 队列名称  
  16.     private static ConnectionFactory connectionFactory;  
  17.   
  18.     public static void main(String[] args)  
  19.             throws IOException, InterruptedException {  
  20.         connectionFactory = new ConnectionFactory();  
  21.         connectionFactory.setHost("10.59.79.37");  
  22.         connectionFactory.setUsername("admin");  
  23.         connectionFactory.setPassword("admin");  
  24.         connectionFactory.setPort(5672);  
  25.   
  26.         Thread consumerThread = new Thread(new Consumer());  
  27.         Thread publisherThread = new Thread(new Publisher());  
  28.         // 开启消费者线程  
  29.   
  30.         consumerThread.start();  
  31.         // 开启生产者线程  
  32.         publisherThread.start();  
  33.     }  
  34.   
  35.     // 消息发布者  
  36.     static class Publisher implements Runnable {  
  37.         public void run() {  
  38.             try {  
  39.                 long startTime = System.currentTimeMillis();  
  40.                 // 连接并非线程安全的,所以要每线程一个连接  
  41.                 Connection conn = connectionFactory.newConnection();  
  42.                 Channel ch = conn.createChannel();  
  43.                 // 创建一个持久化的,非独享,不自动删除的队列  
  44.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  45.                 // 开启通道上的 publisher acknowledgements  
  46.                 ch.confirmSelect();  
  47.   
  48.                 // 发送持久化消息,消息内容为helloWorld  
  49.                 for (long i = 0; i < msgCount; ++i) {  
  50.                     ch.basicPublish("", QUEUE_NAME,  
  51.                             MessageProperties.PERSISTENT_BASIC,  
  52.                             "helloWorld".getBytes());  
  53.                 }  
  54.   
  55.                 // 等待所有消息都被ack或者nack,如果某个消息被nack,则抛出IOException  
  56.                 ch.waitForConfirmsOrDie();  
  57.                 long endTime = System.currentTimeMillis();  
  58.                 System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);  
  59.                           
  60.                 // 删除队列,不论是否在使用中  
  61.                 ch.queueDelete(QUEUE_NAME);  
  62.                 ch.close();  
  63.                 conn.close();  
  64.             } catch (Throwable e) {  
  65.                 System.out.println("damn fuck! error detected :(");  
  66.                 System.out.print(e);  
  67.             }  
  68.         }  
  69.     }  
  70.   
  71.     // 消息消费者  
  72.     static class Consumer implements Runnable {  
  73.         public void run() {  
  74.             try {  
  75.                 // 每线程一个连接  
  76.                 Connection conn = connectionFactory.newConnection();  
  77.                 Channel ch = conn.createChannel();  
  78.                 ch.queueDeclare(QUEUE_NAME, truefalsefalsenull);  
  79.   
  80.                 // 创建消息消费者  
  81.                 QueueingConsumer qc = new QueueingConsumer(ch);  
  82.                 ch.basicConsume(QUEUE_NAME, true, qc);  
  83.                 for (int i = 0; i < msgCount; ++i) {  
  84.                     qc.nextDelivery();  
  85.                 }  
  86.                 // 关闭通道和连接  
  87.                 System.out.println("consumer done");  
  88.                 ch.close();  
  89.                 conn.close();  
  90.             } catch (Throwable e) {  
  91.                 System.out.println("damn fuck! some error happened!");  
  92.                 System.out.print(e);  
  93.             }  
  94.         }  
  95.     }  
  96.   
  97. }  
  98.   
  99. /** Wait until all messages published since the last call have 
  100.   * been either ack'd or nack'd by the broker.  If any of the 
  101.   * messages were nack'd, waitForConfirmsOrDie will throw an 
  102.   * IOException.  When called on a non-Confirm channel, it will 
  103.   * throw an IllegalStateException. 
  104.   * @throws java.lang.IllegalStateException 
  105.   */  
  106. void waitForConfirmsOrDie() throws IOException, InterruptedException;  

原文:
http://www.360doc.com/content/14/0608/22/834950_384932402.shtml    RabbitMQ持久化、事务、消息确认
http://www.tuicool.com/articles/bMJ7r2    RabbitMQ  Publisher的消息确认机制
相关文章
相关标签/搜索