I recently installed Openquake 3.0 & 3.1 from source at our HPC cluster. Openquake 3.0 (and all previous versions) works fine but Openquake 3.1 has an issue given by celery&amqp modules.
For example, I have no problems on running the demo example 'LogicTreeCase2ClassicalPSHA' with 2 nodes (32 cores each) by using Openquake 3.0, but Openquake 3.1 gives the error message as below. It seems Openquake 3.1 uses celery 4.1.0 while Openquake 3.0 stays with celery 3.1.20 so I suppose this is caused by using new version of celery. Openquake 3.1 doesn't seems to work with celery 3.1.20 anymore as I tested. Similar issues happen for some other test cases.
Many thanks for any help to solve this issue.
Regards,
Rui
Error message from Openquake 3.1:
-bash-4.1$ oq engine --run job.ini
Using celery@r861, celery@r862, 64 cores
INFO:root:zipping the input files
[2018-07-09 03:47:05,960 #4 INFO] Running ./LogicTreeCase2ClassicalPSHA/job.ini
[2018-07-09 03:47:06,054 #4 INFO] Using engine version 3.1.0-gite54577f
[2018-07-09 03:47:06,116 #4 INFO] Reading the risk model if present
[2018-07-09 03:47:06,190 #4 INFO] Read 1 hazard sites
[2018-07-09 03:47:06,287 #4 INFO] Reading /short/zs6/rxy900/openquake_job/3.1/LogicTreeCase2ClassicalPSHA/source_model.xml
[2018-07-09 03:47:06,412 #4 INFO] Processed source model 1 with 4 potential gsim path(s)
...
[2018-07-09 03:47:22,057 #4 INFO] /short/zs6/rxy900/openquake_job/3.1/LogicTreeCase2ClassicalPSHA/source_model.xml has been considered 81 times
[2018-07-09 03:47:22,156 #4 INFO] Splitting sources
[2018-07-09 03:47:24,281 #4 INFO] Submitting 96 "RtreeFilter" tasks
[2018-07-09 03:47:24,452 #4 INFO] Sent 1.18 MB of data in 96 task(s)
[2018-07-09 03:47:24,544 #4 INFO] RtreeFilter 1%
...
[2018-07-09 03:47:34,674 #4 INFO] RtreeFilter 98%
[2018-07-09 03:47:34,736 #4 INFO] RtreeFilter 100%
[2018-07-09 03:47:34,811 #4 INFO] Received 1.29 MB of data, maximum per task 15.01 KB
[2018-07-09 03:47:34,923 #4 INFO] Using maxweight=441
[2018-07-09 03:47:35,291 #4 INFO] Submitting "classical" tasks
[2018-07-09 03:47:41,278 #4 INFO] Sent 5238 sources in 208 tasks
[2018-07-09 03:47:41,333 #4 INFO] Sent 1.82 MB of data in 208 task(s)
[2018-07-09 03:47:41,537 #4 INFO] classical 1%
...
[2018-07-09 03:47:51,271 #4 INFO] classical 99%
[2018-07-09 03:47:51,365 #4 INFO] classical 100%
[2018-07-09 03:47:51,408 #4 INFO] Received 244.3 KB of data, maximum per task 2.03 KB
[2018-07-09 03:47:51,475 #4 INFO] Effective sites per task: 1
[2018-07-09 03:47:52,192 #4 INFO] Reading PoEs on 1 sites
[2018-07-09 03:47:52,410 #4 INFO] Submitting "build_hcurves_and_stats" tasks
[2018-07-09 03:47:52,493 #4 CRITICAL]
Traceback (most recent call last):
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/base.py", line 189, in run
self.result = self.execute()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/classical.py", line 353, in execute
self.core_task.__func__, self.gen_args()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 615, in submit_all
num_tasks = next(it)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 652, in _iter_celery
res = safetask.delay(self.task_func, piks)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 413, in delay
return self.apply_async(args, kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 536, in apply_async
**options
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/base.py", line 737, in send_task
amqp.send_task_message(P, name, message, **options)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/amqp.py", line 554, in send_task_message
**properties
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/connection.py", line 494, in _ensured
return fun(*args, **kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/method_framing.py", line 163, in write_frame
3, channel, framelen, str_to_bytes(body), 0xce)
struct.error: pack_into requires a buffer of at least 296911 bytes
[2018-07-09 03:47:52,570 #4 CRITICAL] Traceback (most recent call last):
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/engine/engine.py", line 330, in run_calc
close=False, **kw) # don't close the datastore too soon
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/base.py", line 189, in run
self.result = self.execute()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/classical.py", line 353, in execute
self.core_task.__func__, self.gen_args()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 615, in submit_all
num_tasks = next(it)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 652, in _iter_celery
res = safetask.delay(self.task_func, piks)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 413, in delay
return self.apply_async(args, kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 536, in apply_async
**options
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/base.py", line 737, in send_task
amqp.send_task_message(P, name, message, **options)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/amqp.py", line 554, in send_task_message
**properties
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/connection.py", line 494, in _ensured
return fun(*args, **kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/method_framing.py", line 163, in write_frame
3, channel, framelen, str_to_bytes(body), 0xce)
struct.error: pack_into requires a buffer of at least 296911 bytes
Traceback (most recent call last):
File "/apps/hpc-opt/openquake/3.1/oqenv/bin/oq_real", line 6, in <module>
exec(compile(open(__file__).read(), __file__, 'exec'))
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/bin/oq", line 23, in <module>
main.oq()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/commands/__main__.py", line 46, in oq
parser.callfunc()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/sap.py", line 186, in callfunc
return self.func(**vars(namespace))
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/sap.py", line 245, in main
return func(**kw)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/commands/engine.py", line 170, in engine
exports, hazard_calculation_id=hc_id)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/commands/engine.py", line 66, in run_job
hazard_calculation_id=hazard_calculation_id, **kw)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/engine/engine.py", line 330, in run_calc
close=False, **kw) # don't close the datastore too soon
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/base.py", line 189, in run
self.result = self.execute()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/calculators/classical.py", line 353, in execute
self.core_task.__func__, self.gen_args()
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 615, in submit_all
num_tasks = next(it)
File "/apps/hpc-opt/openquake/3.1/src/oq-engine/openquake/baselib/parallel.py", line 652, in _iter_celery
res = safetask.delay(self.task_func, piks)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 413, in delay
return self.apply_async(args, kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/task.py", line 536, in apply_async
**options
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/base.py", line 737, in send_task
amqp.send_task_message(P, name, message, **options)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/celery/app/amqp.py", line 554, in send_task_message
**properties
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/connection.py", line 494, in _ensured
return fun(*args, **kwargs)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/kombu/messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/channel.py", line 1734, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/apps/hpc-opt/openquake/3.1/oqenv/lib/python3.5/site-packages/amqp/method_framing.py", line 163, in write_frame
3, channel, framelen, str_to_bytes(body), 0xce)
struct.error: pack_into requires a buffer of at least 296911 bytes
Normal output from Openquake 3.0:
-bash-4.1$ oq engine --run job.ini
INFO:root:Using celery@r3010, celery@r3022, 64 cores
[2018-07-09 04:13:06,406 #2 INFO] Running ./LogicTreeCase2ClassicalPSHA/job.ini
[2018-07-09 04:13:06,484 #2 INFO] Using engine version 3.0.1-git4a00a9d
[2018-07-09 04:13:06,568 #2 INFO] There are 1 hazard site(s)
[2018-07-09 04:13:06,634 #2 INFO] Reading the risk model if present
[2018-07-09 04:13:06,708 #2 INFO] Reading /short/zs6/rxy900/openquake_job/3.0/LogicTreeCase2ClassicalPSHA/source_model.xml
[2018-07-09 04:13:06,815 #2 INFO] Processed source model 1 with 4 potential gsim path(s) and 2 sources
...
[2018-07-09 04:13:38,462 #2 INFO] classical 99%
[2018-07-09 04:13:38,560 #2 INFO] classical 100%
[2018-07-09 04:13:38,658 #2 INFO] Received 244.3 KB of data, maximum per task 2.03 KB
[2018-07-09 04:13:38,717 #2 INFO] Effective sites per task: 1
[2018-07-09 04:13:39,363 #2 INFO] Reading PoEs on 1 sites
[2018-07-09 04:13:39,549 #2 INFO] Submitting "build_hcurves_and_stats" tasks
[2018-07-09 04:13:39,625 #2 INFO] Sent 210.1 KB of data in 1 task(s)
[2018-07-09 04:13:39,715 #2 INFO] build_hcurves_and_stats 100%
[2018-07-09 04:13:39,794 #2 INFO] Received 1.21 KB of data, maximum per task 1.21 KB
[2018-07-09 04:13:42,221 #2 INFO] Calculation 2 finished correctly in 33 seconds
id | name
7 | Full Report
8 | Hazard Curves
9 | Hazard Maps
10 | Realizations