Sub-workflows and execution directory

14 views
Skip to first unread message

Dave W

unread,
May 2, 2014, 1:51:04 PM5/2/14
to nipy...@googlegroups.com
Hello, 

I have run across some weird behavior with sub-workflows in a pipeline and I'd like to see if there is a better solution.  

My 'master' pipeline is per-subject with a per-scan session sub-workflow:

# master.py
import os

master_config = {'execution': {'create_report': True,
                               'crashdump_dir': os.getcwd(),
                                                              'hash_method': 'timestamp',
                               'job_finished_timeout' : 25,
                               'keep_inputs': True,
                               'local_hash_check': True,
                               'matplotlib_backend': 'Agg',
                               'plugin': 'Linear',
                               'remove_node_directories': False,
                               'remove_unnecessary_outputs': False,
                               'stop_on_first_crash': False,
                               'stop_on_first_rerun': False,
                               'use_relative_paths': False,
                               'stop_on_unknown_version': False,
                               'write_provenance': False,
                               'parameterize_dirs': True}
                }

from nipype import config
config.update_config(master_config)  # Set universal pipeline options
import nipype.pipeline.engine as pe

from workflows.session import createSessionWorkflow

masterWF
= pe.Workflow(name='master')  # Pipeline for a single subject
masterWF
.base_dir = '/path/to/cache/directory'

inputsSpec = pe.Node(interface=IdentityInterface(fields=['xxx', ...]),
                     run_without_submitting=True, name='inputspec')

sessionWF = createSessionWorkflow(name=session, ...)

masterWF.connect([(inputsSpec, sessionWF, [('xxx', 'inputspec.yyy'), ...]),
                  ...])

masterWF.run()

Inside the subworkflow 'sessionWF', there is a segmentation node that is a wrapper around a SEMLikeCommandLine object, BRAINSABCext().  This object uses a relative path for the input 'outputDir'.  'sessionWF' also has a subworkflow 'subWF' (included as an e

# workflows.session.py

def createSessionWorkflow(name='', ...):
    import nipype.pipeline.engine as pe
    
    from segmentation import 
createSegmentationWorkflow

    workflow = pe.Workflow(name=name)
    inputsSpec = pe.Node(interface=IdentityInterface(fields=['yyy', ...]),
                         run_without_submitting=True, name='inputspec')
    
    segmentNode = pe.Node(interface=BRAINSABCext(), name="BABC")
   
    # Relative path for node
    segmentNode.inputs.outputDir = './'
    
    ... # more nodes, connections, etc.

    subWF = createSegmentationWorkflow(name='sub_sub_wf', ...)

    workflow.connect([(inputsSpec, segmentNode, [...]),
                      (segmentNode, subWF, [...]), 
                      ...])

    return workflow   

During testing, the master workflow would ALWAYS rerun the segmentNode instead of falling through.  I wrote a test workflow and discovered that the segmentNode was executing in a temporary directory on every rerun, even though the node's cache directory existed and had completed previously with success. 

The solution I found was to explicitly set every workflow's base_dir attribute at the masterWF level, e.g.:

# master.py

...
cache_dir = 
'/path/to/cache/directory'
masterWF
.base_dir = cache_dir
sessionWF.base_dir = cache_dir
sessionWF.get_node('sub_sub_wf').base_dir = cache_dir


The execution directory and the cache directory are the same and the workflow now falls through correctly.  :-)  I'm not sure exactly how the execution directory is determined at runtime which makes me suspect there's a better way to do this.

Cheers,
Dave


Dave W

unread,
May 2, 2014, 3:49:45 PM5/2/14
to nipy...@googlegroups.com
Well, I thought I had it.  My simple test case passes, but a more complex case still hangs up on the offending segmentNode.  Anyone have a suggestion?  

I can provide the full test case with downsampled images and the code for the 'full' workflow if necessary.

Dave

Satrajit Ghosh

unread,
May 2, 2014, 3:59:25 PM5/2/14
to nipy-user
hi dave,

could you try to generate a flattened workflow for a single session a look. that might provide some clues.

wf = create_wf(...)
wf.export_graph(graph2use='flat')

and take a look at the graph.

the only place where wf.base_dir should be set is the outermost workflow.

cheers,

satra

--

---
You received this message because you are subscribed to the Google Groups "NiPy Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nipy-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dave W

unread,
May 6, 2014, 1:07:21 PM5/6/14
to nipy...@googlegroups.com
Hi Satra, 

Flat graph doesn't point to a smoking gun.  

Interesting - I've set 

config.enable_debug_mode()

but the workflow still reruns the node instead of stopping.  The node is a wrapper around BRAINSABC, but I don't think the wrapper is the culprit (this behavior began after some clean-up and refactoring of the nested pipelines).

Any ideas?    

Dave

Johnson, Hans J

unread,
May 6, 2014, 1:10:19 PM5/6/14
to nipy...@googlegroups.com
Dave,

I will try to look at it.  There is not sufficient information for Satra to be able to help you from the e-mail below.  Please provide the github branch that I need to look at.


Hans

Notice: This UI Health Care e-mail (including attachments) is covered by the Electronic Communications Privacy Act, 18 U.S.C. 2510-2521, is confidential and may be legally privileged.  If you are not the intended recipient, you are hereby notified that any retention, dissemination, distribution, or copying of this communication is strictly prohibited.  Please reply to the sender that you have received the message in error, then delete it.  Thank you.
Reply all
Reply to author
Forward
0 new messages