How to dynamically insert workflow

54 views
Skip to first unread message

Marc Jäger

unread,
Mar 12, 2019, 11:51:25 AM3/12/19
to fireworkflows
Hello,

In the detours argument of FWAction, it says you can give a list of FWs/WFs. I am trying to insert a mini-workflow dynamically but something is wrong. 
This workflow runs just fine.

------
from fireworks import Firework, Workflow, LaunchPad, ScriptTask, PyTask
from fireworks.core.rocket_launcher import rapidfire

launchpad = LaunchPad()

launchpad.reset('', require_password=False)

# create the individual FireWorks and Workflow

#fw0 = Firework(ScriptTask.from_str("echo start"), name = "start")
#fw1 = Firework(WorkflowDetourClass(), name = "split")
#fw2 = Firework(ScriptTask.from_str("echo finish"), name = "finish")
#wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name="test workflow")
links = {}
new_fws_lst = []
for i in range(3):
    hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))
    goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))
    new_fws_lst.append(hello_fw)
    new_fws_lst.append(goodbye_fw)
    links.update({hello_fw : [goodbye_fw]})

wf = Workflow(new_fws_lst, links)
launchpad.add_wf(wf)
rapidfire(launchpad)

-----

But if the same workflow is inserted in another workflow through detours, this error is thrown:
  File ".local/lib/python3.5/site-packages/fireworks/core/launchpad.py", line 1467, in _refresh_wf
    updated_ids = wf.refresh(fw_id)
  File ".local/lib/python3.5/site-packages/fireworks/core/firework.py", line 1005, in refresh
    updated_ids = updated_ids.union(self.apply_action(m_action, fw.fw_id))
  File ".local/lib/python3.5/site-packages/fireworks/core/firework.py", line 834, in apply_action
    new_updates = self.append_wf(wf, [fw_id], detour=True, pull_spec_mods=False)
  File ".local/lib/python3.5/site-packages/fireworks/core/firework.py", line 947, in append_wf
    updated_ids = self.refresh(new_fw.fw_id, set(updated_ids))
  File ".local/lib/python3.5/site-packages/fireworks/core/firework.py", line 979, in refresh
    if self.fw_states[parent] not in completed_parent_states:
KeyError: -8

----
from fireworks import explicit_serialize, FiretaskBase, FWAction, ScriptTask
import sys 
from fireworks import Workflow, Firework

@explicit_serialize
class WorkflowDetourClass(FiretaskBase):
    _fw_name = 'WorkflowDetourTask'
    required_params = []
    optional_params = []

    def run_task(self, fw_spec):
        links = {}
        new_fws_lst = []

        for i in range(3):
            hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))
            goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

            new_fws_lst.append(hello_fw)
            new_fws_lst.append(goodbye_fw)

            links.update({hello_fw : [goodbye_fw]})

        wf = Workflow(new_fws_lst, links)
        return FWAction(detours = wf) 
---


fw0 = Firework(ScriptTask.from_str("echo start"), name = "start")
fw1 = Firework(WorkflowDetourClass(), name = "split")
fw2 = Firework(ScriptTask.from_str("echo finish"), name = "finish")
wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name="test workflow")

----

What am I doing wrong here? Does anyone have a working example of an inserted workflow?
Cheers
Marc

Anubhav Jain

unread,
Mar 25, 2019, 4:02:56 PM3/25/19
to fireworkflows
Hi Marc,

Thanks for including detailed code for your problem. I ran your code (specifically the code pasted below) with the latest FWS v1.8.7 and did not hit any errors. I've also attached a screenshot of the web gui after executing the code, which shows everything executing OK. So, I am not sure what might be wrong.

========
from fireworks import explicit_serialize, FiretaskBase, FWAction, ScriptTask, \
    LaunchPad
from fireworks import Workflow, Firework
from fireworks.core.rocket_launcher import rapidfire


@explicit_serialize
class WorkflowDetourClass(FiretaskBase):
    _fw_name = 'WorkflowDetourTask'
    required_params = []
    optional_params = []

    def run_task(self, fw_spec):
        links = {}
        new_fws_lst = []

        for i in range(3):
            hello_fw = Firework(ScriptTask.from_str("echo hello " + str(i)))
            goodbye_fw = Firework(ScriptTask.from_str("echo goodbye " + str(i)))

            new_fws_lst.append(hello_fw)
            new_fws_lst.append(goodbye_fw)

            links.update({hello_fw : [goodbye_fw]})

        wf = Workflow(new_fws_lst, links)
        return FWAction(detours = wf)


fw0 = Firework(ScriptTask.from_str("echo start"), name = "start")
fw1 = Firework(WorkflowDetourClass(), name = "split")
fw2 = Firework(ScriptTask.from_str("echo finish"), name = "finish")
wf = Workflow([fw0, fw1, fw2], {fw0 : [fw1], fw1: [fw2], }, name="test workflow")

launchpad = LaunchPad()
launchpad.reset('', require_password=False)
launchpad.add_wf(wf)
rapidfire(launchpad)
========
Screen Shot 2019-03-25 at 12.59.38 PM.png
Screen Shot 2019-03-25 at 1.02.39 PM.png

Marc Jäger

unread,
Mar 29, 2019, 1:37:05 PM3/29/19
to fireworkflows
Hi Anubhav,

thanks for running and checking my code, it helped to find a solution. I managed to get it working on python 3.6.7 and 3.6.8.
As soon as python is downgraded to 3.5.3 or 3.5.6, though, the error posted above occurs. It is tested on 2 different machines.

I will investigate this and post a solution for python 3.5 when I find out what is happening here. In the meantime, could you verify that you get the same error with python 3.5?

Cheers
Marc

P.S: Everything is run in a virtualenv

Marc Jäger

unread,
Apr 2, 2019, 8:48:03 AM4/2/19
to fireworkflows
Problem solved for python 3.5, Firework 1.8.7

Fireworks is using OrderedDict - but not strictly everywhere. The reason for the different behaviour with respect to different versions lies in how python treats regular dictionaries. From python 3.6 onwards, they are insertion ordered - which is usually not required in Fireworks. Except when a Workflow is added dynamically which is where python 3.5 failed to execute, since it tried to add a FW which had a parent which had not been added before (In the example: goodbye before hello).

When initializing Workflow() an easy fix is possible by enforcing insertion order. I added a line (bold) in core/firework.py

---
        # main dict containing mapping of an id to a Firework object
        self.id_fw = {} 
        for fw in fireworks:
            if fw.fw_id in self.id_fw:
                raise ValueError('FW ids must be unique!')
            self.id_fw[fw.fw_id] = fw 

            if fw.fw_id not in links_dict and fw not in links_dict:
                links_dict[fw.fw_id] = [] 
        self.id_fw = OrderedDict([(fw.fw_id, fw) for fw in fireworks])
        self.links = Workflow.Links(links_dict)
---

Anubhav Jain

unread,
Apr 2, 2019, 12:20:48 PM4/2/19
to Marc Jäger, fireworkflows
Hi Marc

Thanks! 

Two questions:

1. I understand you are trying to enforce insertion order for "id_fw" in Workflow, but it's unclear to me which portion of the code requires that Workflow.id_fw is in a particular order (vs any unordered dict). Do you happen to know what part of the code is actually assuming that Workflow.id_fw is in a specific order?

2. For a fix, does the below work? i.e., instead of reordering self.id_fw afterwards, to just maintain insertion order at the time of constructing it.
---
        # main dict containing mapping of an id to a Firework object
        self.id_fw = OrderedDict()
        for fw in fireworks:
            if fw.fw_id in self.id_fw:
                raise ValueError('FW ids must be unique!')
            self.id_fw[fw.fw_id] = fw 

            if fw.fw_id not in links_dict and fw not in links_dict:
                links_dict[fw.fw_id] = [] 
        self.links = Workflow.Links(links_dict)
---

Best,
Anubhav


--
You received this message because you are subscribed to the Google Groups "fireworkflows" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fireworkflow...@googlegroups.com.
To post to this group, send email to firewo...@googlegroups.com.
Visit this group at https://groups.google.com/group/fireworkflows.
For more options, visit https://groups.google.com/d/optout.


--
Best,
Anubhav

Marc Jäger

unread,
Apr 3, 2019, 7:42:37 AM4/3/19
to fireworkflows

 I replied to you by email, I only saw this post here now.
 

Anubhav Jain

unread,
Apr 5, 2019, 7:42:52 PM4/5/19
to fireworkflows
Thanks! The issue should be fixed (with your help) in FWS v1.8.8, let me know if you still see problems
Reply all
Reply to author
Forward
0 new messages