Netty를 통해 JVM 옵션을 여러가지 설정 할 수 있지만 핵심이라고 생각하는 4가지만 정리하였다.



1) 메모리 leak 검출


java -Dio.netty.leakDetection.level=advanced ...


 DISABLED

  메모리 릭 감지를 비활성화

 SIMPLE

  디폴트 설정이며 버퍼의 1%의 누출이 있는지 나타낸다.

 ADVANCED

  누출된 버퍼의 액세스 위치를 나타낸다.

 PARANOID

  모든 단일 버퍼라는 점을 제외하면 advanced와 동일. 

  leak 검출 시 빌드가 실패 될 수 있다.







2)  DirectBuffer 선호 여부 


java -Dio.netty.noPreferDirect=true ...


noPreferDirect 옵션은 netty는 ByteBufAllocator.buffer(..) 메소드가 호출 될 때, direct buffer는 선호되지 않는다고 개발자에게 알려준다. 그렇다고 해서 . ByteBufAllocator.directBuffer(...)가 직접적으로 directBuffer를 호출할 때 Direct Buffer가 사용되지 않는 것은 아니다. 여전히 사용 중이다.






3) ByteBuf allocator의 풀링 타입


java -Dio.netty.allocator.type=unpooled ...


풀링 여부에는 pooled / unpooled 가 존재한다. 여기서 말하는 풀은 DBCP의 Connection Pool  혹은 Thread Excutor의 Thread Pool과 같은 맥락이다.


즉, ByteBuf의 레퍼런스 카운트와 같은 자원관리를 Pool이 관리할 것이냐 아니면 unpooled 함으로 써 ByteBufAllocator에서 직접 얻어 올 것이냐로 나뉠 수 있다.






4) sun.misc.Unsafe의 사용을 중지하도록 허용


java -Dio.netty.tryUnsafe=false ...


Java 9 이상에서 sun.misc.unsafe 기능이 활성화 됨에 따라 NETTY에서 WARN 및 ERROR를 발생할 만한 요소가 있는데, unsafe 옵션을 비활성화 함으로써 사용을 중지 할 수 있다.


https://github.com/netty/netty/issues/272












Netty Base Http Client example Code





NettyHttpClient


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 
public class NettyClient {
 
    ChannelFuture cf;
    EventLoopGroup group;
    
    public void connect(String host, int port) {
 
       group = new NioEventLoopGroup();
 
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new NettyHttpChannelInit(group));
 
            cf = b.connect(host, port).sync();
//            cf.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
//post Request Encoder : attribute를 지정하고 싶다면, PostRquestEncoder를 통해 요청해야한다.
    public void createRequest(String host, int port, String url) throws Exception {
        
        
            HttpRequest request = null;
            HttpPostRequestEncoder postRequestEncoder = null;
            
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/create"
//                    ,Unpooled.copiedBuffer(url.getBytes(CharsetUtil.UTF_8))
            );
            request.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED);
            request.headers().set(HttpHeaderNames.HOST, host+":"+port);
            request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
//            request.headers().set(HttpHeaderNames.CONTENT_LENGTH, url.length());
            
            postRequestEncoder = new HttpPostRequestEncoder(request, false);
            postRequestEncoder.addBodyAttribute("url", url);
            request=postRequestEncoder.finalizeRequest();
            postRequestEncoder.close();
//            cf.channel().writeAndFlush(request).addListener(ChannelFutureListener.CLOSE);
            cf.channel().writeAndFlush(request);
//            System.out.println(request.toString());
    }
    
    public void close() {
        cf.channel().close();
        group.shutdownGracefully();
    }
    
    
}
 
 

cs



-> 가장 중요한 부분은 Request Format이다.  DefaultFullHttpRequest객체를 통해 http의 header, body를 직접 정의할 수 있다.

체크해야할 부분이 있다면, PostRequestEncoder. Post방식의 경우 요청 파라미터가 Url에 명시되지 않기 때문에, PostRequestEncoder를 통해 지정가능하다.








Channelinitializer 



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 
public class NettyHttpChannelInit extends ChannelInitializer<SocketChannel>{
    
    private boolean ssl = false;
    private EventLoopGroup group;
    
     public NettyHttpChannelInit(EventLoopGroup group) {
         this.group = group;
    }
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // TODO Auto-generated method stub
        
        ChannelPipeline p = sc.pipeline();
        if(ssl) {
            SslContext sslCtx = null;
            try {
                sslCtx = SslContextBuilder.forClient()
                        .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                 p.addLast(sslCtx.newHandler(sc.alloc()));
            } catch (SSLException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }
    
        /**
         *
         * Message Decoder, FastLz...
        */
    
    //    p.addLast(new MessageDecoder());
 
        
        //chunked 된 응답을 집계하는 코덱
        p.addLast("chunked",new HttpObjectAggregator(1048576));
        p.addLast("codec",new HttpClientCodec());
        p.addLast(new NettyHttpHandler(group, sc));
    }
}
cs



-> 채널을 초기화하는 객체에서는 서버에서 데이터를  받기 위해 여러 핸들러를 정의했다. 

샘플 코드에서는 4개의 핸들러를 추가었다.



1) sslCtx.newHandler : SSL,TLS 성립된 서버와의 통신을 위해 추가된 코덱 (https에 해당한다.)


2)HttpObjectAggregator : chunked된 응답을 집계하는 코덱이다.



* Chunked

  - Chunk Response, "덩어리 응답" 전체 페이지를 가공하지 않는다.

  - 서버측에서 html 전부 생성한 후에 클라이언트에게 보내는 것이 아니라 html 덩어리(chunk) 단위로 쪼개서 보낼  있다.

  - 브라우저(클라이언트)에게 전체 컨텐츠 크기가 얼마나 큰지 알려주지 않아도된다는 특징이 있다. 

  - 따라서동적인 크기의 컨텐츠  스트리밍에 적합하고 Chunked transfer encoding 사용해야하며 netty의 경우 httpObjectAggregator 코덱에 해당된다.

 

3) httpClientCodec


송신시에는 HttpClientCodec을 사용해야 하며. HttpResponseEncoder로 대신 사용할 수 있다.

pipeline.addLast("encoder", new HttpClientCodec());

pipeline.addLast("encoder", new HttpResponseEncoder());


수신시에는 HttpServerCodec 사용해야 하며아래와 같이 HttpRequestDecoder대신 사용할 수 있다.

pipeline.addLast("decoder", new HttpServerCodec());

pipeline.addLast("decoder", new HttpRequestDecoder());

 


4) NettyHttpHandler : 사용자 정의 핸들러, SimpleChannelInboundHandler를 상속받은 사용자 정의 객체 입니다. 실제 메시지를 확인하기 위해 만들었다.





NettyClientHandler


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
 
 
public class NettyHttpHandler extends SimpleChannelInboundHandler<HttpObject> {
 
    private EventLoopGroup group;
    private SocketChannel sc;
    private int count=0;
    
 
    public NettyHttpHandler(EventLoopGroup group, SocketChannel sc) {
        this.group = group;
        this.sc =sc;
    }
 
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        super.channelActive(ctx);
//        System.out.println("connect:"+ctx.channel().isActive());
    }
    
 
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        // TODO Auto-generated method stub
//        System.err.println(msg);
        
        if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse) msg;
            
            
            System.err.println("STATUS: " + response.status());
            System.err.println("VERSION: " + response.protocolVersion());
            
 
            if (!response.headers().isEmpty()) {
                for (CharSequence name : response.headers().names()) {
                    for (CharSequence value : response.headers().getAll(name)) {
                        System.err.println("HEADER: " + name + " = " + value);
                    }
                }
                System.err.println();
            }
 
            if (HttpUtil.isTransferEncodingChunked(response)) {
                System.err.println("CHUNKED CONTENT {");
            } else {
                System.err.println("CONTENT {");
            }
        }
        if (msg instanceof HttpContent) {
            count++;
            HttpContent content = (HttpContent) msg;
            System.err.println(count+". create url = "+content.content().toString(CharsetUtil.UTF_8));
            System.err.flush();
 
            if (content instanceof LastHttpContent) {
                System.err.println("} END OF CONTENT");
            }
        }
    }
 
        
 
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
        sc.close();
        group.shutdownGracefully();
    }
cs





Echo Server


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package netty_framework;
 
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
 
import javax.swing.plaf.synth.SynthProgressBarUI;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
public class EchoServer {
 
    // 서버 소켓 포트 번호를 지정합니다.
    //private static final int PORT = 30081;
 
    public static void main(String[] args) {
    /*
        NioEventLoop는 I/O 동작을 다루는 멀티스레드 이벤트 루프입니다.
        네티는 다양한 이벤트 루프를 제공합니다.
        이 예제에서는 두개의 Nio 이벤트 루프를 사용합니다.
        첫번째 'parent' 그룹은 인커밍 커넥션(incomming connection)을 액세스합니다.
        두번째 'child' 그룹은 액세스한 커넥션의 트래픽을 처리합니다.
        만들어진 채널에 매핑하고 스레드를 얼마나 사용할지는 EventLoopGroup 구현에 의존합니다.
        그리고 생성자를 통해서도 구성할 수 있습니다.
    */
        EventLoopGroup parentGroup = new NioEventLoopGroup(1);
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try{
            // 서버 부트스트랩을 만듭니다. 이 클래스는 일종의 헬퍼 클래스입니다.
            // 이 클래스를 사용하면 서버에서 Channel을 직접 세팅 할 수 있습니다.
            
            ServerBootstrap sb = new ServerBootstrap();
        
            sb.group(parentGroup, childGroup)
            // 인커밍 커넥션을 액세스하기 위해 새로운 채널을 객체화 하는 클래스 지정합니다.
            .channel(NioServerSocketChannel.class)
            // 상세한 Channel 구현을 위해 옵션을 지정할 수 있습니다.
            //동시에 수용 가능한 클라이언트 연결 요청 개수를 서버 소켓에 설정 가능한 옵션
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            // 새롭게 액세스된 Channel을 처리합니다.
            
            // ChannelInitializer는 특별한 핸들러로 새로운 Channel의
            // 환경 구성을 도와 주는 것이 목적입니다.
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    ChannelPipeline cp = sc.pipeline();
                    cp.addLast(new EchoServerHandler());
 
                }
            });
            
            //SocketAddress address = new InetSocketAddress("169.254.40.218",10081);
    //        SocketAddress address2 = new InetSocketAddress("172.10.11.20",10081);
            
            // 인커밍 커넥션을 액세스하기 위해 바인드하고 시작합니다.
            ChannelFuture cf = sb.bind(8080).sync();
//            cf = sb.bind(10087).sync();
            
            // 서버 소켓이 닫힐때까지 대기합니다.
            cf.channel().closeFuture().sync();
        //    cf2.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }
        finally{
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
cs






Server Inbound Handler


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 
package netty_framework;
 
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
 
    // 채널을 읽을 때 동작할 코드를 정의 합니다.
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server read : " + msg);
        System.out.println(ctx.channel().localAddress().toString() );
        ctx.write(msg); // 메시지를 그대로 다시 write 합니다.
    }
 
    // 채널 읽는 것을 완료했을 때 동작할 코드를 정의 합니다.
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush(); // 컨텍스트의 내용을 플러쉬합니다.
    };
 
    // 예외가 발생할 때 동작할 코드를 정의 합니다.
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace(); // 쌓여있는 트레이스를 출력합니다.
        ctx.close(); // 컨텍스트를 종료시킵니다.
    }
}
cs











Client


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package netty_framework;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
public class EchoClient {
    // 호스트를 정의합니다. 로컬 루프백 주소를 지정합니다.
    private static final String HOST = "localhost";
    // 접속할 포트를 정의합니다.
    private static final int PORT = 8080;
    // 메시지 사이즈를 결정합니다.
    static final int MESSAGE_SIZE = 256;
 
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
 
        try{
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    ChannelPipeline cp = sc.pipeline();
                    cp.addLast(new EchoClientHandler());
                }
            });
 
            ChannelFuture cf = b.connect(HOST, PORT).sync();
            cf.channel().closeFuture().sync();
        }
        catch(Exception e){
            e.printStackTrace();
        }
        finally{
            group.shutdownGracefully();
        }
    }
}
cs



Client Inbound Handler


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package netty_framework;
 
import java.util.Arrays;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class EchoClientHandler extends ChannelInboundHandlerAdapter{
    private final ByteBuf message;
 
    // 초기화
    public EchoClientHandler(){
        message = Unpooled.buffer(EchoClient.MESSAGE_SIZE);
        // 예제로 사용할 바이트 배열을 만듭니다.
        byte[] str = "abcefg".getBytes();
        // 예제 바이트 배열을 메시지에 씁니다.
        message.writeBytes(str);
        message.writeByte((byte)0x11);
 
    }
 
    // 채널이 활성화 되면 동작할 코드를 정의합니다.
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 메시지를 쓴 후 플러쉬합니다.
        ctx.writeAndFlush(message);
    }
    
    public byte[] getMessageID(byte[] body) {
 
        int readCount = 0;
        // ByteBuf buf = Unpooled.buffer(body.length);
        int bodySize = body.length;
 
        for (int i = 0; i < bodySize; i++) {
            if (body[i] == (byte)0x11) {
                break;
            }
            readCount++;
        }
 
        byte[] messageID = new byte[readCount];
 
        messageID = Arrays.copyOfRange(body, 0, readCount);
 
        return messageID;
 
    }
    
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        // 받은 메시지를 ByteBuf형으로 캐스팅합니다.
        ByteBuf byteBufMessage = (ByteBuf) msg;
        // 읽을 수 있는 바이트의 길이를 가져옵니다.
        int size = byteBufMessage.readableBytes();
 
        // 읽을 수 있는 바이트의 길이만큼 바이트 배열을 초기화합니다.
        byte [] byteMessage = new byte[size];
        // for문을 돌며 가져온 바이트 값을 연결합니다.
        for(int i = 0 ; i < size; i++){
            byteMessage[i] = byteBufMessage.getByte(i);
        }
        byte[] message =getMessageID(byteMessage);
        System.err.println(new String(message));
        // 바이트를 String 형으로 변환합니다.
//        String str = new String(byteMessage);
 
        // 결과를 콘솔에 출력합니다.
//        System.out.println(str);
 
        // 그후 컨텍스트를 종료합니다.
        ctx.close();
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
cs




'Framework > Netty' 카테고리의 다른 글

Nety JVM Option  (0) 2018.12.02
Netty HttpClient Example Code  (0) 2018.08.04
#Nettty Framework - Codec 종류 및 사용  (0) 2018.05.21
#Nettty Framework - ByteBuffer, ByteBuf  (0) 2018.05.09
#Nettty Framework - ChannelFutureListener  (0) 2018.05.09


Codec


앞서 정의 했듯, 코덱은 인코딩과 디코딩 과정을 거치는 알고리즘이다.


일반적으로 동영상 압축 알고리즘인 MPEG로 알고 있다. 인코딩 과정을 통해 동영상 용량을 줄일 수 있고, 마찬가지로 디코딩 과정으로 원본파일로 변환한다. 


MPEG 확장자를 갖는 동영상 뿐 아니라 jpg 확장자를 갖는 파일도 코덱 과정을 거친 것이다.



Netty에서는 inboundHandler와 outBoundHandler가 각각 인코더와 디코더에 해당하고, 데이터 송수신시 데이터와 패킷으로 각각 변환 시킬 수 있다.





기본 정의된 코덱


- base64 Codec 

 : Base64 인코딩 데이터에 대한 코덱   

 : Base64는 8비트 이진데이터를 문자 코드에 영향을 받지 않는 공통 ASCII 영역의 문자로 이루어진 일련의 문자열로 바꾸는 인코딩


- bytes Codec

 : 바이트 배열에 대한 코덱


- compression Codec

 : 송수신 데이터의 압축을 지원하는 코덱 

 : 네티 4.0에서는 zlib, gzip, snappy - 4.1에서는 bzip2, castle, l24, lzf의 압축 알고리즘이 추가


- mashaslling Codec

 : 마살량 혹은 언마샬링은 객체를 네트워크를 통해 송신 가능한 형태로 변환하는 과정


- ProtoBuf Codec

 : 구글의 포로토콜 버퍼를 사용한 데이터를 송수신을 지원하는 코덱


- rtsp Codec

 : 오디오 및 비디오 같은 실시간 데이터의 전달을 위해 만들어진 애플리케이션 레벨의 프로토콜 (real time streaming protocol)


- sctp Codec

 : TCP가 아닌 sctp 전송 계층을 사용하는 코덱

 : 이 코덱을 사용하려면 부트스트랩 채널에 NioSctpChannel 혹은 NioSctpServerChannel을 설정


- http Codec

 : http 프로로콜을 지원하는 코덱


- spdy Codec

 : Spdy는 기존의 http를 보완하는 프로토콜


- HTTP/2 Codec

  : HTTP/2 Protocol을 지원하는 코덱. 구글에서 spdy 프로토콜의 지원을 중단하고 http/2에 대한 공식 지원을 하는 중이다. 

  : 네티 4.1 버전에서 제공되고 있다.


-  String Codec

  : 문자열 송수신을 지원하는 코덱. 주로 Telnet이나 채팅 서버의 프로토콜에 이용


- Serialization 코덱

  : 객체를 네트워크로 직렬화 / 역직렬화를 지원하는 코덱


- MQTT Codec

 : MQTT Protocol을 지원하는 코덱


- HaProxy

 : Load Balance와 Proxy기능 을 제공하는 오픈 솔루션을 지원하는 코덱


- STOMP

 : STOMP Protocol을 지원하는 코덱






Http/2, Http Codec example


- 아주 심플하게 코덱 구성을 해보았다.



public class Http2ClientInit extends ChannelInitializer<SocketChannel> {
private HttpToHttp2ConnectionHandler connectionHandler;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
Http2Connection connection = new DefaultHttp2Connection(false);
httpResponseHandler responseHandler = new httpResponseHandler();
connectionHandler = new HttpToHttp2ConnectionHandlerBuilder().connection(connection).build();
socketChannel.pipeline().addLast("connection",connectionHandler);
//기본 정의 codec
socketChannel.pipeline().addLast("response",new HttpResponseDecoder());
// 내가 정의한 사용자 정의 codec
socketChannel.pipeline().addLast("response", httpResponseHandler);
}
}




=> http2/0 으로 Connection을 구성하고 응답 온 패킷에 대해 http 데이터로 변환 후, 내가 정의한 핸들러에서 데이터를 정의







참고 : 자바 네트워크 소녀 Netty 


 






 



Java ByteBuffer



자바 NIO ByteBuffer는 바이트 데이터를 저장하고 읽는 저장소




내부 배열을 3가지 속성으로 관리한다.

(1)Capacity : 버퍼에 저장하는 최대 크기

(2)Position : 읽기 또는 쓰기가 작업 중인 위치

(3)Limit : 읽고 쓰는 버퍼의 최대 공간


 

생성에는 3가지 특징 : allocate , allocateDirect, wrap

(1)Allocate : jvm heap 영역에 생성.

(2)AllocateDirect : 운영체제의 커널 영역에 생성. ByteBuffer로만 생성할 수 있음.

(3)Wrap : 입력된 바이트 배열을 사용하여 바이트 버퍼를 생성.







Netty ByteBuf



Pure Java에서 Buffer와 그 하위 클래스들을 제공함에도 불구하고 네티에서는 자체적인 바이트 버퍼 API를 사용

 

특징

• 별도의 읽기 인덱스와 쓰기 인덱스

• flip 메서드 없이 읽기 쓰기 가능

• 가변 바이트 버퍼

• 프레임워크 레벨의 바이트 버퍼 풀 제공

• Java ByteBuffer와 호환됨

• 자료형에 따른 클래스를 제공하지 않고 별도의 method로 제공





Byte Buffer Pool


네티의 바이트버퍼의 특징은 바이트버퍼풀을 제공한다는 것이다. 이를 통해 생성된 바이트 버퍼를 재사용 할 수 있다.


•  다음은 풀링을 할 수 있는 버퍼를 생성하는 방법이다.


        PooledByteBufAllocator.DEFAULT.heapBuffer() 

         PooledByteBufAllocator.DEFAULT.directBuffer()


 

• 풀링을 하지 않는 버퍼 생성 방법.

        

        Unpooled.buffer(); 

         Unpooled.directBuffer();

 



바이트 버퍼 풀을 통해 매번 ByteBuf 메모리의 할당과 해제를 하지 않아도 됨.


 






















ChannelFutureListener 


.....


ChannelFutureListener future = ctx.write(msg);


//write가 완료됐을 때 이벤트를 받을 수 있는 리스너 추가

future.addListener(listener);


//io작업이 완료됐는지

future.isDone();


//io작업 대기

future.sync();






네티가 제공하는 기본 ChannelFutureListener 


- ChannelFutureListener.CLOSE;

- ChannelFutureListener.CLOSE_ON_FAILURE;

- ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;



.....


ChannelFutureListener future = ctx.write(msg);



future.addlistener(ChannerFutureListener.CLOSE);







참고자료: https://www.slideshare.net/krisjeong/going-asynchronous-with-netty-soscon-2015









이벤트 핸들러

- 이벤트 처리를 다루는 객체


코덱

- 인코딩과 디코딩의 합성어, 이벤트 핸들러를 상속받아 구현된 구현체



인코더: 전송할 데이터를 전송 프로토콜에 맞춰 변환, ChannelOutboundHandler 인터페이스 상속

디코더: 수신한 데이터를 전송 프로토콜에 맞춰 변환, ChannelInboundHandler 인터페이스 상속



네티에는 수많은 코덱이 있다.


ex)loggingHandler, Mqtt, Base64, Https 등등..








파이프라인 

- 발생된 이벤트가 이동하는 통로






- 다음은 서버소켓으로부터 이벤트들이 채널로 들어온 상황을 가정한 그림이다.

클라이언트 부트스트랩에 등록한 채널에 이벤트가 들어온다면, 가장 먼저 등록한 핸들러로부터 이벤트를 처리할 것이다.







 

 






Netty Framework는 Multi Thread를 지원하고 Event loop 기반 위에서 수행됩니다. 









특징

- 네티의 이벤트는 채널에서 발생

- 채널에 event handler를 여러개 추가할 수 있다.

- multi core CPU 효율적으로 사용

- 이벤트 루프 객체는 이벤트 큐를 가지고 이벤트가 큐에 쌓임

- 네티의 채널은 하나의 이벤트 루프에 등록됨, 다중 이벤트루프 또한 지원

- 네티의 이벤트는 발생순서와 처리순서가 항상 일치한다.

- 이벤트 처리를 위해 SingleThreadEventExecutor 사용







=> 간단하게 요약하면, 클라이언트와 서버간의 입출력 이벤트는 채널에서 read/write 이벤트 루프객체 전달을 통해 처리된다,   






  


[아래 그림은 클라이언트에서 write 이벤트를 발생시키는 처리 과정이다.]





<Process>


1) 기본적으로 여러개의 쓰레드는 channelcontext 에 write 할 수 있다.


2) InboundHandlerAdapter는 채널파이프라인으로 전달된다. (파이프라인은 다음장에서.)



3) 채널에서 발생된 이벤트는 이벤트루프그룹 큐에 쌓이고 서버와 연결된 socket의 outputstream을 통해 write한다.












참고자료 : https://www.slideshare.net/krisjeong/going-asynchronous-with-netty-soscon-2015

+ Recent posts