안녕하세요. 네티 데이터 발송 중에 메모리 누수를 추적하고 있습니다.
특정 시점에는 장애가 발생하기 시작합니다.
네티 통신 중에 client가 서버와 한번만 주고 받게 되는 것입니다.
클라이언트 <-> 서버 간에 정의 된 프로토콜에 의하여 마지막 데이터가 올때까지 계속 서로 요청과 응답을 주고 받는네요.
장애 발생시점 부터는 클라이언트 -> 서버 요청 후 서버->클라이언트 응답 처음까지는 정상인데, 그 후 두번째부터는 클라이언트 -> 서버 요청 후 바로 ReadComplete가 떨어지면서 끝나버립니다.
다음은 장애 시점과 재기동 시점의 메모리 모니터링 화면입니다.
장애가 발생 중이던 시점에 특정 nioEventLoop에 대한 로그를 캐치해보면 아래와 같습니다. 추가적으로 데이터를 주고 받을 것이 있는데, 바로 ReadComplete가 떨어지는 현상입니다.
서버가 힙 메모리 에러에 의해 죽지 않아서 덤프파일도 생성이 안되고..
public class PollingClientHandler extends ByteToMessageDecoder {
private static Logger logger = LoggerFactory.getLogger(PollingClientHandler.class);
private ChannelHandlerContext ctx;
private byte[] sendMsg;
private int totalSize;
private ByteBuf buffer;
Bootstrap bs;
public void SendMsg(byte[] sendMsg) {
ctx.write(Unpooled.copiedBuffer(sendMsg));
ctx.flush();
logger.info("[sendMsg][ip:{}] 발송 메시지 : {}", ctx.channel().remoteAddress(), DataUtil.byteArrayToHexString(sendMsg));
}
public PollingClientHandler(byte[] msg, Bootstrap bs) {
this.sendMsg = msg;
this.bs = bs;
}
private PollingClientAction action_;
public void setCallBackClientHandler(PollingClientAction action) {
this.action_ = action;
}
private byte[] receiveMsg;
public byte[] getReceiveMsg() {
return this.receiveMsg;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("채널 등록");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
logger.info("채널 연결이 종료됨.");
bs.group().shutdownGracefully();
logger.info("shutdownGracefully..");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("채널이 메시지 발송할 준비가 됨.");
this.ctx = ctx;
ctx.write(Unpooled.copiedBuffer(sendMsg));
ctx.flush();
//
logger.info("[channelActive] 발송 메시지 : {}", DataUtil.byteArrayToHexString(sendMsg));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("핸들러 등록");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
logger.info("메시지를 받는 메소드. {} ", msg);
buffer = ctx.alloc().buffer(); // Client일 경우는 이 위치에서 수행되어야함
ByteBuf b = (ByteBuf)msg;
try{
buffer.writeBytes(b);
int readableByte = buffer.readableBytes();
if (readableByte > 29 && totalSize == 0) {
byte[] bytes = new byte[2];
buffer.getBytes(27, bytes);
//
logger.info("####### {}, readesize :{}",FrameUtil.byteArrayToHexString(bytes),readableByte);
this.totalSize = ((bytes[0] & 0xff) << 8) | (bytes[1] & 0xff);
}
if (readableByte == totalSize + 30) {
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
//
logger.info("Received Message : {}", FrameUtil.byteArrayToHexString(bytes));
this.receiveMsg = bytes;
this.totalSize = 0;
if(this.action_ != null) {
action_.receive(PollingClientHandler.this);
}
clearBuffer();
}
}catch(Exception e) {
logger.error("channelRead error.", e);
clearBuffer();
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}
private void clearBuffer() {
if (buffer != null) {
buffer.release();
buffer = null;
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
logger.info("메시지를 받는 동작이 끝나면 동작하는 메소드. ");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught", cause);
ctx.close();
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// TODO Auto-generated method stub
}
public class PollingClient {
private static Logger logger = LoggerFactory.getLogger(PollingClient.class);
private Bootstrap bs = new Bootstrap();
private SocketAddress addr_;
private Channel channel_;
private List<byte[]> msgArr;
private int idx;
private NioEventLoopGroup group;
@Autowired FepMeteringDAO fmDAO;
@Autowired AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired KafkaHandler kafkaHandler;
@Autowired MeteringDataProcessor meteringDataProcessor;
@Autowired DeviceStatusProcessor deviceStatusProcessor;
@Value("${fep.protocol.sec}")
private int bSec;
@Value("${fep.protocol.ver}")
private int pVer;
@Value("${fep.protocol.fepid}")
private String pFepId;
byte[] totalData;
public PollingClient(SocketAddress addr, List<byte[]> msgArr) {
this.addr_ = addr;
this.msgArr = msgArr;
}
public PollingClient(String host, int port, List<byte[]> msgArr) {
this(new InetSocketAddress(host, port), msgArr);
}
//실제로 동작시킬 메소드 Bootstrap 연결 옵션 설정 및 연결 처리
public void run() {
if(this.addr_ == null) {
logger.error("주소 정보가 없습니다.");
}else if(this.msgArr == null || this.msgArr.size() == 0) {
logger.error("보낼 메시지가 없습니다.");
}
group = new NioEventLoopGroup(1);
bs.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true);
doConnect();
}
private void doConnect() {
handlerSet();
ChannelFuture f = bs.connect(addr_);
channel_ = f.channel();
logger.info(addr_ + " connect()");
}
private void handlerSet() {
if(bs != null) {
bs.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2048));
PollingClientHandler handler = new PollingClientHandler(msgArr.get(idx), bs);
handler.setCallBackClientHandler(new PollingClientAction() {
public void close(PollingClientHandler handler) {
//종료 처리 후 더 보낼게 존재한다면 기존 옵션으로 재 연결처리를 하는 콜백 메소드
closeAndContinue();
}
public void receive(PollingClientHandler handler) {
//응답 받은 메시지 콜백 메소드
byte[] receiveMsg = handler.getReceiveMsg();
handleMessage(receiveMsg, handler);
}
});
ch.pipeline().addLast("PollingClientHandler", handler);
}
});
}
}
/**
* Actual Message handling and reply to server.
*/
private boolean handleMessage(byte[] msg, PollingClientHandler handler) {
FepFrameParseConstants parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ETC;
FepPacket fepPacket = new FepPacket();
byte[] zKey = null;
boolean bResponse = true;
try {
// 1.Header parsing
parsingResult = fepPacket.parsingBase(msg);
if( ! (parsingResult == FepConstants.FepFrameParseConstants.FRAME_PARSE_SUCCESS) ) { // parsing 실패
// 헤더 파싱 에러 발생 시 예외 발생 시킴
throw new Exception("Fep Packet Parsing failed. : "
+ parsingResult.name());
}
if(fepPacket.ctlSmf == FepHeaderConst.FEP_SC_SEC_ENABLE.getCode()) {
// 암호화 모듈 사용시
String sorcueId = new String(fepPacket.sid, "UTF-8").trim(); // source-Id
logger.info("sourceId : {} ", sorcueId);
DcuVO dcu = fmDAO.selectDcuByDcuId(sorcueId); // dcu 정보 조회
if( dcu == null ) parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ETC;
else {
try{
zKey = DataUtil.hexStringToByteArray(dcu.getFEP_KEY());
fepPacket.doDecryptData(zKey);
}catch(Exception e) {
parsingResult = FepConstants.FepFrameParseConstants.FRAME_PARSE_ERROR_ENDECRYPT;
logger.error("LP Data Decrypt is failed.", e);
}
}
}
} catch (Exception e) {
logger.error("message parsing failed. ", e);
}
// 3. kafka큐에 put
/////////////////////// 카프카 되면 그때 수정
//kafkaHandler.setFepMessageToKafka(fepPacket);
//////////////////////////////////////////////
/// 임시로직
Message message = new Message();
message.setCommand(fepPacket.command);
message.setData(fepPacket.Data);
message.setSourceId(fepPacket.sourceID);
FepCommandList command = FepConstants.FepCommandList.getCommand(fepPacket.command);
switch(command) {
case FEP_CMD_ACK
:
case FEP_CMD_NOT_ACK
:
logger.info("[{}] {} 수신.",
command.name(), fepPacket.sourceID);
bResponse = false; // 서버가 ACK 혹은 NACK를 수신한 경우에는 Response 없이 TCP Connection를 종료한다
break;
case FEP_CMD_DCU_STATUS_RSP
:
case FEP_CMD_METER_STATUS_RSP
:
case FEP_CMD_METER_NEW_CHANGE_RES
:
deviceStatusProcessor.saveDeviceData(message);
break;
case FEP_CMD_AMIGO_DATA_RSP
:
meteringDataProcessor.saveMeteringData(message);
break;
default:
break;
}
if(bResponse) {
// 5. Ack 혹은 Nack 응답 패킷 생성
//TODO FSN(Fragment 된 패킷 처리 하는 부분 작성 필요)
FepPacket replyPacket = new FepPacket();
replyPacket.initialize(bSec, pVer, pFepId);
if(parsingResult == FepConstants.FepFrameParseConstants.FRAME_PARSE_SUCCESS) {
replyPacket.makeAckPacket(fepPacket.sourceID);
} else {
// error code 송신
replyPacket.Data = new byte[1];
replyPacket.Data[0] = parsingResult.getCode();
int Data_enc_len = fepPacket.doEncryptData(zKey);
// 데이터 파싱 실패 시 NACK 송신
replyPacket.setFepToDcuPacket(FepConstants.FepCommandList.FEP_CMD_NOT_ACK, fepPacket.sourceID, Data_enc_len, fepPacket.qsn, 0/*msn*/);
replyPacket.makePacketHeader();
replyPacket.generateCRC();
}
//6. 응답 패킷 전송 ('A'/'a')
byte[] packet = replyPacket.getFepPacket();
if(fepPacket.ctlMpacket == FepConstants.FepHeaderConst.FEP_M_PACKET_NOT_LAST.getCode()) {
logger.info("M-Packet is 0 (not last Packet.)");
if(bResponse) {
handler.SendMsg(packet);
}
} else {
logger.info("M-Packet is 1 (last Packet.)");
// 7. 'K' 응답 시 Last Packet을 수신한 후에 DCU Polling 송신 결과 저장
// (여러 패킷 송신올때마다 Update 하면 비효율적이므로)
if(command == FepConstants.FepCommandList.FEP_CMD_AMIGO_DATA_RSP) updateDcuCommStats(fepPacket.sourceID, "Response");
if(bResponse) handler.SendMsg(packet);
closeAndContinue();
}
}
return true;
}
/**
* DCU에 검침 요청한 이력 통계를 저장한다.
* @param dcuId
*/
public void updateDcuCommStats(String dcuId, String gubun) {
logger.info("[{}] Select updateDcuCommStats.", dcuId);
String isExist = fmDAO.selectDcuCommLog(dcuId);
if(isExist != null) {
if(gubun.equals("Request")) {
logger.info("[{}] Request updateDcuCommStats.", dcuId);
fmDAO.updateDcuCommLogT(dcuId);
}
else {
logger.info("[{}] Response updateDcuCommStats.", dcuId);
fmDAO.updateDcuCommLogA(dcuId);
}
}
else fmDAO.insertDcuCommLog(dcuId);
}
private void closeAndContinue() {
try {
if(msgArr.size() > ++idx) { //보낼 메시지가 남았으면 재연결 처리
logger.info("msgArr length = {} / idx = {} ", msgArr.size(), idx);
doConnect();
}else { //보낼 메시지가 없다면 종료
logger.info("closing thread and channel...");
bs.group().shutdownGracefully(); //eventLoop에 등록된 Thread를 종료 처리한다.
channel_.close().sync(); //현재의 채널을 닫는다.
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
바쁘시겠지만 검토해주시면 큰 도움이 될 것 같습니다.
감사합니다.