博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring 集成RabbitMq
阅读量:4948 次
发布时间:2019-06-11

本文共 7560 字,大约阅读时间需要 25 分钟。

Spring 集成RabbitMq 

一、基本配置

1、pom添加以下jar

     
com.fasterxml.jackson.core
jackson-databind
2.7.5
org.springframework.amqp
spring-rabbit
2.1.7.RELEASE

2、spring配置文件springContext.xml添加以下配置

 

3、rabbitmq_producer.xml生产者配置如下(其中配置了exchange的三种类型:fanout,direct,topic)

 

 

3、rabbitmq_consumer.xml消费者配置如下:(其中定义了三种exchange类型对应队列的消费者 ,)

二、编写测试代码(在此只进行Direct类型 交换机测试代码的表写,其他类型仿照此示例即可)

1、定义消息生产者(DirectProducer)

package com.pinghengxing.direct;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;/**  * @author ww */public class DirectProducer {            private static ApplicationContext ac;    public static void sendMessage(String exchange,String routingKey,Object message){        ac = new ClassPathXmlApplicationContext("classpath:com/config/springContext.xml");        RabbitTemplate rt = ac.getBean(RabbitTemplate.class);                for(int i=0;i<10;i++){            rt.convertAndSend(exchange, routingKey, message+""+i);        }    }         public static void main(String[] args) {        DirectProducer.sendMessage("myDirectExchange","direct","路由模式");    }        }

2、定义消息消费者(DirectReceiver1,DirectReceiver1  )-多个消费者

消费者1

package com.pinghengxing.direct;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component("directReceiver")public class DirectReceiver implements ChannelAwareMessageListener{    @Override    public void onMessage(Message message, Channel channel) throws Exception {        System.out.println("************************direct111********************************");        System.out.println("路由模式direct111 接收信息:"+new String(message.getBody()));        System.out.println("********************************************************");        //设置手工应答//        if(true){//            throw new Exception();//        }        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);                        }    }

消费者2

package com.pinghengxing.direct;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component("directReceiver2")public class DirectReceiver2 implements ChannelAwareMessageListener{    @Override    public void onMessage(Message message, Channel channel) throws Exception {        System.out.println("************************direct222********************************");        System.out.println("路由模式direct222 接收信息:"+new String(message.getBody()));        System.out.println("********************************************************");        //设置手工应答//        if(true){//            throw new Exception();//        }        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);                        }    }

 

三、测试

1、进行测试,结果如下:(两个消费者都可以从队列中取到数据,且数据不重复)

 

四、confirm-callback监听(用于监听exchange是否接收成功)

1、在配置工厂连接的时候,设置publisher-confirms="true"

2、在定义rabbitmq模板时,指定confirm-callback的实现类

3、创建实现类ConfirmCallback,实现RabbitTemplate.ConfirmCallback接口

 

package com.pinghengxing.callback;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;/** * confirm-callback监听(用于监听exchange是否接收成功) * @author ww * */@Component("confirmCallback")public class ConfirmCallBack implements  RabbitTemplate.ConfirmCallback{    /**    * CorrelationData 是在发送消息时传入回调方法的参数,可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。    * 通过这个参数,我们可以区分当前是发送哪一条消息时的回调,并通过ack参数来进行失败重发功能    * @param correlationData 回调的相关数据.    * @param ack true for ack, false for nack    * @param cause 专门给NACK准备的一个可选的原因,其他情况为null。    */    public void confirm(CorrelationData correlationData, boolean ack,String cause) {        System.out.println("********************************************************");        System.out.println("exChange确认" + ack + "   " + cause);        System.out.println("********************************************************");    }    }

4、测试

五、returnCallback监听(basicpublish推送消息到queue失败时回调)

1、在配置工厂连接的时候,设置publisher-returns="true"

<rabbit:connection-factory id="rabbitConnectionFactory"

host="140.143.xx.xx" username="ww" password="ww" port="5672"
virtual-host="ww" channel-cache-size="25" cache-mode="CHANNEL"
publisher-confirms="true" publisher-returns="true" connection-timeout="200" />

2、在定义rabbitmq模板时,指定return-callback的实现类,并且设置mandatory="true"

3、创建实现类ReturnCallBack,实现RabbitTemplate.ReturnCallback接口

package com.pinghengxing.callback;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;/** * 推送消息到queue失败时回调 * @author ww * */@Component("returnCallback")public class ReturnCallBack implements RabbitTemplate.ReturnCallback {                @Override        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {            System.out.println("********************************************************");            System.out.println("失败确认:"+message+" | "+replyCode+" | "+replyText+" | "+exchange+" | "+routingKey);            System.out.println("********************************************************");        }}

 4、测试(更改routing_key的值为direct123,由于找不到对应的队列,报以下错误)

六、json转换(可以将map等自动转换成json格式)

1、pom.xml添加以下maven依赖

      
com.fasterxml.jackson.core
jackson-databind
2.7.5

 2、定义消息转换器,转成json格式

 

3、在定义rabbitmq模板时,指定转换器message-converter="jsonMessageConverter"

 

 4、测试,创建map,进行生产,消费者接收到的信息如下:为json格式

 

友情链接:

完整的项目配置下载地址如下:可下载参考

https://files.cnblogs.com/files/pinghengxing/spring_rabbitmq_test.zip

 

转载于:https://www.cnblogs.com/pinghengxing/p/11210295.html

你可能感兴趣的文章
ubuntu下访问windows共享文件夹
查看>>
Beta 答辩总结
查看>>
Code Page Identifiers zz
查看>>
JAVA设计模式初探之装饰者模式
查看>>
c语言中的隐式函数声明(转)
查看>>
洛谷P1402 酒店之王(二分图)
查看>>
微信jssdk实现分享到微信
查看>>
substr函数的基础使用
查看>>
MyEclipse XML & XML架构教程:XML Schema (XSD)编辑器
查看>>
WebLogic中"域"的概念
查看>>
zip文件解压或压缩
查看>>
PCB 线路板人生
查看>>
shell 通过EOF在脚本中输入需要的用户名或密码
查看>>
添加 Android Framework 到 Adt
查看>>
PencilWang博客目录
查看>>
Sentinel spring-cloud-gateway adapter(1.6)异常错误之@EnableCircuitBreaker
查看>>
position
查看>>
struct模块
查看>>
网络流(最大流) CodeForces 546E:Soldier and Traveling
查看>>
[转]dpkg 和 rpm 的常用方法比较
查看>>