동일한 채널로 동일한 패킷이 무한대로 유입되는 현상에 대해 경험하신분 계신가요?

92 views
Skip to first unread message

총상

unread,
Jul 14, 2021, 8:39:47 AM7/14/21
to Netty Korean User Group
netty 4.1.65final + kotlin 1.5 + spring boot 2.5 이렇게 프로젝트 셋팅하고 tcp 전문 통신 개발중에 격은 현상입니다.

{"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"} {"log_level":"INFO","time":"2021-07-14T03:22:31.376Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"} {"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"} {"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","msg":"@@ duplicate packet. channel close"} {"log_level":"INFO","time":"2021-07-14T03:22:31.388Z","log_type":"TCP_LOG","data_type":"PACKET","byte_array":"AAAA0024450560000000455400002026403695A46000FFFFCBAC","remote_ip":"/10.240.2.129:37508","channel_id":"a437de98"} {"log_level":"INFO","time":"2021-07-14T03:22:31.393Z","log_type":"TCP_LOG","transfer_type":"SEND","data_type":"HEX","byte_array":"aaaa000c458560000000115a"}


위와 같이 동일한 패킷이 계속 유입이 되어 제가만든 서버에서도 계속 응답을 보내고 있는 현상이 발견되었는데 해당 패킷이 진짜 외부에서 유입이 되는건지 네티 자체적으로 무한루프가 도는건지 확인할 방법을 찾지 못하고 있내요.

디버깅하면서 발견한건 로그보면 37508 포트가 열려있다고 나와서 app이 배포된 pod 에 들어가서 netstat -an 으로 필터링을 해봐도 37508 포트는 안나와 있습니다.
정상적으로 tcp 커넥션이 연결된 포트는 검색이 되구요.

상황적으로 봤을때 네티 내부에서 패킷이 무한루프 도는것 같긴한데 확실치가 않아서 도움이 필요합니다.

서버 운영환경이 공인IP -> 내부 proxy server --> kubernetes ingress --> pod 이렇게 구성되어있습니다.

혹시 이와 비슷한 현상을 겪으신분이 계신가요??

총상

unread,
Jul 14, 2021, 9:56:01 PM7/14/21
to Netty Korean User Group

몇일 디버깅하면서 발견한것도 공유드립니다.

동일한 패킷이 들어오는 현상에 대해서 동일한 패킷이 들어오는 경우 netty의 채널을 강제로 닫도록 했는데도 불구하고 해당 채널이 닫히지 않는 현상이 이었습니다.

채널을 닫을때는 
channel.flush()
channel.close()

이렇게 닫도록 했는데 채널이 닫히지 않고 해당 채널로 계속 트래픽이 유입되는것은 외부에서 들어오는게 아니라 내부에서 loop이 돌고 있는걸까요?
2021년 7월 14일 수요일 오후 9시 39분 47초 UTC+9에 총상님이 작성:

Joo Sing

unread,
Jul 15, 2021, 4:23:57 AM7/15/21
to Netty Korean User Group
안녕하세요. 신기한 현상 같아서 지나가다 관심을 가지고 읽어보았어요. 

첨부해주신 메시지의 time 필드 값들이 바뀌어서 수신되는 걸로봐서는 Netty 에서 루프를 돌고 있다고 보기는 힘들지 않을까요? 
Netty 프레임워크는 메시지 구조를 모르기 때문에 정확하게 저 필드 값을 바꾸어 주면서까지 루프를 돌 수는 없을 것 같아요. 

"time":"2021-07-14T03:22:31.376Z"
"time":"2021-07-14T03:22:31.393Z"

netstat -an 해도 안나오는건 gracefully 하게 close handshake가 끝나서 표시되지 않는 것일 수도 있을 것 같아요. 
그리고 역시 첨부해주신 메시지를 보면 ("remote_ip":"/10.240.2.129:37508") 37508은 서버쪽 포트가 아니라 원격지 포트 같은데 원격지 포트쪽도 필터링 해보신거죠? 
정확하게 알려면, 적절한 위치에서 Wireshark 에서 캡처하면서 필터링을 해보면 알 수 있을 것 같습니다. 
( ip.addr == 10.240.2.129 and tcp.port == 37508 )

근본적인 문제를 해결하려면 전체적인 상황을 알아야 할 것 같긴 한데. 회사에서 코드리뷰를 하면서 비슷한 트러블 슈팅을 한 경험이 있는데요.

파이프라인을 구성하고 있는 수신 핸들러에서 ByteBuf 를 readBytes() 등의 메쏘드로 읽어 readerIndex를 증가시켜 주지 않고(버퍼를 비워주지 않고), toString() 같은 메쏘드(readerIndex를 바꿔주지 않아 버퍼에 그대로 있습니다)를 사용해서 해당 데이터를 처리해서 계속 데이터가 들어온적이 있어요. 비슷한 상황이신지는 모르겠으나 혹시 도움이 될까해서 남겨봅니다. 

꼭 해결하시기를!!

2021년 7월 15일 목요일 오전 10시 56분 1초 UTC+9에 osuj...@gmail.com님이 작성:

총상

unread,
Jul 15, 2021, 4:42:08 AM7/15/21
to Netty Korean User Group
의견 주셔서 감사합니다~

계속 디버깅하면서 많이 좁혀지는것 같긴하내요.

우선 의견 주신것에대해 답을 드리면 
정상적으로 연결된 경우에는 netstat -an 통해서 로그에 찍힌 포트가 established 상태로 잘 나오더라고요. 그래서 포트번호 자체가 검색 안되는게 의심 스러웠습니다. 그리고 무한루프 현상이 발생한 서버보면 close_wait 가 계속 생성 되는 현상도 발견이 되내요.

로그에 time 필드는 logback 통해서 출력해주는거라 계속 값이 바뀌고 있습니다.

지금 소스에서 ReplayingDecoder 통해서 패킷 수신을 처리하는데 

val bodyPacket = buf.readBytes(length) 이부분에서 length 부분이 문제가 생겨서 루핑이 도는게 아닌가 싶어서 관련코드 테스트 중에 있습니다.

의견주신것중에 "readerIndex를 증가시켜 주지 않고 " 이부분때문에 비슷한 현상을 격으신것 같다고 하시니 이부분이 문제가 아닌가 하는 가능성이 더 높아져 보이내요 ㅎㅎ

귀중한 답변 감사드립니다.

@ExperimentalUnsignedTypes
class PacketReceiverHandler : ReplayingDecoder<PacketDecoderState>(PacketDecoderState.READ_HEADER) {
private var length: Int = 0
override fun decode(ctx: ChannelHandlerContext, buf: ByteBuf, out: MutableList<Any>) {
when (state()) {
PacketDecoderState.READ_HEADER -> {
println("READ HEAD :: ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name}")
val headerPacket = buf.readBytes(HeaderSize.TOTAL_HEAD_SIZE)

val startCode = ByteBufUtil.getBytes(headerPacket.readBytes(HeaderSize.START_CODE.length))
startCode.takeIf { !it.contentEquals(START_CODE) }?.let {
ctx.channel().close()
val array = ByteArray(buf.readableBytes())
buf.getBytes(buf.readerIndex(), array)
throw TcpPacketValidationCheckException(TcpError(errorCode = ErrorCode.P005, byteArray = array))
}
length = headerPacket.readBytes(HeaderSize.PACKET_LENGTH.length).getUnsignedShort(0)
checkpoint(PacketDecoderState.READ_WHOLE)

// 헤더에 length 필드는 패킷 전체 사이즈 나타내기 때문에 read point를 reset 하여 전체 패킷이 다음 핸들러로 전송될수 있도록 한다.
buf.resetReaderIndex()
}

PacketDecoderState.READ_WHOLE -> {
println("READ_WHOLE :: $length | ${ctx.channel().remoteAddress()} | ${ctx.channel().id()} | $this | ${Thread.currentThread().name} ")

val bodyPacket = buf.readBytes(length)
checkpoint(PacketDecoderState.READ_HEADER)
out.add(bodyPacket)
}
else -> throw ProtocolsException(AppError(ErrorCode.P000))
}
}

}



2021년 7월 15일 목요일 오후 5시 23분 57초 UTC+9에 joosi...@gmail.com님이 작성:

Joo Sing

unread,
Jul 15, 2021, 7:02:10 AM7/15/21
to nett...@googlegroups.com
저도 자세히 설명해 주셔서 조금더 이해가 되었네요. 감사합니다. 덕분에  Kotlin 코드도 처음 봤네요...

해결하시면 어떻게 해결하셨는지 남겨주실 수 있으시면 남겨주세요...  궁금하네요...^^; 

저도 Netty와 TCP에 대해서 공부하고 있는데, 첨부해 주신 코드 보면서 몇 가지 궁금한게 있는데... 바쁘시면 답변 안주셔도 됩니다.. ㅎㅎ 혼자 공부한다는 생각으로 질문도 막 남깁니다... 

1. 헤더를 읽고 resetReaderIndex() 메쏘드를 호출하시잖아요. RelayingDecoder 문서와 샘플코드에는 markReaderIndex() 를 호출했던 지점의 readerIndex 로 돌아간다고 되어있는데, 혹시 markReaderIndex() 를 호출안하고  resetReaderIndex() 을 호출하면 실제로 어디로 포인터가 이동되는건지 궁금하네요. 
2. 문제랑 관계 없는 얘기지만 길이 필드로 메시지를 구분해 주는 로직을 직접 짜셨는데, LengthFieldBasedFrameDecoder 안 쓰시고 직접 짜신 이유가 있을까요?  LengthFieldBasedFrameDecoder 를 사용하고 헤더유효성 체크하는 부분은 별도의 Verify 핸들러를 정의해주시면 파이프라인도 구조화 되고 더 심플해 질 것 같다는 생각이 드네요.
3. 순수하게 End to End 가 TCP로만 연결되었다면( 중간에 Serial이나 UDP를 지나서 온다거나 하지 않으면 ) 헤더 값이 유효한 값인지 검사를 해야할 필요가 있을까요? TCP에서 값이 변형되지 않고, 순서가 바뀌지 않음을 보장해 주는 걸로 알고 있는데 혹시 다른 이유가 있을까요? 

개인적인 질문이었습니다. 

화이팅입니다. 

총상

unread,
Jul 16, 2021, 2:58:57 AM7/16/21
to Netty Korean User Group

로컬환경에서도 무한루프 현상을 재현해서 무한루프가 발생하는 원인을 찾았습니다.

서버가 재기동 하거나 서버가 동작중인 상태에서 특정 서버 이벤트(ex. healthCheck 등) 가 연속으로 두번 들어오게 되는 경우에 발생이 됩니다.헬스체크를 예로 들면 헬스 체크는 총 36 바이트이고 앞의 10바이트는 헤더입니다.

AAAA00244505F4000000754E672F4A78435562716B6D71755A636F3253496A673D3D51EA
정상적인 패킷은 위와 같이 딱 36바이트만 오고 말건데
서버가 재기동하면서 패킷을 따로 처리할수가 없거나 어떤 이유에서 패킷이 버퍼되어 쌓였다가 한번에 서버로 들어는 경우에는
AAAA00244505F4000000754E672F4A78435562716B6D71755A636F3253496A673D3D51EAAAAA00244505F4000000754E672F4A78435562716B6D71755A636F3253496A673D3D51EA
위와 같이 동일한 패킷이 연속으로 들어오게 됩니다. 이렇게 되면 패킷의 사이즈는 36*2 = 72 가 됩니다.이부분때문에 묹제가 발생하게 되는데 ReplayingDecoder 에서 ByteBuf에 수신된 패킷을 한번 처리하면 byteBuf 의 readIndex정보가
ReplayingDecoderByteBuf(ridx=36, widx=72)
이렇게 되고 아직 36바이트가 더 남아 있기 때문에 ReplayingDecoder 내부에서 decode 를 호출하는 코드를 살펴보면
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
while 문안에 byteBuf 에 남아있는 데이터가 있으면 계속 while 문을 돌면서 decode 를 수행하도록 합니다.총 72 바이트에서 먼저 들어온 36 바이트를 읽고 남아있는 패킷이 있기 때문에 37번째 바이트부터 다시 읽기 시작하는데 37번째 바이트부터 헤더 정보(10 바이트)를 읽어보니 정상적인 양식이니 헤더를 다시 읽습니다.그리고 제가 짠 코드에서 buf.resetReaderIndex() 가 있는데 ByteBuf의 readindex를 0으로 만들어 header + body 정보를 읽을수 있도록 하고 있습니다.두번째 헤더 정보를 읽은 후 ByteBuf의 readindex 값은 46입니다.
ReplayingDecoderByteBuf(ridx=46, widx=72)
그리고 header 정보를 읽고난뒤 buf.resetReaderIndex() 수행하면 readindex = 0 으로 초기화 되고
ReplayingDecoderByteBuf(ridx=0, widx=72)

ReplayingDecorder 내부의 ByteBuf의 read index 값은 0으로 초기화 되면서 무한루프에 빠지게 되었내요.
2021년 7월 15일 목요일 오후 8시 2분 10초 UTC+9에 joosi...@gmail.com님이 작성:
Reply all
Reply to author
Forward
0 new messages