2.  Clustering


우선 클러스터링에 대해 정의하면, 여러 대의 서버가 하나의 서버가 처리하는 것처럼 병렬 처리 된 서버들 간의 확립된 연결(Establishing Connectivity) 입니다.

더 간단히 요약하면, 군집화 혹은 서버 이중화로 정의하면 되겠습니다.


클러스터링은 장애시스템 조치(fail-over), 로드 밸런싱(부하 분산) 시스템에 병렬 처리 가능한 기술입니다. 








- activeMQ에서는 클러스터링 기술을 적용하기 위해 Broker의 Connector, Persistence, Network of Broker, Master/Slave 특징을 먼저 살펴봐야 합니다.





(1) Connector


- activeMQ에는 Transport Connection과 Network Connection 2가지가 종류가 있다.


 

Transport Connection : 브로커와 클라이언트간 메시지 송수신을 위한 전송 커넥션입니다.

Network Connection: 브로커와 브로커간 서버 클러스터링을 구현하기 위해 필요한 커넥션입니다.








(2) Persistence


- 지속성은 Master Slave 특징과 관련있습니다.

- 브로커 대기열의 데이터의 지속적인 전송을 유지하기 위해 DB 및 file System에 데이터를 저장하고 공유합니다.


( 버전별 라이브러리 사용)

ActiveMQ 5.9 – Replicated LevelDB Store (Apaach Zookeeper)

ActiveMQ 5.8 – LevelDB Store 

ActiveMQ 5.3 – KahaDB

ActiveMQ 4 - JDBC



(3) Network Of Broker


- 아래와 같은 2가지 방법으로 구현할 수 있습니다.



- networkConnector 요소 사용  //정적

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);





- Discovery를 사용하여 브로커를 탐지 //동적


List<TransportConnector> transportList = new ArrayList<>();

//

TransportConnector transport = new TransportConnector();

transport.setDiscoveryUri(new URI("multicast://default"));

transport.setUri(new URI("tcp://localhost:0"));

transportList.add(transport);

String networkConnectorURIs[] = {"multicast://default"};

broker3.setBrokerName("Broker3");

broker3.setPersistent(false);

broker3.setUseJmx(false);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

broker3.setTransportConnectors(transportList);




(4)  Master / Slave


MasterSlave : 메시지가 Slave 브로커에 복제되므로 마스터 시스템파일 시스템 또는 데이터 센터의 치명적인 하드웨어 오류가 발생해도 메시지 손실없이 즉시 Slave로 페일 오버가 수행됩니다.

 - pure master/slave

 - shared filesystem master/slave

 - db matser/slave


 pure master/slave 는 5.8 버전 이후로 삭제되었기 때문에, DB 혹은 파일시스템을  공유하는 master/slave를 사용해야 합니다.





Figure


activeMQ에는 language, clustering, jmx monitoring, message, embedded broker, Persistence, DB 등 여러 특징이 있습니다.


그 중 embedded Broker, clustering, Message 등의 특징을 활용하여 제공되는 activeMQ Server 대신 직접 서버를 구현하며 여러 특징들을 실습해보았습니다.


 버전: activeMQ-all-5.15.2.jar

 jdk: jdk 1.8




1.  Embeded Broker


Java에서 activeMQ Broker Server를 직접 구하겠다면 다음과 같이 3가지 방법으로 구현할 수 있겠습니다.
=> Maven(Spring), xBean, Broker 객체를 직접 생성

 이처럼 Broker 객체를 직접 생성하여 서버를 구현해 보았습니다. ( 사실 xbean까지 사용했다면, Spring으로 구성하는 것은 문제 없다고 봅니다.)



(1) Broker 객체 생성 및 설정

public class MQServer {


public static void main(String[] args) {


try {

 

// ActiveMQConnectionFactory를 사용하고 VM 커넥터를 URI로 사용하여 내장 브로커를 만들 수 있습니다.

//BrokerService broker= ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

//BrokerService broker = BrokerFactory.createBroker("broker:()/master");

 

BrokerService broker = new BrokerService();

broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.addConnector(conn);

broker.start();


} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}



=> Java 코드를 활용하여 Broker 객체를 직접 만들고, JMX 설정, 메시지가 전송될 TransprotConnector Port  지정, Persistence 설정 등을 할 수 있습니다.





(2) xbean을 통한 Broker 설정


<config.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd

  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">

<persistenceAdapter>

<kahaDB directory="/123/sharedBrokerData" />

<!-- <levelDB directory="/sharedFileSystem/sharedBrokerData"/> -->

<!-- <amqPersistenceAdapter directory="/sharedFileSystem/sharedBrokerData"/> -->

</persistenceAdapter>


<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb">

<deadLetterStrategy>

<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />

</deadLetterStrategy>

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

<systemUsage>

<systemUsage>

<memoryUsage>

<memoryUsage limit="420 mb" />

</memoryUsage>

<storeUsage>

<storeUsage limit="1 gb" />

</storeUsage>

<tempUsage>

<tempUsage limit="250 mb" />

</tempUsage>

</systemUsage>

</systemUsage>



<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

<transportConnector uri="tcp://localhost:61636" />

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" /> -->

</transportConnectors> <!-- destroy the spring context on shutdown to stop jetty -->


<plugins>

<destinationsPlugin location="/workspace/destinations" />

</plugins>

</broker>

</beans>


=> xml에서 Broker에 대한 설정.





<MQServer_xBean.java>


public class MQSever_Xbean {

public static void main(String[] args) {

try {



//xml file을 직접 읽어 들어와서 Broker를 실행

BrokerService broker = (new XBeanBrokerFactory()).createBroker(new URI("xbean:"+MQSever_Xbean.class.getResource("").getPath()+"../../conf/activeMQ.xml"));

broker.start();


} catch (URISyntaxException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}



=> 위와 같이 제대로 설정 되었다면 다음과 같이 서버가 구동되는 것을 확인할 수 있다.






+ Recent posts