bpi_wq_analysis_job_description = CondorJobDescription(condorpy_template_name='vanilla_transfer_files',
remote_input_files=None,
executable='$(APP_WORKSPACE)/job_executables/bpi_wq_executable.py')
bpi_wq_analysis_job = CondorWorkflowJobTemplate(name='bpi_wq_analysis_job',
job_description=bpi_wq_analysis_job_description,
parents=[],
post_script='$(APP_WORKSPACE)/job_executables/update_status_in_db.py')
job_templates = (CondorWorkflowTemplate(name='bpi_wq_analysis_workflow',
jobs=[bpi_wq_analysis_job],
scheduler=None
)
)
class BPIWaterQualityAnalysisWorkflow(object):
def __init__(self,
user,
project_name,
workspace_path,
input_file,
source_nodes,
epanet_project_id,
model_db_url,
project_db_url,
user_project_id,
):
self.user = user
self.project_name = project_name
self.workspace_path = workspace_path
self.input_file = input_file
self.source_nodes = source_nodes
self.epanet_project_id = epanet_project_id
self.model_db_url = model_db_url
self.project_db_url = project_db_url
self.user_project_id = user_project_id
self.workflow = None
def prepare(self):
"""
Prepares jobs for processing into database.
"""
job_manager = app.get_job_manager()
# Set parameters for HTCondor job
self.workflow = job_manager.create_job(name=self.project_name,
user=self.user,
template_name='bpi_wq_analysis_workflow',
workspace=self.workspace_path)
for node in self.source_nodes:
params = node.bpi_input.source_parameters.replace('"', "'")
source_label = node.label
bpi_wq_analysis_job = self.workflow.get_job('bpi_wq_analysis_job')
bpi_wq_analysis_job.set_attribute('arguments', (self.workspace_path,
self.input_file,
source_label,
params.replace('"', '\\"'),
self.epanet_project_id,
self.model_db_url,
self.project_db_url,
self.user_project_id
)
)
bpi_wq_analysis_job.post_script_args = ' '.join((str(self.project_db_url),
str(self.user_project_id),
'BPI_WQ_ANALYSIS')
)
bpi_wq_analysis_job.save()
# finalize
self.workflow.extended_properties['user_project_id'] = str(self.user_project_id)
self.workflow.save()
def run_job(self):
"""
Executes the prepared job.
"""
self.prepare()
self.workflow.execute()
source_nodes = [list, of, nodes]
job = BPIWaterQualityAnalysisWorkflow(
user=request.user,
project_name=project_name,
workspace_path=job_path,
input_file=temp_inp,
source_nodes=source_nodes,
epanet_project_id=epanet_project_id,
model_db_url=model_db.db_url,
project_db_url=project_db_url,
user_project_id=user_project_id
)
job.run_job()
...
bpi_wq_analysis_job = CondorWorkflowJobTemplate(name='bpi_wq_analysis_job_0',
...
class BPIWaterQualityAnalysisWorkflow(object):
...
def prepare(self):
"""
Prepares jobs for processing into database.
"""
job_manager = app.get_job_manager()
# Set parameters for HTCondor job
self.workflow = job_manager.create_job(name=self.project_name,
user=self.user,
template_name='bpi_wq_analysis_workflow',
workspace=self.workspace_path)
# This is the hack to get the CondorWorkflowJobTemplate and instantiate more workflow nodes
template = job_manager.job_templates[
'bpi_wq_analysis_workflow'] node_templates = list(template.node_templates) node_template = node_templates[0] for i in range(1, len(self.source_nodes)): # Start with one because index 0 was already created by the job manager node_template.name = 'bpi_wq_analysis_job_{0}'.format(i) node = node_template.create_node(workflow, app.get_app_workspace(), workspace_path)
for index, node in enumerate(self.source_nodes):
params = node.bpi_input.source_parameters.replace('"', "'")
source_label = node.
label
bpi_wq_analysis_job = self.workflow.get_job('bpi_wq_analysis_job'.format(index))
...
class BPIWaterQualityAnalysisWorkflow(object): ...
def prepare(self): """ Prepares jobs for processing into database. """
# generate a file that contains the parameters for each job parameters = os.path.join(self.workspace_path, 'paramters.txt')
with open(paramters, 'w') as param_file:
for node in self.source_nodes: params = node.bpi_input.source_parameters.replace('"', "'") source_label = node.label
param_file.write(' '.join(self.workspace_path,
self.input_file, source_label, params.replace('"', '\\"'), self.epanet_project_id, self.model_db_url, self.project_db_url, self.user_project_id))
job_manager = app.get_job_manager()
# Set parameters for HTCondor job self.workflow = job_manager.create_job(name=self.project_name, user=self.user, template_name='bpi_wq_analysis_workflow', workspace=self.workspace_path)
bpi_wq_analysis_job = self.workflow.get_job('bpi_wq_analysis_job')
bpi_wq_analysis_job.num_jobs = len(self.source_nodes)
bpi_wq_analysis_job.set_attribute('transfer_input_files', 'parameters.txt') bpi_wq_analysis_job.set_attribute('arguments', '$(process)')
bpi_wq_analysis_job.post_script_args = ' '.join((str(self.project_db_url), str(self.user_project_id), 'BPI_WQ_ANALYSIS') ) bpi_wq_analysis_job.save()
# finalize self.workflow.extended_properties['user_project_id'] = str(self.user_project_id) self.workflow.save()
def run_job(self): """ Executes the prepared job. """ self.prepare() self.workflow.execute()
import sys
line_number = sys.argv[1] # this is the job instance id that gets passed in
with open('parameters.txt', 'r') as params_file: lines = params_file.readlines() params = lines[line_number].split()
# use params in the rest of your script