ActiveMQ 的安装&简单使用&参数详解(Golang版)

ActiveMQ 简介

ActiveMQ 是一种消息中间件,更精确地说,是基于 Java 消息服务(JMS, Java Message Service)API 接口协议的一种消息队列(MQ, message queue)的具体实现。
ActiveMQ 是 Apache 基金会下一个开源的、支持多协议的(STOMP、 AMQP、MQTT、 OpenWire 、REST、WS),由 Java 实现的消息中间件,用于跨平台、跨语言的(C、C++、Java、Python、Go、Js、Ruby、Lua、R 等)的消息传递,有利于降低系统的耦合度。

ActiveMQ 使用步骤

  1. 安装&启动 ActiveMQ
  2. 创建 queue/topic 类型的消息对象
  3. 使用指定的语言的 SDK,连接 ActiveMQ 服务, 再 生产/消费 指定的 queue/topic

下面将说明每一步的具体操作步骤 ↓↓↓

ActiveMQ 安装与启动

Linux 下安装

  • 下载地址: 点击Apache官方下载
  • 解压: tar -xvf apache-activemq-X.XX.X
  • 解压后的文件列表:
    • bin:相关可执行程序
    • conf:配置文件
    • data:存放数据
    • docs:用户手册及文档
    • examples:相关配置的例子
    • lib:jar 包及扩展包
    • webapps:web 页面。
  • 将 apache-activemq-X.XX.X 里的 bin 目录加入到 PATH 中

Mac 安装

  • 一键安装命令: brew install activemq

ActiveMQ 启动与关闭

  • 启动命令:
    • 普通启动: activemq start
    • 带日志启动:activemq start > {LOG_HOME}/{name}.log
  • 关闭命令:activemq stop

创建 queue & topic 消息类型的对象

ActiveMQ 有两种消息类型:queuetopic。二者区别如下:

  • queue 中的每个消息,只能被消费一次,消费完即消失;而 topic 中的每个消息,可以被每一个订阅了该 topic 的消费者消费。
  • queue 消费者可以消费任何时间段的消息; 而 topic 消费者只能消费订阅后才发布的消息。因此, topic 模式下,需要先启动消费者,再启动生产者。

queue 消息类型生成:

  • 使用 ActiveMQ 自带的 Web 服务进行创建:
    2ae1093bdf78dff2bfe5270d2aaae56d.jpeg

topic 消息类型生成:

  • 使用 ActiveMQ 自带的 Web 服务进行创建:
    c6a824c7a74224ca326848d8b9ef2fe9.png

此时,queue/topic 的消息类型对象已经创建完毕,接下来就可以通过特定语言的 SDK ,向指定的消息对象 生产/消费 消息。

连接 ActiveMQ & 生产/消费 消息(Golang 版本)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func TestTopic(t *testing.T)  {
//本例是使用 stomp 协议与 ActiveMQ 进行连接,stomp 的默认端口是 61613;
stompConn, err := stomp.Dial("tcp", "10.18.23.10:61613")
defer stompConn.Disconnect()

// 订阅指定的消息对象,并消费对象
/*
ActiveMQ 通过订阅对象的前缀名来判断是 queue 类型还是 topic 类型
例如:
(1)"/queue/zhaobiao" 则是 queue 类型
(2)"/topic/zhaobiao" 则是 topic 类型
*/
destination := "/topic/zhaobiao"
// 非独占式消费,会使得 Messages Enqueued 增加, Messages Dequeued 也增加
sub, err := stompConn.Subscribe(destination, stomp.AckAuto)
// 独占式消费,会使得 Messages Enqueued 增加, 但 Messages Dequeued 不增加
//sub, err := stompConn.Subscribe(topic, stomp.AckClientIndividual, stomp.SubscribeOpt.Header("activemq.exclusive", "true"))
if err != nil {
fmt.Println("subscribe topic error: ", err.Error())
return
}
go func() {
for {
select {
case v := <-sub.C:
fmt.Println("接收到消息: ", string(v.Body))
case <-time.After(time.Second * 3):
fmt.Println("已经有3秒钟没有收到消息了")
return
}
}
}()
time.Sleep(time.Second)

// 生产消息到指定的 消息对象
c := make(chan string)
go func() {
for {
err := stompConn.Send(destination, "text/plain", []byte(<-c))
if err != nil {
fmt.Println("ActiveMq message send error: " + err.Error())
}
}
}()

for i := 0; i < 1000; i++ {
//c <- "this is a msg " + strconv.Itoa(i)
c <- "发出第 " + strconv.Itoa(i) + " 个消息"
}
time.Sleep(time.Second*30)
}

效果图:

7efbddbbc9738bb633b21cb7831c6fd4.jpeg

ActiveMQ 相关参数配置

ActiveMQ 的相关配置文件位于 ${ActiveMQ_HOME}/conf

查看 ActiveMQ_HOME 位置ps -ef |grep mq
223d8fee7daa7bd959cdd4f9e35b15c4.jpeg

配置目录如下所示:

0e8597566253c974ccb5ad43f2f90472.jpeg
其中 activemq.xml 是最主要配置文件,下面将对此文件重要参数进行解释。

activemq.xml 相关核心参数解释

destinationPolicy(消息目的地策略)

destinationPolicy 用于定义所有消息的相关策略(分发策略、回复策略、消息限制策略)。
destinationPolicy 子标签的包含关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 每个具体的策略都被定义在 `policyEntry` 标签里, 被包含在 `destinationPolicy.policyMap.policyEntries` 内 -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry>
<dispatchPolicy>
<!-- strictOrderDispatchPolicy/ -->
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<!-- lastImageSubscriptionRecoveryPolicy/ -->
</subscriptionRecoveryPolicy>
<pendingMessageLimitStrategy>
<!-- constantPendingMessageLimitStrategy limit="1000"/ -->
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

策略实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- queue 消息类型的策略定义 -->
<!-- producerFlowControl = true, 表示“启用监控流量;当消息的内存消耗达到 memoryLimit 时,ActiveMQ 会减慢消息的产生甚至阻塞”;如果为 false, 表示将消息发送到磁盘中,以防止内存溢出
-->
<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb"/>
<!-- topic 消息类型的策略定义 -->
<!-- 每个 policyEntry 可以包含多种策略(分发策略、恢复策略等,详见下文) -->
<policyEntry topic=">" producerFlowControl="true" memoryLimit="5mb">
<!-- 分发策略 -->
<dispatchPolicy>
<!-- 每个消费者会以相同的顺序接收消息,但有性能损失 -->
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!-- 恢复策略 -->
<subscriptionRecoveryPolicy>
<!-- 只保留最新的一条数据 -->
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
<!-- 消息限制策略(面向慢消费者) -->
<pendingMessageLimitStrategy>
<!-- 若消息保留量超过 limit,将使用 MessageEvictionStrategy 剔除旧的消息 -->
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

下面,对各个策略进行讲解:

1. dispatchPolicy(转发策略)

此策略表明 Broker 将同一个消息发送给不同消费者的顺序,且只对 Topic 有效,转发策略常用的有四种子策略:

  • RoundRobinDispatchPolicy:“轮询”,“订阅者”列表默认按照订阅的先后顺序排列,订阅者拿到消息之后,将会被移动到“订阅者”列表尾部,这也意味着“下一条”消息,将会较晚的转发给它。
  • StrictOrderDispatchPolicy:按照“订阅者”订阅的时间先后,消息依次发送给每个订阅者。它和 RoundRobin 最大的区别是,没有移动“订阅者”顺序的操作。
  • PriorityDispatchPolicy: 基于 “property” 权重对 “订阅者” 排序。它要求开发者首先需要对每个订阅者指定priority,默认情况下每个消费者的 property 值都一样的。
  • SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中 PriorityDispatchPolicy 是其子类。

RoundRobinDispatchPolicy 为例:

1
2
3
4
5
6
<!-- 通配符 “>” 表示对所有 topic 均有效 -->
<policyEntry topic=">">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
2. SubscriptionRecoveryPolicy(恢复/回溯策略)

恢复策略只能用于 Topic 消息类型的消费者
默认情况下,订阅者只能获取“订阅”开始之后的消息,而恢复/回溯策略可以让订阅者可以获取其创建之前的消息。
注:该策略所缓存的消息只会发送给 retroactive 类型的消费者,并不会发送给持久订阅者。要想让恢复/回溯策略生效,在创建此 Topic 的消费者时,需要设置 retroactive = true, 代码如下:

1
2
topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
consumer = session.createConsumer(topic);

恢复/回溯策略包括如下几种:

  • FixedSizedSubscriptionRecoveryPolicy:保存固定内存空间的消息
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • FixedCountSubscriptionRecoveryPolicy:保存固定消息数的消息
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • LastImageSubscriptionRecoveryPolicy:保留最新的一条数据
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <lastImageSubscriptionRecoveryPolicy/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • QueryBasedSubscriptionRecoveryPolicy:根据查询机制使用回溯,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <queryBasedSubscriptionRecoveryPolicy query="Color='red' AND Name='tom'"/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • TimedSubscriptionRecoveryPolicy: 保留指定时间内的消息
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • NoSubscriptionRecoveryPolicy:关闭“恢复机制”,默认。
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <noSubscriptionRecoveryPolicy/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
  • RetainedMessageSubscriptionRecoveryPolicy: 保留 ActiveMQ.Retain 属性值为 true 的最后1条消息
    1
    2
    3
    4
    5
    <policyEntry topic=">">
    <subscriptionRecoveryPolicy>
    <retainedMessageSubscriptionRecoveryPolicy/>
    </subscriptionRecoveryPolicy>
    </policyEntry>
3. DeadLetterStrategy(死信策略)

当 ActiveMQ 多次向客户端发送消息失败后,broker 会将消息放入 死信通道(DeadLetter) 中。如不想抛弃死信队列,默认进入 ACTIVEMQ.DLQ 队列,且不会自动清除。

死信队列提供了可选的策略有: IndividualDeadLetterStrategy(放入各自的死信通道)、 SharedDeadLetterStrategy (保存在一个共享的队列(默认)):

  • IndividualDeadLetterStrategy:把死信放入各自的死信通道中,死信通道默认前缀是 ActiveMQ.DLQ.。对于 queue 的死信通道前缀为 ActiveMQ.DLQ.Queue.; Topic 的死信前缀为 ActiveMQ.DLQ.Topic.。默认情况下,broker 使用Queue 保存死信,可通过配置指定死信类型为 Topic。
    1
    2
    3
    4
    5
    6
    7
    <policyEntry queue="order">  
    <deadLetterStrategy>
    <!-- 从ActiveMQ 5.12开始,deadLetterStrategy 支持 expiration 属性,其值以毫秒为单位 -->
    <individualDeadLetterStrategy
    queuePrefix="DLQ." useQueueForQueueMessages="false" processExpired="true" expiration="30000" />
    </deadLetterStrategy>
    </policyEntry>
  • SharedDeadLetterStrategy:将所有的死信保存在一个共享的队列中,这是 ActiveMQ broker 端默认的策略。共享队列默认为 “ActiveMQ.DLQ”,可以通过 “deadLetterQueue” 属性来设定。
    1
    2
    3
    <deadLetterStrategy>  
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
    </deadLetterStrategy>

注:如果开发者不关心死信,可以使用 AcitveMQ 提供的插件 DiscardingDLQBrokerPlugin 来直接抛弃死信,配置用例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<broker>
...
<!-- 插件定义在 broker 中 -->
<plugins>
<!-- 丢弃所有死信-->
<discardingDLQBrokerPlugindropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
<!-- 丢弃指定死信-->
<!-- <discardingDLQBrokerPlugindropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000" />-->
<!--使用丢弃正则匹配到死信-->
<!--<discardingDLQBrokerPlugindropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000"/>-->
</plugins>
...
</broker>
4. PendingMessageLimitStrategy:消息限制策略(面向慢消费者)

此策略用于防止 Topic 类型消息发生积压时,而设定额定的消息保留量,此策略只对未持久化的 topic 消息类型有效。

  • ConstantPendingMessageLimitStrategy:保留固定条数的消息,如果消息量超过 limit,将使用 MessageEvictionStrategy(消息剔除策略)移除消息。
    1
    2
    3
    4
    5
    6
    <policyEntry topic="ORDERS.>">  
    <!-- lets force old messages to be discarded for slow consumers -->
    <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="50"/>
    </pendingMessageLimitStrategy>
    </policyEntry>
  • PrefetchRatePendingMessageLimitStrategy: 保留 prefetchSize 条消息。
    这种策略在客户端内存里缓存一定数量得消息, 当某个消费者的 prefetch buffer 已经达到上限,那么 broker 不会再向消费者分发消息,直到 broker 收到消费者发出的确认消息后,确认后的消息将会从缓存中去掉。
    1
    2
    3
    4
    5
    6
    <policyEntry topic="ORDERS.>">   
    <pendingMessageLimitStrategy>
    <!-- 若 prefetchSize 为100,则保留2.5 * 100条消息 -->
    <prefetchRatePendingMessageLimitStrategy multiplier="2.5" />
    </pendingMessageLimitStrategy>
    </policyEntry>
    prefetchSize 通过创建消费者的时候设置,代码如下:
    1
    2
    3
    // java 版
    queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
    consumer = session.createConsumer(queue)
5. SlowConsumerStrategy:慢速消费者策略

对于慢消费者,broker 会启动一个后台线程用来检测所有的慢速消费者,并定期的关闭慢消费者。

  • AbortSlowConsumerStrategy:中断并关闭慢速消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">               
    <slowConsumerStrategy>
    <!--
    abortConnection:是否关闭底层的连接
    (1)false:通过 transport 向 client 发送指令,client 接受后,调用 consumer.close;
    (2)true:底层 transport 链接关闭。如果多个消费者共享同一个连接,那么这些消费者会被迫关闭。
    maxTimeSinceLastAck: 最后一个ACK 与当前时间间隔的阀值,超过设置的阈值,则中断慢速消费者
    -->
    <abortSlowConsumerStrategy abortConnection="false" maxTimeSinceLastAck="30000"/>
    </slowConsumerStrategy>
    </policyEntry>
6. MessageEvictionStrategy:消息剔除策略(面向慢消费者)

对于多余的消息,ActiveMQ 提供了以下子策略来移除:

  • OldestMessageEvictionStrategy:移除旧消息,默认策略
  • OldestMessageWithLowestPriorityEvictionStrategy:旧数据中权重较低的消息,将会被移除
  • UniquePropertyMessageEvictionStrategy:移除具有指定 property 的旧消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- 结合上文的 pendingMessageLimitStrategy 一起理解 -->
    <policyEntry topic="ORDER.WEIGHT.>">
    <pendingMessageLimitStrategy>
    <constantPendingMessageLimitStrategy limit="1000"/>
    </pendingMessageLimitStrategy>
    <!-- 对于前缀为 ‘ORDER.WEIGHT.’ 的 topic,最多只保留1000条消息,超出时,将 ORDER 值相同的消息列表中较旧的消息移除(只保留最新的一条消息) -->
    <messageEvictionStrategy>
    <uniquePropertyMessageEvictionStrategy propertyName="ORDER" />
    </messageEvictionStrategy>
    </policyEntry>
7. PendingQueueStoragePolicy:待消费消息策略(面向慢消费者)

当通道中存在大量的慢消费者时,此时便会产生大量的Pending Message(待消费消息)。对于这些Pending Message,ActiveMQ提供了几种Cursor来保存。

  • vmQueueCursor:将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。
  • fileQueueCursor:将消息保存到临时文件中。文件存储方式有 broker 的 tempDataStore 属性决定。是“持久化消息”的默认设置。
  • storeCursor:“综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- 内存限制为512M,如果超过阀值,则转存 -->
    <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">
    <pendingQueuePolicy>
    <storeCursor>
    <nonPersistent>
    <fileQueueCursor/>
    </nonPersistent>
    </storeCursor>
    </pendingQueuePolicy>
    </policyEntry>
systemUsage(存储配置)

用于设置信息在内存、磁盘中占用的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<systemUsage>
<systemUsage>
<!-- memoryUsage 表示 MQ 使用的内存,这个值要大于等于 destinationPolicy 中设置的所有队列的内存之和 -->
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<!-- storeUsage 表示持久化存储文件的大小 -->
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<!-- tempUsage 表示非持久化消息存储的临时内存大小 -->
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
主备配置

ActiveMQ 主备主要分三种: Master/Slave文件共享方式数据库共享方式
下面将介绍这三种方式的区别:

1. Master/Slave

Master/Slave 方式不需要对 Master Broker 做特殊的配置,只要在 Slave Broker 中指定 Master 即可。其中,指定 Master 有两种方式:

  • 方法1:在 broker 节点中添加 :
    1
    masterConnectorURI=”tcp://localhost:61616″
  • 方法2:添加 services 节点,并指定连接的用户名和密码,配置如下:
    1
    2
    3
    <services>
    <masterConnector remoteURI= "tcp://localhost:61616" userName="system" password="manager"/>
    </services>
    注: 纯 Master/Slave 只允许一个 Slave 连接到 Master 上(即只可运行2台 MQ 的集群),若 Master 夯机,需要停止 Slave 来恢复负载。
2. 数据库共享方式

采用数据库做消息的持久化,支持多个 Slave,所有 broker 持久化数据源配置成同一个数据源,多个 slave 竞争同一个数据库锁 ,拿到数据库锁的 slave 会变成 master ;当 master 挂了之后,其中的一个 slave 将会立刻获得数据库锁成为 master,由于采用的是数据库做为持久化,它的性能是有限的

3. 文件共享方式

文件共享方式与数据库共享方式类似,区别在于 broker 的持久化采用的是文件,slave 等待获取的锁是文件锁,它具有更高的性能,但是需要文件共享系统的支持。
Window下共享KahaDB持久化的目录,配置如下:

1
2
3
<persistenceAdapter>
<kahaDB directory="//172.16.1.202/mqdata/kahadb"/>
</persistenceAdapter>
负载均衡配置

ActiveMQ 可以实现多个 mq 之间进行路由,假设有两个 mq,分别为 brokerA 和 brokerB,当有一条消息发送到 brokerA 的队列 test 中,有一个客户端连接到 brokerB 上,并且要求获取 test 队列的消息时,brokerA 中队列 test 的消息就会路由到brokerB上, 反之brokerB 的消息也会路由到 brokerA。
负载均衡的静态路由的配置如下:

1
2
3
<networkConnectors>
<networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>
</networkConnectors>

networkConnector 属性的官方链接:http://activemq.apache.org/networks-of-brokers.html

持久化配置

持久化属性是 persistent (默认 true ),同时发送的消息也应该是 persitent 类型。
ActiveMQ 消息持久化有三种方式:AMQKahaDBJDBC
下面对上述三种持久化方式进行解释。

1. AMQ

AMQ 是一种文件存储形式,具有写入速度快和容易恢复的特点。消息存储在文件中,文件的默认大小为32兆,如果一条消息的大小超过了32 兆,那么这个值必须设置大点。当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。默认配置如下:

1
2
3
<persistenceAdapter>
<amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>
2. KahaDB

KahaDB 是基于文件的本地数据库储存形式,虽然没有 AMQ 的速度快,但是它具有强扩展性,恢复的时间比 AMQ 短,从 5.4 版本之后KahaDB 做为默认的持久化方式。默认配置如下:

1
2
3
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
3. JDBC

配置JDBC适配器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 <persistenceAdapter>
<!--- dataSource 指定持久化数据库的 bean 标签,createTablesOnStartup 是否在启动的时候创建数据表,默认值是 true -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
```
**JDBC 具体的持久化配置是在 `bean` 标签中定义**

下面介绍基于 MYSQL、SQL Server、Oracle、DB2 的持久化配置方案:
* MYSQL 持久化配置如下:
```xml
<!--- 这里的 mysql-ds 与 persistenceAdapter.jdbcPersistenceAdapter.dataSource 对应 -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
  • SQL Server 的持久化配置

    1
    2
    3
    4
    5
    6
    7
    <bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
    <property name="serverName" value="SERVERNAME"/>
    <property name="portNumber" value="PORTNUMBER"/>
    <property name="databaseName" value="DATABASENAME"/>
    <property name="user" value="USER"/>
    <property name="password" value="PASSWORD"/>
    </bean>
  • Oracle 的持久化配置

    1
    2
    3
    4
    5
    6
    7
    8
    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
  • DB2 的持久化配置:

    1
    2
    3
    4
    5
    6
    7
    8
    <bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">
    <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
    <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>