Newbie trouble scheduling a job with AsyncIOScheduler

1,274 views
Skip to first unread message

ExStock

unread,
May 8, 2020, 1:55:03 PM5/8/20
to APScheduler
(I'm a noob with APScheduler, and just barely past noob with Python itself, so please feel free to assume I'm probably making bonehead errors!)

I'm trying to use APScheduler with ib_insync, and for my task, it seems that I need to go the AsyncIOScheduler route in order to play nice with ib_insync, even though I actually want to run my tasks in order rather than concurrently.  I've managed to get several of my tasks scheduled properly, but those were all tasks that were capable of being performed more or less instantaneously.  However, I've got a key task that takes several seconds to perform, and APScheduler seems to give up on it before it has the chance to complete, giving me this error readout:

Job "opquote (trigger: or[cron[day_of_week='mon-fri', hour='16', minute='17']], next run at: 2020-05-07 16:17:00 CDT)" raised an exception
Traceback (most recent call last):
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\apscheduler\executors\base_py3.py", line 29, in run_coroutine_job
    retval = await job.func(*job.args, **job.kwargs)
  File "", line 4, in opquote
    ib.qualifyContracts(gld)
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\ib.py", line 537, in qualifyContracts
    return self._run(self.qualifyContractsAsync(*contracts))
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\ib.py", line 292, in _run
    return util.run(*awaitables, timeout=self.RequestTimeout)
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\util.py", line 311, in run
    result = loop.run_until_complete(task)
  File "c:\users\e\appdata\local\programs\python\python37\lib\asyncio\base_events.py", line 579, in run_until_complete
    return future.result()
  File "c:\users\e\appdata\local\programs\python\python37\lib\asyncio\futures.py", line 178, in result
    raise self._exception
  File "c:\users\e\appdata\local\programs\python\python37\lib\asyncio\tasks.py", line 249, in __step
    result = coro.send(None)
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\ib.py", line 1630, in qualifyContractsAsync
    *(self.reqContractDetailsAsync(c) for c in contracts))
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\ib.py", line 1630, in
    *(self.reqContractDetailsAsync(c) for c in contracts))
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\ib.py", line 1744, in reqContractDetailsAsync
    reqId = self.client.getReqId()
  File "c:\users\e\appdata\local\programs\python\python37\lib\site-packages\ib_insync\client.py", line 157, in getReqId
    raise ConnectionError('Not connected')
ConnectionError: Not connected


Is there something within APScheduler that I'm missing--maybe something similar to misfire_grace_time--that will tell it to wait until the task completes?  

I've started looking into asyncio await as a solution, but haven't made any progress with it yet.  I've also tried to tackle it from the ib_insync side, but that doesn't seem to be the answer; the ib_insync stuff works fine on its own, and only breaks if I try to schedule its time-consuming tasks (but continues to work fine with AsyncIOScheduler for the <~1 second type tasks).

Alex Grönholm

unread,
May 10, 2020, 10:24:29 AM5/10/20
to apsch...@googlegroups.com, ExStock

I think you have understood misfire_grace_time. It only makes sure that the task is started within the grace time. It has no effect on the completion of the task – there is no way to set a deadline for the completion of a job in APScheduler.

Without seeing your code, it's next to impossible to know why it raises ConnectionError('Not connected'). It's your code, so maybe you should figure that out first? I doubt the scheduler has anything to do with that. If you want further help, try constructing a minimum viable representation of the problem that others could run and then identify the problem.

--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/apscheduler/163011a1-6f58-4b0d-bd7b-1a1fe207e06e%40googlegroups.com.

ExStock

unread,
May 12, 2020, 4:29:06 PM5/12/20
to APScheduler
Thanks for your response.  I said something LIKE misfire_grace_time.  Much like how misfire_grace_time gives the task time to start, I'm looking for something that gives the task time to finish.  I'm not expecting misfire_grace_time to do that for me, but seeing that it existed made me hope that what I'm looking for also exists.

Okay, code now--the reason I didn't include code before was because to give you a reasonably full picture I have to include kind of a lot to skim through. I've tried to snip out as much as I can.  (The ib_insync specific parts might be distracting if you're not familiar with it, but hey, you've been warned.)
Here's the code snippet (see the next one if you need to the import statements) that I want to run on a schedule.  It works fine on its own:

fpath = 'Z://dedup/e/Python/csv/'
stamp = datetime.today().strftime('%Y%m%d')
exch = 'SMART'
sfx = stamp + 'TESTgldoptions' + exch + '.csv'

gld = Stock('GLD', exch, currency='USD')
ib.qualifyContracts(gld)

ib.reqMarketDataType(4)

[ticker] = ib.reqTickers(gld)
gldValue = ticker.marketPrice()

chains = ib.reqSecDefOptParams(gld.symbol, '', gld.secType, gld.conId)
chain = next(c for c in chains if c.tradingClass == 'GLD' and c.exchange == 'SMART')
strikes = [strike for strike in chain.strikes
        if strike % 1 == 0
        and gldValue - 1 < strike < gldValue + 1]
expirations = sorted(exp for exp in chain.expirations)[:10]
rights = ['P', 'C']

contracts = [Option('GLD', expiration, strike, right, 'SMART', tradingClass='GLD')
        for right in rights
        for expiration in expirations
        for strike in strikes]

contracts = ib.qualifyContracts(*contracts)

tickers = ib.reqTickers(*contracts)
util.df(tickers)

df_gld = util.df(tickers)

fs = fpath + sfx

with open(fs, 'a')as f:
    df_gld.to_csv(f, mode='a', header=True)

Since my first test of running that as an async def resulted in ...absolute nothing happening, no errors, no csv file, just zilch, I tried leaving it out of the def and just moving bit by bit back into the def until I could get an error.  This is the point at which I got the error seen in my previous message, with the import statements etc included:

from ib_insync import *
util.patchAsyncio()
from datetime import datetime
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.cron import CronTrigger
import pandas as pd
util.startLoop()

ib = IB()

ib.connect('127.0.0.1', 7497, clientId=1)

async def opquote():
    exch = 'SMART'
    gld = Stock('GLD', exch, currency='USD')
    ib.qualifyContracts(gld)

    ib.reqMarketDataType(2)

    now = datetime.now()

    [ticker] = ib.reqTickers(gld)
    print(now, '\n', ticker)

# placeholder task representing regex to format csv file:
async def testrun2():
    ib.sleep(20) # placeholder representing code to make sure the csv file is present
    now = datetime.now()
    print ('This is a second task at', now)

sched = AsyncIOScheduler(timezone="America/Chicago")
trigger = OrTrigger([
    CronTrigger(day_of_week='mon-fri', hour='8', minute='31, 37, 43, 49, 55'),
    CronTrigger(day_of_week='mon-fri', hour='9-14', minute='1, 7, 13, 19, 25, 31, 37, 43, 49, 55'),
    CronTrigger(day_of_week='mon-fri', hour='14', minute='15'),
])
sched.add_job(opquote, trigger)
sched.add_job(testrun2, trigger)
sched.start()

Note #1: I didn't get any error complaints about qualifyContracts(gld) until I also asked it to get the [ticker], in spite of what the error readout is saying. Qualifying the contract does take a second or so to perform; getting ticker info takes noticeably more (usually ~5-15 seconds).
Note #2: If the [ticker] part is left outside of the async def, then the data that's returned is from the time that the code was initiated, rather than the time the task was triggered, which makes it useless.

All that said, I'm exploring ways to get APScheduler to give my task the time it needs in order to complete, and I'm investigating as many avenues as I can currently think of, such as learning how to use asyncio await or figuring out APScheduler's ProcessPoolExecutor and/or ThreadPoolExecutor, etc.  I'm open to any way, elegant or clunky, that makes it possible to run this task on a schedule.

Alex Grönholm

unread,
May 13, 2020, 4:15:29 AM5/13/20
to apsch...@googlegroups.com

I'm pretty sure you should not try to use AsyncIOScheduler here. You don't have a single "await" in your code which leads me to believe you are not actually using any asynchronous code. Instead, use BlockingScheduler or BackgroundScheduler as appropriate, and without "async" in the function definitions.

--
You received this message because you are subscribed to the Google Groups "APScheduler" group.
To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.

ExStock

unread,
May 13, 2020, 1:15:01 PM5/13/20
to APScheduler
Sadly, it appears that I do need to use the AsyncIOScheduler instead of Blocking or Background, due to how ib_insync works.  My newbie attempts at implementing await properly have not worked yet--which is why you didn't see it in my code--but I just found what may be my missing key: there's an ib.qualifyContractsAsync() that I should have been using.

At least now I know that the solution to my problem is not something about APScheduler that I'm missing, which helps me to concentrate my efforts in the right direction. Thanks for your help.

To unsubscribe from this group and stop receiving emails from it, send an email to apsch...@googlegroups.com.

Alex Grönholm

unread,
May 14, 2020, 2:22:23 AM5/14/20
to apsch...@googlegroups.com

When writing async code, you really need to be mindful about not blocking the event loop thread for long periods of time. This line in particular looked highly dubious:

ib.sleep(20)

To unsubscribe from this group and stop receiving emails from it, send an email to apscheduler...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/apscheduler/0949f9d2-4d3d-4e7b-81c7-b711cba43f59%40googlegroups.com.

ExStock

unread,
May 14, 2020, 3:22:44 PM5/14/20
to APScheduler
Good to hear; I've run across that warning before, but I've never seen it specified how long is too long.  I'll take it that 20 seconds is too long!  Since that was just a placeholder for actual code, I can easily get rid of that.  

...And good grief.  While cut & pasting so that I could respond with a bunch more code-that-should-work-but-doesn't (complete with awaits!) something caught my eye, and I found the line that was causing the errors: ib.disconnect() was in the wrooooong place.  *facepalm*  It works like a charm now--thanks! 
Reply all
Reply to author
Forward
0 new messages