ParameterServerStrategy Performance Issues

65 views
Skip to first unread message

joe montana

unread,
Jun 7, 2021, 5:55:20 AM6/7/21
to Discuss

I am training Keras models(tf-operator, TensorFlow 2.5.0) with ParameterServerStrategy. I think the worker nodes are extremely underutilized.
Is there any sample code to reference?  I've checked all the guides & tutorials & Github Issues etc, and I have not found anything useful. There are no log messages on worker nodes as well. How can I debug this issue?  I've profiled my model pipeline on single instance, and my pipeline is not input bound.
  • Your program is NOT input-bound because only 0.0% of the total step time sampled is waiting for input. Therefore, you should focus on reducing other time.

Thanks a lot.

Yuefeng Zhou

unread,
Jun 7, 2021, 2:25:58 PM6/7/21
to joe montana, Discuss
Hi Joe,

Are you following this guide to use PSStrategy: https://www.tensorflow.org/tutorials/distribute/parameter_server_training? It has a section that talks about performance. If you can provide a profile for the coordinator, I can probably give more suggestions.

Thanks,
Yuefeng

--
You received this message because you are subscribed to the Google Groups "Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to discuss+u...@tensorflow.org.
To view this discussion on the web visit https://groups.google.com/a/tensorflow.org/d/msgid/discuss/ea1769d0-f731-4041-9e2d-c19d337885adn%40tensorflow.org.

joe montana

unread,
Jun 7, 2021, 11:58:08 PM6/7/21
to Yuefeng Zhou, Discuss
Thanks for the quick response. I will do profiling on the chief node. I also doubt I might have messed up big time with PS, so I will paste PS-specific code here.

def dist_train(args):
    os.environ["GRPC_FAIL_FAST"] = "use_caller"
    cluster_resolver = TFConfigClusterResolver()
    if cluster_resolver.task_type in ('ps', 'worker'):
        logging.info(
            "[{}] Start {}({})...".format(datetime.now(), cluster_resolver.task_type, cluster_resolver.task_id))
        server = tf.distribute.Server(
            cluster_resolver.cluster_spec(),
            job_name=cluster_resolver.task_type,
            task_index=cluster_resolver.task_id,
            protocol=cluster_resolver.rpc_layer or "grpc",
            start=True)
        print(cluster_resolver.cluster_spec())
        server.join()
    if cluster_resolver.task_type == 'chief':
        NUM_PS = len(cluster_resolver.cluster_spec().as_dict().get("ps", ()))
        # wait for workers to be ready
        time.sleep(5)
        logging.info(
            "[{}] Start {}({})...".format(datetime.now(), cluster_resolver.task_type, cluster_resolver.task_id))
        variable_partitioner = (
            tf.distribute.experimental.partitioners.FixedShardsPartitioner(
                num_shards=NUM_PS))

        strategy = tf.distribute.experimental.ParameterServerStrategy(
            cluster_resolver,
            variable_partitioner=variable_partitioner)

        with strategy.scope():
            keras_model = make_model(args)
            keras_model.compile(args)
            keras_model.fit(sharded_ds)

Reply all
Reply to author
Forward
0 new messages