ActiveMQ 简介
ActiveMQ 是一种消息中间件,更精确地说,是基于 Java 消息服务(JMS, Java Message Service)API 接口协议的一种消息队列(MQ, message queue)的具体实现。
ActiveMQ 是 Apache 基金会下一个开源的、支持多协议的(STOMP、 AMQP、MQTT、 OpenWire 、REST、WS
),由 Java 实现的消息中间件,用于跨平台、跨语言的(C、C++、Java、Python、Go、Js、Ruby、Lua、R 等
)的消息传递,有利于降低系统的耦合度。
ActiveMQ 使用步骤
- 安装&启动 ActiveMQ
- 创建 queue/topic 类型的消息对象
- 使用指定的语言的 SDK,连接 ActiveMQ 服务, 再
生产/消费
指定的queue/topic
下面将说明每一步的具体操作步骤 ↓↓↓
ActiveMQ 安装与启动
Linux 下安装
- 下载地址: 点击Apache官方下载
- 解压:
tar -xvf apache-activemq-X.XX.X
- 解压后的文件列表:
- bin:相关可执行程序
- conf:配置文件
- data:存放数据
- docs:用户手册及文档
- examples:相关配置的例子
- lib:jar 包及扩展包
- webapps:web 页面。
- 将 apache-activemq-X.XX.X 里的 bin 目录加入到 PATH 中
Mac 安装
- 一键安装命令:
brew install activemq
ActiveMQ 启动与关闭
- 启动命令:
- 普通启动:
activemq start
- 带日志启动:
activemq start > {LOG_HOME}/{name}.log
- 普通启动:
- 关闭命令:
activemq stop
创建 queue & topic 消息类型的对象
ActiveMQ 有两种消息类型:queue
,topic
。二者区别如下:
- queue 中的每个消息,只能被消费一次,消费完即消失;而 topic 中的每个消息,可以被每一个订阅了该 topic 的消费者消费。
- queue 消费者可以消费任何时间段的消息; 而 topic 消费者只能消费订阅后才发布的消息。因此, topic 模式下,需要先启动消费者,再启动生产者。
queue 消息类型生成:
- 使用 ActiveMQ 自带的 Web 服务进行创建:
topic 消息类型生成:
- 使用 ActiveMQ 自带的 Web 服务进行创建:
此时,queue/topic 的消息类型对象已经创建完毕,接下来就可以通过特定语言的 SDK ,向指定的消息对象 生产/消费
消息。
连接 ActiveMQ & 生产/消费
消息(Golang 版本)
代码示例
1 | func TestTopic(t *testing.T) { |
效果图:
ActiveMQ 相关参数配置
ActiveMQ 的相关配置文件位于
${ActiveMQ_HOME}/conf
查看 ActiveMQ_HOME 位置: ps -ef |grep mq
配置目录如下所示:
其中 activemq.xml
是最主要配置文件,下面将对此文件重要参数进行解释。
activemq.xml 相关核心参数解释
destinationPolicy(消息目的地策略)
destinationPolicy 用于定义所有消息的相关策略(分发策略、回复策略、消息限制策略)。
destinationPolicy 子标签的包含关系如下:
1 | <!-- 每个具体的策略都被定义在 `policyEntry` 标签里, 被包含在 `destinationPolicy.policyMap.policyEntries` 内 --> |
策略实例如下:
1 | <destinationPolicy> |
下面,对各个策略进行讲解:
1. dispatchPolicy(转发策略)
此策略表明 Broker 将同一个消息发送给不同消费者的顺序,且只对 Topic 有效,转发策略常用的有四种子策略:
RoundRobinDispatchPolicy
:“轮询”,“订阅者”列表默认按照订阅的先后顺序排列,订阅者拿到消息之后,将会被移动到“订阅者”列表尾部,这也意味着“下一条”消息,将会较晚的转发给它。StrictOrderDispatchPolicy
:按照“订阅者”订阅的时间先后,消息依次发送给每个订阅者。它和 RoundRobin 最大的区别是,没有移动“订阅者”顺序的操作。PriorityDispatchPolicy
: 基于 “property” 权重对 “订阅者” 排序。它要求开发者首先需要对每个订阅者指定priority,默认情况下每个消费者的 property 值都一样的。SimpleDispatchPolicy
: 默认值,按照当前“订阅者”列表的顺序。其中 PriorityDispatchPolicy 是其子类。
以 RoundRobinDispatchPolicy
为例:
1 | <!-- 通配符 “>” 表示对所有 topic 均有效 --> |
2. SubscriptionRecoveryPolicy(恢复/回溯策略)
恢复策略只能用于 Topic 消息类型的消费者
默认情况下,订阅者只能获取“订阅”开始之后的消息,而恢复/回溯策略可以让订阅者可以获取其创建之前的消息。
注:该策略所缓存的消息只会发送给 retroactive
类型的消费者,并不会发送给持久订阅者。要想让恢复/回溯策略生效,在创建此 Topic 的消费者时,需要设置 retroactive = true
, 代码如下:
1 | topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true"); |
恢复/回溯策略包括如下几种:
FixedSizedSubscriptionRecoveryPolicy
:保存固定内存空间的消息1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>
</subscriptionRecoveryPolicy>
</policyEntry>FixedCountSubscriptionRecoveryPolicy
:保存固定消息数的消息1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
</subscriptionRecoveryPolicy>
</policyEntry>LastImageSubscriptionRecoveryPolicy
:保留最新的一条数据1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>QueryBasedSubscriptionRecoveryPolicy
:根据查询机制使用回溯,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<queryBasedSubscriptionRecoveryPolicy query="Color='red' AND Name='tom'"/>
</subscriptionRecoveryPolicy>
</policyEntry>TimedSubscriptionRecoveryPolicy
: 保留指定时间内的消息1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>NoSubscriptionRecoveryPolicy
:关闭“恢复机制”,默认。1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>- RetainedMessageSubscriptionRecoveryPolicy: 保留 ActiveMQ.Retain 属性值为 true 的最后1条消息
1
2
3
4
5<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<retainedMessageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
3. DeadLetterStrategy(死信策略)
当 ActiveMQ 多次向客户端发送消息失败后,broker 会将消息放入 死信通道(DeadLetter)
中。如不想抛弃死信队列,默认进入 ACTIVEMQ.DLQ
队列,且不会自动清除。
死信队列提供了可选的策略有: IndividualDeadLetterStrategy
(放入各自的死信通道)、 SharedDeadLetterStrategy
(保存在一个共享的队列(默认)):
IndividualDeadLetterStrategy
:把死信放入各自的死信通道中,死信通道默认前缀是ActiveMQ.DLQ.
。对于 queue 的死信通道前缀为 ActiveMQ.DLQ.Queue.; Topic 的死信前缀为 ActiveMQ.DLQ.Topic.。默认情况下,broker 使用Queue 保存死信,可通过配置指定死信类型为 Topic。1
2
3
4
5
6
7<policyEntry queue="order">
<deadLetterStrategy>
<!-- 从ActiveMQ 5.12开始,deadLetterStrategy 支持 expiration 属性,其值以毫秒为单位 -->
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="false" processExpired="true" expiration="30000" />
</deadLetterStrategy>
</policyEntry>SharedDeadLetterStrategy
:将所有的死信保存在一个共享的队列中,这是 ActiveMQ broker 端默认的策略。共享队列默认为 “ActiveMQ.DLQ”,可以通过 “deadLetterQueue” 属性来设定。1
2
3<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
注:如果开发者不关心死信,可以使用 AcitveMQ 提供的插件 DiscardingDLQBrokerPlugin
来直接抛弃死信,配置用例如下:
1 | <broker> |
4. PendingMessageLimitStrategy:消息限制策略(面向慢消费者)
此策略用于防止 Topic 类型消息发生积压时,而设定额定的消息保留量,此策略只对未持久化的 topic 消息类型有效。
- ConstantPendingMessageLimitStrategy:保留固定条数的消息,如果消息量超过 limit,将使用
MessageEvictionStrategy(消息剔除策略)
移除消息。1
2
3
4
5
6<policyEntry topic="ORDERS.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="50"/>
</pendingMessageLimitStrategy>
</policyEntry> - PrefetchRatePendingMessageLimitStrategy: 保留 prefetchSize 条消息。
这种策略在客户端内存里缓存一定数量得消息, 当某个消费者的 prefetch buffer 已经达到上限,那么 broker 不会再向消费者分发消息,直到 broker 收到消费者发出的确认消息后,确认后的消息将会从缓存中去掉。prefetchSize 通过创建消费者的时候设置,代码如下:1
2
3
4
5
6<policyEntry topic="ORDERS.>">
<pendingMessageLimitStrategy>
<!-- 若 prefetchSize 为100,则保留2.5 * 100条消息 -->
<prefetchRatePendingMessageLimitStrategy multiplier="2.5" />
</pendingMessageLimitStrategy>
</policyEntry>1
2
3// java 版
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue)
5. SlowConsumerStrategy:慢速消费者策略
对于慢消费者,broker 会启动一个后台线程用来检测所有的慢速消费者,并定期的关闭慢消费者。
AbortSlowConsumerStrategy
:中断并关闭慢速消费者1
2
3
4
5
6
7
8
9
10
11<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<slowConsumerStrategy>
<!--
abortConnection:是否关闭底层的连接
(1)false:通过 transport 向 client 发送指令,client 接受后,调用 consumer.close;
(2)true:底层 transport 链接关闭。如果多个消费者共享同一个连接,那么这些消费者会被迫关闭。
maxTimeSinceLastAck: 最后一个ACK 与当前时间间隔的阀值,超过设置的阈值,则中断慢速消费者
-->
<abortSlowConsumerStrategy abortConnection="false" maxTimeSinceLastAck="30000"/>
</slowConsumerStrategy>
</policyEntry>
6. MessageEvictionStrategy:消息剔除策略(面向慢消费者)
对于多余的消息,ActiveMQ 提供了以下子策略来移除:
OldestMessageEvictionStrategy
:移除旧消息,默认策略OldestMessageWithLowestPriorityEvictionStrategy
:旧数据中权重较低的消息,将会被移除UniquePropertyMessageEvictionStrategy
:移除具有指定 property 的旧消息1
2
3
4
5
6
7
8
9
10<!-- 结合上文的 pendingMessageLimitStrategy 一起理解 -->
<policyEntry topic="ORDER.WEIGHT.>">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
<!-- 对于前缀为 ‘ORDER.WEIGHT.’ 的 topic,最多只保留1000条消息,超出时,将 ORDER 值相同的消息列表中较旧的消息移除(只保留最新的一条消息) -->
<messageEvictionStrategy>
<uniquePropertyMessageEvictionStrategy propertyName="ORDER" />
</messageEvictionStrategy>
</policyEntry>
7. PendingQueueStoragePolicy:待消费消息策略(面向慢消费者)
当通道中存在大量的慢消费者时,此时便会产生大量的Pending Message(待消费消息)。对于这些Pending Message,ActiveMQ提供了几种Cursor来保存。
vmQueueCursor
:将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。fileQueueCursor
:将消息保存到临时文件中。文件存储方式有 broker 的 tempDataStore 属性决定。是“持久化消息”的默认设置。storeCursor
:“综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。1
2
3
4
5
6
7
8
9
10<!-- 内存限制为512M,如果超过阀值,则转存 -->
<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
<pendingQueuePolicy>
<storeCursor>
<nonPersistent>
<fileQueueCursor/>
</nonPersistent>
</storeCursor>
</pendingQueuePolicy>
</policyEntry>
systemUsage(存储配置)
用于设置信息在内存、磁盘中占用的大小
1 | <systemUsage> |
主备配置
ActiveMQ 主备主要分三种: Master/Slave
、 文件共享方式
、数据库共享方式
下面将介绍这三种方式的区别:
1. Master/Slave
Master/Slave 方式不需要对 Master Broker 做特殊的配置,只要在 Slave Broker 中指定 Master 即可。其中,指定 Master 有两种方式:
- 方法1:在 broker 节点中添加 :
1
masterConnectorURI=”tcp://localhost:61616″
- 方法2:添加 services 节点,并指定连接的用户名和密码,配置如下:注: 纯 Master/Slave 只允许一个 Slave 连接到 Master 上(即只可运行2台 MQ 的集群),若 Master 夯机,需要停止 Slave 来恢复负载。
1
2
3<services>
<masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/>
</services>
2. 数据库共享方式
采用数据库做消息的持久化,支持多个 Slave,所有 broker 持久化数据源配置成同一个数据源,多个 slave 竞争同一个数据库锁 ,拿到数据库锁的 slave 会变成 master ;当 master 挂了之后,其中的一个 slave 将会立刻获得数据库锁成为 master,由于采用的是数据库做为持久化,它的性能是有限的
3. 文件共享方式
文件共享方式与数据库共享方式类似
,区别在于 broker 的持久化采用的是文件,slave 等待获取的锁是文件锁,它具有更高的性能,但是需要文件共享系统的支持。
Window下共享KahaDB持久化的目录,配置如下:
1 | <persistenceAdapter> |
负载均衡配置
ActiveMQ 可以实现多个 mq 之间进行路由,假设有两个 mq,分别为 brokerA 和 brokerB,当有一条消息发送到 brokerA 的队列 test 中,有一个客户端连接到 brokerB 上,并且要求获取 test 队列的消息时,brokerA 中队列 test 的消息就会路由到brokerB上, 反之brokerB 的消息也会路由到 brokerA。
负载均衡的静态路由的配置如下:
1 | <networkConnectors> |
networkConnector 属性的官方链接:http://activemq.apache.org/networks-of-brokers.html
持久化配置
持久化属性是 persistent
(默认 true ),同时发送的消息也应该是 persitent 类型。
ActiveMQ 消息持久化有三种方式:AMQ
、 KahaDB
、 JDBC
下面对上述三种持久化方式进行解释。
1. AMQ
AMQ 是一种文件存储形式,具有写入速度快和容易恢复的特点。消息存储在文件中,文件的默认大小为32兆,如果一条消息的大小超过了32 兆,那么这个值必须设置大点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。默认配置如下:
1 | <persistenceAdapter> |
2. KahaDB
KahaDB 是基于文件的本地数据库储存形式,虽然没有 AMQ 的速度快,但是它具有强扩展性,恢复的时间比 AMQ 短,从 5.4 版本之后KahaDB 做为默认的持久化方式。默认配置如下:
1 | <persistenceAdapter> |
3. JDBC
配置JDBC适配器:
1 | <persistenceAdapter> |
SQL Server 的持久化配置
1
2
3
4
5
6
7<bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
<property name="serverName" value="SERVERNAME"/>
<property name="portNumber" value="PORTNUMBER"/>
<property name="databaseName" value="DATABASENAME"/>
<property name="user" value="USER"/>
<property name="password" value="PASSWORD"/>
</bean>Oracle 的持久化配置
1
2
3
4
5
6
7
8<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>DB2 的持久化配置:
1
2
3
4
5
6
7
8<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
<property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>