Loading...
使用Spring AMQP进行RabbitMQ消息的发送和接收

RabbitMQ笔记 2015/12/18 Spring Framework , RabbitMQ

Spring Framework项目集成Spring AMQP

#安装好RabbitMQ 3.5.6

安装的介绍在这篇文章:http://imethan.cn/blog/article/78


#通过eclipse创建简单的maven项目

#编辑pom.xml,指定编译器的版本

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.1</version>

<configuration>

<source>1.8</source>

<target>1.8</target>

<encoding>UTF-8</encoding>

</configuration>

</plugin>

</plugins>

</build>

#编辑pom.xml,添加maven依赖

<properties>

<spring-rabbit-version>1.4.3.RELEASE</spring-rabbit-version>

<rabbitmq-version>3.5.0</rabbitmq-version>

</properties>

<dependencies>

<!-- rabbitmq begin -->

<dependency>

<groupId>org.springframework.amqp</groupId>

<artifactId>spring-rabbit</artifactId>

<version>${spring-rabbit-version}</version>

</dependency>

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>${rabbitmq-version}</version>

</dependency>

<!-- rabbitmq end -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-test</artifactId>

<version>4.1.7.RELEASE</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.12</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>commons-logging</groupId>

<artifactId>commons-logging</artifactId>

<version>1.2</version>

</dependency>

</dependencies>


#添加rabbitmq.properties文件

#rabbitmq connection information

mq.host=192.168.42.133

mq.port=5673

mq.username=spring

mq.password=spring.com

mq.virtualhost=spring

mq.defaultqueue=defaultqueue


#添加Spring Rabbitmq配置文件“applicationContext-rabbitmq.xml”

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xmlns:p="http://www.springframework.org/schema/p" 

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="

http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd" 

default-lazy-init="true">

<description>Spring AMQP</description>

<context:annotation-config />

<context:component-scan base-package="cn.imethan" />

<context:property-placeholder location="classpath:/rabbitmq/rabbitmq.properties" />

<!-- 发送确认监听 -->

<bean id="messageConfirmCallback" class="cn.imethan.rabbitmq.callback.MessageConfirmCallback"/>

<!-- 失败回调监听 -->

<bean id="messageReturnCallback" class="cn.imethan.rabbitmq.callback.MessageReturnCallback"/>


<!-- 连接工厂设置 -->

<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">

  <property name="username" value="${mq.username}"/>

  <property name="password" value="${mq.password}"/>

  <property name="virtualHost" value="${mq.virtualhost}"/>

   <property name="host" value="${mq.host}"/>

   <property name="port" value="${mq.port}"/>

   <property name="channelCacheSize" value="25"/>    

   <property name="publisherConfirms" value="true"/>

   <property name="publisherReturns" value="true"/>

</bean>

<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">

<property name="connectionFactory" ref="rabbitConnectionFactory"/>

<!-- 设置确认监听 -->

<property name="confirmCallback" ref="messageConfirmCallback"/>

<!-- 如果mandatory设置为true,但是不能被路由,则由returnCallback监听获取消息 -->

<property name="mandatory" value="true"/>

<!-- 设置失败监听 -->

<property name="returnCallback" ref="messageReturnCallback"/>

   <property name="exchange" value="remoting.exchange"/>    

   <property name="routingKey" value="${mq.defaultqueue}"/>

</bean>

<!-- 队列创建,并且绑定 -->

<rabbit:admin id="admin" connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${mq.defaultqueue}" declared-by="admin" />

<!-- 创建交换机并且绑定 -->

<rabbit:direct-exchange name="remoting.exchange" declared-by="admin">

<rabbit:bindings>

<rabbit:binding queue="${mq.defaultqueue}" key="${mq.defaultqueue}" />

</rabbit:bindings>

</rabbit:direct-exchange>

<!-- 监听设置 -->

<rabbit:listener-container connection-factory="rabbitConnectionFactory">

<rabbit:listener ref="awareRecviveListener" method="listen" queue-names="${mq.defaultqueue}"  />

</rabbit:listener-container>

<!-- 接收监听器 -->

<bean id="awareRecviveListener" class="cn.imethan.rabbitmq.listener.AwareRecviveListener" />

<bean id="simpleRecviveListener" class="cn.imethan.rabbitmq.listener.SimpleRecviveListener" />

</beans>



#创建几个重要的类

MessageConfirmCallback:

public class MessageConfirmCallback implements ConfirmCallback {

@Override

public void confirm(CorrelationData correlationData, boolean ack,String cause) {

System.out.println("[RabbitMQ] MessageConfirmCallback correlationData:"+correlationData);

System.out.println("[RabbitMQ] MessageConfirmCallback             ack:"+ack);

System.out.println("[RabbitMQ] MessageConfirmCallback           cause:"+cause);

if(ack){

System.out.println("[RabbitMQ] send message to rabbitmq success !");

}else{

System.out.println("[RabbitMQ] send message to rabbitmq fail !");

}

}

}

MessageReturnCallback :

public class MessageReturnCallback implements ReturnCallback {

@Override

public void returnedMessage(Message message, int replyCode,

String replyText, String exchange, String routingKey) {

System.out.println("[RabbitMQ] MessageReturnCallback message:"+message);

System.out.println("[RabbitMQ] MessageReturnCallback replyCode:"+replyCode);

System.out.println("[RabbitMQ] MessageReturnCallback replyText:"+replyText);

System.out.println("[RabbitMQ] MessageReturnCallback exchange:"+exchange);

System.out.println("[RabbitMQ] MessageReturnCallback routingKey:"+routingKey);

}

}


AwareRecviveListener:

@Service

public class AwareRecviveListener implements ChannelAwareMessageListener  {

@Autowired

private RabbitTemplate rabbitTemplate;

@Override

public void onMessage(Message message, Channel channel) throws Exception {

MessageProperties messageProperties = message.getMessageProperties() ;

String encoding = messageProperties.getContentEncoding();//编码

String content = new String(message.getBody(),encoding);//内容

String appId = messageProperties.getAppId();//发送时设置的发送方标示appid

System.out.println("[RabbitMQ] recv message -appId:"+appId +" -message:"+content);

String vhost = rabbitTemplate.getConnectionFactory().getVirtualHost();//当前虚拟主机名称

String consumerQueue = messageProperties.getConsumerQueue();//当前接收的队列名称

//业务处理

this.handle(content);

}


/**

* 业务处理

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:15:03

*/

private void handle(String message) {

}

}


SimpleRecviveListener:

@Service

public class SimpleRecviveListener {


/**

* 消息监听

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:14:36

*/

public void listen(String message) {

this.handle(message);

}


/**

* 消息监听

* @param message 消息

* @throws UnsupportedEncodingException

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:14:45

*/

public void listen(byte[] message) throws UnsupportedEncodingException {

this.handle(new String(message, "UTF-8"));

}


/**

* 业务处理

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:15:03

*/

private void handle(String message) {

System.out.println("[RabbitMQ] recv message:"+message);


}

}


RabbitMqSender:

@Service

public class RabbitMqSender {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发送消息

* @param queue

* @param content

*

* @author Ethan

* @datetime 2015年11月9日 下午8:24:00

*/

public void sendMessage(String queue,String content){

try {

String routingKey = queue;

//发送消息

rabbitTemplate.setQueue(queue);

rabbitTemplate.setRoutingKey(routingKey);

rabbitTemplate.send(this.generateMessage(content,queue));

} catch (AmqpException e) {

e.printStackTrace();

}

}

/**

* 生成消息

* @param content 消息内容

* @param queueName 队列名称

* @return

*

* @author Ethan

* @create-time 2015年5月6日 下午4:20:50

*/

private Message generateMessage(String content,String queueName){

String appId = rabbitTemplate.getConnectionFactory().getVirtualHost()+"/"+queueName;//将虚拟主机和队列名称设置为appId

//发送消息

MessageProperties props = MessagePropertiesBuilder.newInstance()

.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)

.setContentEncoding("UTF-8")

.setTimestamp(new Date()).setHeader("", "")

.setAppId(appId)//设置发送中心系统识别

.build();

Message message = MessageBuilder.withBody(content.getBytes())

.andProperties(props)

.build();

return message;

}

}


#到此处,所有的准备工作都做好了,开始测试,测试类如下:

RabbitMqTest:

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations={"classpath:rabbitmq/applicationContext-rabbitmq.xml"}) 

public class RabbitMqTest{  

@Autowired

private RabbitMqSender rabbitMqSender;

@BeforeClass

public static void initRabbitMq(){

}

/**

* 测试向MQ发送消息

*

* @author Ethan Wong

* @create-time 2015年5月6日 下午2:47:34

*/

@Test

public void testSend(){

rabbitMqSender.sendMessage("defaultqueue", "向RabbitMQ发送消息!!!");

}

@AfterClass

public static void testRecv(){

}

}


源代码后续将和其他部分整理完整后,提交GitHub.


#参考文档

http://projects.spring.io/spring-amqp/

http://docs.spring.io/spring-amqp/docs/1.5.1.RELEASE/reference/html/




Comments