Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ib_async.sleep should not create a new async loop to run asyncio.sleep() #103

Open
GonzRon opened this issue Jan 16, 2025 · 12 comments
Open

Comments

@GonzRon
Copy link

GonzRon commented Jan 16, 2025

  self._ib_client.sleep(2)
    ~~~~~~~~~~~~~~~~~~~~~^^^
  File "/lib/python3.13/site-packages/ib_async/util.py", line 405, in sleep
    run(asyncio.sleep(secs))
    ~~~^^^^^^^^^^^^^^^^^^^^^
  File "/lib/python3.13/site-packages/ib_async/util.py", line 360, in run
    result = loop.run_until_complete(task)
  File "/usr/lib64/python3.13/asyncio/base_events.py", line 696, in run_until_complete
    self._check_running()
    ~~~~~~~~~~~~~~~~~~~^^
      File "/usr/lib64/python3.13/asyncio/base_events.py", line 632, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

Disclaimer: Perhaps it is my fault for using the builtin sleep function - maybe I did that a long time ago when I was first learning ib_async and asyncio in general. Perhaps, it is not intended to be used in async contexts, or when trying to use ib_async to manage the event loop.

On with the proposed description - but again, perhaps I am wrong here so take it with a grain of salt:

This library should not use a loop to run asyncio.sleep(). That's because you can cause an event loop error that's difficult to determine the root cause for in large async environments where many components are using the same event loop.

I actually think the problem is in util.py line 484

def getLoop():
    """Get the asyncio event loop for the current thread."""
    return asyncio.get_event_loop_policy().get_event_loop()

gpt analysis:

Given the function's intended behavior, it should return the existing running event loop if one is active. However, the error indicates that the code is attempting to run the event loop again while it's already active. Here's why this might be happening:

  1. Mixing Synchronous and Asynchronous Code
    Your run function in ib_async/util.py is designed to handle both synchronous and asynchronous contexts. Here's the problematic part:
> def run(*awaitables: Awaitable, timeout: Optional[float] = None):
>     loop = getLoop()
>     # ...
>     result = loop.run_until_complete(task)
>     # ...

When run is called from within an asynchronous context (i.e., when the event loop is already running), loop.run_until_complete(task) attempts to start the event loop again, leading to the RuntimeError.

  1. The sleep Function's Implementation
    The sleep function is defined as a synchronous function that internally calls run(asyncio.sleep(secs)):
> def sleep(secs: float = 0.02) -> bool:
>     run(asyncio.sleep(secs))
>     return True

When sleep is invoked within an asynchronous function (which is likely the case in your stack trace), it triggers the run function to attempt to start the event loop again, causing the error.

@GonzRon GonzRon changed the title ib_async.sleep should not use a loop to run asyncio.sleep() ib_async.sleep should not use a create a new async loop to run asyncio.sleep() Jan 16, 2025
@gnzsnz
Copy link
Contributor

gnzsnz commented Jan 17, 2025

did you run ib_async.util.startLoop()?

@GonzRon
Copy link
Author

GonzRon commented Jan 17, 2025

here now my ignorance will be highlighted most prominently:

did you run ib_async.util.startLoop()?

I did not. Was I supposed to? It appears that my code works without it. If you can point to documentation that states otherwise, please direct me there. I run my wrapper class inside an existing event loop. I don't want ib_async to run its own event loop; I want it to use the existing (running) event loop instead. So that everything, including my ASGI server, is running in the event loop in the main thread (for obvious reasons).

it appears that docs state that startLoop is for nesting an event loop in for example jupyter Notebook contexts:
https://ib-api-reloaded.github.io/ib_async/api.html#ib_async.util.startLoop

Use nested asyncio event loop for Jupyter notebooks.

so then, is ib_sleep not meant for traditional async event loops? That's why I asked, it would appear that perhaps not all methods are meant to be used in all contexts, and that's OK, I was just wanting to understand.

@gnzsnz
Copy link
Contributor

gnzsnz commented Jan 17, 2025

getLoop does exactly what you are describing, get the current running one or start a new one if it's not running. the problem is unlikely to come from here

ib_async.util.startLoop() is needed in ipython/jupyter environments otherwise it will through the error you are showing. thus is the first thing that came to my mind. see --> https://nbviewer.org/github/ib-api-reloaded/ib_async/blob/main/notebooks/basics.ipynb

this works fine

import ib_async
import asyncio

ib_async.util.startLoop()

ib_async.util.run(asyncio.sleep(1))
print('long time no see')

asyncio get's tricky when you mix sync and async code, and ib_async(and eventkit) does a great job hiding the complexity. but it's not magic. i would suggest you to start with a simple piece of code and build up slowly until you find the issue

@GonzRon
Copy link
Author

GonzRon commented Jan 17, 2025

Perfect, thanks for the response.

For now, my issue has been resolved by using asyncio.sleep instead of ib_client.sleep()

I believe there is still an issue in the getLoop() functionality not returning the current running_event_loop (but admittedly I could be 100% wrong).

Your absolutely right that asyncio is tricky!

Give me a couple days and I'll investigate further, and update you with my testing results. Thank you for your patience and support!

@GonzRon GonzRon changed the title ib_async.sleep should not use a create a new async loop to run asyncio.sleep() ib_async.sleep should not create a new async loop to run asyncio.sleep() Jan 17, 2025
@gnzsnz
Copy link
Contributor

gnzsnz commented Jan 18, 2025

take into account that this works as well

ib_async.util.sleep(1)
print('long time no see')

which is equivalent(sort of) to

await asyncio.sleep(1)
print('long time no see')

@geepeem
Copy link

geepeem commented Jan 19, 2025

I have also had issues with ib.sleep() throwing the same runtime error. Initially, my code was not async and used a while loop with a call to ib.sleep(30) to control the flow but still allow Events to trigger in the background. The problems started when I added code in a different module that used ib_async Events; say a filledEvent was triggered and I want to place a new trade in the method triggered by the async filledEvent. Adding a call to ib.sleep() here would throw the runtime error (presumably since it happened asynchronously when sleep() was already running in the main loop??).

I tried using @mattsta s nest_asyncio library but that just caused a different error, to do with entry points.

I tried running a "custom sleep" using asyncio.sleep(), this would not cause the runtime error, but it was blocking the main loop so that no Events were triggered during the sleep so was basically useless.

In the end, I rewrote all the code for the main entry point as async using asyncio and now I am no longer using ib.sleep() in the main loop, rather waiting on Events from the trading bot to trigger the main loop to shutdown. I dunno if this is considered "best practice" but seems to work.

Perhaps @mattsta could weigh in on this topic since it pops up quite often? (and is damn confusing for non-pro coders like me!)

PS: @gnzsnz could you clarify whether ib.util.startLoop() is required in contexts other than running a Jupyter notebook (like a regular .py script)?

@GonzRon
Copy link
Author

GonzRon commented Jan 19, 2025

@gnzsnz I dug into the issue a bit further using the PyCharm remote debugger. I was able to determine that getLoop is (of course!!!!) returning the correct event loop. Using debug logging, I confirmed everything is running in the main thread, which, of course, means that there is a running event loop, i.e. I have already started an event loop to run my app (which incorporates ib_async) using asyncio.run(main()).

Debug Logging:

Event loop id: 140250283730064
thread_id:140251678893888

patched ib_async::util.py debug statement

get_event_loop_policy returned 140250283730064

This confirms that the problem is not that ib_async is somehow not returning the correct running event loop; it's actually exactly the opposite, that it is in fact returning the main event loop.

So attempting to call loop.run_until_complete in a thread that is of course already running an event loop results in the RuntimeError: This event loop is already running.

only one event loop can run in a thread at a time. Attempting to start another loop within the same thread leads to conflicts.

run_until_complete Behavior: The method loop.run_until_complete(coro) runs the event loop until the given coroutine completes. However, if the event loop is already running, calling this method raises a RuntimeError.

The problem IMHO is that ib_async is not handling correctly when it is being run in a pure async environment, i.e. in a thread that is already running event loop. startLoop() patches asyncio to allow nesting of other event loops inside the running event loop as a workaround. This of course is 100% not necessary in a pure async environment (unless you are in Jupyter etc. as everyone uses nested event loops in jupyter for this reason).

So to make a long story short you can maybe do something like this instead, however after thinking about it, what would be the point of running asyncio sleep in a task? LOL I'm not sure at all. I think the point of ib_async sleep is to not block the ib_async event loop nested in the Jupyter event loop? (I'm not exactly sure here?).

def run(*awaitables: Awaitable, timeout: Optional[float] = None) -> Any:
    """
    Execute awaitables in the current event loop.

    - If the event loop is not running, run the awaitables until complete.
    - If the event loop is running, schedule the awaitables as tasks.

    Args:
        *awaitables: Coroutines or futures to execute.
        timeout (Optional[float]): Maximum time to wait for the awaitables.

    Returns:
        The result of the awaitables, or the scheduled task if the loop is running.
    """
    loop = getLoop()
    if not awaitables:
        if loop.is_running():
            return

        try:
            loop.run_forever()
        finally:
            # Clean up tasks when the loop is stopped
            if sys.version_info >= (3, 7):
                all_tasks = asyncio.all_tasks(loop)
            else:
                all_tasks = asyncio.Task.all_tasks(loop)

            if all_tasks:
                # Cancel pending tasks
                f = asyncio.gather(*all_tasks)
                f.cancel()
                try:
                    loop.run_until_complete(f)
                except asyncio.CancelledError:
                    pass
    else:
        if len(awaitables) == 1:
            future = awaitables[0]
        else:
            future = asyncio.gather(*awaitables)

        if timeout:
            future = asyncio.wait_for(future, timeout)
        task = asyncio.ensure_future(future)

        def onError(_):
            task.cancel()

        globalErrorEvent.connect(onError)
        try:
            if loop.is_running():
                # Event loop is running; return the task for the caller to handle
                return task
            else:
                # Event loop is not running; run the task until complete
                result = loop.run_until_complete(task)
        except asyncio.CancelledError as e:
            raise globalErrorEvent.value() or e
        finally:
            globalErrorEvent.disconnect(onError)

        return result

This returns an awaitable which cannot be awaited from run because run is not an async method, and also run would have to return the awaitable to the caller, i.e. await ib_async.sleep(1), which is why asyncio.sleep() is being run in a new event loop in the first place.

In other words, if you're already running ib_async in an event loop there is no need to call ib.sleep(), you can simply call asyncio.sleep() from your application to make sure you don't block the main event loop.

Admittedly, there is a lot of trickery in ib_async to deal with running in both sync and async environments. Even though the library is called ib_async, and so it's focus should be on async compatibility, the tws_api implements both sync and async calls.

I'm not an expert in asyncio and neither in ib_async internals, so take this again with a grain of salt, but there you go.

Do what thou wilt :)

@mattsta
Copy link
Contributor

mattsta commented Jan 26, 2025

We should really provide a better training workflow for onboarding people into the ecosystem here since these problems are the most common confusion around usage.

ib_async is best used in a 100% async environment (i.e. the user calling await ib.X() for API calls everywhere), but because "fully async" environments can be confusing, ib_async also provides more traditional synchronous wrappers around all internal async operations.

The problem happens when users try to combine the wrapper helper functions which are managing the event loop and async coroutines for you with your own externally created event loop and async API patterns.

Usage should either be 100% user-controlled event loop and using python-level async operations, or 100% using the ib_async helper wrappers so ib_async can manage the event loop in the background for the user.

More complex situations like users wanting multiple threads with each thread having their own event loops are currently unsupported throughout the entire ib_async ecosystem (such a thing could work, but you would need to manually create a new clientId connection for each participating thread since they would be 100% isolated event loops without any data sharing).

@GonzRon
Copy link
Author

GonzRon commented Jan 26, 2025

We should really provide a better training workflow for onboarding people into the ecosystem here since these problems are the most common confusion around usage.

@mattsta 👍

ib_async is best used in a 100% async environment (i.e. the user calling await ib.X() for API calls everywhere),

Perfect, thanks for confirming

but because "fully async" environments can be confusing, ib_async also provides more traditional synchronous wrappers around all internal async operations.

Yup, I figured this one out. Maybe a blurb about this at the beginning would clear up a lot of confusion for new users. Although, it might be useful to split up the wrapper functions into a totally different class that distinguishes the (wrapped) sync method calls from the async ones, e.g. ib_client.sync.method(), versus the regular async method : ib_client.method(). IDK, just throwing the idea out there.

The problem happens when users try to combine the wrapper helper functions which are managing the event loop and async coroutines for you with your own externally created event loop and async API patterns.

Yes, but, also, some of the method declarations are a bit confusing, example:

    def reqAccountSummaryAsync(self) -> Awaitable[None]:
        reqId = self.client.getReqId()
        future = self.wrapper.startReq(reqId)
        tags = (
            "AccountType,NetLiquidation,TotalCashValue,SettledCash,"
            "AccruedCash,BuyingPower,EquityWithLoanValue,"
            "PreviousDayEquityWithLoanValue,GrossPositionValue,RegTEquity,"
            "RegTMargin,SMA,InitMarginReq,MaintMarginReq,AvailableFunds,"
            "ExcessLiquidity,Cushion,FullInitMarginReq,FullMaintMarginReq,"
            "FullAvailableFunds,FullExcessLiquidity,LookAheadNextChange,"
            "LookAheadInitMarginReq,LookAheadMaintMarginReq,"
            "LookAheadAvailableFunds,LookAheadExcessLiquidity,"
            "HighestSeverity,DayTradesRemaining,DayTradesRemainingT+1,"
            "DayTradesRemainingT+2,DayTradesRemainingT+3,"
            "DayTradesRemainingT+4,Leverage,$LEDGER:ALL"
        )
        self.client.reqAccountSummary(reqId, "All", tags)
        return future

the method declaration says async, and I see it returns an awaitable (future), but it's not defined as async, i.e. not: async def reqAccountSummaryAsync(self). This is a minor nitpick however.

But more importantly:
while reqAccountSummaryAsync does track the reqId internally for us - there's no implementation of cancelAccountSummary. I mean there is, but it's not complementary to the Async method. So how does one cancel? I ended up using reqAccountSummary, tracking reqId myself, and using it later to cancel:

Note: Here I just want to connect directly to the event Handler, so elsewhere I'm doing: self.ib_client.accountSummaryEvent += self.event_handler

reqId = self.ib_client.client.getReqId()
self.ib_client.client.reqAccountSummary(reqId, "All", tags)
self.ib_client.client.cancelAccountSummary(reqId)

Although admittedly this is technically not 100% directly related to the Async stuff, there are numerous accountSummary related calls in ib.py, e.g.:

async def accountSummaryAsync
def reqAccountSummaryAsync(
def accountSummary
def reqAccountSummary
"accountSummaryEvent",

If I want to be notified of account summary updates, should I just await accountSummaryAsync in a while True loop, subscribe to the accountSummaryEvent with my own eventHandler, or? I mean, I know I'm being somewhat rhetorical here, but I am actually looking for the answer to this one.

Usage should either be 100% user-controlled event loop and using python-level async operations, or 100% using the ib_async helper wrappers so ib_async can manage the event loop in the background for the user.

I surmised this :)

More complex situations like users wanting multiple threads with each thread having their own event loops are currently unsupported throughout the entire ib_async ecosystem (such a thing could work, but you would need to manually create a new clientId connection for each participating thread since they would be 100% isolated event loops without any data sharing).

Yeah, that sounds complex - not something I personally want to do but at least now I have an inkling of how it might be done now (hindsight is always 20/20).

@mattsta
Copy link
Contributor

mattsta commented Jan 27, 2025

the method declaration says async, and I see it returns an awaitable (future), but it's not defined as async, i.e. not: async def reqAccountSummaryAsync(self).

This is actually perfectly valid and a normal use case.

These are all equivalent:

# "Normal" usage

async def doSomething():
    return 3

await doSomething()


# "Wrapper" usage

async def doItAgain():
    return await doSomething()

await doItAgain()


# "Reduced Wrapper" usage

def doItDifferently():
    return doSomething()

await doItDifferently()

All of those functions return coroutines when called even if not defined with async at the declaration.

Before Python 3.12, using the doItDifferently() pattern was more efficient because there is only one await in the call. For Python >= 3.12, you can enable a thing called "eager task factory" which will follow every async call immediately and only yield to the event loop if an actual blocking operation is performed. So, in 3.12+ with "eager task factory" enabled, all 3 of those approaches are (roughly) equivalent (minus one extra function call hop).

I ended up using reqAccountSummary, tracking reqId myself, and using it later to cancel:

I'd say you may be over-thinking it? I don't know why anybody would want to stop receiving account value updates (that's also kinda the entire purpose of the library — to keep all values updated as quickly as possible 🙃).

If I want to be notified of account summary updates, should I just await accountSummaryAsync in a while True loop, subscribe to the accountSummaryEvent with my own eventHandler,

Again, probably over-thinking it a bit. When you subscribe to account summary updates, your account values are now updated in real time for the lifetime of your connection.

When running accountSummaryAsync(), you start receiving streaming account value updates and the updates continue as your account data changes, with no stop condition until your program exits.

You can subscribe to each account value update tick using the regular built-in event handlers:

ib_async/ib_async/ib.py

Lines 197 to 201 in 38cf54a

* ``accountValueEvent`` (value: :class:`.AccountValue`):
An account value has changed.
* ``accountSummaryEvent`` (value: :class:`.AccountValue`):
An account value has changed.

@GonzRon
Copy link
Author

GonzRon commented Jan 27, 2025

I'd say you may be over-thinking it?

hah! Story of my life!

I don't know why anybody would want to stop receiving account value updates (that's also kind of the entire purpose of the library—to keep all values updated as quickly as possible 🙃).

because it's a server, and there are no active front-end clients. I mean I totally get it right, who doesn't want this data all the time?. I guess I can just subscribe and then connect/disconnect the accountSummaryUpdate event listener—no big deal.

I just think it's wasteful to request/receive the data if it's not being used. I am a bit of a pedant, you know, fastidious. However, I do know that the accountSummaryUpdate event triggers for every tag as it comes in, and it would be useful to have access to the start and end signals (i.e. the start update and end update) for the accountSummary events (I see it gets wrapped in the accountSummaryEnd in wrapper).

This way I would know to wait for the end signal before accessing the obj. However, if there's an easier way to do this for an event-driven framework, then the high-level requirement is, "Wait for an update to finish, when it comes in, send it". Maybe that's already the implementation, but I try to avoid polling and instead rely on events to signal when I can update the client.

Before Python 3.12, using the doItDifferently() pattern was more efficient because there is only one await in the call. For Python >= 3.12, you can enable a thing called "eager task factory" which will follow every async call immediately and only yield to the event loop if an actual blocking operation is performed. So, in 3.12+ with "eager task factory" enabled, all 3 of those approaches are (roughly) equivalent (minus one extra function call hop).

Now that is good information. Thank you, I researched the topic and the implementation makes a lot of sense. A lot of my async code doesn't block, so that avoids the context switching overhead. Very nice indeed.

BTW, if gratitude didn't come across in my previous comments, let me fix that now. Thank you for your responses, and also more generally, I want to thank the contributors and maintainers of the project for all their hard work. Let's not let this get lost in minor technical gripes. Truly great work on the library, it's easy to work with, and there's a lot of work put in to make our lives easier.

@mattsta
Copy link
Contributor

mattsta commented Jan 27, 2025

wait for the end signal before accessing the obj.

That's an important part to understand too!

When requesting account data (or any data with the IBKR APIs) every field is populated async.

The operation is "request account data updates" not "FETCH ALL ACCOUNT DATA IMMEDIATELY."

When you request account data updates, your program asks the IBKR API to "please start sending account data when you have a free moment," then each of the maybe 50+ account data fields receives an independent initial sync then receives future updates only when they change.

You can see here where the account data gets populated as just individually received name-value pairs over time, so there is practically no one final "the account object is fully populated" moment.

def updateAccountValue(self, tag: str, val: str, currency: str, account: str):
key = (account, tag, currency, "")
acctVal = AccountValue(account, tag, val, currency, "")
self.accountValues[key] = acctVal
self.ib.accountValueEvent.emit(acctVal)

After you begin receiving account updates, all the values will independently sync up-front for an initial summary population, then each value will continue changing one-by-one as the underlying value updates are pushed from IBKR -> gateway -> ib_async -> your program.

(this is obviously for the "streaming account data" which is more valuable, i think, than the static "fetch account value as batch" thing also with a closing event here:

def accountDownloadEnd(self, _account: str):
# sent after updateAccountValue and updatePortfolio both finished
self._endReq("accountValues")
)

Original docs here of course, and if we are missing the explicit cancel summary API call, we should add it eventually just for completeness:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants