Creating a Condor Workflow that consists of a single job being executed multiple times

13 views
Skip to first unread message

shawncrawley

unread,
Jan 12, 2017, 6:54:20 PM1/12/17
to Tethys Platform
This question is mainly for Scott (+scottych...@gmail.com), but of course anyone who knows is welcome to answer. I am trying to define a Condor Workflow that would consist of a single job being executed multiple times. Here is what I have defined:

In app.py:
    bpi_wq_analysis_job_description = CondorJobDescription(condorpy_template_name='vanilla_transfer_files',
                                                               remote_input_files
=None,
                                                               executable
='$(APP_WORKSPACE)/job_executables/bpi_wq_executable.py')
   
    bpi_wq_analysis_job
= CondorWorkflowJobTemplate(name='bpi_wq_analysis_job',
                                                        job_description
=bpi_wq_analysis_job_description,
                                                        parents
=[],
                                                        post_script
='$(APP_WORKSPACE)/job_executables/update_status_in_db.py')


    job_templates
= (CondorWorkflowTemplate(name='bpi_wq_analysis_workflow',
                                                jobs
=[bpi_wq_analysis_job],
                                                scheduler
=None
                                               
)
                             
)



In custom_class.py

class BPIWaterQualityAnalysisWorkflow(object):
   
def __init__(self,
                 user
,
                 project_name
,
                 workspace_path
,
                 input_file
,
                 source_nodes
,
                 epanet_project_id
,
                 model_db_url
,
                 project_db_url
,
                 user_project_id
,
                 
):


       
self.user = user
       
self.project_name = project_name
       
self.workspace_path = workspace_path
       
self.input_file = input_file
       
self.source_nodes = source_nodes
       
self.epanet_project_id = epanet_project_id
       
self.model_db_url = model_db_url
       
self.project_db_url = project_db_url
       
self.user_project_id = user_project_id


       
self.workflow = None


   
def prepare(self):
       
"""
        Prepares jobs for processing into database.
        """

        job_manager
= app.get_job_manager()


       
# Set parameters for HTCondor job
       
self.workflow = job_manager.create_job(name=self.project_name,
                                               user
=self.user,
                                               template_name
='bpi_wq_analysis_workflow',
                                               workspace
=self.workspace_path)


       
for node in self.source_nodes:
           
params = node.bpi_input.source_parameters.replace('"', "'")
            source_label
= node.label


            bpi_wq_analysis_job
= self.workflow.get_job('bpi_wq_analysis_job')
            bpi_wq_analysis_job
.set_attribute('arguments', (self.workspace_path,
                                                           
self.input_file,
                                                            source_label
,
                                                           
params.replace('"', '\\"'),
                                                           
self.epanet_project_id,
                                                           
self.model_db_url,
                                                           
self.project_db_url,
                                                           
self.user_project_id
                                                           
)
                                             
)
            bpi_wq_analysis_job
.post_script_args = ' '.join((str(self.project_db_url),
                                                             str
(self.user_project_id),
                                                             
'BPI_WQ_ANALYSIS')
                                                           
)
            bpi_wq_analysis_job
.save()


       
# finalize
       
self.workflow.extended_properties['user_project_id'] = str(self.user_project_id)
       
self.workflow.save()


   
def run_job(self):
       
"""
        Executes the prepared job.
        """

       
self.prepare()
       
self.workflow.execute()



In script_that_executes_the_workflow.py:
   
source_nodes = [list, of, nodes]


    job
= BPIWaterQualityAnalysisWorkflow(
        user
=request.user,
        project_name
=project_name,
        workspace_path
=job_path,
        input_file
=temp_inp,
        source_nodes
=source_nodes,
        epanet_project_id
=epanet_project_id,
        model_db_url
=model_db.db_url,
        project_db_url
=project_db_url,
        user_project_id
=user_project_id
   
)


    job
.run_job()


The trouble I run into is in the prepare function inside of the BPIWaterQualityAnalysisWorkflow class in the custom_class.py. I am trying to loop through all of my nodes, and trigger a separate job for each node. However, only a single job is being triggered - the last one in the loop. I know this is because the self.workflow.get_job('bpi_wq_analysis_job'line always returns the same, unique object, and is thus overwritten in the loop. Is there a good way to work around this without having to literally define tons of different jobs that should really be the same job? Does that make sense? I am wanting to take advantage of the clustering for each of these jobs, and thus do not want to put the loop somewhere else where it will act synchronously. Let me know if you have any thoughts/ideas/questions/solutions. Thanks! 

sdc50

unread,
Jan 18, 2017, 1:11:52 PM1/18/17
to tethysp...@googlegroups.com
Unfortunately I didn't design the interface for CondorWorkflows to be that flexible (I hope that will change in the future). However, there is still a way to do what I think you want, but it requires a little bit of a hack. I'll first try to show that method and then I'll show an alternative that I think would provide a more elegant solution. 

Caution: I haven't actually tested any of this code. If you run into issues then let me know I can help you debug.

Solution1 (the hack): 

This first way requires that you access the CondorWorkflowJobTemplate directly in the app and instantiate it as may times as you need. Confusingly, a CondorWorkflowJob is referred to on the backend as a node. That makes the variable names a little bit confusing for this particular example, but hopefully you'll be able to follow (note: I've just put ellipses in place of code from your previous example that is unchanged and not needed for context):

In app.py:

Rename the CondorWorkflowJobTemplate so that it has an index. The key to creating multiple nodes (i.e. workflow jobs) in a workflow is that they each must have a unique name. The Job Manager will automatically instantiate the first node from the template so we give it the first index of 0. Later we will instantiate the rest of the nodes needed and rename them with the appropriate index.
    ...
    
    bpi_wq_analysis_job 
= CondorWorkflowJobTemplate(name='bpi_wq_analysis_job_0',
                                                    ...



In custom_class.py

class BPIWaterQualityAnalysisWorkflow(object):
    
...

    
def prepare(self):

        
"""
        Prepares jobs for processing into database.
        """

        job_manager 
= app.get_job_manager()


        
# Set parameters for HTCondor job
        
self.workflow = job_manager.create_job(name=self.project_name,
                                               user
=self.user,
                                               template_name
='bpi_wq_analysis_workflow',
                                               workspace
=self.workspace_path)
    
        # This is the hack to get the CondorWorkflowJobTemplate and instantiate more workflow nodes
        template = job_manager.job_templates[
'bpi_wq_analysis_workflow']
        node_templates = list(template.node_templates)
        node_template = node_templates[0]
        for i in range(1, len(self.source_nodes)):  Start with one because index 0 was already created by the job manager 
            node_template.name = 'bpi_wq_analysis_job_{0}'.format(i)
            node = node_template.create_node(workflow, app.get_app_workspace(), workspace_path)
  
        for index, node in enumerate(self.source_nodes):

            
params = node.bpi_input.source_parameters.replace('"', "'")
            source_label 
= node.
label


            bpi_wq_analysis_job 
= self.workflow.get_job('bpi_wq_analysis_job'.format(index))

            ...

            



Solution 2:


This second approach involves listing all of the parameters in a file and then setting num_jobs attribute on the single workflow job to execute multiple instances of the job. It requires changing your 'bpi_wq_executable.py' accept the job instance id (also known as the process id) as a single parameter, which will then be used to grab a set of parameters from the parameter file. First, you might go about creating the parameters file and setting the num_jobs like this:

In custom_class.py

class BPIWaterQualityAnalysisWorkflow(object):
    ...


    def prepare(self):
        """
        Prepares jobs for processing into database.
        """
        

        # generate a file that contains the parameters for each job
        parameters = os.path.join(self.workspace_path, 'paramters.txt')

        with open(paramters, 'w') as param_file:

            for node in self.source_nodes:
                params = node.bpi_input.source_parameters.replace('"', "'")
                source_label = node.label
                param_file.write(' '.join(self.workspace_path,
                                            self.input_file,
                                            source_label,
                                            params.replace('"', '\\"'),
                                            self.epanet_project_id,
                                            self.model_db_url,
                                            self.project_db_url,
                                            self.user_project_id))

        job_manager = app.get_job_manager()


        # Set parameters for HTCondor job
        self.workflow = job_manager.create_job(name=self.project_name,
                                               user=self.user,
                                               template_name='bpi_wq_analysis_workflow',
                                               workspace=self.workspace_path)

        bpi_wq_analysis_job = self.workflow.get_job('bpi_wq_analysis_job')

        bpi_wq_analysis_job.num_jobs = len(self.source_nodes)

        bpi_wq_analysis_job.set_attribute('transfer_input_files', 'parameters.txt')
        bpi_wq_analysis_job.set_attribute('arguments', '$(process)')


        bpi_wq_analysis_job.post_script_args = ' '.join((str(self.project_db_url),
                                                             str(self.user_project_id),
                                                             'BPI_WQ_ANALYSIS')
                                                            )
        bpi_wq_analysis_job.save()

        # finalize
        self.workflow.extended_properties['user_project_id'] = str(self.user_project_id)
        self.workflow.save()

    def run_job(self):
        """
        Executes the prepared job.
        """
        self.prepare()
        self.workflow.execute()


Now change your Python executable to read the parameters from the parameters file by doing something like this:

In bpi_wq_executable.py

import sys

line_number = sys.argv[1]  # this is the job instance id that gets passed in

with open('parameters.txt', 'r') as params_file:
    lines = params_file.readlines()
    params = lines[line_number].split()

# use params in the rest of your script


Hopefully you can make sense of this. Sorry about the code highlighting (Google Groups and I don't get along so well). Let me know if you have further questions.

Reply all
Reply to author
Forward
0 new messages