喵星之旅-狂奔的兔子-rabbitmq和spring整合

一、创建maven项目

这里使用的是社区版idea和jdk8。

Alt text

二、导入依赖

包含spring、rabbitmq等

最后的pom文件内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>club.kittybunny</groupId>
<artifactId>springrabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- spring版本号 -->
<spring.version>4.3.14.RELEASE</spring.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
<!-- junit版本号 -->
<junit.version>4.10</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>

<!--rabbitmq依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<!-- 日志文件管理包 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- log end -->

<!--单元测试依赖 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>

三、编写监听者(消费者)

创建类,实现接口MessageListener

这里创建了两个消费者类,其中message包含数据的全部内容,使用new String(message.getBody())可以获得传输的具体内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package club.kittybunny.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @Author: bunny
* @Description: 我是兔子我会喵,我叫喵星兔
*/
public class ConsumerOne implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ConsumerOne.class);
public void onMessage(Message message) {
logger.info("第一个消费者收到的信息 : " + message);
logger.info("第一个消费者收到的信息 : " + new String(message.getBody()));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package club.kittybunny.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @Author: bunny
* @Description: 我是兔子我会喵,我叫喵星兔
*/
public class ConsumerTwo implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ConsumerTwo.class);
public void onMessage(Message message) {
logger.info("第二个消费者收到的信息 : " + message);
logger.info("第二个消费者收到的信息 : " + new String(message.getBody()));
}
}

四、编写生产者类

这个类没有类型要求,@Service注册为bean,需要注入AmqpTemplate实现和交换机的连接,并使amqpTemplate.convertAndSend发送信息。

对于convertAndSend方法:

如果3个参数,第一个参数是交换机名字,第二个是路由key,第三个是发送的信息。

如果两个参数,就是省略了第一个参数,当没有默认交换机时会数据直接丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package club.kittybunny.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

/**
* @Author: bunny
* @Description: 我是兔子我会喵,我叫喵星兔
*/
@Service
public class Producer {
private Logger logger = LoggerFactory.getLogger(Producer.class);

@Autowired
@Qualifier("amqpTemplate")
private AmqpTemplate amqpTemplate;

public void sendMessage(Object message) {
logger.info("Send message:" + message);
amqpTemplate.convertAndSend("MY_DIRECT_EXCHANGE", "FirstKey", "1" + message);
amqpTemplate.convertAndSend("MY_DIRECT_EXCHANGE", "SecondKey", "2" + message);
amqpTemplate.convertAndSend("SecondKey", "3" + message);//数据直接丢失
}
}

五、编写配置文件

首先是log4j.properties 其中linux的路径改成相应系统对应格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
log4j.rootLogger=INFO,consoleAppender,fileAppender
log4j.category.ETTAppLogger=DEBUG, ettAppLogFile
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.Threshold=TRACE
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss SSS} ->[%t]--[%-5p]--[%c{1}]--%m%n
log4j.appender.fileAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.fileAppender.File=/home/bunny/logsforlog4j/debug1.log
log4j.appender.fileAppender.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.fileAppender.Threshold=TRACE
log4j.appender.fileAppender.Encoding=BIG5
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss SSS}-->[%t]--[%-5p]--[%c{1}]--%m%n
log4j.appender.ettAppLogFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ettAppLogFile.File=/home/bunny/logsforlog4j/ettdebug.log
log4j.appender.ettAppLogFile.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.ettAppLogFile.Threshold=DEBUG
log4j.appender.ettAppLogFile.layout=org.apache.log4j.PatternLayout
log4j.appender.ettAppLogFile.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss SSS}-->[%t]--[%-5p]--[%c{1}]--%m%n

然后是applicationContext.xml 将rabbit的配置单独放在一个文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!--将rabbit的配置单独放在一个文件中-->
<import resource="classpath*:rabbitMQ.xml" />

<!-- 扫描指定package下所有带有如 @Controller,@Service,@Resource 并把所注释的注册为Spring Beans -->
<context:component-scan base-package="club.kittybunny.*" />

<!-- 激活annotation功能 -->
<context:annotation-config />

<!-- 激活annotation功能 -->
<context:spring-configured />
</beans>

最后是最主要的rabbitMQ.xml ,需要配置的内容略了交换机的绑定。这部分单独配置的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/mybunny" username="bunny" password="bunny" host="127.0.0.1" port="5672" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成
RabbitAdmin主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等。-->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

<!--定义rabbit template用于数据的接收和发送 可使用exchange="MY_DIRECT_EXCHANGE" 指定默认交换机-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

<!--消息接收者 -->

<bean id="messageReceiver" class="club.kittybunny.consumer.ConsumerOne"></bean>

<!--定义queue -->

<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

<!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->

<rabbit:listener-container connection-factory="connectionFactory">

<rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />

</rabbit:listener-container>

<!-- 消息接收者 -->

<bean id="receiverSecond" class="club.kittybunny.consumer.ConsumerTwo"></bean>

<!--定义queue -->

<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->

<rabbit:listener-container connection-factory="connectionFactory">

<rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />

</rabbit:listener-container>

</beans>

六、测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package club.kittybunny.test;

import club.kittybunny.producer.Producer;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
* @Author: bunny
* @Description: 我是兔子我会喵,我叫喵星兔
*/
public class RabbitTest {

private ApplicationContext context = null;

@Test
public void sendMessage() {
context = new ClassPathXmlApplicationContext("applicationContext.xml");
Producer messageProducer = (Producer) context.getBean("producer");
int k = 100;
while (k > 0) {
messageProducer.sendMessage("第" + k + "次发送的消息");
k--;
try {
Thread.sleep(1000);
//Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
//return;
}
}

}

至此结束。

最后的项目:

Alt text

如果想要配置交换机的绑定可以参考下面内容,绑定关系不应该由程序决定,应单独申请。

直连的:

1
2
3
4
5
6
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

topic:

1
2
3
4
5
6
<!-- 定义topic exchange,绑定MY_THIRD_QUEUE,注意关键词是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

fanout:

1
2
3
4
5
6
7
<!-- 定义fanout exchange,绑定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true" declared-by="connectAdmin" >
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
<rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>

项目地址:

ps:项目地址 svn://www.kittybunny.cn/kitty/2%E3%80%81code/springrabbitmq   用户名密码:reader/reader

文章目录
  1. 一、创建maven项目
  2. 二、导入依赖
  3. 三、编写监听者(消费者)
  4. 四、编写生产者类
  5. 五、编写配置文件
  6. 六、测试类
|