V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
pmispig
V2EX  ›  Python

python3 请问协程怎么 wait_for 一个 task list

  •  
  •   pmispig · 2019-12-07 10:19:02 +08:00 · 4800 次点击
    这是一个创建于 1824 天前的主题,其中的信息可能已经有所发展或是发生改变。

    版本 3.7.5

    taskList = []
    task = asyncio.create_task(nested())
    task1 = asyncio.create_task(nested())
    taskList.append(task)
    taskList.append(task1)
    await asyncio.wait_for(taskList timeout=10)
    

    然而 wait_for 不支持 task list.
    请问怎么创建一堆 task,给每个 task 设置一个超时时间,然后并发运行呢。 如果一个 async 函数里有 time.sleep() ,那么 wait_for 设置的超时时间就无效,请问这个怎么解决?当然正确的方法是用 asyncio.sleep().然而第三方的库里面,它用的是 time.sleep()呢,那么无法 wait_for()超时中断了吗,task 有没有 kill 机制呢

    第 1 条附言  ·  2019-12-07 11:58:08 +08:00
    import asyncio
    async def nested():
        await asyncio.sleep(10000)
    
    
    async def main():
        taskList = []
        waitList = []
        task = asyncio.create_task(nested())
        task1 = asyncio.create_task(nested())
        t1 = asyncio.create_task(asyncio.wait_for(task,timeout=10))
        t2 = asyncio.create_task(asyncio.wait_for(task1,timeout=10))
        waitList.append(t1)
        waitList.append(t2)
        await asyncio.gather(*waitList)
    
    asyncio.run(main())
    

    想到一种套娃实现。。可以了。。

    17 条回复    2019-12-07 18:22:29 +08:00
    wwqgtxx
        1
    wwqgtxx  
       2019-12-07 10:43:02 +08:00   ❤️ 1
    在 async 函数中如果使用了 time.sleep(),无论如何都不可能中断的,也不可能 kill,除非你 kill 掉这个 thread 再重启一个 loop
    如果一定要用该第三方库,请调用
    https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
    把它单独丢到一个线程中(注意,python 中依然没有办法超时 Kill 一个 thread,这是非法的,当然你可以换一个 ProcessPoolExecutor 来把任务丢到一个新的进程,这样去 kill )
    ClericPy
        2
    ClericPy  
       2019-12-07 10:43:38 +08:00   ❤️ 1
    简单等待
    coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。

    返回两个 Task/Future 集合: (done, pending)。

    用法:

    done, pending = await asyncio.wait(aws)
    如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

    请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。


    time.sleep 本来就不是异步的, 非要用, 有三个方法吧

    1. 模仿 gevent 猴子补丁把 time.sleep 给 hack 掉, 不建议
    2. 动态语言可以加载以后动态修改原始函数, 把第三方的那个函数给修改掉成协程版本
    3. 这种本来就不是异步函数的, 直接套个 executor 然后 await loop.run_in_executor 就可以了, 这个最保险
    qcts33
        3
    qcts33  
       2019-12-07 11:06:15 +08:00
    等待一组 task 的话可以用 asyncio.gather()或 asyncio.as_completed()啊
    不要用 time.sleep(),这个是不是异步的,应该用专门的 asyncio.sleep()
    pmispig
        4
    pmispig  
    OP
       2019-12-07 11:28:50 +08:00
    @wwqgtxx 明白了,多谢
    pmispig
        5
    pmispig  
    OP
       2019-12-07 11:32:00 +08:00
    @qcts33
    @ClericPy
    asyncio.wait 不能设置超时时间,我现在是
    [await asyncio.wait_for(task,timeout=10) for task in taskList]
    这么操作的。。有点辣眼睛
    pmispig
        6
    pmispig  
    OP
       2019-12-07 11:39:02 +08:00
    @ClericPy 0.0 刚重新看了下文档,可以设置 timeout,与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。
    keepeye
        7
    keepeye  
       2019-12-07 11:41:49 +08:00
    taskList = []
    task = asyncio.wait_for(asyncio.create_task(nested()), 10)
    task1 = asyncio.wait_for(asyncio.create_task(nested()), 10)
    taskList.append(task)
    taskList.append(task1)
    await asyncio.gather(*taskList)
    ClericPy
        8
    ClericPy  
       2019-12-07 11:45:36 +08:00
    @pmispig #6 对的, asyncio 里大部分设计都比较明确, 就是 wait 和 gather 以及是否默认 cancel 这部分比较混乱. 又懒得装 trio 那些, 只能硬着头皮看源码看文档了
    pmispig
        9
    pmispig  
    OP
       2019-12-07 11:54:44 +08:00
    ```
    import asyncio
    async def nested():
    await asyncio.sleep(10000)

    async def main():
    taskList = []
    waitList = []
    task = asyncio.create_task(nested())
    task1 = asyncio.create_task(nested())
    t1 = asyncio.create_task(asyncio.wait_for(task,timeout=10))
    t2 = asyncio.create_task(asyncio.wait_for(task1,timeout=10))
    waitList.append(t1)
    waitList.append(t2)
    await asyncio.gather(*waitList)


    asyncio.run(main())
    ```
    想了一种套娃实现...
    0Y89tX3MgR4I
        10
    0Y89tX3MgR4I  
       2019-12-07 12:44:31 +08:00
    用 asyncio 不爽的一个地方就是所有的组件都要换成支持 asyncio 的
    0Y89tX3MgR4I
        11
    0Y89tX3MgR4I  
       2019-12-07 12:44:52 +08:00
    @0Y89tX3MgR4I 不支持的只能放到一个单独的线程里面
    jingcoco
        12
    jingcoco  
       2019-12-07 14:25:54 +08:00
    小白,用 python 3.8 运行楼主的 报错 ........
    pmispig
        13
    pmispig  
    OP
       2019-12-07 14:38:02 +08:00
    @jingcoco concurrent.futures._base.TimeoutError 报这个是正常的,目的就是为了引出这个异常
    wwqgtxx
        14
    wwqgtxx  
       2019-12-07 15:22:45 +08:00 via iPhone   ❤️ 1
    我自己写过一个给 task 加上 timeout 的辅助函数,你可以参考一下,不过这样输出的会是 asyncio.CancelledError
    https://github.com/wwqgtxx/wwqLyParse/blob/master/wwqLyParse/common/asyncio.py#L182
    wwqgtxx
        15
    wwqgtxx  
       2019-12-07 15:29:17 +08:00 via iPhone
    不过其实 python 官方的 wait_for 的实现也是类似思路
    https://github.com/python/cpython/blob/3.8/Lib/asyncio/tasks.py#L434
    pmispig
        16
    pmispig  
    OP
       2019-12-07 17:36:30 +08:00
    @wwqgtxx 3Q,要改也挺麻烦得,还是直接套娃算了,task 套 task...
    ClericPy
        17
    ClericPy  
       2019-12-07 18:22:29 +08:00
    @pmispig #16 可以借鉴 aiohttp 依赖的 async-timeout

    wait 之所以不主动 cancel 看它 api 就明白了, 区分了 pending 和 done, pending 的自己 cancel 就好了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1016 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 32ms · UTC 21:02 · PVG 05:02 · LAX 13:02 · JFK 16:02
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.