一、Work Queue消息模型

该消息模型有一个生产者和多个消费者,多个消费者可以同时消费消息
Work Queue
这个消息模型的特点是RabbitMq会将生产者生产的消息一次性平均分配给消费者,也就是轮询

生产者

public class Provider {
    @Test
    public void test() throws IOException, InterruptedException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i=0;i<10;i++){
            channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+1+": hello workqueues").getBytes());
        }
        RabbitMqUtil.close(channel,connection);
    }
}

消费者一

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer1消费消息:"+new String(body));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

消费者二

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer2消费消息:"+new String(body));
            }
        });
    }
}

进行消费

先对两个消费者进行开启,进入异步监听模式,然后让生产者生产10条消息,将消费者一线程休眠2秒,模拟该业务慢的情况

customer1:
消费者一

customer2:
消费者二
结果是无论是否当某个消费者处理缓慢时,还是一样地平均消费

二、消费机制

前面说到Work Queue是将生产者生产的消息一次性平均分配给消费者,当分配完消息后,它的自动确认机制会一次性全部确认,在官方文档中有这么一段解释:

Message acknowledgment
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

当生产者生产了10个消息,2个消费者平均分到了5个消息,当消费者一消费完3个消息时不明原因宕机了,剩余的2个消息则会丢失,而我们希望由其他的消费者来对这些剩余的消息消费,要是在业务中出现消息丢失可能会造成很严重的后果,所以官方不推荐使用自动消息确认,下面来测试手动消息确认。

生产者

public class Provider {
    @Test
    public void test() throws IOException, InterruptedException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i=0;i<10;i++){
            channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+1+": hello workqueues").getBytes());
        }
        RabbitMqUtil.close(channel,connection);
    }
}

消费者一

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        //每次只确认一条消息
        channel.basicQos(1);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer1消费消息:"+new String(body));
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者一通过线程进行了2秒的休眠,模拟处理业务慢的情况。

消费者二

public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicQos(1);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Customer2消费消息:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

这里我们通过basicQos()设置了每次只确认一次消息,消息被消费完后通过basicAck()手动确认,第一个参数为消息的标识,用来标识信道中投递的消息,RabbitMQ 推送消息给消费者时,会附带一个 Delivery Tag,以便 消费者可以在消息确认时告诉RabbitMQ到底是哪条消息被确认了,第二个参数为是否多消息确认。当某个消费者宕机了,也不会丢失消息,剩余的则分担到其他的消费者身上,这样的设置可以防止消息的丢失,保证了数据的完整性。

消费者消费

customer1:
消费者一
customer2:
消费者二
体现了能者多劳,处理效率快的消费者可以处理较多的消息。

最后修改:2021 年 02 月 04 日 08 : 09 PM
如果觉得我的文章对你有用,请随意赞赏