from fireworks import LaunchPad, PyTask, ScriptTask, Workflow
from fireworks.core.firework import FWAction, Firework, FireTaskBase, Workflow
from fireworks.core.rocket_launcher import rapidfire
from fireworks.utilities.fw_utilities import explicit_serialize
import time
@explicit_serialize
class exceptionTask(FireTaskBase):
def run_task(self, fw_spec):
raise NameError("just to throw an exception")
@explicit_serialize
class testTask(FireTaskBase):
def run_task(self, fw_spec):
fireworks = []
name1 = "sleep 1"
fw1 = Firework(ScriptTask.from_str("sleep 15"), spec={"_priority": 10},
name=name1)
fireworks.append(fw1)
name1 = "sleep 2"
fw2 = Firework(ScriptTask.from_str("sleep 15"), spec={"_priority": 10},
name=name1,
parents=[fw1])
fireworks.append(fw2)
name1 = "sleep 3"
fw3 = Firework(ScriptTask.from_str("sleep 15"), spec={"_priority": 10},
name=name1,
parents=[fw2])
fireworks.append(fw3)
links = {}
for i in range(0, len(fireworks) - 1):
links[fireworks[i]] = fireworks[i + 1]
workflow = Workflow(fireworks, links_dict=links)
return FWAction(detours=workflow)
@explicit_serialize
class endTask(FireTaskBase):
def run_task(self, fw_spec):
time.sleep(30)
launchpad = LaunchPad()
fireworks = []
name = "start"
sfw = Firework(ScriptTask.from_str("sleep 15"), name=name, spec={"_priority": 1})
fireworks.append(sfw)
name = "detours"
dfw = Firework(testTask(), name=name, parents=[sfw])
fireworks.append(dfw)
name = "end"
efw = Firework(ScriptTask.from_str("sleep 15"), name=name, spec={"_priority": 10},
parents=[dfw]
)
fireworks.append(efw)
wf = Workflow(fireworks, name="test detours")
launchpad.add_wf(wf)
launchpad.add_wf(wf)
launchpad.add_wf(wf)
launchpad.add_wf(wf)
launchpad.add_wf(wf)
rapidfire(launchpad)