安装RabbitMQ
环境说明:
1.操作系统:centos 7
2.RabbitMq:3.7.3
3.erlang:v20.2.3
4.centos已经关闭防火墙
|
|
下载erlang
下载rabbitmq
开始安装
|
|
使用
[root@localhost rabbitmq]# erl
Erlang/OTP 20 [erts-9.2.1] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.2.1 (abort with ^G)
1>
|
|
[root@localhost rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@localhost rabbitmq]#
|
|
[root@localhost rabbitmq]# service rabbitmq-server status
Redirecting to /bin/systemctl status rabbitmq-server.service
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2018-03-07 10:25:08 CST; 1min 14s ago
Main PID: 1325 (beam.smp)
Status: “Initialized”
CGroup: /system.slice/rabbitmq-server.service
├─1325 /usr/lib64/erlang/erts-9.2.1/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000…
├─1507 /usr/lib64/erlang/erts-9.2.1/bin/epmd -daemon
├─1639 erl_child_setup 1024
├─1659 inet_gethost 4
└─1660 inet_gethost 4
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: ## ##
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: ## ## RabbitMQ 3.7.3. Copyright (C) 2007-201…nc.
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: ########## Licensed under the MPL. See http://ww...om/
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: ###### ##
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: ########## Logs: /var/log/rabbitmq/rabbit@localhost.log
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: /var/log/rabbitmq/rabbit@localhost_upgrade.log
3月 07 10:25:04 localhost.localdomain rabbitmq-server[1325]: Starting broker…
3月 07 10:25:08 localhost.localdomain rabbitmq-server[1325]: systemd unit for activation check: “rabbitmq-serve…ce”
3月 07 10:25:08 localhost.localdomain systemd[1]: Started RabbitMQ broker.
3月 07 10:25:10 localhost.localdomain rabbitmq-server[1325]: completed with 3 plugins.
Hint: Some lines were ellipsized, use -l to show in full.
[root@localhost rabbitmq]#
|
|
[root@localhost rabbitmq]# cd /var/log/rabbitmq/
[root@localhost rabbitmq]# ls
erl_crash.dump rabbit@localhost.log rabbit@localhost_upgrade.log
log rabbit@localhost.log-20180306.gz rabbit@localhost_upgrade.log-20180306.gz
|
[root@localhost rabbitmq]# cat rabbit@localhost.log
Starting RabbitMQ 3.7.3 on Erlang 20.2.3
Copyright (C) 2007-2018 Pivotal Software, Inc.
Licensed under the MPL. See http://www.rabbitmq.com/
2018-03-07 10:25:04.157 [info] <0.247.0>
node : rabbit@localhost
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash : kdRZlH6EzW+h0o2u3onyUg==
log(s) : /var/log/rabbitmq/rabbit@localhost.log
: /var/log/rabbitmq/rabbit@localhost_upgrade.log
database dir : /var/lib/rabbitmq/mnesia/rabbit@localhost
2018-03-07 10:25:08.222 [info] <0.255.0> Memory high watermark set to 390 MiB (409041305 bytes) of 975 MiB (1022603264 bytes) total0.255.0>0.247.0>
|
|
[root@localhost rabbitmq]# cd /etc/rabbitmq/
[root@localhost rabbitmq]# vim rabbitmq.config
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [“admin”]}]}
].
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
~
“rabbitmq.config” 3L, 70C 3,1 全部
|
|
rabbitmqctl list_users
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p “/“ admin ‘.‘ ‘.‘ ‘.*’
rabbitmqctl set_user_tags admin administrator
rabbitmq-plugins enable rabbitmq_management
|
|
|
|
public class DirectProducer {
public static final String QUEUE_NAME = "com.szl.direct.que";
public static final String DIRECT_EXCHANGE_NAME = "com.szl.direct.exchange";
public static final String ROUTING_KEY = "com.szl.direct.que.routing";
public static void main(String[] args){
// 创建连接工厂
ConnectionFactory factory = RabbitMqBase.getConnectionFactoryInstance();
Connection connection = null;
Channel channel = null;
// 设置rabbitmq
factory.setHost("192.168.56.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
try {
// 创建一个连接
connection= RabbitMqBase.getConnection(factory);
// 创建一个通道
channel = RabbitMqBase.getChannel(connection);
// 声明一个交换机,这里使用DIRECT
channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列绑定到交换机
channel.queueBind(QUEUE_NAME,DIRECT_EXCHANGE_NAME,ROUTING_KEY);
// 发送消息到队列
System.out.println("-----写入消息队列开始------");
for (int i = 1; i < 11; i++) {
String message = "Hello rabbitmq(" + i + ")";
channel.basicPublish(DIRECT_EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
}
System.out.println("-----写入消息队列完成------");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
finally {
try {
RabbitMqBase.close(connection,channel);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
|
|
public class DirectCustomer extends CustomerBase {
public static void main(String[] args){
// 创建连接工厂
ConnectionFactory factory = RabbitMqBase.getConnectionFactoryInstance();
// 设置rabbitmq
factory.setHost("192.168.56.128");
factory.setPort(5672);
// 创建一个连接
Connection connection = null;
try {
connection = RabbitMqBase.getConnection(factory);
// 创建一个通道
Channel channel = RabbitMqBase.getChannel(connection);
// 声明一个交换机,这里使用DIRECT
channel.exchangeDeclare(DirectProducer.DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
// 声明一个队列
channel.queueDeclare(DirectProducer.QUEUE_NAME, true, false, false, null);
// 监听一个通道
Consumer consumer = defaultConsumer(channel);
// 自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(DirectProducer.QUEUE_NAME, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
|
|
|
|
<?xml version=”1.0” encoding=”UTF-8”?>
|
|
<?xml version=”1.0” encoding=”UTF-8”?>
|
|
@Service(“mqProvider”)
public class MqProvider {
@Resource(name="amqpTemplate")
private AmqpTemplate amqpTemplate;
public void sendMsg() throws UnsupportedEncodingException {
System.out.println("spring:开始写入消息队列");
for (int i = 0; i < 10 ; i++) {
String msg = "Hello Spring-RabbitMq("+i+")";
Message message = MessageBuilder.withBody(msg.getBytes("utf-8"))
.setMessageId(System.currentTimeMillis() + "")
.build();
amqpTemplate.send(message);
}
System.out.println("spring:写入消息队列完成");
}
}
|
|
public class MqCustomer implements MessageListener{
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
|
|
public class RabbitMqTest {
private ApplicationContext applicationContext = null;
@Before
public void init(){
applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
}
@Test
public void mqTest() throws InterruptedException, UnsupportedEncodingException {
MqProvider mqProvider = (MqProvider) applicationContext.getBean("mqProvider");
mqProvider.sendMsg();
// 暂停一下,让消费者去处理
Thread.sleep(6000);
}
}
```