I am working with Flower which is a federated learning framework. In its (grpc connection)[
https://github.com/adap/flower/blob/main/src/py/flwr/client/grpc_client/connection.py#L91] file they are only creating 1 channel whereas I want 2-3 channels. But when I created 1 more channel with server_address `localhost:5040`, the previous channel with server address `localhost:8080` is getting overridden. How can I avoid that and use both the channels?
```
# Copyright 2020 Adap GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
http://www.apache.org/licenses/LICENSE-2.0#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contextmanager for a gRPC streaming channel to the Flower server."""
from contextlib import contextmanager
from logging import DEBUG
from pathlib import Path
from queue import Queue
from typing import Callable, Iterator, Optional, Tuple, Union
from flwr.common import GRPC_MAX_MESSAGE_LENGTH
from flwr.common.grpc import create_channel
from flwr.common.logger import log
from flwr.proto.transport_pb2 import ClientMessage, ServerMessage
from flwr.proto.transport_pb2_grpc import FlowerServiceStub
# The following flags can be uncommented for debugging. Other possible values:
#
https://github.com/grpc/grpc/blob/master/doc/environment_variables.md# import os
# os.environ["GRPC_VERBOSITY"] = "debug"
# os.environ["GRPC_TRACE"] = "tcp,http"
def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)
@contextmanager
def grpc_connection(
server_address: str,
max_message_length: int = GRPC_MAX_MESSAGE_LENGTH,
root_certificates: Optional[Union[bytes, str]] = None,
) -> Iterator[Tuple[Callable[[], ServerMessage], Callable[[ClientMessage], None]]]:
"""Establish a gRPC connection to a gRPC server.
Parameters
----------
server_address : str
The IPv4 or IPv6 address of the server. If the Flower server runs on the same
machine on port 8080, then `server_address` would be `"
0.0.0.0:8080"` or
`"[::]:8080"`.
max_message_length : int
The maximum length of gRPC messages that can be exchanged with the Flower
server. The default should be sufficient for most models. Users who train
very large models might need to increase this value. Note that the Flower
server needs to be started with the same value
(see `flwr.server.start_server`), otherwise it will not know about the
increased limit and block larger messages.
(default: 536_870_912, this equals 512MB)
root_certificates : Optional[bytes] (default: None)
The PEM-encoded root certificates as a byte string or a path string.
If provided, a secure connection using the certificates will be
established to an SSL-enabled Flower server.
Returns
-------
receive, send : Callable, Callable
Examples
--------
Establishing a SSL-enabled connection to the server:
>>> from pathlib import Path
>>> with grpc_connection(
>>> server_address,
>>> max_message_length=max_message_length,
>>> root_certificates=Path("/crts/root.pem").read_bytes(),
>>> ) as conn:
>>> receive, send = conn
>>> server_message = receive()
>>> # do something here
>>> send(client_message)
"""
if isinstance(root_certificates, str):
root_certificates = Path(root_certificates).read_bytes()
channel = create_channel(
server_address='localhost:8080',
root_certificates=root_certificates,
max_message_length=max_message_length,
)
channel.subscribe(on_channel_state_change)
queue: Queue[ClientMessage] = Queue( # pylint: disable=unsubscriptable-object
maxsize=1
)
stub = FlowerServiceStub(channel)
server_message_iterator: Iterator[ServerMessage] = stub.Join(iter(queue.get, None))
# Adding one more channel over here. They are getting over-riden that is only the next one (localhost:5040)
#####################
channel_2 = create_channel(
server_address='localhost:5040',
root_certificates=root_certificates,
max_message_length=max_message_length,
)
channel_2.subscribe(on_channel_state_change)
queue: Queue[ClientMessage] = Queue( # pylint: disable=unsubscriptable-object
maxsize=1
)
stub = FlowerServiceStub(channel_2)
server_message_iterator: Iterator[ServerMessage] = stub.Join(iter(queue.get, None))
#####################
def receive() -> ServerMessage:
return next(server_message_iterator)
def send(msg: ClientMessage) -> None:
return queue.put(msg, block=False)
try:
yield (receive, send)
finally:
# Make sure to have a final
channel.close()
log(DEBUG, "gRPC channel closed")
```