could I use tortoise with pika together?

33 views
Skip to first unread message

赵勇

unread,
Nov 14, 2023, 10:09:44 PM11/14/23
to Pika
I try to use pika with tortoise, but I found I can't call a async method in pika, I tried to do it like this :

loop = asyncio.get_running_loop()
loop.call_later(self.import_auto_image(new_img, address))

import_auto_image method is a async database update action, after calling neither couldn't update data nor there was a error  "there is a running loop" , How can i solve these problem?

Luke Bakken

unread,
Nov 15, 2023, 8:36:17 AM11/15/23
to Pika
Hello,

Please provide a complete code sample that I can run that demonstrates what you're trying to do.

Mark.Zhao

unread,
Nov 16, 2023, 6:34:30 AM11/16/23
to Pika
Thanks for reply. I paste a code sample below, I just remain a key code sample. As you can see that I used tornado adapter directly.

```python
def on_message(self, unused_channel, basic_deliver, properties, body):
logger.info('Received message # %s from %s: %s',
basic_deliver.delivery_tag, properties.app_id, body)

img_path = json.loads(body)
address = ""

# insert into database
self.import_auto_image(img_path, address)
self.acknowledge_message(basic_deliver.delivery_tag)



def import_auto_image(self, file_path, address):
data_dict = {
"file_path": file_path,
"address": address,
}
try:
loop = asyncio.get_running_loop()
if loop.is_running():
task = loop.create_task(self.into_pgsql(data_dict))
task.add_done_callback(logg.info(f"insert to DB success!{pano_id=}"))
except Exception as err_:
logg.error(f"insert to DB fail:{pano_id=} {err_}")


@staticmethod
async def into_pgsql(data):
await Tortoise.init(get_orm_conf(TestSetting))
try:
result = ImagesModel.filter(image_id=data['image_id'])
if result:
result.update(**data)
else:
await ImagesModel.create(**data)
except Exception as err_:
raise err_


```

Mark.Zhao

unread,
Nov 16, 2023, 6:47:59 AM11/16/23
to Pika
def on_message(self, unused_channel, basic_deliver, properties, body): logger.info('Received message # %s from %s: %s', basic_deliver.delivery_tag, properties.app_id, body) img_path = json.loads(body) address = "" # insert into database self.import_auto_image(img_path, address) self.acknowledge_message(basic_deliver.delivery_tag) def import_auto_image(self, file_path, address): data_dict = { "file_path": file_path, "address": address, } try: loop = asyncio.get_running_loop() if loop.is_running(): task = loop.create_task(self.into_pgsql(data_dict)) task.add_done_callback(logg.info(f"insert to DB success!{pano_id=}")) except Exception as err_: logg.error(f"insert to DB fail:{pano_id=} {err_}") @staticmethod async def into_pgsql(data): await Tortoise.init(get_orm_conf(TestSetting)) try: result = ImagesModel.filter(image_id=data['image_id']) if result: result.update(**data) else: await ImagesModel.create(**data) except Exception as err_: raise err_
I just beautified the code, which have same content with last post.

Luke Bakken

unread,
Nov 16, 2023, 10:36:11 AM11/16/23
to Pika
Hello,

I really can't comment on an incomplete set of code, and I don't have the time to guess what may be going on.

If you'd like me to assist you, provide a runnable program that demonstrates your issue.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages