개념


I/O (입출력) - 입력 , 출력 / 두 대상 간의 데이터를 주고 받는 것



Stream -  데이터를 운반 하는데 사용되는 연결통로

         -  연속적인 흐름

         -  입출력을 동시에 수행하려면, 2개의 스트림이 필요하다.





간단한 코드지만 아래의 코드를 살펴보자.


import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;


public class FileTest {


public static void main(String args[]) {


FileInputStream fis = null;

FileOutputStream fos = null;


try {

fis = new FileInputStream("log.txt");

fos = new FileOutputStream("back_log.txt");


int data = 0;

byte[] b = new byte[1];


while ((data = fis.read(b)) != -1) {

System.out.println(data);

fos.write(b);

fos.flush();

}


} catch (FileNotFoundException e) {

// TODO Auto-generated catch block

System.out.println("File Not found");

e.printStackTrace();

} catch (IOException e1) {

// TODO Auto-generated catch block

System.out.println("File Not found");

e1.printStackTrace();

}finally {

if(fos!=null) {

try {

fos.close();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

if(fis!=null) {

try {

fis.close();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}


}

}




네트워크, 특히 서버에서 예외 처리와 close() 메소드는 매우 중요하다.


만일 close() 메소드를 try 문에 포함시켜서 한다고 가정하자. 


서버에서 예외가 발생하면 스트림은 닫히지 않고 메모리 누수가 발생할 요소가 있다.


꼼꼼한 예외처리와 try catch, finally 는 자바 네트워크에서 중요한 요소이다.





스트림의 종류


바이트 기반 스트림 


  - inputstream , outputstream


  - 1byte 단위




 문자 기반 스트림

 

  - reader, writer

  -  문자 단위














 개념



Process - 실행중일 프로그램.  쓰레드와 자원으로 구성


Thread -   하나의 작업을 실행하는 작업 단위.




interruptedexception - 서버가 도중에 끊키거나 서버에 문제가 생겼을 때 발생하는 예외



Thread 의 스케쥴링은 자바  jvm 내에서 작업의 순서가 저장된다.


- > 작업의 우선순위를 보장받지 못한다는 말과 같다.





 예제를 보면서 이해해보자..


import Thread.Thread1;

import Thread.Thread2;


public class Test {


public static void main(String[] args) {


Thread1 thread1 = new Thread1();

thread1.start();


Thread thread2 = new Thread(new Thread2());

thread2.start();

Thread thread3 = new Thread();

thread3.start();

int sum = 0;


for (int i = 0; i < 10; i++) {

sum = +i;

try {

Thread.sleep(500);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("Main Thread = " + sum);


}


}

}






코드를 아래와 같은 그림으로 나타 낼 수 있다.








=> 메인 쓰레드에서 쓰레드 3개가 돌고 있다...         쉽다.





또한  setPriority()  메소드를 통해 우선순위를 설정할 수 있다.







멀티쓰레드의 장 단점



 장점

  - 자원을 보다 효율적으로 사용한다.

  - 사용자에 대한 응답성이 향상된다.

  - 작업이 분리되어 코드가 간결해진다.

 단점

  - 동기화에 주의

  - 교착상태가 발생하지 않게 조심 해야한다.

  - 쓰레드가 효율적으로 실행되게끔 고려해야한다.











Thread Groop 


- 관련된 쓰레드를 그룹으로 다루는 것이다.


- 쓰레드 그룹은 반드시 하나 이상 포함된다. 포함하지 않으면 자연스럽게 메인 쓰레드에 포함된다.


- 쓰레드 그룹으로 부터 우선순위를 상속받기 때문에, 같은 쓰레드 그룹에 있는 쓰레드들은 우선순위를 공유한다.. 


(쉽다)










 Daemon Thread



-    어렵게 생각할 것 없다. 데몬은 쓰레드가 계속 실행되고 있는 상태이다.

-    가장 간단하게 while 문으로 무한 루프 돌리면 된다.. 























Netty Framework


네티는 전 세계에서 많은 개발자가 사용하는 범용 자바 네트워크 애플리케이션 프레임워크입니다.


다양한 오픈소스 프레임워크 내부에서 사용되고 있으며, 카카오/라인/애플/트위터 등 서비스 제공 업체에서도 사용됩니다.


네티는 Non-block 비동기 처리가 가능하기에 고성능으로 시스템을 유지할 수 있습니다.


또한, 상당 부분 프로그래머의 귀찮은 작업을 네티가 알아서 처리해줍니다.. ( 멀티 쓰레드 처리와 같은..)


훌륭하게도, 최소 10만 이상의 클라이언트의 접속이 가능합니다. 


Netty의 구조와 주요특징을 정확하게 알고 사용법을 익힌다면 , 효과적으로 네티 프레임워크를 사용할 수 있을 것입니다.




https://netty.io/


네티 공식 사이트에서 네티를 다음과 같이 정의하고 있습니다.



Netty는 비동기 이벤트 기반 네트워크 응용 프로그램 프레임 워크입니다.

유지 보수가 가능한 고성능 프로토콜 서버 및 클라이언트를 신속하게 개발할 수 있습니다.


Netty는 프로토콜 서버 및 클라이언트와 같은 네트워크 응용 프로그램을 빠르고 쉽게 개발할 수있는 NIO 클라이언트 서버 프레임 워크입니다.

 TCP 및 UDP 소켓 서버와 같은 네트워크 프로그래밍을 크게 간소화하고 간소화합니다.








주요 특징



네티 공식 홈페이지에선, 네티의 주요 특징을 첫줄에 정의하고 있습니다.

1. 비동기 이고 , 2. block&non-block 이 가능한, 3. 이벤트 기반 네트워크 프레임워크이다.





(1) 동기 / 비동기 처리 

- 동기: 특정 서비스를 호출하면 처리가 완료될 때까지 기다렸다가 결과를 받는 방식

-  비동기: 서비스를 호출하여 즉시 응답을 받고, 다른 작업을 하다가 처리가 완료되었는지 확인하여 결과를 받는 방식. NettryReactor 패턴 사용하고 있습니다.



(2)블로킹/논블로킹 소켓

블로킹 : read, write, accept 등의 메서드가 호출되면 완료될 때까지 쓰레드가 멈춤

- 논블로킹: 하나의 스레드로 여러 클라이언트 대응 가능




(3)이벤트 기반 프로그래밍

- 네트워크 이벤트의 주체는 소켓

- 데이터를 소켓에 전달하기 위해 데이터 핸들러 이용

- 로직 분리, 코드 재사용성 증가, 에러 처리 부담 완화


이벤트의 예) 연결 요청 , 데이터 전송, 데이터 수신







=> 다음과 같은 3가지 특징과 구조를 생각하며 , 네티를 사용한다면 조금 더 구조적으로 혹은 단계적으로 손쉽게 접근이 가능 할 것 같습니다.




참고

- https://netty.io/

- 자바 네트워크 소녀 Netty 


















Proxy



proxy (프록시)는 단어 그대로 '대리인'의 역할을 하는 서버이다. 


웹 환경에서 프록시 서버의 역할은 웹 클라이언트와 웹 서버 사이에서 요청한 데이터를 전달하는 것입니다.


이 때 프록시 서버는 웹서버에서 가져온 데이터를 웹클라아인트에 전송한 후 캐시에 데이터를 저장합니다.


그리고 웹 클라이언트가 같은 데이터를 요청하면 캐시에서 해당 데이터를 보냅니다. 이를 통해 클라이언트에게 빠르게 필요한 내용을 보여줄 수 있겠죠.

             





Proxy의 목적


여러가지 목적이 있지만 크게 다음과 같은 2가지 목적으로 사용됩니다.


(1) 속도 : 캐시를 사용하기 때문에 리소스에 대한 접근이 빠릅니다. 프록시서버에서는 자주 가는 웹사이트에 대한 리소스들이 캐시에 쌓여있고 웹 서버에서 직접 가져오는 것보다 좀 더 빠른 속도로 접근할 수 있습니다.


(2) 보안 : 익명의 사용자가 서버에 접근하는 것을 막을 수 있습니다. 프록시 서버는 요청 서버의 대리인의 역할을 수행함으로써 외부 서버망으로의 접근을 막을수 있습니다. 







Proxy 서버의 종류




1. forward Proxy


- 가장 일반적으로 사용하는 proxy Server 입니다. Proxy 서버를 클라이언트와 원격 서버에 있는 리소스 사이에 위치 시키는 방법입니다.




-> 일반적으로 사내망 (Private Network) 에서 외부로 나갈 때 사용하는 프록시입니다.





2 Reverse Proxy


- 프록시 서버를 인터넷 혹은 인트라넷 리소스 앞에 위치 시킵니다.

- 이 방식을 사용한다면 클라이언트가 프록시 서버에 연결되었다는 것을 알지 못합니다.

- 보안의 이유가 가장 큽니다. 예를 들어 내부망에 웹 데이터를 요청하여 내부망에 접근하였다면 DB 데이터 혹은 파일 서버에 있는 데이터의 접근 또한 가능하게 될 것입니다. 

 









데이터를 가져오는 과정은 다음과 같습니다.


(1) 클라이언트 1은 웹서버에 데이터를 요청합니다.

(2) 프록시 서버에서 데이터를 우선 요청하고. 캐쉬에 데이터를 저장한 후 클라이언트에게 데이터를 전송합니다.

(3) 클라이언트 2,3이 같은 데이터를 요청한다면 프록시 서버는 서버에 요청하지 않고 캐쉬에 있던 데이터를 가져와서 전송합니다.






테스트



- 다음과 같은 목표로 구현하였습니다.

- Broker1과  Broker2는 Broker3과 NetworkConnection되어있다. 

- Broker3은 Master 1,2는 slave 브로커로 구성한다.



시나리오

-> Broker3에 연결된 Producer가 메시지를 생산한다면, Broker1 과  2에 연결된 Consumer들 또한 메시지를 소비할 수 있어야 한다. 




<Server>


public class MQServer {


public static void main(String[] args) {


// TODO Auto-generated method stub


//persistence & master slave 

// DB 및 Persistence 설정

KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();

// LevelDBPersistenceAdapter persistenceAdapter = new LevelDBPersistenceAdapter();

persistenceAdapter.setDirectory(new File("ActiveMQ_Persistence","shared"));

persistenceAdapter.setUseLock(false);

// jmx managementcontext 생성

ManagementContext managementContext = new ManagementContext();

managementContext.setConnectorPath("/jmxrmi2");

//managementContext.setMBeanServer();

managementContext.setConnectorPort(1099);

managementContext.setJmxDomainName("ActiveMQ Sever");

managementContext.setUseMBeanServer(true);

// 대형 메시징 패브릭의 대규모 확장 성을 제공하기 위해 일반적으로 많은 브로커를 네트워크에 연결하여 모든 논리적으로 연결된 클라이언트를

// 원하는만큼 보유 할 수 있습니다. 필요에 따라 많은 메시지 브로커를 실행할 수 있습니다.

try {

//BrokerService broker = new BrokerService();

BrokerService broker3 = BrokerFactory.createBroker("broker:()/master");

BrokerService broker = BrokerFactory.createBroker("broker:()/slave");

BrokerService broker2 = BrokerFactory.createBroker("broker:()/slave");


//jmx

broker.setManagementContext(managementContext);

broker2.setManagementContext(managementContext);

broker3.setManagementContext(managementContext);

///advisory

List<Object> entries = new ArrayList<>();

PolicyEntry entry1 = new PolicyEntry();

entry1.setAdvisoryForConsumed(true);

entry1.setTopic("TEST");

PolicyEntry entry2 = new PolicyEntry();

entry2.setAdvisoryForConsumed(true);

entry2.setQueue("TEST");

entries.add(entry1);

entries.add(entry2);

PolicyMap policyMap = new PolicyMap();

policyMap.setPolicyEntries(entries);

// advisory Message setting

broker.setAdvisorySupport(true);

broker.setDestinationPolicy(policyMap);

broker2.setAdvisorySupport(true);

broker2.setDestinationPolicy(policyMap);

broker3.setAdvisorySupport(true);

broker3.setDestinationPolicy(policyMap);

try {


// broker.addConnector("tcp://localhost:61616");

// java Management Extention


broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.setPersistenceAdapter(persistenceAdapter);

broker.addConnector(conn);

broker.start();

broker2.setBrokerName("Broker2");

broker2.setUseJmx(true); // check true or false

broker2.setPersistent(true);

broker2.setShutdownOnMasterFailure(true);

//broker2.addConnector("tcp://172.10.11.20:61617");

TransportConnector conn2 = new TransportConnector();

conn2.setUri(new URI("tcp://localhost:61617"));

broker2.setPersistenceAdapter(persistenceAdapter);

broker2.addConnector(conn2);

broker2.start();


///////////////////////////////// Network Connect 사용


// String networkConnectorURIs[] =

// {"static:(tcp://localhost:61617)","static:(tcp://localhost:61616)"};


/// 속성

// property default description


// initialReconnectDelay 1000 재접속을 시도하기까지 대기 할 시간 (ms) (useExponentialBackOff가

// false 인 경우)

// maxReconnectDelay 30000 다시 연결을 시도하기 전에 대기 할 시간 (ms)

// useExponentialBackOff true 재 연결 시퀀스의 모든 실패에 대해 재 연결 사이의 시간을 증가시킵니다.

// backOffMultiplier 2 대기 시간 증가 지수에 곱함 useExponentialBackOff true이면


// 정적 발견

/* String networkConnectorURIs[] = {

"static:(tcp://172.10.11.20:61617,tcp://172.10.11.20:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" };

*/

/// static == masterslave

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

//// persistenceAdapter share file system

// broker3.deleteAllMessages();


//


// // 일반적으로 네트워크 브로커 시작은 순차적으로 시작함 . 비동기 시작을 위한 설정.

broker3.setNetworkConnectorStartAsync(true);

broker3.start();



} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}

}


}


=> 서버를 실행하면 61602포트는 Broker3의 메시지 송수신 포트, 61616, 61617 포트는 각각  Broker1,2 와 커넥션 할 준비가 되어있습니다. 또한 서버 내 Broker3은 Network 연결된 것을 확인할 수 있습니다.








<Producer>

-> failover Protocol은 커넥션이 실패한다면 자동으로 다음 로컬주소를 발견하여 커넥션 시도를 합니다. 

public class Text_Send {


// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is

// on localhost

//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

///61613 에 실패

private static String url = "failover:(tcp://localhost:61602)?randomize=false";

// default broker URL is : tcp://localhost:61616"

private static String subject = "TEST"; // Queue Name.You can create any/many queue names as per your

// requirement.


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server and starting it

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();


connection.start();


// Creating a non transactional session to send/receive JMS message.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Destination represents here our queue 'JCG_QUEUE' on the JMS server.

// The queue will be created automatically on the server.

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageProducer is used for sending messages to the queue.

MessageProducer producer = session.createProducer(destination);


for(int i=0; i<100; i++) {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// We will send a small text message saying 'Hello World!!!'

//TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

// Here we are sending our message!

producer.send(message);

System.out.println("JMS printing@@ '" + message.getText() + "'");

}


connection.close();

}



<Consumer>


public class Receive2 {


// URL of the JMS server

// private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private static String url ="failover:(tcp://localhost:61617,tcp://localhost:61616,tcp://localhost:61602)";

// default broker URL is : tcp://localhost:61616"


// Name of the queue we will receive messages from

private static String subject = "TEST";


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();

connection.start();


// Creating session for seding messages

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Getting the queue 'JCG_QUEUE'

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageConsumer is used for receiving (consuming) messages

MessageConsumer consumer = session.createConsumer(destination);

while(true) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// Here we receive the message. /// block mode

Message message = consumer.receive();


// We will be using TestMessage in our example. MessageProducer sent us a

// TextMessage

// so we must cast to it to get access to its .getText() method.

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("Received message '" + textMessage.getText() + "'");

}


}

}



=> 차례로 실행하면 61602 포트로 Broker3에 생산된 메시지를 , Broker2에 61616 포트로 연결된 Consumer가 메시지를 소비할 수 있습니다.


<결과>





간단하게 브로커를 직접 만들고 Clustering 기술을 활용하여 서버를 구현하고 테스트하는 실습하였습니다.

이외에도 jmx monitoring , HermesJMS 같은 third party tool과 연계, java 이외에 c, c++ 클라이언트와의 연결 advisory message, Object message, blob message 등의 특성을 활용할 수 있습니다.



'Cloud & NoSQL & Middleware > ActiveMQ' 카테고리의 다른 글

#activeMQ (5) - Clustering  (0) 2018.03.03
#activeMQ (4) - Broker  (0) 2018.03.02
#activeMQ (3) - ActiveMQ란?  (0) 2018.03.01
#activeMQ (2) - 용어  (0) 2018.02.25
#activeMQ (1) - 버전 정보  (0) 2018.02.24


2.  Clustering


우선 클러스터링에 대해 정의하면, 여러 대의 서버가 하나의 서버가 처리하는 것처럼 병렬 처리 된 서버들 간의 확립된 연결(Establishing Connectivity) 입니다.

더 간단히 요약하면, 군집화 혹은 서버 이중화로 정의하면 되겠습니다.


클러스터링은 장애시스템 조치(fail-over), 로드 밸런싱(부하 분산) 시스템에 병렬 처리 가능한 기술입니다. 








- activeMQ에서는 클러스터링 기술을 적용하기 위해 Broker의 Connector, Persistence, Network of Broker, Master/Slave 특징을 먼저 살펴봐야 합니다.





(1) Connector


- activeMQ에는 Transport Connection과 Network Connection 2가지가 종류가 있다.


 

Transport Connection : 브로커와 클라이언트간 메시지 송수신을 위한 전송 커넥션입니다.

Network Connection: 브로커와 브로커간 서버 클러스터링을 구현하기 위해 필요한 커넥션입니다.








(2) Persistence


- 지속성은 Master Slave 특징과 관련있습니다.

- 브로커 대기열의 데이터의 지속적인 전송을 유지하기 위해 DB 및 file System에 데이터를 저장하고 공유합니다.


( 버전별 라이브러리 사용)

ActiveMQ 5.9 – Replicated LevelDB Store (Apaach Zookeeper)

ActiveMQ 5.8 – LevelDB Store 

ActiveMQ 5.3 – KahaDB

ActiveMQ 4 - JDBC



(3) Network Of Broker


- 아래와 같은 2가지 방법으로 구현할 수 있습니다.



- networkConnector 요소 사용  //정적

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);





- Discovery를 사용하여 브로커를 탐지 //동적


List<TransportConnector> transportList = new ArrayList<>();

//

TransportConnector transport = new TransportConnector();

transport.setDiscoveryUri(new URI("multicast://default"));

transport.setUri(new URI("tcp://localhost:0"));

transportList.add(transport);

String networkConnectorURIs[] = {"multicast://default"};

broker3.setBrokerName("Broker3");

broker3.setPersistent(false);

broker3.setUseJmx(false);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

broker3.setTransportConnectors(transportList);




(4)  Master / Slave


MasterSlave : 메시지가 Slave 브로커에 복제되므로 마스터 시스템파일 시스템 또는 데이터 센터의 치명적인 하드웨어 오류가 발생해도 메시지 손실없이 즉시 Slave로 페일 오버가 수행됩니다.

 - pure master/slave

 - shared filesystem master/slave

 - db matser/slave


 pure master/slave 는 5.8 버전 이후로 삭제되었기 때문에, DB 혹은 파일시스템을  공유하는 master/slave를 사용해야 합니다.





Figure


activeMQ에는 language, clustering, jmx monitoring, message, embedded broker, Persistence, DB 등 여러 특징이 있습니다.


그 중 embedded Broker, clustering, Message 등의 특징을 활용하여 제공되는 activeMQ Server 대신 직접 서버를 구현하며 여러 특징들을 실습해보았습니다.


 버전: activeMQ-all-5.15.2.jar

 jdk: jdk 1.8




1.  Embeded Broker


Java에서 activeMQ Broker Server를 직접 구하겠다면 다음과 같이 3가지 방법으로 구현할 수 있겠습니다.
=> Maven(Spring), xBean, Broker 객체를 직접 생성

 이처럼 Broker 객체를 직접 생성하여 서버를 구현해 보았습니다. ( 사실 xbean까지 사용했다면, Spring으로 구성하는 것은 문제 없다고 봅니다.)



(1) Broker 객체 생성 및 설정

public class MQServer {


public static void main(String[] args) {


try {

 

// ActiveMQConnectionFactory를 사용하고 VM 커넥터를 URI로 사용하여 내장 브로커를 만들 수 있습니다.

//BrokerService broker= ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

//BrokerService broker = BrokerFactory.createBroker("broker:()/master");

 

BrokerService broker = new BrokerService();

broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.addConnector(conn);

broker.start();


} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}



=> Java 코드를 활용하여 Broker 객체를 직접 만들고, JMX 설정, 메시지가 전송될 TransprotConnector Port  지정, Persistence 설정 등을 할 수 있습니다.





(2) xbean을 통한 Broker 설정


<config.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd

  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">

<persistenceAdapter>

<kahaDB directory="/123/sharedBrokerData" />

<!-- <levelDB directory="/sharedFileSystem/sharedBrokerData"/> -->

<!-- <amqPersistenceAdapter directory="/sharedFileSystem/sharedBrokerData"/> -->

</persistenceAdapter>


<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">" producerFlowControl="true" memoryLimit="5mb">

<deadLetterStrategy>

<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />

</deadLetterStrategy>

</policyEntry>

</policyEntries>

</policyMap>

</destinationPolicy>

<systemUsage>

<systemUsage>

<memoryUsage>

<memoryUsage limit="420 mb" />

</memoryUsage>

<storeUsage>

<storeUsage limit="1 gb" />

</storeUsage>

<tempUsage>

<tempUsage limit="250 mb" />

</tempUsage>

</systemUsage>

</systemUsage>



<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->

<transportConnector uri="tcp://localhost:61636" />

<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />

<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" /> -->

</transportConnectors> <!-- destroy the spring context on shutdown to stop jetty -->


<plugins>

<destinationsPlugin location="/workspace/destinations" />

</plugins>

</broker>

</beans>


=> xml에서 Broker에 대한 설정.





<MQServer_xBean.java>


public class MQSever_Xbean {

public static void main(String[] args) {

try {



//xml file을 직접 읽어 들어와서 Broker를 실행

BrokerService broker = (new XBeanBrokerFactory()).createBroker(new URI("xbean:"+MQSever_Xbean.class.getResource("").getPath()+"../../conf/activeMQ.xml"));

broker.start();


} catch (URISyntaxException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}



=> 위와 같이 제대로 설정 되었다면 다음과 같이 서버가 구동되는 것을 확인할 수 있다.







activeMQ

  • Apache ActiveMQ 는 가장 대중적이고 강력한 오픈 소스 메세징 그리고 통합 패턴 서버입니다.
  • Apache ActiveMQ는 빠르며, 다양한 언어간의 클라이언트 및 프로토콜을 지원하고, 사용하기 쉬운 엔터프라이즈 통합 패턴 및 많은 고급 기능을 제공하면서 JMS 1.1 J2EE 1.4를 완벽하게 지원합니다.
  • MOM(메시지 지향 미들웨어)입니다.
  • ActiveMQJMS를 지원하는 클라이언트를 포함하는 브로커, 자바 뿐만 아니라 다양한 언어를이용하는시스템간의 통신을할수있게해줍니다또한 클러스터링기능 및 DB 그리고 FileSystem을 통해 각 시스템간의 일관성 및 지속성을 유지 시켜줍니다.
  • 간단히 정의하면  클라이언트 간 메시지를 송수신 할 수 있는 오픈 소스 Broker(JMS 서버)입니다.  



=>즉, activeMQ에서 JMS는 핵심요소입니다.






JMS

  • JMS 는 자바 기반의 MOM(메시지 지향 미들웨어) API 이며 둘 이상의 클라이언트 간의 메시지를 보냅니다.

  • JMS 는 자바 플랫폼엔터프라이즈 에디션(EE) 기반이며메시지 생성송수신읽기를 합니다.  또한 비동기적이며 신뢰할 만하고 느슨하게 연결된 서로 다른 분산 어플리케이션 컴포넌트 간의 통신을 허용합니다.
  • JMS의 핵심 개념은 Message Broker 와 Destination 입니다.  


Message Broker : 목적지에 안전하게 메시지를 건네주는 중개자 역할 

Destination: 목적지에 배달될 2가지 메시지 모델 QUEUE, TOPIC 



  • -  Queue: Point to Point ( Consumer 는 메시지를 받기 위해 경쟁합니다.)

  • -  Topic: Publish to Subscribe







ActiveMQ 메시지 처리 구조


 


 => 기본적으로 Message를 생산하는 Producer, activeMQ Broker(Server), Message를 소비하는 Consumer로  구성되어 있습니다.









- QUEUE 모델의 경우 메시지를 받는 Consumer가  다수일 때 연결된 순서로 메시지는 제공됩니다.

- TOPIC 모델의 경우 메시지를 받는 Consumer가 다수일 때 메시지는 모두에게 제공됩니다.





-  Consumer 에서는 메시지를 받을 때까지 block 상태인  Receive() 메소드와 리스너를 통해 nonblack 상태로 메시지를 가져올 수 있습니다.










ActiveMQ의 장점



(1) 분리: 대기열은 시스템 사이에 있으며, 하나의 시스템 장애는 다른 대기열에 영향을 주지 않습니다. 메시지 통신은 대기열을 통해 이루어집니다. 시스템이 가동 중일 때도 계속 작동합니다.


( 클라이언트와 서버간의 연결과 큐 대기열의 역할이 분리)



(2) 복구 지원: 큐의 처리가 실패하면 나중에 메시지를 복원 할 수 있습니다.



(3) 신뢰성: 클라이언트 요청을 처리하는 시스템을 생각해보십시오.정상적인 경우 시스템은 분당 100 건의 요청을 받습니다. 이 시스템은 요청 수가 평균을 넘어서는 경우 신뢰할 수 없습니다. 이 경우 Queue는 요청을 관리 할 수 있으며 시스템 처리량을 기초로 주기적으로 메시지를 전달할 수 있습니다.


(큐에서 들어온 메시지에 대한 처리를 관리하기 때문에 신뢰할 수 있는 시스템)




(4) 비동기 처리: 클라이언트와 서버 통신이 비 차단입니다. 클라이언트가 서버에 요청을 보내면 응답을 기다리지 않고 다른 작업을 수행 할 수 있습니다. 응답을 받으면 클라이언트는 언제든지 처리 할 수 있습니다.







참고 : http://activemq.apache.org


+ Recent posts