I am trying to integrate caffe with celery for running distributed workers across several cpu/gpu machines
from celery import Celery
import os
import sys
import time
import numpy as np
os.environ['GLOG_minloglevel'] = '3'
CAFFE_ROOT='<path to caffe>'
CAFFE_HOME=os.path.join(CAFFE_ROOT,'python')
sys.path.insert(0,CAFFE_HOME)
import caffe
model_path='<>'
app = Celery('tasks', broker='amqp://guest@localhost//')
img = caffe.io.load_image(os.path.join(CAFFE_ROOT,'examples/images/cat.jpg'))
def get_caffenet_model(model_path=os.path.join(CAFFE_ROOT,'models/')):
model_config = os.path.join(model_path,"bvlc_reference_caffenet/deploy.prototxt")
pretrained_model = os.path.join(model_path,"bvlc_reference_caffenet/bvlc_reference_caffenet.caffemodel")
image_mean = np.load(os.path.join(CAFFE_HOME,'/caffe/imagenet/ilsvrc_2012_mean.npy')).mean(1).mean(1)
return model_config, pretrained_model, image_mean
def network(mode='cpu', mtype='caffe'):
model_config, pretrained_model, image_mean = get_caffenet_model()
if mode == 'cpu':
caffe.set_mode_cpu()
else:
caffe.set_device(0)
caffe.set_mode_gpu()
net = caffe.Classifier( model_config, pretrained_model, mean = image_mean,
channel_swap=(2,1,0), raw_scale=255, image_dims=(256, 256))
return net
caffenet = network(mode='gpu')
@app.task
def add():
caffenet.predict([img])
Run the celery workers
celery -A tasks worker -c 1 --loglevel=infoipython
In [5]: from tasks import add
In [6]: add()
forward: 0.0167691707611
add operation works fine on the gpu mode
In [7]: add.delay()
Out[7]: <AsyncResult: ac91262d-4ffd-494b-99bb-44ab174293a1>
This fails
[2015-08-25 20:53:27,196: INFO/MainProcess] Connected to amqp://guest:**@
127.0.0.1:5672//[2015-08-25 20:53:27,203: INFO/MainProcess] mingle: searching for neighbors
[2015-08-25 20:53:28,208: INFO/MainProcess] mingle: all alone
[2015-08-25 20:53:28,214: WARNING/MainProcess] celery@gpu1 ready.
[2015-08-25 20:53:29,204: INFO/MainProcess] Received task: tasks.add[ac91262d-4ffd-494b-99bb-44ab174293a1]
WARNING: Logging before InitGoogleLogging() is written to STDERR
F0825 20:53:29.231441 9142 syncedmem.cpp:57]
Check failed: error == cudaSuccess (3 vs. 0) initialization error*** Check failure stack trace: ***
[2015-08-25 20:53:29,320: ERROR/MainProcess] Process 'Worker-1' pid:9142 exited with 'signal 6 (SIGIOT)'
[2015-08-25 20:53:29,331: ERROR/MainProcess] Task tasks.add[ac91262d-4ffd-494b-99bb-44ab174293a1] raised unexpected: WorkerLostError('Worker exited prematurely: signal 6 (SIGIOT).',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 6 (SIGIOT).