Loading...
My Blog 我的工作和学习笔记

使用Spring AMQP动态创建RabbitMQ连接进行消息的发送和接收

RabbitMQ笔记 2015/12/18 Spring Framework , RabbitMQ

使用Spring AMQP动态创建RabbitMQ连接进行消息的发送和接收


未完待续~~



使用Spring AMQP进行RabbitMQ消息的发送和接收

RabbitMQ笔记 2015/12/18 Spring Framework , RabbitMQ

Spring Framework项目集成Spring AMQP

#安装好RabbitMQ 3.5.6

安装的介绍在这篇文章:http://imethan.cn/blog/article/78


#通过eclipse创建简单的maven项目

#编辑pom.xml,指定编译器的版本

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.1</version>

<configuration>

<source>1.8</source>

<target>1.8</target>

<encoding>UTF-8</encoding>

</configuration>

</plugin>

</plugins>

</build>

#编辑pom.xml,添加maven依赖

<properties>

<spring-rabbit-version>1.4.3.RELEASE</spring-rabbit-version>

<rabbitmq-version>3.5.0</rabbitmq-version>

</properties>

<dependencies>

<!-- rabbitmq begin -->

<dependency>

<groupId>org.springframework.amqp</groupId>

<artifactId>spring-rabbit</artifactId>

<version>${spring-rabbit-version}</version>

</dependency>

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>${rabbitmq-version}</version>

</dependency>

<!-- rabbitmq end -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-test</artifactId>

<version>4.1.7.RELEASE</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.12</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>commons-logging</groupId>

<artifactId>commons-logging</artifactId>

<version>1.2</version>

</dependency>

</dependencies>


#添加rabbitmq.properties文件

#rabbitmq connection information

mq.host=192.168.42.133

mq.port=5673

mq.username=spring

mq.password=spring.com

mq.virtualhost=spring

mq.defaultqueue=defaultqueue


#添加Spring Rabbitmq配置文件“applicationContext-rabbitmq.xml”

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xmlns:p="http://www.springframework.org/schema/p" 

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xsi:schemaLocation="

http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd" 

default-lazy-init="true">

<description>Spring AMQP</description>

<context:annotation-config />

<context:component-scan base-package="cn.imethan" />

<context:property-placeholder location="classpath:/rabbitmq/rabbitmq.properties" />

<!-- 发送确认监听 -->

<bean id="messageConfirmCallback" class="cn.imethan.rabbitmq.callback.MessageConfirmCallback"/>

<!-- 失败回调监听 -->

<bean id="messageReturnCallback" class="cn.imethan.rabbitmq.callback.MessageReturnCallback"/>


<!-- 连接工厂设置 -->

<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">

  <property name="username" value="${mq.username}"/>

  <property name="password" value="${mq.password}"/>

  <property name="virtualHost" value="${mq.virtualhost}"/>

   <property name="host" value="${mq.host}"/>

   <property name="port" value="${mq.port}"/>

   <property name="channelCacheSize" value="25"/>    

   <property name="publisherConfirms" value="true"/>

   <property name="publisherReturns" value="true"/>

</bean>

<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">

<property name="connectionFactory" ref="rabbitConnectionFactory"/>

<!-- 设置确认监听 -->

<property name="confirmCallback" ref="messageConfirmCallback"/>

<!-- 如果mandatory设置为true,但是不能被路由,则由returnCallback监听获取消息 -->

<property name="mandatory" value="true"/>

<!-- 设置失败监听 -->

<property name="returnCallback" ref="messageReturnCallback"/>

   <property name="exchange" value="remoting.exchange"/>    

   <property name="routingKey" value="${mq.defaultqueue}"/>

</bean>

<!-- 队列创建,并且绑定 -->

<rabbit:admin id="admin" connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${mq.defaultqueue}" declared-by="admin" />

<!-- 创建交换机并且绑定 -->

<rabbit:direct-exchange name="remoting.exchange" declared-by="admin">

<rabbit:bindings>

<rabbit:binding queue="${mq.defaultqueue}" key="${mq.defaultqueue}" />

</rabbit:bindings>

</rabbit:direct-exchange>

<!-- 监听设置 -->

<rabbit:listener-container connection-factory="rabbitConnectionFactory">

<rabbit:listener ref="awareRecviveListener" method="listen" queue-names="${mq.defaultqueue}"  />

</rabbit:listener-container>

<!-- 接收监听器 -->

<bean id="awareRecviveListener" class="cn.imethan.rabbitmq.listener.AwareRecviveListener" />

<bean id="simpleRecviveListener" class="cn.imethan.rabbitmq.listener.SimpleRecviveListener" />

</beans>



#创建几个重要的类

MessageConfirmCallback:

public class MessageConfirmCallback implements ConfirmCallback {

@Override

public void confirm(CorrelationData correlationData, boolean ack,String cause) {

System.out.println("[RabbitMQ] MessageConfirmCallback correlationData:"+correlationData);

System.out.println("[RabbitMQ] MessageConfirmCallback             ack:"+ack);

System.out.println("[RabbitMQ] MessageConfirmCallback           cause:"+cause);

if(ack){

System.out.println("[RabbitMQ] send message to rabbitmq success !");

}else{

System.out.println("[RabbitMQ] send message to rabbitmq fail !");

}

}

}

MessageReturnCallback :

public class MessageReturnCallback implements ReturnCallback {

@Override

public void returnedMessage(Message message, int replyCode,

String replyText, String exchange, String routingKey) {

System.out.println("[RabbitMQ] MessageReturnCallback message:"+message);

System.out.println("[RabbitMQ] MessageReturnCallback replyCode:"+replyCode);

System.out.println("[RabbitMQ] MessageReturnCallback replyText:"+replyText);

System.out.println("[RabbitMQ] MessageReturnCallback exchange:"+exchange);

System.out.println("[RabbitMQ] MessageReturnCallback routingKey:"+routingKey);

}

}


AwareRecviveListener:

@Service

public class AwareRecviveListener implements ChannelAwareMessageListener  {

@Autowired

private RabbitTemplate rabbitTemplate;

@Override

public void onMessage(Message message, Channel channel) throws Exception {

MessageProperties messageProperties = message.getMessageProperties() ;

String encoding = messageProperties.getContentEncoding();//编码

String content = new String(message.getBody(),encoding);//内容

String appId = messageProperties.getAppId();//发送时设置的发送方标示appid

System.out.println("[RabbitMQ] recv message -appId:"+appId +" -message:"+content);

String vhost = rabbitTemplate.getConnectionFactory().getVirtualHost();//当前虚拟主机名称

String consumerQueue = messageProperties.getConsumerQueue();//当前接收的队列名称

//业务处理

this.handle(content);

}


/**

* 业务处理

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:15:03

*/

private void handle(String message) {

}

}


SimpleRecviveListener:

@Service

public class SimpleRecviveListener {


/**

* 消息监听

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:14:36

*/

public void listen(String message) {

this.handle(message);

}


/**

* 消息监听

* @param message 消息

* @throws UnsupportedEncodingException

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:14:45

*/

public void listen(byte[] message) throws UnsupportedEncodingException {

this.handle(new String(message, "UTF-8"));

}


/**

* 业务处理

* @param message 消息

*

* @author Ethan Wong

* @create-time 2015年5月9日 下午5:15:03

*/

private void handle(String message) {

System.out.println("[RabbitMQ] recv message:"+message);


}

}


RabbitMqSender:

@Service

public class RabbitMqSender {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发送消息

* @param queue

* @param content

*

* @author Ethan

* @datetime 2015年11月9日 下午8:24:00

*/

public void sendMessage(String queue,String content){

try {

String routingKey = queue;

//发送消息

rabbitTemplate.setQueue(queue);

rabbitTemplate.setRoutingKey(routingKey);

rabbitTemplate.send(this.generateMessage(content,queue));

} catch (AmqpException e) {

e.printStackTrace();

}

}

/**

* 生成消息

* @param content 消息内容

* @param queueName 队列名称

* @return

*

* @author Ethan

* @create-time 2015年5月6日 下午4:20:50

*/

private Message generateMessage(String content,String queueName){

String appId = rabbitTemplate.getConnectionFactory().getVirtualHost()+"/"+queueName;//将虚拟主机和队列名称设置为appId

//发送消息

MessageProperties props = MessagePropertiesBuilder.newInstance()

.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)

.setContentEncoding("UTF-8")

.setTimestamp(new Date()).setHeader("", "")

.setAppId(appId)//设置发送中心系统识别

.build();

Message message = MessageBuilder.withBody(content.getBytes())

.andProperties(props)

.build();

return message;

}

}


#到此处,所有的准备工作都做好了,开始测试,测试类如下:

RabbitMqTest:

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations={"classpath:rabbitmq/applicationContext-rabbitmq.xml"}) 

public class RabbitMqTest{  

@Autowired

private RabbitMqSender rabbitMqSender;

@BeforeClass

public static void initRabbitMq(){

}

/**

* 测试向MQ发送消息

*

* @author Ethan Wong

* @create-time 2015年5月6日 下午2:47:34

*/

@Test

public void testSend(){

rabbitMqSender.sendMessage("defaultqueue", "向RabbitMQ发送消息!!!");

}

@AfterClass

public static void testRecv(){

}

}


源代码后续将和其他部分整理完整后,提交GitHub.


#参考文档

http://projects.spring.io/spring-amqp/

http://docs.spring.io/spring-amqp/docs/1.5.1.RELEASE/reference/html/



CentOS 6.7 配置文件的方式部署RabbitMQ消息中间件集群

RabbitMQ笔记 2015/12/18 RabbitMQ , CentOS

CentOS 6.7 配置文件的方式部署RabbitMQ消息中间件

#准备工作

分别在名为centos1、centos2的Centos 6.7 Linux上安装部署独立节点的RabbitMQ:http://imethan.cn/blog/article/78


#安装完成后停止RabbitMQ节点服务

[root@centos1 bin]# rabbitmqctl stop

[root@centos2 bin]# rabbitmqctl stop


#配置文件方式部署说明

默认安装部署的RabbitMQ并没有生成配置文件,采用的是运行时参数配置和策略(runtime parameters and policies);自定义配置时,需要手动的创建配置文:rabbitmq-env.conf(environment variables)、rabbitmq.config(a configuration file),创建的配置文件置于“/etc/rabbitmq”目录下。


#创建环境变量配置文件:rabbitmq-env.conf(这步配置可以略过)

将节点名称更改为“imethan@centos1”


vi /etc/rabbitmq/rabbitmq-env.conf

#example rabbitmq-env.conf file entries

#Rename the node

NODENAME=rabbit@centos1

#Config file location and new filename bunnies.config

#CONFIG_FILE=/etc/rabbitmq/testdir/bunnies


#创建配置文件:rabbitmq.config

分别在centos1、centos2上创建如下配置文件:

vi /etc/rabbitmq/rabbitmq.config


[

    {mnesia, [{dump_log_write_threshold, 1000}]},

    {rabbit, [{tcp_listeners, [5673]},{cluster_nodes,{['rabbit@centos1','rabbit@centos2'], disc}}]},

    {rabbitmq_management, [{listener, [{port, 45672}]}]}

].


#创建完成后启动centos1上的RabbitMQ Master节点和重置centos2上的RabbitMQ Slave节点

#在Master节点上执行

[root@centos1 bin]# rabbitmq-server -detached


#在Slave节点上执行

[root@centos2 bin]# rabbitmqctl stop_app

[root@centos2 bin]# rabbitmqctl reset

[root@centos2 bin]# rabbitmqctl start_app


#确认集群配置是否成功

[root@centos2 bin]# rabbitmqctl cluster_status

Cluster status of node rabbit@centos2 ...

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos1,rabbit@centos2]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


NameFile descriptors (?)Socket descriptors (?)Erlang processesMemoryDisk spaceInfo+/-
rabbit@centos1
20
1024 available
1
829 available
176
1048576 available
38MB
395MB high watermark
13GB
48MB low watermark
Disc 1 Stats
rabbit@centos2
20
1024 available
1
829 available
175
1048576 available
38MB
395MB high watermark
12GB
48MB low watermark
Disc 1




#参考文档

http://www.rabbitmq.com/configure.html

http://jessesnet.com/development-notes/2015/rabbitmq-cluster/



CentOS 6.7 部署RabbitMQ消息中间件集群

RabbitMQ笔记 2015/12/18 RabbitMQ , CentOS

CentOS 6.7 部署RabbitMQ消息中间件集群

#准备工作

分别在两台名centos1和centos2的CentOS6.7安装RabbitMQ消息中间件节点:详情步骤请查看:http://imethan.cn/blog/article/78

将节点服务更新为“stop”状态


#集群说明

默认安装的RabbitMQ节点会自动生成“Erlang cookie”,该cookie用于识别是否是集群中的一个节点,集群中的全部节点的cookie必须保持一致,否则没办法成为集群中的一个节点。

默认安装生成的cookie位于“/var/lib/rabbitmq/.erlang.cookie”路径或者是“$HOME/.erlang.cookie”路径


#将centos1中的“/var/lib/rabbitmq/.erlang.cookie”复制到centos2的“/var/lib/rabbitmq/”目录下


#分别启动CentOS的RabbitMQ服务

cd /usr/lib/rabbitmq/bin

centos1$ rabbitmq-server -detached

centos2$ rabbitmq-server -detached


#启动过程中出现如下问题:

"Cookie file /var/lib/rabbitmq/.erlang.cookie must be accessible by owner only"

chmod 600 /var/lib/rabbitmq/.erlang.cookie


“Error when reading /var/lib/rabbitmq/.erlang.cookie: eacces”

vi /var/lib/rabbitmq/.erlang.cookie

将centos1中的cookie内容copy到centos2中的cookie


#两个节点独立启动后查看状态

centos1$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos1 ...

[{nodes,[{disc,[rabbit@centos1]}]},

 {running_nodes,[rabbit@centos1]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


centos2$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos2 ...

[{nodes,[{disc,[rabbit@centos2]}]},

 {running_nodes,[rabbit@centos2]},

 {cluster_name,<<"rabbit@centos2.imethan.cn">>},

 {partitions,[]}]


#创建集群,将centos2节点加入centos1集群

centos2$ rabbitmqctl stop_app

Stopping node rabbit@centos2 ...

centos2$ rabbitmqctl join_cluster rabbit@centos1

Clustering node rabbit@centos2 with rabbit@centos1 ...

centos2$ rabbitmqctl start_app

Starting node rabbit@centos2 ...


#查看集群状态

centos1$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos1 ...

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos2,rabbit@centos1]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


centos2$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos2 ...

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos1,rabbit@centos2]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


登录WEB管理界面首页查看集群状态

NameFile descriptors (?)Socket descriptors (?)Erlang processesMemoryDisk spaceInfo+/-
rabbit@centos1
25
1024 available
1
829 available
181
1048576 available
39MB
395MB high watermark
13GB
48MB low watermark
Disc 1 Stats
rabbit@centos2
20
1024 available
1
829 available
175
1048576 available
38MB
395MB high watermark
12GB
48MB low watermark
Disc 1


至此,centos1和centos2的RabbitMQ集群部署完成。



#其他管理操作

#重启节点,本例子集群中只有两个节点,如果将其中的主节点停止,集群的另外节点将切换为主节点

centos1$ rabbitmqctl stop

Stopping and halting node rabbit@centos1 ...


centos2$ rabbitmqctl cluster_status

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos2]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


NameFile descriptors (?)Socket descriptors (?)Erlang processesMemoryDisk spaceInfo+/-
rabbit@centos1
Node not running
rabbit@centos2
25
1024 available
1
829 available
181
1048576 available
38MB
395MB high watermark
12GB
48MB low watermark
Disc 1 Stats


centos1$ rabbitmq-server -detached

centos1$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos1 ...

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos2,rabbit@centos1]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


NameFile descriptors (?)Socket descriptors (?)Erlang processesMemoryDisk spaceInfo+/-
rabbit@centos1
25
1024 available
1
829 available
179
1048576 available
38MB
395MB high watermark
13GB
48MB low watermark
Disc 1
rabbit@centos2
25
1024 available
1
829 available
181
1048576 available
38MB
395MB high watermark
12GB
48MB low watermark
Disc 1 Stats


#从集群中移除一个节点

停止节点

centos2$ rabbitmqctl stop_app

Stopping node rabbit@centos2 ...

重置节点

centos2$ rabbitmqctl reset

Resetting node rabbit@rabbit2 ...done.

启动节点

centos2$ rabbitmqctl start_app

Starting node rabbit@rabbit2 ...done.


查看centos2节点是否已经移除

centos1$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos1 ...

[{nodes,[{disc,[rabbit@centos1]}]},

 {running_nodes,[rabbit@centos1]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


#默认安装的集群节点是“Disc”类型,可以设置节点类型为“Ram”

centos2$ rabbitmqctl stop_app

Stopping node rabbit@centos2 ...done.

centos2$ rabbitmqctl join_cluster --ram rabbit@centos1

Clustering node rabbit@centos2 with [rabbit@centos1] ...done.

centos2$ rabbitmqctl start_app

Starting node rabbit@centos2 ...done.


centos2$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos2 ...

[{nodes,[{disc,[rabbit@centos1]},{ram,[rabbit@centos2]}]},

 {running_nodes,[rabbit@centos1,rabbit@centos2]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]


#修改centos2节点类型为“Ram”,集群中必须存在一个“Disc”类型的节点

centos2$ rabbitmqctl stop_app

Stopping node rabbit@centos2 ...

centos2$ rabbitmqctl change_cluster_node_type disc

Turning rabbit@rabbit2 into a ram node ...

centos2$ rabbitmqctl start_app

Starting node rabbit@centos2 ...


查看状态

centos2$ rabbitmqctl cluster_status

Cluster status of node rabbit@centos2 ...

[{nodes,[{disc,[rabbit@centos1,rabbit@centos2]}]},

 {running_nodes,[rabbit@centos1,rabbit@centos2]},

 {cluster_name,<<"rabbit@centos1.imethan.cn">>},

 {partitions,[]}]






#参考文档

http://www.rabbitmq.com/clustering.html

http://www.cnblogs.com/avril/archive/2010/03/23/1692809.html



CentOS 6.7安装RabbitMQ消息中间件

RabbitMQ笔记 2015/12/18 RabbitMQ , CentOS , Linux

#RabbitMQ是基于Erlang开发的,所以首先安装Erlang环境

下载:erlang-17.4-1.el6.x86_64.rpm 下载地址:http://www.rabbitmq.com/releases/erlang/


#rpm安装erlang

rpm -ivh erlang-17.4-1.el6.x86_64.rpm


#安装RabbitMQ Server

下载:rabbitmq-server-3.5.6-1.noarch.rpm 下载地址:http://www.rabbitmq.com/install-rpm.html


#rpm安装rabbitmq-server

rpm -ivh rabbitmq-server-3.5.6-1.noarch.rpm


#启动rabbitmq-server

/sbin/service rabbitmq-server start


#设置开机启动

chkconfig rabbitmq-server on


#开放防火墙端口

SELinux and similar mechanisms may prevent RabbitMQ from binding to a port. When that happens, RabbitMQ will fail to start. Make sure the following ports can be opened:

  • 4369 (epmd), 25672 (Erlang distribution)
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)


#默认安装说明

默认安装日志路径:/var/log/rabbitmq

默认安装路径:/usr/lib/rabbitmq

默认安装配置文件存放路径:/etc/rabbitmq


#默认访问用户和虚拟主机

默认安装的rabbitmq服务,默认拥有一个账号和密码都是“guest”的用户,并且绑定到虚拟主机“/”,只能在本机(localhost)访问rabbitmq,正式发布时,建议删除“guest”账号和虚拟主机“/


#管理RabbitMQ节点

进入到bin目录:cd /usr/lib/rabbitmq/bin

停止服务:./rabbitmqctl stop

查看状态: ./rabbitmqctl status

更多rabbitmqctl命令:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html


#查看日志

进入到日志目录:cd /var/log/rabbitmq

浏览日志文件:ls

rabbit@centos1.log  rabbit@centos1-sasl.log  startup_err  startup_log

查看日志文件:cat rabbit@centos1.log


#开启UI管理界面插件“Management Plugin

cd /usr/lib/rabbitmq/bin

rabbitmq-plugins enable rabbitmq_management


URL访问地址:http://server-name:15672


#创建管理员用户,负责整个MQ的运维

rabbitmqctl add_user  admin admin  

rabbitmqctl set_user_tags admin administrator  

更多用户管理命令:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html


创建好后,便可以用账号为admin,密码为admin登录http://server-name:15672 进行MQ的维护



#参考文档

http://www.rabbitmq.com/