Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

Question about globus compute and flows

24 views
Skip to first unread message

Anthony Weaver

unread,
Feb 20, 2025, 11:24:21 AMFeb 20
to Discuss
I've setup a globus compute endpoint and I'm working on writing a flow that involves a compute portion.  I'm currently focusing on just the compute and I'm stuck trying to get the program arguments passed from the flow to the actual compute function.  As an example, consider this example compute function:

def hello(firstname, lastname):
    """Say hello to someone."""
    return 'Hello {} {}'.format(firstname, lastname)

func_id = gcc.register_function(hello) task_id = gcc.run("Bob", "Smith", endpoint_id=tutorial_endpoint, function_id=func_id)

try:
   print(gcc.get_result(task_id))
except Exception as e:
   print("Exception: {}".format(e))

Instead of hard coding "Bob" and "Smith", how would I modify this code to get the information from a field or fields as defined in the flow?

Anthony Weaver

unread,
Feb 20, 2025, 2:06:07 PMFeb 20
to Discuss, Anthony Weaver
Let me be more clear in my setup.  
My function
def run_cameratrap(image_dir, output_dir, json_file, threshold, batch_zize):
    import subprocess

    image_dir = "$DATADIR/" + image_dir
    output_dir = "$DATADIR/" + output_di
    threshold = "--threshold " + threshold
    batch_size = "--batch_size " + batch_size

    return subprocess.run(["run_detector", image_dir, ouputt_dir, json_file, threshold, batch_size], shell=True)

definition.json
{
"Comment": "Transfer and process files by invoking a Globus Compute function",
"StartAt": "ProcessImages",
"States": {
"ProcessImages": {
"Comment": "Process cameratrap images using pytorch wildlife",
"Type": "Action",
"Parameters": {
"tasks": [
{
"image_dir.=": "getattr('image_dir')",
"output_dir.=": "getattr('output_dir')",
"json_file.=": "getattr('json_file')",
"threshold.=": "getattr('threshold')",
"batch_size.=": "getattr('batch_size')"
}
],
"endpoint_id": "c82c8b96-7548-432b-82e8-4d8d0e58b415",
"function_id": "2c936e5c-8594-4f20-aa58-adc2b5f1fd7b"
},
"ResultPath": "$.ProcessImages",
"WaitTime": 180,
"End": true
}
}
}

schema.json
{
"type": "object",
"required": [
"image_dir",
"output_dir",
"json_file",
"threshold",
"batch_size"
],
"properties": {
"image_dir": {
"type": "string",
"title": "Image directory (excluding $DATADIR)"
},
"output_dir": {
"type": "string",
"title": "Output directory (excluding $DATADIR)"
},
"json_file": {
"type": "string",
"title": "Name of the JSON file"
},
"threshold": {
"type": "string",
"default": "0.5",
"title": "Detection threshold"
},
"batch_size": {
"type": "string",
"default": "64",
"title": "Batch size"
}
},
"additionalProperties": false
}

The error when I run the flow
{
  "exception": "ActionUnableToRun",
  "cause": {
    "message": "Field '$.tasks[0].function_id' (category: 'value_error.missing'): field required; Field '$.tasks[0].batch_size' (category: 'value_error.extra'): extra fields not permitted; Field '$.tasks[0].image_dir' (category: 'value_error.extra'): extra fields not permitted; Field '$.tasks[0].json_file' (category: 'value_error.extra'): extra fields not permitted; Field '$.tasks[0].output_dir' (category: 'value_error.extra'): extra fields not permitted; Field '$.tasks[0].threshold' (category: 'value_error.extra'): extra fields not permitted"
  },
  "state_name": "ProcessImages"
}

I am certain it's just something I am doing wrong either in the .py file or in definition.json, I'm just not sure what it is.  I read through the action provider documentation and the compute setup but it's unclear to how to link the two when it comes get the information from the flow into the python code

As always, I am appreciative for all help

Lei Wang

unread,
Feb 20, 2025, 5:16:36 PMFeb 20
to Anthony Weaver, Discuss
Hi Anthony,

The issue seems to be the structure you are passing into the Compute action step.  Notably, each task needs to specify the function_id, thus '$.tasks[0].function_id' (category: 'value_error.missing'), and args/kwargs need to be passed in as arguments named 'args' and 'kwargs'.  See https://globus-compute.readthedocs.io/en/stable/actionprovider.html for the v3 schema.  The root level of what the action provider accepts is about the overall submission (endpoint_id, resource_specification etc), not an argument list to a specific function.

An example of the structure:

      "Parameters": {
        "tasks": [
          {
            "args.=": "getattr('my_schema_input')",
            "kwargs.=": "getattr('schema_json_object')",
            "function_id.$": "$.function_id_input"   #   "function_id": "abc123ab-8594-4f20-aa58-adc2b5f1fd7b"
          }
        ],
        "endpoint_id": "4b116d3c-1703-4f8f-9f6f-39921e5864df"
      }
Let me know if you have any further questions,
Lei

Anthony Weaver

unread,
Feb 24, 2025, 11:35:06 AMFeb 24
to Discuss, l...@globus.org, Discuss, Anthony Weaver

Thank you for your last reply.  I placed the function_id in the proper place in the JSON structure but I'm still struggling mightily with the args portion.  To try and simplify a bit I am using named arguments:

image_dir,
output_dir,
json_file,
threshold,
batch_size

These get passed to my compute state as:
{
  "input": {
    "threshold": "0.5",
    "batch_size": "64",
    "image_dir": "tony_compute_test/AKI_25_1_CAKI1/DCIM",
    "output_dir": "tony_compute_test/AKI_25_1_CAKI1/output",
    "json_file": "tony.json"
  }
}

I have tried many different things to get args set properly but they
1. Error out while validating the .json file with different errors OR
2. Error out during the run of the flow with a wide variety of errors OR
3. Runs with out error but does not pass the proper value to my python code
Most often it treats what I put inside the [ ] as a string instead of passing on the actual value 
For example if I try:
"args.=": "[ '$.input.input_dir', '$.input.output_dir', '$.input.json_file', '$.input.threshold', '$.input.batch_size' ]",

In my flow results I see things like
"result": [
          "/camtrapdata/$.input.image_dir\n/camtrapdata/$.input.output_dir\n$.input.json_file\n--threshold $.input.threshold\n--batch_size $.input.batch_size"
        ],
        "results": [
          {
            "output": "/camtrapdata/$.input.image_dir\n/camtrapdata/$.input.output_dir\n$.input.json_file\n--threshold $.input.threshold\n--batch_size $input.batch_size",
            "task_id": "897326f1-b813-4a51-9377-5414e7f79212"

The flow runs without error but the compute function actually fails because those values are gibberish

Lei Wang

unread,
Feb 24, 2025, 11:39:44 AMFeb 24
to Anthony Weaver, Discuss
Anthony,

Our apologies that you are having trouble with sending arguments properly to the flow.  We will improve the documentation to list concrete syntax and examples

Meanwhile, I will duplicate your flow and a function similar to yours myself and then send you the syntax I used as soon as possible, thanks for your patience!

Lei

Anthony Weaver

unread,
Feb 24, 2025, 11:46:39 AMFeb 24
to Discuss, l...@globus.org, Discuss, Anthony Weaver
Lei,

Thank you so much for your help.  If it would be useful the name of the flow I'm working on is Ardia Cameratrap Flow.  I don't know if you have access to edit it or if I should give you access.
I plan on making this a 2 step flow, where the first step is to do a data transfer where the endpoint and path  the data gets transfered to will be my image_dir and the user will also pick
an endpoint and path for  the output to go.  Right now I'm just trying to get the compute to run with essentially hard coded values from the flow form

Lei Wang

unread,
Feb 24, 2025, 11:49:44 AMFeb 24
to Anthony Weaver, Discuss
Anthony,

No worries about the flow access, what you posted in the original message should be enough for me to write something similar enough to diagnose (but I'll let you know otherwise).

Lei

Lei Wang

unread,
Feb 24, 2025, 3:09:45 PMFeb 24
to Anthony Weaver, Discuss
Hello Anthony,

I created two one-step flows and ran them successfully with custom inputs. Both were tested with this function that has 2 arguments and 3 kwargs:

def my_test_func(arg1, arg2, k1: str = '', k2: str = '', k3: str = ''):
    print(f"{arg1}..{arg2}..{k1}..{k2}..{k3}")

The syntax should be easily adaptable to your use case (both the function and endpoint UUIDs are hard coded like yours, but they can be taken from arguments similarly).

Please let me know if this allows you to get your flow running successfully.  Meanwhile, we will also add these examples to the Globus Compute Action Provider docs ASAP.

Here is the first version's flow and schema JSON, where it uses the raw str input in the Flow like this:
Screenshot 2025-02-24 at 2.56.42 PM.png
Flow :
{
  "Comment": "First test version 1",
  "StartAt": "RunTasks",
  "States": {
    "RunTasks": {

      "Type": "Action",
      "ActionUrl": "https://compute.actions.globus.org/v3",
      "Parameters": {
        "tasks": [
          {
            "args.=": "getattr('my_args')",
            "kwargs.=": "getattr('my_kwargs')",
            "function_id": "abcd1234-fd0d-48c1-a975-10a23e7ba839"
          }
        ],
        "endpoint_id": "abcd1234-2a98-415a-883b-0f40d973abc8"
      },
      "ResultPath": "$.RunResult",
      "End": true
    }
  }
}

Schema:
{
  "type": "object",
  "required": [
  ],
  "properties": {
    "my_args": {
      "type": "array",
      "description": "args to the task, ie. [123, \"hello world\"]"
    },
    "my_kwargs": {
      "type": "object",
      "description": "kwargs to the task, ie. {\"first\": 123, \"second\": \"my string\"}"
    }
  },
  "propertyOrder": [
    "my_args",
    "my_kwargs"
  ],
  "additionalProperties": false
}


And the second version still uses the arguments as an array but now has individual kwargs as in your function, with individual inputs:
Screenshot 2025-02-24 at 2.46.51 PM.png
Flow JSON:
{
  "Comment": "Second flow V2",
  "StartAt": "RunTasks",
  "States": {
    "RunTasks": {

      "Type": "Action",
      "ActionUrl": "https://compute.actions.globus.org/v3",
      "Parameters": {
        "tasks": [
          {
            "function_id": "abcd1234-fd0d-48c1-a975-10a23e7ba839",
            "args.=": "getattr('my_args')",
            "kwargs": {
              "k1.=": "getattr('my_kwarg_1')",
              "k2.=": "getattr('my_kwarg_2')",
              "k3.=": "getattr('my_kwarg_3')"
            }
          }
        ],
        "endpoint_id": "abcd1234-2a98-415a-883b-0f40d973abc8"
      },
      "ResultPath": "$.RunResult",
      "End": true
    }
  }
}

Schema:
{
  "type": "object",
  "required": [
  ],
  "properties": {
    "my_args": {
      "type": "array",
      "description": "argument args"
    },
    "my_kwarg_1": {
      "type": "string",
      "description": "kwarg 1"
    },
    "my_kwarg_2": {
      "type": "string",
      "description": "kwarg 2"
    },
    "my_kwarg_3": {
      "type": "string",
      "description": "kwarg 3"
    }
  },
  "propertyOrder": [
    "my_args",
    "my_kwarg_1",
    "my_kwarg_2",
    "my_kwarg_3"
  ],
  "additionalProperties": false
}

Anthony Weaver

unread,
Feb 24, 2025, 4:54:15 PMFeb 24
to Lei Wang, Discuss
Lei,

Thank you so much for these examples.  I just want to check that I understand these correctly.  It would seem that if I want to use args
then in the form, the person executing the flow has to enter them in a single field enclosed in [ ].  If that is the correct
interpretation then it may be better in my case to use kwargs instead.

With the kwargs setup,  if my image directory is going to be path on my endpoint the user selects then I might do something like have a part of a schema that looks like:

"destination": {
      "type": "object",
      "title": "Destination",
      "format": "globus-collection",
      "required": [
        "id",
        "path"
      ],
      "properties": {
        "id": {
          "type": "string",
          "const": "6a72df7a-91ee-49d9-bee7-624746e2adca",
          "title": "Destination Collection ID",
          "format": "uuid",
          "description": "The UUID for the collection which serves as the destination for the two-stage Transfer"
        },
        "path": {
          "type": "string",
          "title": "Destination Collection Path",
          "default": "Type your GSU campusid and click Browse",
          "description": "The path on the destination collection where the data will be stored"
        }
      },

And in the flow I would do something like
"image_dir.=": "getattr('destination.path')" 

Anthony Weaver

unread,
Feb 25, 2025, 11:05:02 AMFeb 25
to Discuss, Anthony Weaver, Discuss, l...@globus.org
Lei,

This certainly helped move a good chunk in the right direction in terms of getting the flow to work.  Thank you

Tony

Lei Wang

unread,
Feb 25, 2025, 4:55:43 PMFeb 25
to Anthony Weaver, Discuss
It would seem that if I want to use args
then in the form, the person executing the flow has to enter them in a single field enclosed in [ ].

The flow example I posted does require a string input that is a JSON formatted array with brackets.  The syntax would have to be remembered, possibly with the help of the description of the field shown to the user.

I'll get back to you on whether individual args can be passed in as separate (string) fields in the schema, instead of as an array.

And in the flow I would do something like
"image_dir.=": "getattr('destination.path')" 

Another syntax is "image_dir.$": "$.destination.path"
 

Lei Wang

unread,
Feb 25, 2025, 6:41:13 PMFeb 25
to Anthony Weaver, Discuss
Anthony,

I just tested a flow that does take individual arg inputs to combine into args.  The key is .=": and square brackets in a string with getattr()
This should work for individual string arguments.  (kwargs were shown already in the earlier flows)

Here are the relevant sections:
Flow Definition:
...
            "args.=": "[getattr('my_arg_1'), getattr('my_arg_2')]",
            "kwargs": {
   ...


The schema:
...
    "my_arg_1": {
      "type": "string",
      "description": "argument args"
    },
    "my_arg_2": {
      "type": "string",
      "description": "argument args"
    },
...

The input in the Flows web UI:
Screenshot 2025-02-25 at 6.33.55 PM.png
The output from the function execution in Flows:
Screenshot 2025-02-25 at 6.34.09 PM.png

Hope this gives you better options for your users, and thank you for your patience!

Anthony Weaver

unread,
Feb 25, 2025, 6:49:38 PMFeb 25
to Lei Wang, Discuss
Lei,

Thank you so much.  Your version was probably the only thing I didn't try with args.  

I did get the compute portion of my flow running this afternoon using the kwargs version, but I may
switch to the args version as I develop the rest of the steps I want to do in this flow. 

 As an aside I ran into some trouble running compute when the executable I was trying to run was in a conda environment.  I worked through that, 
It might be helpful to make a note of how to do that in the documentation somewhere

Thank you again for working through all this and providing much needed help

Tony

Lei Wang

unread,
Feb 25, 2025, 6:53:55 PMFeb 25
to Anthony Weaver, Discuss
Anthony,

Agreed with the documentation and examples.  Small syntax differences can mean the difference between working well and 'not sure if it can be done at all'.  Better labeled examples of some of these usage building blocks is a priority for us as well, and thanks again for bring up the pain points so we are aware of them.

Lei
Reply all
Reply to author
Forward
0 new messages