HelloWorld消息模型

HelloWorld模型是RabbitMq七大消息模型中最简单也是最容易理解的消息模型,在RabbitMq中,生产者生产消息,消费者消费消息主要结构如下:
生产者消费者

RabbitMq中,所有程序都运行在RabbitMq Server中,这里的Virtaul Host是虚拟主机,类似于关系型数据库中的库概念,每个VirtualHost之间是相互隔离的,生产者生产消息和消费者消费消息首先是通过TCP虚拟信道传输,一个信道就是一个线程,一个TCP被多个线程共享,RabbitMqTCP端口号为5672,这里的exchange交换机就类似一张路由信息表,Queue则是队列。

为什么不直接使用connection连接而引入信道:
一个应用有多个线程需要从rabbitmq中消费,或是生产消息,那么必然会建立很多个connection ,也就是多个tcp连接,对操作系统而言,建立和销毁tcp连接是很昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现,rabbitmq采用类似nio的做法,连接tcp连接复用,不仅可以减少性能开销,同时也便于管理。每个线程都把持一个信道,所以信道复用了TCP连接。同时rabbitmq可以确保每个线程的私密性,就像拥有独立的连接一样。

Helloworld消息模型:
Helloworld消息模型

P是生产者,C是消费者。中间的红色区域则是消息队列Queue,它本质上就是一个无限大小的缓冲区。多个生产者可以发送消息到同一个队列中,多个消费者也可以从同一个队列中消费消息。此模型类似于点对点传输,生产者将消息放入队列中,消费者直接从队列中消费,不涉及到交换机。

实现Helloworld消息模型

1、导入amqp-client依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>

2、RabbitMq工具类(建立connection以及关闭通道和连接)

public class RabbitMqUtil {
    private static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        //连接Host主机
        connectionFactory.setHost("121.5.29.121");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接的虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123456");
    }

    /**
     * 获取RabbitMq工具类连接对象
     * @return
     */
    public static Connection getConnection(){
        try {
            return connectionFactory.newConnection();
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Channel channel,Connection connection){
        try {
            if(channel != null){
                channel.close();
            }

            if(connection != null){
                connection.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3、生产者生产消息

public class Provider {
    /**
     * 生产消息
     */
    @Test
    public void testSendMessage() throws IOException{
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();

        /**
         * 通道绑定对应的消息队列
         * 参数一queue:队列名称,没有则自动创建
         * 参数二durable:队列是否持久化
         * 参数三exclusive:是否排外,是否只允许一个消费者访问该队列
         * 参数四autoDelete:是否在消费完后自动删除队列
         */
        channel.queueDeclare("hello",true,false,true,null);

        //发布消息
        /**
         * 参数一:交换机名称
         * 参数二:队列名称
         * 参数三:传递消息的额外设置    MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化
         * 参数四:消息的具体内容
         */
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        RabbitMqUtil.close(channel,connection);
    }
}

在通道绑定对应队列时,如果不设置队列持久化(也就是为false)时,当执行systemctl restart rabbitmq-server rabbitmq服务重启时,队列和消息都会删除,如果设置消费完自动删除后,当消费者这个进程结束时,队列自动删除。

注意:生产者和消费者绑定的队列参数必须一样,否则会抛异常

4、消费者消费消息

public class Customer {
    //消费消息
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //通道绑定队列
        channel.queueDeclare("hello",true,false,true,null);
        //消费消息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body)"+new String(body));
            }
        });
    }
}
最后修改:2021 年 02 月 03 日 05 : 04 PM
如果觉得我的文章对你有用,请随意赞赏