테스트



- 다음과 같은 목표로 구현하였습니다.

- Broker1과  Broker2는 Broker3과 NetworkConnection되어있다. 

- Broker3은 Master 1,2는 slave 브로커로 구성한다.



시나리오

-> Broker3에 연결된 Producer가 메시지를 생산한다면, Broker1 과  2에 연결된 Consumer들 또한 메시지를 소비할 수 있어야 한다. 




<Server>


public class MQServer {


public static void main(String[] args) {


// TODO Auto-generated method stub


//persistence & master slave 

// DB 및 Persistence 설정

KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();

// LevelDBPersistenceAdapter persistenceAdapter = new LevelDBPersistenceAdapter();

persistenceAdapter.setDirectory(new File("ActiveMQ_Persistence","shared"));

persistenceAdapter.setUseLock(false);

// jmx managementcontext 생성

ManagementContext managementContext = new ManagementContext();

managementContext.setConnectorPath("/jmxrmi2");

//managementContext.setMBeanServer();

managementContext.setConnectorPort(1099);

managementContext.setJmxDomainName("ActiveMQ Sever");

managementContext.setUseMBeanServer(true);

// 대형 메시징 패브릭의 대규모 확장 성을 제공하기 위해 일반적으로 많은 브로커를 네트워크에 연결하여 모든 논리적으로 연결된 클라이언트를

// 원하는만큼 보유 할 수 있습니다. 필요에 따라 많은 메시지 브로커를 실행할 수 있습니다.

try {

//BrokerService broker = new BrokerService();

BrokerService broker3 = BrokerFactory.createBroker("broker:()/master");

BrokerService broker = BrokerFactory.createBroker("broker:()/slave");

BrokerService broker2 = BrokerFactory.createBroker("broker:()/slave");


//jmx

broker.setManagementContext(managementContext);

broker2.setManagementContext(managementContext);

broker3.setManagementContext(managementContext);

///advisory

List<Object> entries = new ArrayList<>();

PolicyEntry entry1 = new PolicyEntry();

entry1.setAdvisoryForConsumed(true);

entry1.setTopic("TEST");

PolicyEntry entry2 = new PolicyEntry();

entry2.setAdvisoryForConsumed(true);

entry2.setQueue("TEST");

entries.add(entry1);

entries.add(entry2);

PolicyMap policyMap = new PolicyMap();

policyMap.setPolicyEntries(entries);

// advisory Message setting

broker.setAdvisorySupport(true);

broker.setDestinationPolicy(policyMap);

broker2.setAdvisorySupport(true);

broker2.setDestinationPolicy(policyMap);

broker3.setAdvisorySupport(true);

broker3.setDestinationPolicy(policyMap);

try {


// broker.addConnector("tcp://localhost:61616");

// java Management Extention


broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.setPersistenceAdapter(persistenceAdapter);

broker.addConnector(conn);

broker.start();

broker2.setBrokerName("Broker2");

broker2.setUseJmx(true); // check true or false

broker2.setPersistent(true);

broker2.setShutdownOnMasterFailure(true);

//broker2.addConnector("tcp://172.10.11.20:61617");

TransportConnector conn2 = new TransportConnector();

conn2.setUri(new URI("tcp://localhost:61617"));

broker2.setPersistenceAdapter(persistenceAdapter);

broker2.addConnector(conn2);

broker2.start();


///////////////////////////////// Network Connect 사용


// String networkConnectorURIs[] =

// {"static:(tcp://localhost:61617)","static:(tcp://localhost:61616)"};


/// 속성

// property default description


// initialReconnectDelay 1000 재접속을 시도하기까지 대기 할 시간 (ms) (useExponentialBackOff가

// false 인 경우)

// maxReconnectDelay 30000 다시 연결을 시도하기 전에 대기 할 시간 (ms)

// useExponentialBackOff true 재 연결 시퀀스의 모든 실패에 대해 재 연결 사이의 시간을 증가시킵니다.

// backOffMultiplier 2 대기 시간 증가 지수에 곱함 useExponentialBackOff true이면


// 정적 발견

/* String networkConnectorURIs[] = {

"static:(tcp://172.10.11.20:61617,tcp://172.10.11.20:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" };

*/

/// static == masterslave

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

//// persistenceAdapter share file system

// broker3.deleteAllMessages();


//


// // 일반적으로 네트워크 브로커 시작은 순차적으로 시작함 . 비동기 시작을 위한 설정.

broker3.setNetworkConnectorStartAsync(true);

broker3.start();



} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}

}


}


=> 서버를 실행하면 61602포트는 Broker3의 메시지 송수신 포트, 61616, 61617 포트는 각각  Broker1,2 와 커넥션 할 준비가 되어있습니다. 또한 서버 내 Broker3은 Network 연결된 것을 확인할 수 있습니다.








<Producer>

-> failover Protocol은 커넥션이 실패한다면 자동으로 다음 로컬주소를 발견하여 커넥션 시도를 합니다. 

public class Text_Send {


// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is

// on localhost

//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

///61613 에 실패

private static String url = "failover:(tcp://localhost:61602)?randomize=false";

// default broker URL is : tcp://localhost:61616"

private static String subject = "TEST"; // Queue Name.You can create any/many queue names as per your

// requirement.


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server and starting it

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();


connection.start();


// Creating a non transactional session to send/receive JMS message.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Destination represents here our queue 'JCG_QUEUE' on the JMS server.

// The queue will be created automatically on the server.

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageProducer is used for sending messages to the queue.

MessageProducer producer = session.createProducer(destination);


for(int i=0; i<100; i++) {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// We will send a small text message saying 'Hello World!!!'

//TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

// Here we are sending our message!

producer.send(message);

System.out.println("JMS printing@@ '" + message.getText() + "'");

}


connection.close();

}



<Consumer>


public class Receive2 {


// URL of the JMS server

// private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private static String url ="failover:(tcp://localhost:61617,tcp://localhost:61616,tcp://localhost:61602)";

// default broker URL is : tcp://localhost:61616"


// Name of the queue we will receive messages from

private static String subject = "TEST";


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();

connection.start();


// Creating session for seding messages

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Getting the queue 'JCG_QUEUE'

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageConsumer is used for receiving (consuming) messages

MessageConsumer consumer = session.createConsumer(destination);

while(true) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// Here we receive the message. /// block mode

Message message = consumer.receive();


// We will be using TestMessage in our example. MessageProducer sent us a

// TextMessage

// so we must cast to it to get access to its .getText() method.

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("Received message '" + textMessage.getText() + "'");

}


}

}



=> 차례로 실행하면 61602 포트로 Broker3에 생산된 메시지를 , Broker2에 61616 포트로 연결된 Consumer가 메시지를 소비할 수 있습니다.


<결과>





간단하게 브로커를 직접 만들고 Clustering 기술을 활용하여 서버를 구현하고 테스트하는 실습하였습니다.

이외에도 jmx monitoring , HermesJMS 같은 third party tool과 연계, java 이외에 c, c++ 클라이언트와의 연결 advisory message, Object message, blob message 등의 특성을 활용할 수 있습니다.



'Cloud & NoSQL & Middleware > ActiveMQ' 카테고리의 다른 글

#activeMQ (5) - Clustering  (0) 2018.03.03
#activeMQ (4) - Broker  (0) 2018.03.02
#activeMQ (3) - ActiveMQ란?  (0) 2018.03.01
#activeMQ (2) - 용어  (0) 2018.02.25
#activeMQ (1) - 버전 정보  (0) 2018.02.24


2.  Clustering


우선 클러스터링에 대해 정의하면, 여러 대의 서버가 하나의 서버가 처리하는 것처럼 병렬 처리 된 서버들 간의 확립된 연결(Establishing Connectivity) 입니다.

더 간단히 요약하면, 군집화 혹은 서버 이중화로 정의하면 되겠습니다.


클러스터링은 장애시스템 조치(fail-over), 로드 밸런싱(부하 분산) 시스템에 병렬 처리 가능한 기술입니다. 








- activeMQ에서는 클러스터링 기술을 적용하기 위해 Broker의 Connector, Persistence, Network of Broker, Master/Slave 특징을 먼저 살펴봐야 합니다.





(1) Connector


- activeMQ에는 Transport Connection과 Network Connection 2가지가 종류가 있다.


 

Transport Connection : 브로커와 클라이언트간 메시지 송수신을 위한 전송 커넥션입니다.

Network Connection: 브로커와 브로커간 서버 클러스터링을 구현하기 위해 필요한 커넥션입니다.








(2) Persistence


- 지속성은 Master Slave 특징과 관련있습니다.

- 브로커 대기열의 데이터의 지속적인 전송을 유지하기 위해 DB 및 file System에 데이터를 저장하고 공유합니다.


( 버전별 라이브러리 사용)

ActiveMQ 5.9 – Replicated LevelDB Store (Apaach Zookeeper)

ActiveMQ 5.8 – LevelDB Store 

ActiveMQ 5.3 – KahaDB

ActiveMQ 4 - JDBC



(3) Network Of Broker


- 아래와 같은 2가지 방법으로 구현할 수 있습니다.



- networkConnector 요소 사용  //정적

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);





- Discovery를 사용하여 브로커를 탐지 //동적


List<TransportConnector> transportList = new ArrayList<>();

//

TransportConnector transport = new TransportConnector();

transport.setDiscoveryUri(new URI("multicast://default"));

transport.setUri(new URI("tcp://localhost:0"));

transportList.add(transport);

String networkConnectorURIs[] = {"multicast://default"};

broker3.setBrokerName("Broker3");

broker3.setPersistent(false);

broker3.setUseJmx(false);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

broker3.setTransportConnectors(transportList);




(4)  Master / Slave


MasterSlave : 메시지가 Slave 브로커에 복제되므로 마스터 시스템파일 시스템 또는 데이터 센터의 치명적인 하드웨어 오류가 발생해도 메시지 손실없이 즉시 Slave로 페일 오버가 수행됩니다.

 - pure master/slave

 - shared filesystem master/slave

 - db matser/slave


 pure master/slave 는 5.8 버전 이후로 삭제되었기 때문에, DB 혹은 파일시스템을  공유하는 master/slave를 사용해야 합니다.





Figure


activeMQ에는 language, clustering, jmx monitoring, message, embedded broker, Persistence, DB 등 여러 특징이 있습니다.


그 중 embedded Broker, clustering, Message 등의 특징을 활용하여 제공되는 activeMQ Server 대신 직접 서버를 구현하며 여러 특징들을 실습해보았습니다.


 버전: activeMQ-all-5.15.2.jar

 jdk: jdk 1.8




1.  Embeded Broker


Java에서 activeMQ Broker Server를 직접 구하겠다면 다음과 같이 3가지 방법으로 구현할 수 있겠습니다.
=> Maven(Spring), xBean, Broker 객체를 직접 생성

 이처럼 Broker 객체를 직접 생성하여 서버를 구현해 보았습니다. ( 사실 xbean까지 사용했다면, Spring으로 구성하는 것은 문제 없다고 봅니다.)



(1) Broker 객체 생성 및 설정

public class MQServer {


public static void main(String[] args) {


try {

 

// ActiveMQConnectionFactory를 사용하고 VM 커넥터를 URI로 사용하여 내장 브로커를 만들 수 있습니다.

//BrokerService broker= ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

//BrokerService broker = BrokerFactory.createBroker("broker:()/master");

 

BrokerService broker = new BrokerService();

broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.addConnector(conn);

broker.start();


} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}



=> Java 코드를 활용하여 Broker 객체를 직접 만들고, JMX 설정, 메시지가 전송될 TransprotConnector Port  지정, Persistence 설정 등을 할 수 있습니다.





(2) xbean을 통한 Broker 설정


<config.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd

  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">

<persistenceAdapter>

<kahaDB directory="/123/sharedBrokerData" />

<!-- <levelDB directory="/sharedFileSystem/sharedBrokerData"/> -->

<!-- <amqPersistenceAdapter directory="/sharedFileSystem/sharedBrokerData"/> -->

</persistenceAdapter>


<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb">

<deadLetterStrategy>

<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />

</deadLetterStrategy>

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

<systemUsage>

<systemUsage>

<memoryUsage>

<memoryUsage limit="420 mb" />

</memoryUsage>

<storeUsage>

<storeUsage limit="1 gb" />

</storeUsage>

<tempUsage>

<tempUsage limit="250 mb" />

</tempUsage>

</systemUsage>

</systemUsage>



<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

<transportConnector uri="tcp://localhost:61636" />

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" /> -->

</transportConnectors> <!-- destroy the spring context on shutdown to stop jetty -->


<plugins>

<destinationsPlugin location="/workspace/destinations" />

</plugins>

</broker>

</beans>


=> xml에서 Broker에 대한 설정.





<MQServer_xBean.java>


public class MQSever_Xbean {

public static void main(String[] args) {

try {



//xml file을 직접 읽어 들어와서 Broker를 실행

BrokerService broker = (new XBeanBrokerFactory()).createBroker(new URI("xbean:"+MQSever_Xbean.class.getResource("").getPath()+"../../conf/activeMQ.xml"));

broker.start();


} catch (URISyntaxException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}



=> 위와 같이 제대로 설정 되었다면 다음과 같이 서버가 구동되는 것을 확인할 수 있다.







activeMQ

  • Apache ActiveMQ 는 가장 대중적이고 강력한 오픈 소스 메세징 그리고 통합 패턴 서버입니다.
  • Apache ActiveMQ는 빠르며, 다양한 언어간의 클라이언트 및 프로토콜을 지원하고, 사용하기 쉬운 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공하면서 JMS 1.1 J2EE 1.4를 완벽하게 지원합니다.
  • MOM(메시지 지향 미들웨어)입니다.
  • ActiveMQJMS를 지원하는 클라이언트를 포함하는 브로커, 자바 뿐만 아니라 다양한 언어를이용하는시스템간의 통신을할수있게해줍니다또한 클러스터링기능 및 DB 그리고 FileSystem을 통해 각 시스템간의 일관성 및 지속성을 유지 시켜줍니다.
  • 간단히 정의하면  클라이언트 간 메시지를 송수신 할 수 있는 오픈 소스 Broker(JMS 서버)입니다.  



=>즉, activeMQ에서 JMS는 핵심요소입니다.






JMS

  • JMS 는 자바 기반의 MOM(메시지 지향 미들웨어) API 이며 둘 이상의 클라이언트 간의 메시지를 보냅니다.

  • JMS 는 자바 플랫폼엔터프라이즈 에디션(EE) 기반이며메시지 생성송수신읽기를 합니다.  또한 비동기적이며 신뢰할 만하고 느슨하게 연결된 서로 다른 분산 어플리케이션 컴포넌트 간의 통신을 허용합니다.
  • JMS의 핵심 개념은 Message Broker 와 Destination 입니다.  


Message Broker : 목적지에 안전하게 메시지를 건네주는 중개자 역할 

Destination: 목적지에 배달될 2가지 메시지 모델 QUEUE, TOPIC 



  • -  Queue: Point to Point ( Consumer 는 메시지를 받기 위해 경쟁합니다.)

  • -  Topic: Publish to Subscribe







ActiveMQ 메시지 처리 구조


 


 => 기본적으로 Message를 생산하는 Producer, activeMQ Broker(Server), Message를 소비하는 Consumer로  구성되어 있습니다.









- QUEUE 모델의 경우 메시지를 받는 Consumer가  다수일 때 연결된 순서로 메시지는 제공됩니다.

- TOPIC 모델의 경우 메시지를 받는 Consumer가 다수일 때 메시지는 모두에게 제공됩니다.





-  Consumer 에서는 메시지를 받을 때까지 block 상태인  Receive() 메소드와 리스너를 통해 nonblack 상태로 메시지를 가져올 수 있습니다.










ActiveMQ의 장점



(1) 분리: 대기열은 시스템 사이에 있으며, 하나의 시스템 장애는 다른 대기열에 영향을 주지 않습니다. 메시지 통신은 대기열을 통해 이루어집니다. 시스템이 가동 중일 때도 계속 작동합니다.


( 클라이언트와 서버간의 연결과 큐 대기열의 역할이 분리)



(2) 복구 지원: 큐의 처리가 실패하면 나중에 메시지를 복원 할 수 있습니다.



(3) 신뢰성: 클라이언트 요청을 처리하는 시스템을 생각해보십시오.정상적인 경우 시스템은 분당 100 건의 요청을 받습니다. 이 시스템은 요청 수가 평균을 넘어서는 경우 신뢰할 수 없습니다. 이 경우 Queue는 요청을 관리 할 수 있으며 시스템 처리량을 기초로 주기적으로 메시지를 전달할 수 있습니다.


(큐에서 들어온 메시지에 대한 처리를 관리하기 때문에 신뢰할 수 있는 시스템)




(4) 비동기 처리: 클라이언트와 서버 통신이 비 차단입니다. 클라이언트가 서버에 요청을 보내면 응답을 기다리지 않고 다른 작업을 수행 할 수 있습니다. 응답을 받으면 클라이언트는 언제든지 처리 할 수 있습니다.







참고 : http://activemq.apache.org




activeMQ를 살펴보기 전에 필요한 기본적인 용어




- 메시지 지향 미들웨어(Message Oriented Middleware : MOM) 

분산 시스템 간 메시지를 주고 받는 기능을 지원하는 소프트웨어나 하드웨어 인프라

 

- 메시지 큐(Message Queue : MQ)

: MOM을 구현한 시스템

 

- 브로커(Broker)

: Message Queue 시스템

 

- AMQP(Advanced Message Queueing Protocol)

: 메시지 지향 미들웨어를 위한 프로토콜


-JMS(Java Message Service)

: 자바 메시지 서비스(Java Message Service; JMS)는 자바 프로그램이 네트워크를 통해 데이터를 송수신하는 자바 API이다.








=> activeMQ는 JMS로 구현한 메시지 지향 미들웨어(MOM)이다.






activeMQ 버전별 Java 최소 기준


 version

Minimum Java 

 ActiveMQ 4.x

 Java 5

 ActiveMQ 5.0 - 5.7

 Java 5 

 ActiveMQ 5.8 - 5.10

 Java 6 

 ActiveMQ 5.11

 Java 7 

 ActiveMQ 5.15.0

 Java 8  


+ Recent posts