I believe I have figured out the issue and have it working. Although I'm not sure if this is the best/most secure way to go about this.
An overview of what to get this working is:
1. I changed the APP from a UserApp to a ClientApp
2. I have the CLIENT_ID and CLIENT_SECRET set as environment variables in my endpoint and use os.environ.get() to retrieve those values
3. Make sure the CLIENT_ID being used has write permissions to the search index
4. Monitor the ingest task and return SUCCESS, FAIL, TIMEOUT depending on the situation
So now the relevant ingest portion of my compute function (which is run as part of a flow) looks like
def prettyprint_json(obj, fp=None):
if fp:
return json.dump(obj, fp, indent=2, separators=(",", ": "), ensure_ascii=False)
return json.dumps(obj, indent=2, separators=(",", ": "), ensure_ascii=False)
# Write the ingest data just for backup
with open(ingest_file, "w") as fp:
prettyprint_json(
{"ingest_type": "GMetaList", "ingest_data": {"gmeta": entries}}, fp)
# Ingest the data
tmp_ingest = prettyprint_json(
{"ingest_type": "GMetaList", "ingest_data": {"gmeta": entries}})
ingest_data = json.loads(tmp_ingest)
CLIENT_ID = os.environ.get("GLOBUS_COMPUTE_CLIENT_ID")
SECRET = os.environ.get("GLOBUS_COMPUTE_CLIENT_SECRET")
INDEX_ID = "XXXXXXXXXXXXXXXXXXXXXXXXX"
APP = globus_sdk.ClientApp("ingest-app", client_id=CLIENT_ID, client_secret=SECRET)
SEARCH_CLIENT = globus_sdk.SearchClient(
app=APP,
app_scopes=[globus_sdk.Scope(globus_sdk.SearchClient.scopes.all)])
ingest_res = SEARCH_CLIENT.ingest(INDEX_ID, ingest_data)
task_id = ingest_res["task_id"]
waited = 0
max_wait = 1200 # 20 minutes
while True:
res = SEARCH_CLIENT.get_task(task_id)
if res["state"] in ("SUCCESS", "FAILED"):
return res["state"]
# wait 1s and check for timeout
waited += 1
if waited >= max_wait:
return "Ingest timed out"
time.sleep(1)