테스트
- 다음과 같은 목표로 구현하였습니다.
- Broker1과 Broker2는 Broker3과 NetworkConnection되어있다.
- Broker3은 Master 1,2는 slave 브로커로 구성한다.
시나리오
-> Broker3에 연결된 Producer가 메시지를 생산한다면, Broker1 과 2에 연결된 Consumer들 또한 메시지를 소비할 수 있어야 한다.
<Server>
public class MQServer {
public static void main(String[] args) {
// TODO Auto-generated method stub
//persistence & master slave
// DB 및 Persistence 설정
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
// LevelDBPersistenceAdapter persistenceAdapter = new LevelDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("ActiveMQ_Persistence","shared"));
persistenceAdapter.setUseLock(false);
// jmx managementcontext 생성
ManagementContext managementContext = new ManagementContext();
managementContext.setConnectorPath("/jmxrmi2");
//managementContext.setMBeanServer();
managementContext.setConnectorPort(1099);
managementContext.setJmxDomainName("ActiveMQ Sever");
managementContext.setUseMBeanServer(true);
// 대형 메시징 패브릭의 대규모 확장 성을 제공하기 위해 일반적으로 많은 브로커를 네트워크에 연결하여 모든 논리적으로 연결된 클라이언트를
// 원하는만큼 보유 할 수 있습니다. 필요에 따라 많은 메시지 브로커를 실행할 수 있습니다.
try {
//BrokerService broker = new BrokerService();
BrokerService broker3 = BrokerFactory.createBroker("broker:()/master");
BrokerService broker = BrokerFactory.createBroker("broker:()/slave");
BrokerService broker2 = BrokerFactory.createBroker("broker:()/slave");
//jmx
broker.setManagementContext(managementContext);
broker2.setManagementContext(managementContext);
broker3.setManagementContext(managementContext);
///advisory
List<Object> entries = new ArrayList<>();
PolicyEntry entry1 = new PolicyEntry();
entry1.setAdvisoryForConsumed(true);
entry1.setTopic("TEST");
PolicyEntry entry2 = new PolicyEntry();
entry2.setAdvisoryForConsumed(true);
entry2.setQueue("TEST");
entries.add(entry1);
entries.add(entry2);
PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(entries);
// advisory Message setting
broker.setAdvisorySupport(true);
broker.setDestinationPolicy(policyMap);
broker2.setAdvisorySupport(true);
broker2.setDestinationPolicy(policyMap);
broker3.setAdvisorySupport(true);
broker3.setDestinationPolicy(policyMap);
try {
// broker.addConnector("tcp://localhost:61616");
// java Management Extention
broker.setBrokerName("Broker1");
broker.setUseJmx(true); // check true or false
broker.setPersistent(true);
// broker.addConnector("tcp://localhost:61616");
/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).
TransportConnector conn = new TransportConnector();
conn.setUri(new URI("tcp://localhost:61616"));
// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트
// conn.setUpdateClusterClients(true);
broker.setPersistenceAdapter(persistenceAdapter);
broker.addConnector(conn);
broker.start();
broker2.setBrokerName("Broker2");
broker2.setUseJmx(true); // check true or false
broker2.setPersistent(true);
broker2.setShutdownOnMasterFailure(true);
//broker2.addConnector("tcp://172.10.11.20:61617");
TransportConnector conn2 = new TransportConnector();
conn2.setUri(new URI("tcp://localhost:61617"));
broker2.setPersistenceAdapter(persistenceAdapter);
broker2.addConnector(conn2);
broker2.start();
///////////////////////////////// Network Connect 사용
// String networkConnectorURIs[] =
// {"static:(tcp://localhost:61617)","static:(tcp://localhost:61616)"};
/// 속성
// property default description
// initialReconnectDelay 1000 재접속을 시도하기까지 대기 할 시간 (ms) (useExponentialBackOff가
// false 인 경우)
// maxReconnectDelay 30000 다시 연결을 시도하기 전에 대기 할 시간 (ms)
// useExponentialBackOff true 재 연결 시퀀스의 모든 실패에 대해 재 연결 사이의 시간을 증가시킵니다.
// backOffMultiplier 2 대기 시간 증가 지수에 곱함 useExponentialBackOff true이면
// 정적 발견
/* String networkConnectorURIs[] = {
"static:(tcp://172.10.11.20:61617,tcp://172.10.11.20:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" };
*/
/// static == masterslave
String networkConnectorURIs[] = {
"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };
String transportConnectorURIs[] = { "tcp://localhost:61602" };
broker3.setBrokerName("Broker3");
broker3.setPersistent(true);
broker3.setUseJmx(true);
broker3.setPersistenceAdapter(persistenceAdapter);
broker3.setTransportConnectorURIs(transportConnectorURIs);
broker3.setNetworkConnectorURIs(networkConnectorURIs);
//// persistenceAdapter share file system
// broker3.deleteAllMessages();
//
// // 일반적으로 네트워크 브로커 시작은 순차적으로 시작함 . 비동기 시작을 위한 설정.
broker3.setNetworkConnectorStartAsync(true);
broker3.start();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
=> 서버를 실행하면 61602포트는 Broker3의 메시지 송수신 포트, 61616, 61617 포트는 각각 Broker1,2 와 커넥션 할 준비가 되어있습니다. 또한 서버 내 Broker3은 Network 연결된 것을 확인할 수 있습니다.
<Producer>
-> failover Protocol은 커넥션이 실패한다면 자동으로 다음 로컬주소를 발견하여 커넥션 시도를 합니다.
public class Text_Send {
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is
// on localhost
//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
///61613 에 실패
private static String url = "failover:(tcp://localhost:61602)?randomize=false";
// default broker URL is : tcp://localhost:61616"
private static String subject = "TEST"; // Queue Name.You can create any/many queue names as per your
// requirement.
public static void main(String[] args) throws JMSException {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// Creating a non transactional session to send/receive JMS message.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'JCG_QUEUE' on the JMS server.
// The queue will be created automatically on the server.
//Destination destination = session.createQueue(subject);
Destination destination = session.createTopic(subject);
// MessageProducer is used for sending messages to the queue.
MessageProducer producer = session.createProducer(destination);
for(int i=0; i<100; i++) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// We will send a small text message saying 'Hello World!!!'
//TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");
TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");
// Here we are sending our message!
producer.send(message);
System.out.println("JMS printing@@ '" + message.getText() + "'");
}
connection.close();
}
<Consumer>
public class Receive2 {
// URL of the JMS server
// private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String url ="failover:(tcp://localhost:61617,tcp://localhost:61616,tcp://localhost:61602)";
// default broker URL is : tcp://localhost:61616"
// Name of the queue we will receive messages from
private static String subject = "TEST";
public static void main(String[] args) throws JMSException {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// Creating session for seding messages
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Getting the queue 'JCG_QUEUE'
//Destination destination = session.createQueue(subject);
Destination destination = session.createTopic(subject);
// MessageConsumer is used for receiving (consuming) messages
MessageConsumer consumer = session.createConsumer(destination);
while(true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// Here we receive the message. /// block mode
Message message = consumer.receive();
// We will be using TestMessage in our example. MessageProducer sent us a
// TextMessage
// so we must cast to it to get access to its .getText() method.
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message '" + textMessage.getText() + "'");
}
}
}
=> 차례로 실행하면 61602 포트로 Broker3에 생산된 메시지를 , Broker2에 61616 포트로 연결된 Consumer가 메시지를 소비할 수 있습니다.
<결과>
간단하게 브로커를 직접 만들고 Clustering 기술을 활용하여 서버를 구현하고 테스트하는 실습하였습니다.
이외에도 jmx monitoring , HermesJMS 같은 third party tool과 연계, java 이외에 c, c++ 클라이언트와의 연결 advisory message, Object message, blob message 등의 특성을 활용할 수 있습니다.