Figure
activeMQ에는 language, clustering, jmx monitoring, message, embedded broker, Persistence, DB 등 여러 특징이 있습니다.
그 중 embedded Broker, clustering, Message 등의 특징을 활용하여 제공되는 activeMQ Server 대신 직접 서버를 구현하며 여러 특징들을 실습해보았습니다.
버전: activeMQ-all-5.15.2.jar
jdk: jdk 1.8
1. Embeded Broker
이처럼 Broker 객체를 직접 생성하여 서버를 구현해 보았습니다. ( 사실 xbean까지 사용했다면, Spring으로 구성하는 것은 문제 없다고 봅니다.)
(1) Broker 객체 생성 및 설정
public class MQServer {
public static void main(String[] args) {
try {
// ActiveMQConnectionFactory를 사용하고 VM 커넥터를 URI로 사용하여 내장 브로커를 만들 수 있습니다.
//BrokerService broker= ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
//BrokerService broker = BrokerFactory.createBroker("broker:()/master");
BrokerService broker = new BrokerService();
broker.setBrokerName("Broker1");
broker.setUseJmx(true); // check true or false
broker.setPersistent(true);
// broker.addConnector("tcp://localhost:61616");
/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).
TransportConnector conn = new TransportConnector();
conn.setUri(new URI("tcp://localhost:61616"));
// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트
// conn.setUpdateClusterClients(true);
broker.addConnector(conn);
broker.start();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
=> Java 코드를 활용하여 Broker 객체를 직접 만들고, JMX 설정, 메시지가 전송될 TransprotConnector Port 지정, Persistence 설정 등을 할 수 있습니다.
(2) xbean을 통한 Broker 설정
<config.xml>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<kahaDB directory="/123/sharedBrokerData" />
<!-- <levelDB directory="/sharedFileSystem/sharedBrokerData"/> -->
<!-- <amqPersistenceAdapter directory="/sharedFileSystem/sharedBrokerData"/> -->
</persistenceAdapter>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="420 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="250 mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector uri="tcp://localhost:61636" />
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" /> -->
</transportConnectors> <!-- destroy the spring context on shutdown to stop jetty -->
<plugins>
<destinationsPlugin location="/workspace/destinations" />
</plugins>
</broker>
</beans>
=> xml에서 Broker에 대한 설정.
<MQServer_xBean.java>
public class MQSever_Xbean {
public static void main(String[] args) {
try {
//xml file을 직접 읽어 들어와서 Broker를 실행
BrokerService broker = (new XBeanBrokerFactory()).createBroker(new URI("xbean:"+MQSever_Xbean.class.getResource("").getPath()+"../../conf/activeMQ.xml"));
broker.start();
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
=> 위와 같이 제대로 설정 되었다면 다음과 같이 서버가 구동되는 것을 확인할 수 있다.
'Cloud & NoSQL & Middleware > ActiveMQ' 카테고리의 다른 글
#activeMQ (6) - 서버 구현 및 메시지 송수신 테스트 (2) | 2018.03.03 |
---|---|
#activeMQ (5) - Clustering (0) | 2018.03.03 |
#activeMQ (3) - ActiveMQ란? (0) | 2018.03.01 |
#activeMQ (2) - 용어 (0) | 2018.02.25 |
#activeMQ (1) - 버전 정보 (0) | 2018.02.24 |