I've been experiencing a constant and continuous memory leak in my grpc async-server implementation. I've moved my code to allocate all of my messages using an Arena that is using a static buffer for allocation, and yet I see the leak, so it seems to me that the source of the leak is the internal implementation of the grpc server classes/messages.
I assume that I'm missing something simple (do I need to call to some cleanup callback or something of that sort) - but I couldn't figure out what I'm missing so far (looking in documentation, grpc code, and valgrind output...).
I'm attaching a simplified version of my code. I'd appreciate if anyone can have a look and maybe spot my mistake.
I'm basically running the server, and then running a loop which calls the client every x seconds and I can see that the server's RAM usage is growing up constantly.
Thanks.
#include <iostream>
#include <thread>
#include <csignal>
syntax = "proto3";
package example.v1;
message ServerPing {
uint64 ping_generation = 1;
}
message ServerPong {
ServerPing ping = 1;
uint64 pings_so_far = 2;
}
message ServerPingRequest {
ServerPing ping = 1;
}
message ServerPingResponse {
ServerPong pong = 2;
}
message VersionGetRequest {
}
message VersionGetResponse {
string version = 1;
string commit_hash = 2;
}
service ExampleService {
rpc VersionGet (VersionGetRequest) returns (VersionGetResponse) {}
rpc ServerPing (ServerPingRequest) returns (ServerPingResponse) {}
}
#include "server.h"
using std::cout;
using std::endl;
void term_signal_handler(int signum) {
cout << "Starting sigterm handler" << endl;
server::get_server().close_server();
}
void signal_handler(int signum, siginfo_t *siginfo, void *context) {
cout << "Interrupt signal " << signum << " received" << endl;
if (signum == SIGTERM) {
term_signal_handler(signum);
}
// Reset to default behavior and re-raise signal
struct sigaction sa;
sa.sa_handler = SIG_DFL;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(signum, &sa, nullptr);
raise(signum);
}
void register_signals() {
struct sigaction sa;
sa.sa_sigaction = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_SIGINFO;
// Ignore SIGPIPE
struct sigaction ignore_sa;
ignore_sa.sa_handler = SIG_IGN;
sigemptyset(&ignore_sa.sa_mask);
ignore_sa.sa_flags = 0;
sigaction(SIGPIPE, &ignore_sa, nullptr);
std::vector<int> signals_to_handle = { SIGINT, SIGTERM, SIGQUIT, SIGABRT, SIGSEGV, SIGFPE, SIGILL, SIGBUS, SIGSYS, SIGSTOP, SIGUSR1 };
for (int signum : signals_to_handle) {
sigaction(signum, &sa, nullptr);
}
}
int main() {
register_signals();
server::get_server().init_server();
while (true) {
std::this_thread::yield();
}
server::get_server().close_server();
return 0;
}
//
// Created by Dan Cohen on 11/11/2024.
//
#include "server.h"
#include "handlers.h"
#include <chrono>
#include <sstream>
#include <memory>
#include <cstdlib>
#include <iostream>
constexpr int MAX_RETRY_COUNT = 10;
using namespace grpc;
using namespace std::chrono_literals;
using namespace google::protobuf;
using std::endl;
using std::cout;
void server::init_rpc_handlers() {
// Create the command handlers
auto _cmd_ping = (handler_base *)handlers_pool::get_pool().allocator().allocate_node();
_cmd_ping = new (_cmd_ping) handler_server_ping(_completion_queue.get(), _service);
auto _cmd_get_version = (handler_base *)handlers_pool::get_pool().allocator().allocate_node();
_cmd_get_version = new (_cmd_get_version) handler_version_get(_completion_queue.get(), _service);
_cmd_ping->init_rpc_handler();
_cmd_get_version->init_rpc_handler();
}
void server::handle_requests_queue() {
void *rpc_tag = nullptr; // uniquely identifies a request.
auto rpc_status = false;
uint16_t retry_count = 0;
while (true) {
auto ret = _completion_queue->Next(&rpc_tag, &rpc_status);
if (!ret) {
cout << "completion_queue next method indicates that the gRPC server is shutting down, ret=" << ret << ", rpc_status=" << rpc_status << ", did_we_initiate=" << is_server_shutting_down() << endl;
break;
}
if (!rpc_status) {
cout << "completion_queue next method indicates that an RPC request failed, moving to next request, retry_count=" << retry_count << endl;
if (rpc_tag) {
auto rpc_handler = static_cast<handler_base *>(rpc_tag);
cout << "Failed rpc request details-> " << rpc_handler->get_request_debug_message() << endl;
rpc_handler->complete_request();
}
if (is_server_shutting_down()) {
cout << "completion_queue next method indicates that the gRPC server is shutting down, ret=" << ret << ", rpc_status=" << rpc_status << endl;
break;
}
if (retry_count < MAX_RETRY_COUNT) {
++retry_count;
std::this_thread::sleep_for(5ms);
}
else {
cout << "Retry count exceeded the configured max, can't recover - killing the agent" << endl;
::abort();
}
continue;
}
retry_count = 0;
if (!rpc_tag) {
cout << "invalid RPC request moving to next request" << endl;
continue;
}
static_cast<handler_base *>(rpc_tag)->handle_rpc_request();
}
}
bool server::init_server() {
if (_did_init) {
return true;
}
_shutting_down.store(false);
_service = std::make_shared<ExampleService::AsyncService>();
auto server_address = "0.0.0.0";
auto server_port = 6212;
std::stringstream stream;
stream << server_address << ":" << server_port;
std::string server_address_str(stream.str());
auto max_message_size = 10 * 1024 * 1024;
ServerBuilder builder;
// Set max message size.
builder.SetMaxMessageSize(max_message_size);
builder.SetMaxSendMessageSize(max_message_size);
builder.SetMaxReceiveMessageSize(max_message_size);
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address_str, grpc::InsecureServerCredentials());
builder.RegisterService(_service.get());
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
_completion_queue = builder.AddCompletionQueue();
// Finally assemble the server.
_server = builder.BuildAndStart();
_server_thread = std::make_unique<thread>(std::function<void(server *)>(&server::handle_requests_queue), this);
init_rpc_handlers();
_did_init = true;
return true;
}
void server::close_server() {
if (!_did_init) {
return;
}
cout << "Closing the grpc server..." << endl;
_shutting_down.store(true);
_server->Shutdown();
_completion_queue->Shutdown();
if (_server_thread && _server_thread->joinable()) {
_server_thread->join();
}
// Make sure the queue is empty before closing it.
void* ignored_tag;
bool ignored_ok;
while (_completion_queue->Next(&ignored_tag, &ignored_ok)) { }
_server_thread.reset();
_server_thread = nullptr;
_completion_queue.reset();
_completion_queue = nullptr;
_service.reset();
_service = nullptr;
_did_init = false;
}
server::~server() {
close_server();
}
//
// Created by Dan Cohen on 11/11/2024.
//
#ifndef GRPC_EXAMPLE_HANDLERS_H
#define GRPC_EXAMPLE_HANDLERS_H
#include <iostream>
#include <grpcpp/grpcpp.h>
#include <google/protobuf/arena.h>
#include "memory_pool.h"
using namespace google::protobuf;
using namespace example::v1;
using completion_queue_ptr = grpc::ServerCompletionQueue*;
using completion_queue_sptr = std::unique_ptr<grpc::ServerCompletionQueue>;
using service_ptr = std::shared_ptr<ExampleService::AsyncService>;
using std::endl;
using std::cout;
enum request_state_e {
REQUEST_STATE_INVALID = -1,
REQUEST_STATE_CREATE = 0,
REQUEST_STATE_PROCESS = 1,
REQUEST_STATE_COMPLETE = 2,
REQUEST_STATE_LAST = 3
};
class handler_base {
public:
handler_base(completion_queue_ptr completion_queue, service_ptr service) : _completion_queue(completion_queue), _service(service), _arena(nullptr) {
_state = REQUEST_STATE_CREATE;
}
virtual void init_rpc_handler() = 0;
virtual const std::string get_request_debug_message() = 0;
virtual void handle_rpc_request() {
if (_state == REQUEST_STATE_PROCESS) {
cout << "Recieved rpc: "<< get_request_debug_message() << std::endl;
if (process_request()) {
reset_and_prepare_handler_for_next_request();
_state = REQUEST_STATE_COMPLETE;
}
}
else if (_state == REQUEST_STATE_COMPLETE) {
complete_request();
}
}
virtual ~handler_base() {}
virtual void complete_request() {
handlers_pool::get_pool().allocator().deallocate_node(reinterpret_cast<char*>(this));
}
protected:
virtual void reset_and_prepare_handler_for_next_request() {
handler_base *next_handler = new_rpc_handler();
next_handler->init_rpc_handler();
}
virtual bool process_request() = 0;
virtual handler_base *new_rpc_handler() = 0;
void setup_arena_options(google::protobuf::ArenaOptions &arena_options, void *(*alloc_func)(size_t), void (*dealloc_func)(void *,size_t), size_t size) {
arena_options.block_alloc = alloc_func;
arena_options.block_dealloc = dealloc_func;
arena_options.start_block_size = 0;
arena_options.max_block_size = size;
arena_options.initial_block = nullptr;
arena_options.initial_block_size = 0;
}
completion_queue_ptr _completion_queue;
service_ptr _service;
grpc::ServerContext _ctx;
request_state_e _state;
std::unique_ptr<Arena> _arena;
private:
handler_base(const handler_base &other) = delete;
};
class handler_server_ping : public handler_base {
public:
handler_server_ping(completion_queue_ptr completion_queue, service_ptr service) : handler_base(completion_queue, service), _responder(&_ctx) {
memory_allocator::reset(_offset, _buffer, 4096);
auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, _offset, size, 4096);};
auto dealloc = [](void *ptr, size_t size){ cout << "Freeing " << size << " bytes" << endl; memory_allocator::dealloc(_buffer, _offset, size);};
ArenaOptions options;
setup_arena_options(options, alloc, dealloc, 4096);
_arena = std::make_unique<Arena>(options);
}
void init_rpc_handler() override {
_server_ping_request = Arena::Create<ServerPingRequest>(_arena.get());
_service->RequestServerPing(&_ctx, _server_ping_request, &_responder, _completion_queue, _completion_queue, this);
_state = REQUEST_STATE_PROCESS;
}
const std::string get_request_debug_message() override {
return "[" + ServerPingRequest::descriptor()->name() + "] " + _server_ping_request->ShortDebugString();
}
protected:
bool process_request() override {
if (_state == REQUEST_STATE_PROCESS) {
auto *response = Arena::Create<ServerPingResponse>(_arena.get());
*response->mutable_pong()->mutable_ping() = _server_ping_request->ping();
_ping_counter++;
response->mutable_pong()->set_pings_so_far(_ping_counter);
_responder.Finish(*response, grpc::Status::OK, this);
}
return true;
}
handler_base *new_rpc_handler() override {
auto new_cmd = (handler_base *)handlers_pool::get_pool().allocator().allocate_node();
return new (new_cmd) handler_server_ping(_completion_queue, _service);
}
private:
static inline char _buffer[4096] = {};
static inline size_t _offset = 0;
ServerPingRequest *_server_ping_request;
grpc::ServerAsyncResponseWriter<ServerPingResponse> _responder;
static inline uint64_t _ping_counter{0};
};
class handler_version_get : public handler_base {
public:
handler_version_get(completion_queue_ptr completion_queue, service_ptr service) : handler_base(completion_queue, service),
_responder(&_ctx) {
memory_allocator::reset(_offset, _buffer, 4096);
auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, _offset, size, 4096);};
auto dealloc = [](void *ptr, size_t size){ memory_allocator::dealloc(_buffer, _offset, size);};
ArenaOptions options;
setup_arena_options(options, alloc, dealloc, 4096);
_arena = std::make_unique<Arena>(options);
}
void init_rpc_handler() override {
_version_get_request = Arena::Create<VersionGetRequest>(_arena.get());
_service->RequestVersionGet(&_ctx, _version_get_request, &_responder, _completion_queue, _completion_queue, this);
_state = REQUEST_STATE_PROCESS;
}
const std::string get_request_debug_message() override {
return "[" + VersionGetRequest::descriptor()->name() + "] " + _version_get_request->ShortDebugString();
}
protected:
bool process_request() override {
if (_state == REQUEST_STATE_PROCESS) {
auto *response = Arena::Create<VersionGetResponse>(_arena.get());
response->set_version("v1.1");
response->set_commit_hash("abc123");
_responder.Finish(*response, grpc::Status::OK, this);
}
return true;
}
handler_base *new_rpc_handler() override {
auto new_cmd = (handler_base *)handlers_pool::get_pool().allocator().allocate_node();;
return new (new_cmd) handler_version_get(_completion_queue, _service);
}
private:
static inline char _buffer[4096] = {};
static inline size_t _offset = 0;
VersionGetRequest *_version_get_request;
grpc::ServerAsyncResponseWriter<VersionGetResponse> _responder;
};
#endif //GRPC_EXAMPLE_HANDLERS_H
#ifndef GRPC_EXAMPLE_MEMORY_POOL_H
#define GRPC_EXAMPLE_MEMORY_POOL_H
#include <cstdlib>
#include <cstdio>
#include <string>
#include <mutex>
#include <iostream>
using std::cout;
using std::endl;
class memory_allocator {
public:
static void* allocate(char *buffer, size_t &offset, size_t alloc_size, size_t max_size) {
if (offset + alloc_size > max_size) {
return nullptr; // Not enough space
}
void *ptr = buffer + offset;
offset += alloc_size;
return ptr;
}
static void dealloc(void *ptr, size_t &offset, size_t dealloc_size) {
// Do nothing
}
static void reset(size_t &offset, char buffer[], size_t size) {
offset = 0;
::memset(buffer, 0x20, size);
}
};
class mem_pool{
public:
template <class N>
struct basic_list{
basic_list() : head(nullptr), tail(nullptr), size(0), _m() {}
N* head;
N* tail;
uint32_t size;
std::mutex _m;
basic_list(basic_list&& o) : _m(){
o.head = head;
o.tail = tail;
o.size = size;
}
struct itererator{
N* head;
N* tail;
basic_list* lst;
};
N* erase(N* node){
std::lock_guard lk(_m);
if(node->next){
node->next->prev = node->prev;
}
if(node->prev){
node->prev->next = node->next;
}
if (head == node){
head = node->next;
}
if (tail == node){
tail = node->prev;
}
size--;
node->prev = node->next = nullptr;
node->lst = nullptr;
return node;
}
void push_back(N* node){
std::lock_guard lk(_m);
node->prev = node->next = nullptr;
size++;
if (tail == nullptr){
tail = node;
head = node;
node->lst = this;
return;
}
tail->next=node;
node->prev = tail;
tail = node;
node->lst = this;
}
void clear(){
while(head){
erase(head);
}
}
};
struct mem_node{
typedef basic_list<mem_node> list;
mem_node *next, *prev;
list* lst;
uint64_t index;
};
mem_pool() : _mutex(), _count(0), _size(0), _allocated(nullptr), _free_list(), _allocated_list(), _did_init(false), _name("unnamed_pool") {}
void init(uint64_t size, uint64_t count, const std::string& name){
std::unique_lock<std::mutex> g(_mutex);
if(_did_init) {
return;
}
_size = size;
_count = count;
_allocated = (char*)malloc(count * size);
_nodes = new mem_node[count];
for (int i = 0 ; i < count ; i ++){
_nodes[i].index = i;
_nodes[i].next = nullptr;
_nodes[i].prev = nullptr;
_free_list.push_back(_nodes+i);
}
if (!name.empty()) {
_name = name;
}
_did_init = true;
}
char* allocate_node(){
std::unique_lock<std::mutex> g(_mutex);
if (_free_list.head == nullptr){
return nullptr;
}
mem_node* n = _free_list.erase(_free_list.head);
_allocated_list.push_back(n);
return _allocated + (_size * n->index);
}
bool deallocate_node(char* ptr){
std::unique_lock<std::mutex> g(_mutex);
uint64_t index = (ptr - _allocated) / _size;
if (_nodes[index].lst == &_allocated_list){
_allocated_list.erase(_nodes + index);
_free_list.push_back(_nodes + index);
return true;
} else {
// not allocated
return false;
}
}
void close(){
std::unique_lock<std::mutex> g(_mutex);
if (!_did_init){
return;
}
_free_list.clear();
_allocated_list.clear();
free(_allocated);
delete[] _nodes;
_allocated = nullptr;
_nodes = nullptr;
_did_init = false;
}
protected:
bool _did_init;
std::string _name;
char* _allocated;
mem_node* _nodes;
mem_node::list _free_list;
mem_node::list _allocated_list;
uint64_t _size;
uint64_t _count;
std::mutex _mutex;
};
class handlers_pool {
public:
static handlers_pool &get_pool() {
static handlers_pool the_pool;
return the_pool;
}
mem_pool &allocator() {
return *_pool;
}
private:
handlers_pool() {
_pool = new mem_pool();
_pool->init(2048, 100, "handlers");
}
~handlers_pool() {
_pool->close();
delete _pool;
_pool = nullptr;
}
mem_pool *_pool;
};
#endif // GRPC_EXAMPLE_MEMORY_POOL_H
//
// Created by Dan Cohen on 11/11/2024.
//
#include <string>
#include <iostream>
#include <grpcpp/grpcpp.h>
#include "protos/example/v1/example.grpc.pb.h"
using std::cout;
using std::endl;
using namespace example::v1;
void usage() {
cout << "Usage: grpc_client [p|v]" << endl;
cout << "p - send a ping request to the server" << endl;
cout << "v - ask the server version" << endl;
}
void version() {
cout << "Asking for version..." << endl;
auto address = std::string("127.0.0.1");
auto port = std::string("6212");
// Starting a grpc client to run the version command.
std::unique_ptr<ExampleService::Stub> stub;
std::shared_ptr<grpc::Channel> channel;
std::string server_address = address + ":" + port;
channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
stub = ExampleService::NewStub(channel);
// Sending the ping request.
VersionGetRequest request;
VersionGetResponse response;
grpc::ClientContext context;
auto ret = stub->VersionGet(&context, request, &response);
if (ret.ok()) {
cout << response.DebugString() << endl;
}
else {
std::cout << "VersionGet failed with the following error: error_code=" << ret.error_code() << std::endl;
std::cout << "Error message: '" << ret.error_message() << "'" << std::endl;
std::cout << "Error details: '" << ret.error_details() << "'" << std::endl;
}
}
void ping() {
cout << "Sending ping..." << endl;
auto address = std::string("127.0.0.1");
auto port = std::string("6212");
// Starting a grpc client to run the ping command.
std::unique_ptr<ExampleService::Stub> stub;
std::shared_ptr<grpc::Channel> channel;
std::string server_address = address + ":" + port;
channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials());
stub = ExampleService::NewStub(channel);
// Sending the ping request.
ServerPingRequest request;
request.mutable_ping()->set_ping_generation(time(nullptr));
cout << "Sending ping request with generation=" << request.ping().ping_generation() << endl;
ServerPingResponse response;
grpc::ClientContext context;
auto ret = stub->ServerPing(&context, request, &response);
if (ret.ok()) {
cout << "Received ping response with generation=" << response.pong().ping().ping_generation() << endl;
cout << "Server pong:" << response.DebugString() << endl;
}
else {
cout << "Ping failed with the following error: error_code=" << ret.error_code() << endl;
cout << "Error message: '" << ret.error_message() << "'" << endl;
cout << "Error details: '" << ret.error_details() << "'" << endl;
}
}
int main(int argc, char **argv) {
if (argc != 2) {
usage();
return 1;
}
std::string command(argv[1]);
if (command == "p") {
ping();
}
else if (command == "v") {
version();
}
else {
usage();
return 1;
}
return 0;
}