netty 사용시 메모리 누수 추척 문의드립니다..!

565 views
Skip to first unread message

ss

unread,
Jul 6, 2023, 11:27:02 PM7/6/23
to Netty Korean User Group
안녕하세요. 네티 데이터 발송 중에 메모리 누수를 추적하고 있습니다.

전에는 heap space 메모리 에러가발생하고 서버가 다운되었었는데요,
이에 코드를 검토에 조금 수정하였더니 서버가 죽지는 않으나,
사용 메모리는 계속 누적으로 올라가고,
특정 시점에는 장애가 발생하기 시작합니다.

장애 내용은,
네티 통신 중에 client가 서버와 한번만 주고 받게 되는 것입니다.

기존에는
클라이언트 <-> 서버 간에 정의 된 프로토콜에 의하여 마지막 데이터가 올때까지 계속 서로 요청과 응답을 주고 받는네요.

장애 발생시점 부터는 클라이언트 -> 서버 요청 후 서버->클라이언트 응답 처음까지는 정상인데, 그 후 두번째부터는 클라이언트 -> 서버 요청 후 바로  ReadComplete가 떨어지면서 끝나버립니다.



다음은 장애 시점과 재기동 시점의 메모리 모니터링 화면입니다.

장애시점 메모리.PNG



장애가 발생 중이던 시점에 특정 nioEventLoop에 대한 로그를 캐치해보면 아래와 같습니다. 추가적으로 데이터를 주고 받을 것이 있는데, 바로 ReadComplete가 떨어지는 현상입니다.

추가패킷이있는데ReadCmplete발생.PNG


서버가 힙 메모리 에러에 의해 죽지 않아서 덤프파일도 생성이 안되고..

어떻게 추적해야할지 모르겠습니다.ㅠㅠ




아래는 핸들러 부분 소스입니다.

public class PollingClientHandler extends ByteToMessageDecoder {


private static Logger logger = LoggerFactory.getLogger(PollingClientHandler.class);
private ChannelHandlerContext ctx;

private byte[] sendMsg;
private int totalSize;
private ByteBuf buffer;
Bootstrap bs;

public void SendMsg(byte[] sendMsg) {
ctx.write(Unpooled.copiedBuffer(sendMsg));
ctx.flush();
logger.info("[sendMsg][ip:{}] 발송 메시지 : {}", ctx.channel().remoteAddress(), DataUtil.byteArrayToHexString(sendMsg));
}

public PollingClientHandler(byte[] msg, Bootstrap bs) {
this.sendMsg = msg;
this.bs = bs;
}

private PollingClientAction action_;

public void setCallBackClientHandler(PollingClientAction action) {
this.action_ = action;
}

private byte[] receiveMsg;
public byte[] getReceiveMsg() {
return this.receiveMsg;
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("채널 등록");
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
logger.info("채널 연결이 종료됨.");
bs.group().shutdownGracefully();
logger.info("shutdownGracefully..");
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("채널이 메시지 발송할 준비가 됨.");
this.ctx = ctx;
ctx.write(Unpooled.copiedBuffer(sendMsg));
ctx.flush();
//logger.info("[channelActive] 발송 메시지 : {}", DataUtil.byteArrayToHexString(sendMsg));
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("핸들러 등록");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.info("메시지를 받는 메소드. {} ", msg);
buffer = ctx.alloc().buffer(); // Client일 경우는 이 위치에서 수행되어야함
ByteBuf b = (ByteBuf)msg;
try{
buffer.writeBytes(b);
int readableByte = buffer.readableBytes();
  if (readableByte > 29 && totalSize == 0) {
 
      byte[] bytes = new byte[2];
      buffer.getBytes(27, bytes);
      //logger.info("####### {}, readesize :{}",FrameUtil.byteArrayToHexString(bytes),readableByte);
      this.totalSize = ((bytes[0] & 0xff) << 8) | (bytes[1] & 0xff);
  }
  if (readableByte == totalSize + 30) {
 
      byte[] bytes = new byte[buffer.readableBytes()];
      buffer.readBytes(bytes);
      //logger.info("Received Message : {}", FrameUtil.byteArrayToHexString(bytes));
      this.receiveMsg = bytes;
      this.totalSize = 0;
     
      if(this.action_ != null) {
action_.receive(PollingClientHandler.this);
      }
      clearBuffer();
  }
}catch(Exception e) {
logger.error("channelRead error.", e);
clearBuffer();
} finally {
      ReferenceCountUtil.safeRelease(msg);
    }
}

private void clearBuffer() {
  if (buffer != null) {
    buffer.release();
    buffer = null;
  }
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
logger.info("메시지를 받는 동작이 끝나면 동작하는 메소드. ");
}

@Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("exceptionCaught", cause);
    ctx.close();
  }

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// TODO Auto-generated method stub

}



아래는 클라이언트 부분입니다.

public class PollingClient {

private static Logger logger = LoggerFactory.getLogger(PollingClient.class);

private Bootstrap bs = new Bootstrap();
private SocketAddress addr_;
private Channel channel_;
private List<byte[]> msgArr;
private int idx;
private NioEventLoopGroup group;

@Autowired FepMeteringDAO fmDAO;
@Autowired AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired KafkaHandler kafkaHandler;
@Autowired MeteringDataProcessor meteringDataProcessor;
@Autowired DeviceStatusProcessor deviceStatusProcessor;

@Value("${fep.protocol.sec}")
private int bSec;

@Value("${fep.protocol.ver}")
private int pVer;

@Value("${fep.protocol.fepid}")
private String pFepId;

byte[] totalData;

public PollingClient(SocketAddress addr, List<byte[]> msgArr) {
this.addr_ = addr;
this.msgArr = msgArr;
}

public PollingClient(String host, int port, List<byte[]> msgArr) {
this(new InetSocketAddress(host, port), msgArr);
}

//실제로 동작시킬 메소드 Bootstrap 연결 옵션 설정 및 연결 처리
public void run() {
if(this.addr_ == null) {
logger.error("주소 정보가 없습니다.");
}else if(this.msgArr == null || this.msgArr.size() == 0) {
logger.error("보낼 메시지가 없습니다.");
}

group = new NioEventLoopGroup(1);
bs.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true);

doConnect();
}

private void doConnect() {
handlerSet();

ChannelFuture f = bs.connect(addr_);
channel_ = f.channel();
logger.info(addr_ + " connect()");
}

private void handlerSet() {
if(bs != null) {
bs.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2048));
PollingClientHandler handler = new PollingClientHandler(msgArr.get(idx), bs);
handler.setCallBackClientHandler(new PollingClientAction() {

public void close(PollingClientHandler handler) {
//종료 처리 후 더 보낼게 존재한다면 기존 옵션으로 재 연결처리를 하는 콜백 메소드
closeAndContinue();
}

public void receive(PollingClientHandler handler) {
//응답 받은 메시지 콜백 메소드
byte[] receiveMsg = handler.getReceiveMsg();
handleMessage(receiveMsg, handler);
}

});

ch.pipeline().addLast("PollingClientHandler", handler);
}
});
}
}

/**
 * Actual Message handling and reply to server.
 */
private boolean handleMessage(byte[] msg, PollingClientHandler handler) {
FepFrameParseConstants parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ETC;
FepPacket fepPacket = new FepPacket();
byte[] zKey = null;
boolean bResponse = true;
try {
// 1.Header parsing
parsingResult = fepPacket.parsingBase(msg);

if( ! (parsingResult == FepConstants.FepFrameParseConstants.FRAME_PARSE_SUCCESS) ) { // parsing 실패
// 헤더 파싱 에러 발생 시 예외 발생 시킴
throw new Exception("Fep Packet Parsing failed. : "
+ parsingResult.name());
}

if(fepPacket.ctlSmf == FepHeaderConst.FEP_SC_SEC_ENABLE.getCode()) {
// 암호화 모듈 사용시
String sorcueId = new String(fepPacket.sid, "UTF-8").trim(); //  source-Id
logger.info("sourceId : {} ", sorcueId);
DcuVO dcu = fmDAO.selectDcuByDcuId(sorcueId); // dcu 정보 조회

if( dcu == null ) parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ETC;
else {
try{
zKey = DataUtil.hexStringToByteArray(dcu.getFEP_KEY());
fepPacket.doDecryptData(zKey);
}catch(Exception e) {
parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ENDECRYPT;
logger.error("LP Data Decrypt is failed.", e);
}
}
}
} catch (Exception e) {
logger.error("message parsing failed. ", e);
}

// 3. kafka큐에 put
/////////////////////// 카프카 되면 그때 수정
//kafkaHandler.setFepMessageToKafka(fepPacket);
//////////////////////////////////////////////

/// 임시로직
Message message = new Message();
message.setCommand(fepPacket.command);
message.setData(fepPacket.Data);
message.setSourceId(fepPacket.sourceID);


FepCommandList command = FepConstants.FepCommandList.getCommand(fepPacket.command);
    switch(command) {
    case FEP_CMD_ACK :
    case FEP_CMD_NOT_ACK :
    logger.info("[{}] {} 수신.", command.name(), fepPacket.sourceID);
    bResponse = false; // 서버가 ACK 혹은 NACK를 수신한 경우에는 Response 없이 TCP Connection를 종료한다
    break;
    case FEP_CMD_DCU_STATUS_RSP :
    case FEP_CMD_METER_STATUS_RSP :
    case FEP_CMD_METER_NEW_CHANGE_RES :
    deviceStatusProcessor.saveDeviceData(message);
    break;
    case FEP_CMD_AMIGO_DATA_RSP :
    meteringDataProcessor.saveMeteringData(message);
    break;
default:
break;
    }
     
   
if(bResponse) {
// 5. Ack 혹은 Nack 응답 패킷 생성
    //TODO FSN(Fragment 된 패킷 처리 하는 부분 작성 필요)
FepPacket replyPacket = new FepPacket();
replyPacket.initialize(bSec, pVer, pFepId);

if(parsingResult == FepConstants.FepFrameParseConstants.FRAME_PARSE_SUCCESS) {
replyPacket.makeAckPacket(fepPacket.sourceID);
} else {
// error code 송신
replyPacket.Data = new byte[1];
replyPacket.Data[0] = parsingResult.getCode();
int Data_enc_len = fepPacket.doEncryptData(zKey);

// 데이터 파싱 실패 시 NACK 송신
replyPacket.setFepToDcuPacket(FepConstants.FepCommandList.FEP_CMD_NOT_ACK, fepPacket.sourceID, Data_enc_len, fepPacket.qsn, 0/*msn*/);
replyPacket.makePacketHeader();
replyPacket.generateCRC();
}

//6. 응답 패킷 전송 ('A'/'a')
byte[] packet = replyPacket.getFepPacket();

if(fepPacket.ctlMpacket == FepConstants.FepHeaderConst.FEP_M_PACKET_NOT_LAST.getCode()) {
logger.info("M-Packet is 0 (not last Packet.)");
if(bResponse) {
handler.SendMsg(packet);
}

} else {
logger.info("M-Packet is 1 (last Packet.)");

// 7. 'K' 응답 시 Last Packet을 수신한 후에 DCU Polling 송신 결과 저장
// (여러 패킷 송신올때마다 Update 하면 비효율적이므로)
if(command == FepConstants.FepCommandList.FEP_CMD_AMIGO_DATA_RSP) updateDcuCommStats(fepPacket.sourceID, "Response");

if(bResponse) handler.SendMsg(packet);
closeAndContinue();
}
}
return true;
}

/**
 * DCU에 검침 요청한 이력 통계를 저장한다.
 * @param dcuId
 */
public void updateDcuCommStats(String dcuId, String gubun) {
logger.info("[{}] Select updateDcuCommStats.", dcuId);
String isExist = fmDAO.selectDcuCommLog(dcuId);

if(isExist != null) {
if(gubun.equals("Request")) {
logger.info("[{}] Request updateDcuCommStats.", dcuId);
fmDAO.updateDcuCommLogT(dcuId);
}
else {
logger.info("[{}] Response updateDcuCommStats.", dcuId);
fmDAO.updateDcuCommLogA(dcuId);
}
}
else fmDAO.insertDcuCommLog(dcuId);
}

private void closeAndContinue() {
try {
if(msgArr.size() > ++idx) { //보낼 메시지가 남았으면 재연결 처리
logger.info("msgArr length = {} / idx = {} ", msgArr.size(), idx);
doConnect();
}else { //보낼 메시지가 없다면 종료
logger.info("closing thread and channel...");
bs.group().shutdownGracefully(); //eventLoop에 등록된 Thread를 종료 처리한다.
channel_.close().sync(); //현재의 채널을 닫는다.
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}



3주간 분석해도 도저히 알수가 없어 문의드립니다 ㅠ

바쁘시겠지만 검토해주시면 큰 도움이 될 것 같습니다.
감사합니다.

Joo Sing

unread,
Jul 9, 2023, 10:32:51 PM7/9/23
to Netty Korean User Group
안녕하세요. 

1. 런타임 힙 메모리 모니터링

VisualVM 이라는 도구를 설치하면 런타임에 JVM의 힙 메모리 덤프를 실시간으로 떠서 확인할 수 있습니다. 

1. VisualVm 설치
2. Sampler > Memory 샘플링 시작
3. 샘플링 결과 제일 상단에 가장 많은 메모리를 사용하는 byte[] 같은 메모리가 보이는데 Name 열을 패키지 이름순으로 정렬하면 우리 애플리케이션 객체의 사용량 변화도 확인할 수 있습니다. 

Visual VM.png

2. 힙 메모리 누수 원인

저도 메모리(쓰레드) 누수 문제로 고생한 적이 있었는데 3주 동안.... 어려움이 크시겠어요. 

저는 문제가 되었을 때 EventLoopGroup을 TCP 연결을 수행할 때 마다 생성하고 (연결이 실패해도 생성된 채로 두어서) 해당 그룹을 제대로 닫아주지 않아서 (shutdownGracefully 사용) OutOfMemoryError를 만난적이 있습니다. 질문자님의 캡처 이미지를 보면 이벤트 루프그룹 번호가 6만대까지 올라간 걸로 봐서 혹시 저와 비슷한 실수를 하고 있지 않으신지 체크해 보시면 좋을 것 같아요.

이후에 알게 된 건데 EventLoopGroup은 연결할 때 마다 생성하고 해제해도 되지만, 그 보다는 애플리케이션 실행 시 한 번 생성하고 계속 재사용할 수가 있더라구요. EventLoopGroup은 이름 그대로 N개의 EventLoop를 그룹핑하고 있고, EventLoop는 다시 내부적으로 채널 I/O를 처리하는 쓰레드 1개를 가집니다. 그리고 EvenLoop 쓰레드는 멀티플렉싱을 활용해 다중 채널의 I/O를 병행해서 처리할 수 있습니다. 그래서 연결 당 EventLoopGroup을 생성하는 건 굉장히 오버해서 자원을 사용하는 거였더라구요. 이 부분을 같이 리뷰해 보시면 좋을 것 같습니다. 

이벤트루프 그룹.png

3. TCP 메시지 바운더리 문제

그리고 이건 메모리 누수와 관련있는 문제는 아니지만 Netty 사용자 가이드 문서에 나오는 Dealing with a Stream-based Transport 절을 읽어보시면 TCP 프로토콜은 스트림 기반 프로토콜이라서 메시지 경계를 애플리케이션에서 구분해서 읽어주어야 한다고 합니다. 그래서 질문자님과 같이 30바이트 헤더가 수신되고 마지막 2바이트가 가변 본문 길이를 표현하는 경우에는 LengthFieldBasedFrameDecoder 핸들러를 인바운드 파이프라인 앞단에 추가하여 안정적인 읽기 동작이 가능할 것 같습니다. 참고하시면 좋겠습니다. 

2023년 7월 7일 금요일 오후 12시 27분 2초 UTC+9에 sho9...@gmail.com님이 작성:

권석훈

unread,
Sep 22, 2023, 6:12:51 AM9/22/23
to nett...@googlegroups.com
안녕하세요.
소스만 봐서 명확하지는 않지만 
가이드에 있는 형태로 구현은 되어 있지 않아 보입니다.


tcp는 데이터가 한번에 오는 게 아니라 띄엄띄엄 올 수 있기 때문에 데이터의 처음과 끝이 어디인지 알아서 
데이터의 끝이 올 때까지 데이터를 쌓아놓는 역할을 수행하는 코드가 있어야 합니다.

ByteToMessageDecoder 를 상속받아 구현은 하셨는데..
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  메소드를 구현하지 않았는데요.
이 메소드를 구현해야 저 tcp 문제를 해결하는 걸로 알고 있어요..

가이드 다시 한번 읽어보시고
샘플 보시고 공부해 보시면 딱히 어렵지는 않아요..




2023년 7월 7일 (금) 오후 12:27, ss <sho9...@gmail.com>님이 작성:
--
이 메일은 Google 그룹스 'Netty Korean User Group' 그룹에 가입한 분들에게 전송되는 메시지입니다.
이 그룹에서 탈퇴하고 더 이상 이메일을 받지 않으려면 netty-ko+u...@googlegroups.com에 이메일을 보내세요.
웹에서 이 토론을 보려면 https://groups.google.com/d/msgid/netty-ko/d061025d-a464-4863-91b3-3c8d6b988343n%40googlegroups.com을(를) 방문하세요.
Reply all
Reply to author
Forward
0 new messages