FeedBackService



:메시지전달 실패에 대한 정보를 제공하는 서비스이다.


- 특정 장비에서 이미 삭제된 애플리케이션으로 Push 메시지를 전달하려고 시도할 경우 해당 장비는 Push 메시지 수신을 거부한다. 

- 원격알림을 전송할 수 없는 디바이스 토큰을 피드백리스트 목록에 올려놓는다. 

- 전송실패 혹은 오류에 대한 알림은 피드백서비스에 영향을 미치지 않는다.

 

- feedback.push.apple.com의 도메인과 2196포트로의 연결을 통해 피드백서비스를 사용할 수 있다.

   (개발서버는 feeback.sandbox.push.apple.com/2196으로 연결한다.)


- 피드백서비스를 사용하여 전달할 수없는 원격 알림 전송을 중지하면 불필요한 메시지 오버 헤드가 줄어들고 전체 시스템 성능을 향상 할 수 있다.

- end-to-end의 TLS/SSL 연결만이 허용된다.


 

-개발서버에서 어플에 해당하는 인증서를 갖고 피드백 서비스에 연결하면 즉시 피드백 서비스에서 리스트업 된 디바이스 토큰 정보가 포함된 데이터를 전송한다.(연결 후 서버에서 따로 해 주어야 할 내용은 없다) 


- 피드백 서비스에서 저장된 디바이스 토큰 리스트의 데이터를 다 받게 되면 서비스 내의 토큰들은 모두 삭제된다.


-개발자 문서에서는 적어도 하루에 한번 피드백 서비스를 확인하라고 명시되어 있으며 상황에 따라 유동적으로 확인이 필요하다.


- 어떠한 요청도 보낼 필요 없이, 연결 즉시 응답이 오며 Connection은 종료된다.





Format






 Timestmap : 데이터에 포함된 타임스탬프 정보를 갖고 서버에 저장된 디바이스 토큰의 갱신여부를 판단할 수 있다. 

 Token Lenth : 디바이스 토큰의 길이

 device Token : 디바이스 토큰 값







예제



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Arrays;
 
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
 
public class Main {
    public static void main(String[] args) {
        BufferedInputStream bis = null;
        SSLSocket feedSock = null;
        try {
 
            KeyStore ks = null;
            ks = KeyStore.getInstance("PKCS12");
            ks.load(new FileInputStream("path.."), "password".toCharArray());
 
            System.out.println("key size:" + ks.size() + "/key type:" + ks.getType());
 
            /// SUNX 509 or PKIX
            // 인증서 인코딩 알고리즘
            KeyManagerFactory kmf = KeyManagerFactory.getInstance("sunx509");
            kmf.init(ks, "password..".toCharArray());
 
            TrustManagerFactory tmf = TrustManagerFactory.getInstance("sunx509");
            tmf.init((KeyStore) null);
 
            // TLS or SSL protocol get
            SSLContext sc = SSLContext.getInstance("TLS");
            // SSLContext sc = SSLContext.getInstance("SSL");
 
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
            SSLSocketFactory ssf = sc.getSocketFactory();
 
            feedSock = (SSLSocket) ssf.createSocket("feedback.sandbox.push.apple.com"2196);
            String[] cipherSuites = feedSock.getSupportedCipherSuites();
            feedSock.setEnabledCipherSuites(cipherSuites);
 
            // TLS HandShake
            feedSock.startHandshake();
 
            ;
            System.out.println("피드백 서비스 연결:" + feedSock.isConnected());
 
            int a = 0;
            bis = new BufferedInputStream(feedSock.getInputStream());
            while (true) {
                byte[] tuple = new byte[38];
                a = bis.read(tuple, 0, tuple.length);
                System.out.println(a);
 
                byte[] time_t = Arrays.copyOfRange(tuple, 04);
                byte[] token_length = Arrays.copyOfRange(tuple, 46);
                byte[] token = Arrays.copyOfRange(tuple, 6, tuple.length);
                if (a > 0) {
                    // System.out.println("time_t:" + byteArrayToInt(time_t));
                    // System.out.println("token_length:" + byteArrayToInt(token_length));
                    // System.out.println("token:" + byteArrayToHex(token));
                } else {
                    break;
                }
            }
 
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (KeyStoreException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (KeyManagementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (CertificateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnrecoverableKeyException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
 
        finally {
            try {
                bis.close();
                feedSock.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
cs








'Cloud & NoSQL & Middleware > Apns' 카테고리의 다른 글

Binary Provider API 구현  (0) 2018.06.11
HTTP/2 - based Apns Provider API  (0) 2018.05.31
Binary Provider API - Request, Response Format  (0) 2018.05.25
APNS Provider Protocol  (0) 2018.05.25
APNS Notification Payload  (0) 2018.05.15


Request Format









여러 알림에 대한 일괄 처리를 최적의 성능과 함께 수행하는 인터페이스이다.


Command : 1 byte

Frame length : 4 byte

Frame data : 일련의 아이템으로 구성된 가변 데이터









     


프레임은 여러 아이템이 들어있는 뼈대이다.


Item ID : 1byte

Item length : 2 byte

Item data : 가변 데이터



Item ID

 Item length

Item data

 1  ( Device Token)

100 byte 까지

등록된 디바이스의 binary 형태의 값
(
하나이상의 token보내져야함)

 2  (payLoad)

2 kilobyte (2048 byte)

 json 형태의 payload

(하나 이상의 payload가 보내져야 하며

Null로 끝나서는 안됨)

 3  (Notificatin Identifier)

4 byte

Push notification 의 식별 값

 4  (expiry)

4 byte

UTC 로 표현된 날짜 값

( 0 이상이면 적어도 한번 보내고 0이면 즉시 만료되어 알림이 저장되지 않음)

 5  priority

1 byte

 notification의 우선순위

10 아니면 5 중 하나 선택.

10: 즉시 전송

5: 전원의 배터리를 고려하여 전송









Response Format





Apns가  Device에게 성공적으로 알림을 보냈다면, Response Packet을 Provider에게 보내지 않는다.



- command : 1byte이며 Command Number 8

- status code : 발생한 에러에 따라 응답코드를 반환한다.

- Identifier : Provider에서 알림을 보낼 때 포함한 식별자 4byte 값을 반환한다.








참고

https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/BinaryProviderAPI.html#//apple_ref/doc/uid/TP40008194-CH13-SW1






'Cloud & NoSQL & Middleware > Apns' 카테고리의 다른 글

HTTP/2 - based Apns Provider API  (0) 2018.05.31
FeedbackService - Format, Packet  (0) 2018.05.29
APNS Provider Protocol  (0) 2018.05.25
APNS Notification Payload  (0) 2018.05.15
APNS Architecture - Connection & Push Flow  (0) 2018.05.11


Protocol


애플을 통해 푸쉬 알림을 보내기 위한 방식은 TCP 방식과 HTTP 방식이 있다.



애플에서 TCP 기반으로 보내는 API를 Binary Provider API , HTTPS 기반으로 보내는 방식을 APNS Provider API라고 명칭하고 있다.






Compare


 

 Binary Provider API

APNS Provider/Notification API 

 protocol

 TCP

HTTP/2 

 host / port

 

 gateway.sandbox.push.apple.com  // 2195      gateway.push.apple.com  // 2195



feedback.sandbox.push.apple.com  // 2196

: feedback.push.apple.com  // 2196 

api.development.push.apple.com:443

api.push.apple.com:443

Port is 443 or 2197

 data

 2KB

4KB 

 figure

High Capacity

 High Security, High Speed

 certificate

 .p12

 .p8, .p12

 feedback service

 O




(1) protocol : Binary Provider API에서는 TCP, APNS Provider API에서는 HTTP/2 Protocol 기반에서 수행된다.


(2) host / port : sandbox가 붙으면 개발서버, 아니면 운영서버이고 Binary Provider API는 피드백서비스를 활성화 해야한다.


(3) data : Payload의 용량 제한은 위와 같다. (VolP Notification 을 보낼 시 최대 5KB까지 가능.)


(4) figure : Binary Provider API 에서는 비동기 처리가 가능하고 생산성이 좋은 반면, HTTP/2 방식은 보안과 속도 측면에서 뛰어나다는 장점이 있다.


(5) certificate: TCP 방식은 .p12 인증서 파일만 사용가능하며, HTTP/2 방식으로 했을 경우에는 Token 방식을 사용할 수 있기 때문에 .p8 파일을 이용하여 여러 앱에 푸쉬 알림을 보낼 수 있다.


(6) feedback service:  feedback 서비스는 메시지전달 실패에 대한 정보를 제공하는 서비스이다. 자세한 정보는 추가적으로 다룰 예정






Payload


공급자 서버가 APN (ApplePushNotification Service)에 보내는 각 알림에는 페이로드가 포함되어 있다.


알림을 보내기 위해 메시지를 정의하고, 옵션을 선택하는 등의 역할을 하는  json 형태의 데이터다.





특징


- Json 형태의 데이터

- 디바이스로 알림을 보내기 위한 옵션 및 메시지 설정

- TCP Binary API 를 이용하여 보낼 시 최대 2KB 전송 가능

- HTTP/2 API 로 보낼 시 최대 4KB 데이터 전송 가능

- VolP 알림으로 보낼 시 최대 5KB 데이터 전송 가능






ex)


{ "aps" : { "category" : "NEW_MESSAGE_CATEGORY" "alert" : { "body" : "Acme message received from Johnny Appleseed", }, "badge" : 3, "sound" : “chime.aiff" } }







※ Aps Key에 대한 Json Dictionary



key

Value 

설명

Alert

String , 혹은 json dictionary

 

Badge

Number

0의 값은 뱃지를 제거

Sound

String

Apple 에서 지정한 string 문자열

Content-available

Number

1의 값은 백그라운드에서 앱을 깨우고 알림을 전달

category

String

알림의 식별자 값 지정

Thread-id

String

그룹화 알림을위한 앱 별 식별자를 나타내는 문자열 값을이 키에 입력하십시오. 알림 콘텐츠 추가 앱 정보를 제공하는 경우이 값을 사용하여 알림을 그룹화 할 수 있습니다.







※ Alert Key에 대한 Json Dictionary



key 

Value

설명

Title

String

알림의 제목

Body

String

 알림의 내용

Title-loc-key

String or null

로컬 라이즈 용의 파일 내의 타이틀 캐릭터 라인의 키

Title-loc-args

Array of String or null

가변 문자열 값의 형식 지정자의 장소에 표시

Action-loc-key

String or nullString

문자열을 지정하면 닫기 및보기 단추가 포함 된 경고가 표시됩니다.

Loc-key

String

현재 지역화 파일 의 경고 메시지 문자열에 대한 키

Loc-args

Array of string

가변 문자열 값의 형식 지정자의 장소에 표시

Launch-image

String

파일 이름 확장자의 유무에 관계없이 앱 번들에있는 이미지 파일의 파일 이름입니다. 사용자가 작업 버튼을 누르거나 작업 슬라이더를 움직일 때 이미지가 실행 이미지로 사용됩니다.
















Apns Connection Server


HTTP/2


Development enviroment : api.development.push.apple.com: 443 
production enviroment : api.push.apple.com: 443

(APN과 통신 할 때  2197 포트도 사용가능)


HTTP / 2 PING프레임을 사용하여 연결 상태 확인

 HTTP / 2 연결을 종료하기로 결정하면 GOAWAY프레임으로 확인 가능


    TCP Binary


Development enviroment : gateway.sandbox.push.apple.com : 2195 port

production enviroment : gateway.push.apple.com  : 2195 port


Development enviroment : feedback.sandbox.push.apple.com : 2196 port

production enviroment : feedback.push.apple.com  : 2196 port







Apns Connection Trust

APN에 연결할 때 공급자가 TLS 1.2 이상으로 연결 (Http 방식에서만) , 기본적으로 TLS 및 SSL 방식에서 연결
APN 공급자 apns 인증서 없이 연결하려면 개발자 계정을 통해 제공되는 키로 서명 된 공급자 인증 토큰을 만들어야 함. (Json Web Token)
APN은 각 연결에 대해 여러 개의 동시 스트림을 허용.

인증서가 아닌 토큰을 사용하여 APN에 대한 연결을 설정할 때 유효한 공급자 인증 토큰을 사용하여 푸시 메시지를 보낼 때까지 하나의 스트림에 연결 

-> 즉, 개발자 계정의 인증서 파일 혹은 개발자 인증 토큰을 통해 연결을 해야함.




   Device to Apns Connection




1.APNS 서버에 보안연결을 요청. (암호화 보안 프로토콜 SSLTLS) 

2.APNS 인증

3.APNS 인증서의 유효성 검사 
4. 디바이스 단말 인증
5. Device 인증서 유효성 검사 확인 이후 연결





  Provider to Apns Connection




1.APNS 서버에 보안연결을 요청. (암호화 보안 프로토콜 SSL과 TLS) 
2.APNS 인증
3.APNS 인증서의 유효성 검사 
4. 푸쉬 메시지 전송 ( 전송할 단말의 디바이스 토큰 값과 함께)
5. Provider의 인증토큰 값을 검사 (token Trust)
        6. Requst의 대한 응답

=> 기본적으로 http 방식으로 APNS와 연결하고자 할 때, JWT로 구성된 token 값이 포함되어야 한다. ( Device의 고유한 token과는 별개의 개념)
=> TCP 방식으로 구현하고자 할 때 token 값 대신 인증서가 있어야 한다.





Provider to Device Push Notification



메시지 공급자에서 iOS 단말 디바이스로 메시지를 보내는 플로우는 다음과 같다.



1. iOS 디바이스 토큰 값이 Apns 서버에 먼저 등록되 있어야 한다.


2. 메시지 공급자는 디바이스 단말의 토큰과 보내고자 하는 메시지를 포함하여 APNs 서버에 요청한다. 


3.  위의 방식은  http 방식의 요청이기 때문에 token 값을 포함하여 인증해야 한다.


4. apns 서버는 인증 토큰을 토큰 키와 함께 복호화 하여 payload에 담겨진 notification message를 디바이스에 push한다.












-> 정리하면 Provider에서 APNS에 Connection 하기 위해 HTTP / TCP 방식으로 구현 가능하며, 각 각 인증 토큰 값과 인증서 파일이 필요하다.

-> 디바이스 단말 token 값은 Apns 서버에 먼저 등록되어야 하며, 등록된 고유 디바이스 단말 token으로 인증 token과 함께 notification을 push한다.






개요 



Apple Push Notification Service


- apns는 보안 연결을 통해 third-party server에서 앱이 설치된 사용자 디바이스로 푸쉬 알림을 보낼 수 있는 클라우드 서비스이다.




iOS 단말에 모바일 알림 서비스를 이용하려면 애플에서 자체적으로 구축된 푸쉬서버를 이용해야한다.







특징


Apnsend-to-end에서 시행한다.


- Provider는 메시지를 전달하는 공급자이며, 주로 3rd Party Server에서 수행된다. 


Apns에 연결하기 위해서는 인증 토큰 혹은 인증서를 사용해야 한다. 

  (인증토큰과 인증서는 https://developer.apple.com/account/ 에서 확인가능)



Apnstwo levels of trust와 함께 암호 검증 및 인증을 한다.(connection Trust token trust)
: Connection Trust는 연결 level에서 수행되는 암호 검증 단계
: token Trust는 단말기의 고유 deviceToken 인증 단계

- QoS를 지원한다.

 

Quality of Service(QoS)  구성요소 :  Stored-and-Forward, Coalesced Notification




  • QoS: 다른 응용 프로그램, 사용자, 데이터 흐름 등에 우선 순위를 정하여, 데이터 전송에 특정 수준의 성능을 보장하기 위한 능력

  • Stored-and-Forward: Apns가 알림을 전달하려고 할 때 장치가 오프라인이라면 일정 기간 동안 저장하고 장치가 다시 사용될 때 전달한다. 장치가 오프라인이라면 최신 알림만 보내고 이전 알림은 삭제하며, 오랫동안 오프라인이라면 모든 알림을 삭제된다.
  • Coalesced Notification :유사한 알림을 통합 할 수 있도록 알림 요청 내에 축소 식별자를 포함 할 수 있다.

  • 예를 들어 동일한 헤드 라인을 두 번 보내는 뉴스 서비스는 두 요청에 동일한 축소 식별자 값을 사용하여 통합된 알림으로 전송할 수 있다.







참고사이트: https://developer.apple.com/library/content/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/APNSOverview.html












테스트



- 다음과 같은 목표로 구현하였습니다.

- Broker1과  Broker2는 Broker3과 NetworkConnection되어있다. 

- Broker3은 Master 1,2는 slave 브로커로 구성한다.



시나리오

-> Broker3에 연결된 Producer가 메시지를 생산한다면, Broker1 과  2에 연결된 Consumer들 또한 메시지를 소비할 수 있어야 한다. 




<Server>


public class MQServer {


public static void main(String[] args) {


// TODO Auto-generated method stub


//persistence & master slave 

// DB 및 Persistence 설정

KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();

// LevelDBPersistenceAdapter persistenceAdapter = new LevelDBPersistenceAdapter();

persistenceAdapter.setDirectory(new File("ActiveMQ_Persistence","shared"));

persistenceAdapter.setUseLock(false);

// jmx managementcontext 생성

ManagementContext managementContext = new ManagementContext();

managementContext.setConnectorPath("/jmxrmi2");

//managementContext.setMBeanServer();

managementContext.setConnectorPort(1099);

managementContext.setJmxDomainName("ActiveMQ Sever");

managementContext.setUseMBeanServer(true);

// 대형 메시징 패브릭의 대규모 확장 성을 제공하기 위해 일반적으로 많은 브로커를 네트워크에 연결하여 모든 논리적으로 연결된 클라이언트를

// 원하는만큼 보유 할 수 있습니다. 필요에 따라 많은 메시지 브로커를 실행할 수 있습니다.

try {

//BrokerService broker = new BrokerService();

BrokerService broker3 = BrokerFactory.createBroker("broker:()/master");

BrokerService broker = BrokerFactory.createBroker("broker:()/slave");

BrokerService broker2 = BrokerFactory.createBroker("broker:()/slave");


//jmx

broker.setManagementContext(managementContext);

broker2.setManagementContext(managementContext);

broker3.setManagementContext(managementContext);

///advisory

List<Object> entries = new ArrayList<>();

PolicyEntry entry1 = new PolicyEntry();

entry1.setAdvisoryForConsumed(true);

entry1.setTopic("TEST");

PolicyEntry entry2 = new PolicyEntry();

entry2.setAdvisoryForConsumed(true);

entry2.setQueue("TEST");

entries.add(entry1);

entries.add(entry2);

PolicyMap policyMap = new PolicyMap();

policyMap.setPolicyEntries(entries);

// advisory Message setting

broker.setAdvisorySupport(true);

broker.setDestinationPolicy(policyMap);

broker2.setAdvisorySupport(true);

broker2.setDestinationPolicy(policyMap);

broker3.setAdvisorySupport(true);

broker3.setDestinationPolicy(policyMap);

try {


// broker.addConnector("tcp://localhost:61616");

// java Management Extention


broker.setBrokerName("Broker1");

broker.setUseJmx(true); // check true or false

broker.setPersistent(true);

// broker.addConnector("tcp://localhost:61616");


/// start() 메소드가 저장소 잠금 보류를 차단할 때 유용합니다 (예 : 슬레이브 시작).

TransportConnector conn = new TransportConnector();

conn.setUri(new URI("tcp://localhost:61616"));


// failover:// 브로커 클러스터에 대한 클라이언트 연결을 업데이트

// conn.setUpdateClusterClients(true);

broker.setPersistenceAdapter(persistenceAdapter);

broker.addConnector(conn);

broker.start();

broker2.setBrokerName("Broker2");

broker2.setUseJmx(true); // check true or false

broker2.setPersistent(true);

broker2.setShutdownOnMasterFailure(true);

//broker2.addConnector("tcp://172.10.11.20:61617");

TransportConnector conn2 = new TransportConnector();

conn2.setUri(new URI("tcp://localhost:61617"));

broker2.setPersistenceAdapter(persistenceAdapter);

broker2.addConnector(conn2);

broker2.start();


///////////////////////////////// Network Connect 사용


// String networkConnectorURIs[] =

// {"static:(tcp://localhost:61617)","static:(tcp://localhost:61616)"};


/// 속성

// property default description


// initialReconnectDelay 1000 재접속을 시도하기까지 대기 할 시간 (ms) (useExponentialBackOff가

// false 인 경우)

// maxReconnectDelay 30000 다시 연결을 시도하기 전에 대기 할 시간 (ms)

// useExponentialBackOff true 재 연결 시퀀스의 모든 실패에 대해 재 연결 사이의 시간을 증가시킵니다.

// backOffMultiplier 2 대기 시간 증가 지수에 곱함 useExponentialBackOff true이면


// 정적 발견

/* String networkConnectorURIs[] = {

"static:(tcp://172.10.11.20:61617,tcp://172.10.11.20:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" };

*/

/// static == masterslave

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

//// persistenceAdapter share file system

// broker3.deleteAllMessages();


//


// // 일반적으로 네트워크 브로커 시작은 순차적으로 시작함 . 비동기 시작을 위한 설정.

broker3.setNetworkConnectorStartAsync(true);

broker3.start();



} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

} catch (Exception e1) {

// TODO Auto-generated catch block

e1.printStackTrace();

}

}


}


=> 서버를 실행하면 61602포트는 Broker3의 메시지 송수신 포트, 61616, 61617 포트는 각각  Broker1,2 와 커넥션 할 준비가 되어있습니다. 또한 서버 내 Broker3은 Network 연결된 것을 확인할 수 있습니다.








<Producer>

-> failover Protocol은 커넥션이 실패한다면 자동으로 다음 로컬주소를 발견하여 커넥션 시도를 합니다. 

public class Text_Send {


// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is

// on localhost

//private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

///61613 에 실패

private static String url = "failover:(tcp://localhost:61602)?randomize=false";

// default broker URL is : tcp://localhost:61616"

private static String subject = "TEST"; // Queue Name.You can create any/many queue names as per your

// requirement.


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server and starting it

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();


connection.start();


// Creating a non transactional session to send/receive JMS message.

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Destination represents here our queue 'JCG_QUEUE' on the JMS server.

// The queue will be created automatically on the server.

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageProducer is used for sending messages to the queue.

MessageProducer producer = session.createProducer(destination);


for(int i=0; i<100; i++) {

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// We will send a small text message saying 'Hello World!!!'

//TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

TextMessage message = session.createTextMessage("Hello !!!Send Message 1 Test");

// Here we are sending our message!

producer.send(message);

System.out.println("JMS printing@@ '" + message.getText() + "'");

}


connection.close();

}



<Consumer>


public class Receive2 {


// URL of the JMS server

// private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private static String url ="failover:(tcp://localhost:61617,tcp://localhost:61616,tcp://localhost:61602)";

// default broker URL is : tcp://localhost:61616"


// Name of the queue we will receive messages from

private static String subject = "TEST";


public static void main(String[] args) throws JMSException {

// Getting JMS connection from the server

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();

connection.start();


// Creating session for seding messages

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


// Getting the queue 'JCG_QUEUE'

//Destination destination = session.createQueue(subject);

Destination destination = session.createTopic(subject);

// MessageConsumer is used for receiving (consuming) messages

MessageConsumer consumer = session.createConsumer(destination);

while(true) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// Here we receive the message. /// block mode

Message message = consumer.receive();


// We will be using TestMessage in our example. MessageProducer sent us a

// TextMessage

// so we must cast to it to get access to its .getText() method.

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("Received message '" + textMessage.getText() + "'");

}


}

}



=> 차례로 실행하면 61602 포트로 Broker3에 생산된 메시지를 , Broker2에 61616 포트로 연결된 Consumer가 메시지를 소비할 수 있습니다.


<결과>





간단하게 브로커를 직접 만들고 Clustering 기술을 활용하여 서버를 구현하고 테스트하는 실습하였습니다.

이외에도 jmx monitoring , HermesJMS 같은 third party tool과 연계, java 이외에 c, c++ 클라이언트와의 연결 advisory message, Object message, blob message 등의 특성을 활용할 수 있습니다.



'Cloud & NoSQL & Middleware > ActiveMQ' 카테고리의 다른 글

#activeMQ (5) - Clustering  (0) 2018.03.03
#activeMQ (4) - Broker  (0) 2018.03.02
#activeMQ (3) - ActiveMQ란?  (0) 2018.03.01
#activeMQ (2) - 용어  (0) 2018.02.25
#activeMQ (1) - 버전 정보  (0) 2018.02.24


2.  Clustering


우선 클러스터링에 대해 정의하면, 여러 대의 서버가 하나의 서버가 처리하는 것처럼 병렬 처리 된 서버들 간의 확립된 연결(Establishing Connectivity) 입니다.

더 간단히 요약하면, 군집화 혹은 서버 이중화로 정의하면 되겠습니다.


클러스터링은 장애시스템 조치(fail-over), 로드 밸런싱(부하 분산) 시스템에 병렬 처리 가능한 기술입니다. 








- activeMQ에서는 클러스터링 기술을 적용하기 위해 Broker의 Connector, Persistence, Network of Broker, Master/Slave 특징을 먼저 살펴봐야 합니다.





(1) Connector


- activeMQ에는 Transport Connection과 Network Connection 2가지가 종류가 있다.


 

Transport Connection : 브로커와 클라이언트간 메시지 송수신을 위한 전송 커넥션입니다.

Network Connection: 브로커와 브로커간 서버 클러스터링을 구현하기 위해 필요한 커넥션입니다.








(2) Persistence


- 지속성은 Master Slave 특징과 관련있습니다.

- 브로커 대기열의 데이터의 지속적인 전송을 유지하기 위해 DB 및 file System에 데이터를 저장하고 공유합니다.


( 버전별 라이브러리 사용)

ActiveMQ 5.9 – Replicated LevelDB Store (Apaach Zookeeper)

ActiveMQ 5.8 – LevelDB Store 

ActiveMQ 5.3 – KahaDB

ActiveMQ 4 - JDBC



(3) Network Of Broker


- 아래와 같은 2가지 방법으로 구현할 수 있습니다.



- networkConnector 요소 사용  //정적

String networkConnectorURIs[] = {

"static:(tcp://localhost:61616,tcp://localhost:61617)?maxReconnectDelay=5000&useExponentialBackOff=false" };


String transportConnectorURIs[] = { "tcp://localhost:61602" };


broker3.setBrokerName("Broker3");

broker3.setPersistent(true);

broker3.setUseJmx(true);

broker3.setPersistenceAdapter(persistenceAdapter);

broker3.setTransportConnectorURIs(transportConnectorURIs);

broker3.setNetworkConnectorURIs(networkConnectorURIs);





- Discovery를 사용하여 브로커를 탐지 //동적


List<TransportConnector> transportList = new ArrayList<>();

//

TransportConnector transport = new TransportConnector();

transport.setDiscoveryUri(new URI("multicast://default"));

transport.setUri(new URI("tcp://localhost:0"));

transportList.add(transport);

String networkConnectorURIs[] = {"multicast://default"};

broker3.setBrokerName("Broker3");

broker3.setPersistent(false);

broker3.setUseJmx(false);

broker3.setNetworkConnectorURIs(networkConnectorURIs);

broker3.setTransportConnectors(transportList);




(4)  Master / Slave


MasterSlave : 메시지가 Slave 브로커에 복제되므로 마스터 시스템파일 시스템 또는 데이터 센터의 치명적인 하드웨어 오류가 발생해도 메시지 손실없이 즉시 Slave로 페일 오버가 수행됩니다.

 - pure master/slave

 - shared filesystem master/slave

 - db matser/slave


 pure master/slave 는 5.8 버전 이후로 삭제되었기 때문에, DB 혹은 파일시스템을  공유하는 master/slave를 사용해야 합니다.




+ Recent posts