아래 클라이언트와 netty 서버간 connection 유지가 안되는 문제 관련

233 views
Skip to first unread message

류중욱

unread,
Dec 18, 2023, 2:46:37 AM12/18/23
to Netty Korean User Group

안녕하세요.
아래 클라이언트와 netty 서버간 connection 유지가 안되는 문제 관련해서 질문을 했었는데요.
몇 분께서 좋은 답변을 해주셨고 이것 저것 해보았지만 아직 해결이 안된 상태입니다.
그래서 좀 더 질문을 드려볼려고 합니다.


1) NTRIP이라는 위치 관련 위성의 정보를 받아서 파싱 및 가공하여 캐쉬(redis)에 저장하는 부분이 있고
2) 이 정보를 주기적으로(현재는 1초단위) 읽어서 접속한 클라이언트들에게 전송하는 서버(이하 Caster)를 만들고 있습니다.
3) 이 서버에 접속하는 클라이언트들을 에뮬레이트하는 에뮬레이터(이하 client)를 만들고 있습니다.

이 client에서 데이터를 한번 받으면
두번째 받을 때는
int r = is.read(bytesRead);
에서 r이 -1이 되면서 연결이 끊긴다는 것을 앞의 대화에 썼었는데요...


이 Caster는 Spring boot에서 netty를 사용하여 클라이언트 접속을 받도록 구현하였구요.
channel pipeline을 아래와 같이 하였습니다.

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

private final MessageHandler handler;
private final ResDataEncoder encorder;

@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();

ReqDataDecoder decoder = new ReqDataDecoder();

pipeline.addLast(decoder);
pipeline.addLast(encorder);
pipeline.addLast(handler);        
}
}


제가 이 handler 관계를 아직도 정확히(?) 모르겠네요.

일단 client가 접속을 하면
ReqDataDecoder의 decode 함수에서 받은 데이터를 파싱하여 
요청 message를 만들어서 아래와 같이 넘깁니다.
    out.add(message);

그럼 그 다음에는 MessageToMessageDecoder를 상속받은 MessageHandler의 decode 함수에서 요청 메시지를 넘겨받아
아래와 같이 로그인 여부에 따라 clientInfo를 설정하고

@Override
    protected void decode(ChannelHandlerContext ctx, RequestMessage msg, List<Object> out) throws Exception {
        this.requestMessage = (RequestMessage) msg;
       
        // 요청 메시지로 부터 로긴 메시지 생성  
        LoginMessage loginMessage = new LoginMessage(requestMessage.getContent(), configProperties.getServerInfo(), configProperties.getSourceTablePath(),
            mountPointConfig, ctx);        
        // 로그인이 아니거나 실패한 경우
        if (loginMessage.isLogin() == false) {  
            // 로그인 메시지의 데이터를 전송
            ctx.writeAndFlush(loginMessage);    
            ctx.close();                        // 접속 종료
        }
        else    // 로그인한 경우
        {
            loginMessage.getClientInfo().setRemoteAddress(requestMessage.getRemoteAddress());
            this.clientInfo = loginMessage.getClientInfo();            
        }        
    }



MessageHandler의 channelActive에서 eventLoop의 scheduleAtFixedRate를 이용하여
client들에게 메시지를 만들어서 주기적으로 데이터를 보내도록 하고 있습니다.
(앞의 대화에서 조언을 받아 구조를 수정하였습니다.)

@Override
    public void channelActive(ChannelHandlerContext ctx) {
        //log.info("channelActive ctx = " + ctx);      
        // 접속중인 client channel
        dBManager.getChannelGroup().add(ctx.channel());

        ScheduledFuture<?> future = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable(){
            @Override
            public void run()
            {
                try {

                    if(clientInfo != null && ctx.channel().isActive()){
                        // 최신 RTCM 데이터를 캐쉬에서 가져오기
                        RtcmMessage rtcmMessage = new RtcmMessage(clientInfo, dBManager);
                        log.info("channelActive rtcm size = " + rtcmMessage.getSize());
                        if(rtcmMessage.getSize() > 0){
                            ctx.writeAndFlush(rtcmMessage);  
                        }else{
                            // 이런 경우 encode가 호출되지 않으므로 여기서 content release
                            rtcmMessage.getContent().release();
                        }
                    }

                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }

            }
        }, 1, 1, TimeUnit.SECONDS);
        userTaskFutures.add(future);
    }



위에서 ctx.writeAndFlush(rtcmMessage)를 호출하면
ResDataEncoder의 encode함수에서
넘겨받은 ByteBuf out 메시지를
    msg.encode(out);
으로 실제 client에게 전송하는 것으로 이해를 했는데요.


한번 접속을 해서 데이터를 받고 연결을 끊는 구조에서는 문제 없이 돌아가는
다른 서버 소스를 재사용해서 만든 것인데요.

혹시나 이 구조가 주기적으로 데이터를 전송해야 하는 구조에서는
잘못된 것은 아닌가 생각이 들어서요.
이것을
decoder => handler => encode의 pipeline에서

handler나 다른 pipeline에서 ctx.close()를 하지않고
handler에서 1초마다 메시지를 보내면 연결이 안끊기고 client가 계속 받을 수 있는 것이 맞을까요?

int r = is.read(bytesRead);
에서 r이 -1이 되는 것은
스트림이 파일의 끝에 있기 때문에 사용할 수 있는 바이트가 없으면 값 -1이 반환된다고 하던데요.

EOF를 따로 보내지는 않으니 여기서 한번 보내고 스트림의 연결이 끊기는 것 같은데요...

기존에 제가 정확히 알지도 못하면서 그냥 이렇게 저렇게 해서 되고 
그렇게 넘어가고 그런 소스라서...

많은 조언 부탁드립니다.
미리 감사드립니다.


권석훈

unread,
Dec 18, 2023, 4:14:13 AM12/18/23
to nett...@googlegroups.com
일단  MessageHandler 는  MessageToMessageDecoder 같은  Decoder가 아닌 SimpleChannelInboundHandler같은 클래스를 상속받아 구현해야 합니다.
아래 메소드에서 decode된 객체를 비즈니스 처리하는 로직을 구현하면 되요.

protected void channelRead0(ChannelHandlerContext ctx, Object packet) throws Exception

2023년 12월 18일 (월) 오후 4:46, 류중욱 <ncr...@gmail.com>님이 작성:
--
이 메일은 Google 그룹스 'Netty Korean User Group' 그룹에 가입한 분들에게 전송되는 메시지입니다.
이 그룹에서 탈퇴하고 더 이상 이메일을 받지 않으려면 netty-ko+u...@googlegroups.com에 이메일을 보내세요.
웹에서 이 토론을 보려면 https://groups.google.com/d/msgid/netty-ko/29fc0119-5b65-4437-9a02-45485f6b3482n%40googlegroups.com을(를) 방문하세요.

류중욱

unread,
Dec 21, 2023, 4:43:28 AM12/21/23
to Netty Korean User Group
말씀하신대로  MessageHandler를  MessageToMessageDecoder 같은  Decoder가 아닌
SimpleChannelInboundHandler를 상속받아 구현해 보았는데 똑같은 현상이 발생합니다.


@Component
@Slf4j
public class SimpleMessageHandler extends SimpleChannelInboundHandler<Object> {
 
    private final ConfigProperties configProperties;
    private final MountPointConfig mountPointConfig;
    private final DBManager dBManager;

    private RequestMessage requestMessage;
    private ClientInfo clientInfo;
    private final CopyOnWriteArrayList<ScheduledFuture<?>> userTaskFutures = new CopyOnWriteArrayList<>();

    public SimpleMessageHandler(DBManager dBManager, ConfigProperties configProperties, MountPointConfig mountPointConfig){
        this.dBManager = dBManager;
        this.configProperties = configProperties;
        this.mountPointConfig = mountPointConfig;
    }

   
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       
        dBManager.getChannelGroup().add(ctx.channel());

        ScheduledFuture<?> future = ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable(){
            @Override
            public void run()
            {
                try {

                    if(clientInfo != null && ctx.channel().isActive()){
                        // 최신 RTCM 데이터를 가져오기
                        RtcmMessage rtcmMessage = new RtcmMessage(clientInfo, dBManager);
                        log.info("channelActive rtcm size = " + rtcmMessage.getSize());
                        if(rtcmMessage.getSize() > 0){
                            ctx.writeAndFlush(rtcmMessage);  
                        }else{
                            // 이런 경우 encode가 호출되지 않으므로 여기서 content release
                            rtcmMessage.getContent().release();
                        }
                    }

                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }

            }
        }, 1, 1, TimeUnit.SECONDS);
        userTaskFutures.add(future);

        super.channelActive(ctx);

    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
       
        this.requestMessage = (RequestMessage) obj;
        log.info(" [request] = " + requestMessage.toString());
       
        // 요청 메시지로부터 로긴 메시지 생성   // Todo  requestMessage의 Type이 REQ_SOURCE인 경우 처리?
        LoginMessage loginMessage = new LoginMessage(requestMessage.getContent(), configProperties.getServerInfo(), configProperties.getSourceTablePath(),
            mountPointConfig, ctx);
       
        // 로그인이 아니거나 실패한 경우
        if (loginMessage.isLogin() == false) {  
            // 로그인 메시지의 데이터를 전송
            ctx.writeAndFlush(loginMessage);    
            ctx.close();                        // 접속 종료
        }
        else    // 로그인한 경우
        {
            loginMessage.getClientInfo().setRemoteAddress(requestMessage.getRemoteAddress());

            this.clientInfo = loginMessage.getClientInfo();            
        }        
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
        ctx.close();
    }


}


로그인을 한 경우 접속이 유지되면서 channelActive의 eventLoop에서 1초 주기로 
데이터를 보내는 것인데...

log.info("channelActive rtcm size = " + rtcmMessage.getSize());

이게  아래처럼 Caster 쪽에는 이 로그가 1초간격으로 계속 찍히기는 하거든요.

14:20:24.030 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2673
14:20:25.042 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2673
14:20:26.003 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2673
14:20:26.997 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2797
14:20:28.021 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2673
14:20:28.977 [nioEventLoopGroup-5-1] INFO  c.g.n.n.handler.SimpleMessageHandler - channelActive rtcm size = 2673


근데 클라이언트 쪽에서 읽은 바이트 수가 -1이 되면서 루프를 벗어나는데
아직도 갈피를 못잡겠네요.

클라이언트 쪽에는 아래처럼  for 루프 안에서 HttpURLConnection의 getInputStream으로 얻은 is를 인자로 fillup을 호출해서 데이터를 읽어서 0보다 크면 계속 루프를 돌거든요. 

private void decodeData(InputStream is) throws IOException{
        for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
            log.info(mountPoint + " Received data : {} B", r);
            // we have read something, reset reconnection attempts counters        
            nbAttempts = 0;
            delay      = reconnectDelay;

            if (stop.get()) {
                // stop monitoring immediately
                // (returning closes the input stream automatically)
                return;
            }          
           

            // RTCM 메시지 하나당 header(3) + Message(최대 1023) + CRC(3)
            int bufferSize = 0;
            while (bufferSize() >= 3) {
                if (peekByte(0) != PREAMBLE) {      //header 3bytes 중 8bit는 PREAMBLE, 6bit는 reserved  
                    // we are out of synch with respect to frame structure
                    // drop the unknown byte
                    moveRead(1);
                    log.info(mountPoint + " PREAMBLE 불일치 => 1byte 이동");
                } else {
                    final int messageSize = (peekByte(1) & 0x03) << 8 | peekByte(2);   //header 3bytes 중 뒤쪽의 10bit는 메시지의 사이즈 = 최대 1023
                    bufferSize = bufferSize();
                    if (bufferSize >= PREAMBLE_SIZE + messageSize + CRC_SIZE) {
                        // check CRC
                        final int crc = (peekByte(PREAMBLE_SIZE + messageSize)     << 16) |
                                        (peekByte(PREAMBLE_SIZE + messageSize + 1) <<  8) |
                                            peekByte(PREAMBLE_SIZE + messageSize + 2);
                        //log.info("\n\n\n size = {} crc = {}", size, crc);
                        if (crc == computeCRC(PREAMBLE_SIZE + messageSize)) {
                            // we have a complete and consistent frame
                            // we can extract the message it contains
                            messageEndIndex = (readIndex + PREAMBLE_SIZE + messageSize) % BUFFER_SIZE;
                            //log.info("readIndex : {}, messageEndIndex : {}", readIndex, messageEndIndex);
                            moveRead(PREAMBLE_SIZE);
                            start();
                           
                            // get the message number as a String
                            int messageNum = (int) extractBits(12);
                            extractBits(4);
                            moveRead(messageSize - 2);
                           
                            // jump to expected message end, in case the message was corrupted
                            // and parsing did not reach message end
                            readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;

                            log.info(mountPoint + " RTCM {} : {} B, remaining = {} B", messageNum, messageSize, (bufferSize-messageSize-6));                                                        
                        } else {
                            // CRC is not consistent, we are probably not really synched
                            // and the preamble byte was just a random byte
                            // we drop this single byte and continue looking for sync
                            moveRead(1);
                            log.info(mountPoint + " CRC 불일치 => 1byte 이동");
                        }
                    } else {
                        // the frame is not complete, we need more data
                        break;
                    }
                }
            }

        }
    }


fillup은 읽은 bytes를 buffer에 데이터를 넣는 것이고

private int fillUp(final InputStream is) throws IOException {
        log.info(mountPoint + " fillUp 0");
        final int max = bufferMaxWrite();
        log.info(mountPoint + " fillUp max = {}", max);
        if (max == 0) {
            // this should never happen
            // the buffer is large enough for almost 16 encoded messages, including wrapping frame
            throw new OrekitInternalError(null);
        }
       
        byte[] bytesRead = new byte[max];
       
        int r = is.read(bytesRead);
        //while (r < 0) {
        //    r = is.read(bytesRead);
        //}
        log.info(mountPoint + " fillUp bytesRead r = {}", r);

        if (r >= 0) {
            System.arraycopy(bytesRead, 0, buffer, writeIndex, r);
            writeIndex = (writeIndex + r) % BUFFER_SIZE;
        }

        log.info(mountPoint + " fillUp return");
        return r;
    }


이 클라이언트를 해외 Ntrip Caster에 연결하면 r이 -1이 되는 문제없이 잘 읽어서
제가 만든 Caster가 문제가 있는 거라고 생각을 했는데
사실 이런  HttpURLConnection의 getInputStream으로 얻은 is를 가지고 루프를 도는것도 맞는 구조인지 모르겠습니다.
네트웍 쪽은 제가 얕아서...

어쨌든 계속적인 조언 감사드립니다. 

2023년 12월 18일 월요일 오후 6시 14분 13초 UTC+9에 필그림님이 작성:

권석훈

unread,
Dec 25, 2023, 1:28:28 AM12/25/23
to nett...@googlegroups.com
네티를 이용하여 일반적인 구조로 개발하였는가에 대해서 조언을 드리고 있어요. ( 물론 일반적이지 않은 형태로도 구현이 가능하겠지만... )
아직 몇가지가 눈에 띄는데요.. 

1. channel을 통해 쓰고 읽는 객체가 몇가지가 있는데 모두 decoder, encoder에서 처리가능한 클래스인가요?  
RtcmMessage
LoginMessage
RequestMessage

2. 서버는 netty  decoder,encoder 를 사용해서 구현했는데.. 클라이언트도 서버처럼 netty 의 decoder, encoder 를 사용하여 구현하지 않은 이유는 무엇인가요?
유입된 패킷을 처리할 수 있는 decoder를 구현하였다고 클라이언트에서도 동일한 구조로 처리가 가능해 보입니다.
(물론 클라이언트를 netty로 구현할 필요는 없지만서두..) 





2023년 12월 21일 (목) 오후 6:43, 류중욱 <ncr...@gmail.com>님이 작성:

류중욱

unread,
Dec 26, 2023, 1:09:12 AM12/26/23
to nett...@googlegroups.com

1. channel을 통해 쓰고 읽는 객체가 몇가지가 있는데 모두 decoder, encoder에서 처리가능한 클래스인가요?  
RtcmMessage
LoginMessage
RequestMessage

=> 
네. 제가 생각하기에는요. 
RtcmMessage, LoginMessage, RequestMessage  모두 ByteBuf content를 멤버변수로 가지고 있는 CommonMessage를 상속해서 구현하였습니다.
이 content라는 멤버변수에 클라이언트에 보낼 데이터를 아래처럼 byte로 wirte 하고 있고요.

    public RtcmMessage(ClientInfo clientInfo, DBManager dbManager){
        this.clientInfo = clientInfo;
        this.dbManager = dbManager;
        this.size = 0;
       
        // 인코딩된 한 epoch의 RTCM 데이터를 가져온다.
        ByteBuf buff = dbManager.getRtcmBundleData(clientInfo);

        if(buff != null){
            // 헤더 부분
            writeHttpHeader(200, "OK");
            writeLine("ICY 200 OK");
            writeLine("Ntrip-Version: Ntrip/2.0");
            writeLine("Cache-Control: no-store, no-cache, max-age=0");
            writeLine("Pragma: no-cache");
            //writeLine("Connection: close");
            writeLine("Connection: keep-alive");
            writeLine("Keep-Alive: timeout=30, max=100");            
            writeLine("Content-Type: gnss/data");            
           
            writeLine(String.format("Content-Length: %d\r\n", buff.readableBytes()));

            this.size = buff.readableBytes();

            // 바디 부분 - RTCM 메시지 데이터
            content.writeBytes(buff);
            buff.release();
        }
    }

writeHttpHeader, writeLine 함수도 결국 content에 데이터를 wirte하는 것이고요.

    protected void writeHttpHeader(int error, String msg)
    {
        writeLine(String.format("HTTP/1.1 %d %s", error, msg));
        writeLine(String.format("Server: Geosoft NtripCaster %s/%s", version, ntripVersion));
    }

    protected void writeLine(String msg)
    {
        content.writeCharSequence(msg + "\r\n", CharsetUtil.UTF_8);
    }



그래서 ResDataEncoder에서는 RtcmMessage를 CommonMessage로 받아서 아래와 같이 msg.encode(out)을 호출하면

    @Override
    protected void encode(ChannelHandlerContext ctx, CommonMessage msg, ByteBuf out) throws Exception {
       
        // CommonMessage의 encode를 호출하여 메시지를 bytes로 변환
        msg.encode(out);

        //log.info(" [encoded] = " + out);
    }

CommonMessage의 encode에서는 그냥 out에 자신의 ByteBuf content를 write 하여 데이터를 client에게 보냅니다.

// 송신 데이터 byte encoding
    public void encode(ByteBuf out){
       
        // out에 content 데이터 write
        out.writeBytes(this.content);

        // eoncoding 후 content ByteBuf는 release
        this.content.release();
    }

혹시 이 구조가 잘못된 부분이 있을까요?
기존의 다른 서버에서도 이 구조를 그대로 쓰고 있는데 다른거라면 거기는 한번 접속해서 한번 데이터를 주고 끝난다는 것이구요.
여기서는 연결 유지하고 데이터를 1초단위로 계속 보내야 한다는 거거든요.



2. 서버는 netty  decoder,encoder 를 사용해서 구현했는데.. 클라이언트도 서버처럼 netty 의 decoder, encoder 를 사용하여 구현하지 않은 이유는 무엇인가요?
유입된 패킷을 처리할 수 있는 decoder를 구현하였다고 클라이언트에서도 동일한 구조로 처리가 가능해 보입니다.
(물론 클라이언트를 netty로 구현할 필요는 없지만서두..) 

=>
원래 NTRIP과 RTCM이라는 위치 관련(위성) 데이터 송수신 규격이 있구요.
그것을 제가 spring boot로 구현을 하다 보니까 기존의 서버 구조(netty)를 가져다가 Caster를 구현을 한 것이구요.
NTRIP Client라는 거는 다른 오픈소스를 가져다가 수정해서 구현하다가 보니까 그렇게 되었습니다.

Keep alive 관련 설정 부분이 문제인가 해서 아래 부분도 이렇게 했다가 저렇게 했다가 하면서 수정해 보았는데 안되더라구요.

@Bean(name = "serverBootstrap")
    public ServerBootstrap bootstrap(ServerChannelInitializer serverChannelInitializer) {
        ServerBootstrap sbs = new ServerBootstrap();
        sbs.group(bossGroup(), workerGroup())
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.DEBUG))
        .option(ChannelOption.SO_BACKLOG, nettyProperties.getBacklog())
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_LINGER, 0)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.SO_REUSEADDR, true)
        .childHandler(serverChannelInitializer);
   
        return sbs;
    }


아직 해결은 안되었지만 계속된 관심 감사드립니다.
즐거운 연말 되시고 새해 복 많이 받으세요. ^^


2023년 12월 25일 (월) 오후 3:28, 권석훈 <lloy...@gmail.com>님이 작성:
이 메일은 Google 그룹스 'Netty Korean User Group' 그룹의 주제에 가입한 분들에게 전송되는 메시지입니다.
이 주제에서 탈퇴하려면 https://groups.google.com/d/topic/netty-ko/J7WUrIOPRBY/unsubscribe을(를) 방문하세요.
이 그룹 및 그룹의 모든 주제에서 탈퇴하려면 netty-ko+u...@googlegroups.com에 이메일을 보내세요.
웹에서 이 토론을 보려면 https://groups.google.com/d/msgid/netty-ko/CAHGh1N5iVdEy2cewjhOMXH-26YnBB2vgFcp_j%3DYdgGaTJn2ieg%40mail.gmail.com을(를) 방문하세요.

권석훈

unread,
Dec 26, 2023, 3:02:32 AM12/26/23
to nett...@googlegroups.com
일단 의심되는 것은 
encoder에서 ctx.flush() 를 맨 마지막에 추가를 해 보시겠어요?
@Override
    protected void encode(ChannelHandlerContext ctx, CommonMessage msg, ByteBuf out) throws Exception {
       
        // CommonMessage의 encode를 호출하여 메시지를 bytes로 변환
        msg.encode(out);
                  ctx.flush();    // 추가

        //log.info(" [encoded] = " + out);
    }

주어진 코드를 봐서는 딱히 눈에 띄는 것들은 더 이상 보이지 않네요.
( 한계가 있는 거는 어쩔 수 없네요..) 



2023년 12월 26일 (화) 오후 3:09, 류중욱 <ncr...@gmail.com>님이 작성:

류중욱

unread,
Dec 27, 2023, 1:58:52 AM12/27/23
to nett...@googlegroups.com

드디어 해결의 기미가 보입니다.
왜 이 생각을 진작에 못했었는지...

주기적으로 메시지를 보낼때 앞에서도 말씀드렸던 RtcmMessage라는 클래스에서 아래처럼
ByteBuf content라는 멤버변수에 HTTP 헤더부분과 바디부분인 RTCM 메시지 데이터를 write하여 encoder에서 매 주기마다 보냈는데요.
특히 Content-Length라는 헤더를 같이 보냈는데 그것이 연결을 끊어버리는 결과가 되었나 봅니다.

    public RtcmMessage(ClientInfo clientInfo, DBManager dbManager){
        this.clientInfo = clientInfo;
        this.dbManager = dbManager;
        this.size = 0;
       
        // 인코딩된 한 epoch의 RTCM 데이터를 가져온다.
        ByteBuf buff = dbManager.getRtcmBundleData(clientInfo);

        if(buff != null){
            // 헤더 부분
            writeHttpHeader(200, "OK");
            writeLine("ICY 200 OK");
            writeLine("Ntrip-Version: Ntrip/2.0");
            writeLine("Cache-Control: no-store, no-cache, max-age=0");
            writeLine("Pragma: no-cache");
            //writeLine("Connection: close");
            writeLine("Connection: keep-alive");
            writeLine("Keep-Alive: timeout=30, max=100");            
            writeLine("Content-Type: gnss/data");            
           
            writeLine(String.format("Content-Length: %d\r\n", buff.readableBytes()));

            this.size = buff.readableBytes();

            // 바디 부분 - RTCM 메시지 데이터
            content.writeBytes(buff);
            buff.release();
        }
    }

위와 다르게 접속후 Content-Length를 포함하지 않는 HTTP 헤더 부분을 한번만 보내고
주기적으로는 바디부분인 RTCM 메시지 데이터만 보내니까 연결이 끊기지 않고 계속 받아지더라구요 ㅡㅡ;

위의 RtcmMessage도 NTRIP의 RTCM 메시지 구조를 검색해서 만든것인데 버전이 다른지, 아니면 뭔가 번역이 미흡했는지 문제가 있었네요.

어쨌든 그동안 알려주신 내용들 감사드리구요.
저에게는 netty의 여러가지에 대해서 이것 저것 시도해보는 시간이었네요.



2023년 12월 26일 (화) 오후 5:02, 권석훈 <lloy...@gmail.com>님이 작성:

권석훈

unread,
Dec 27, 2023, 2:18:12 AM12/27/23
to nett...@googlegroups.com
처리되어 다행입니다. 

2023년 12월 27일 (수) 15:58, 류중욱 <ncr...@gmail.com>님이 작성:
Reply all
Reply to author
Forward
0 new messages