asyncio使用举例

import time
import asyncio
from threading import Thread
import functools
import os
import signal


def exit_loop(sign_name):
    print(f"获取信号{sign_name}: exit")
    loop = asyncio.get_event_loop()
    loop.stop()
    print(f'loop.is_running {loop.is_running()}')


async def main():
    await asyncio.sleep(3)
    print('world2')
    await asyncio.sleep(2)
    print('world4')


async def main1():
    await asyncio.sleep(2)
    print('world1')
    await asyncio.sleep(2)
    print('world3')


def send_signal(num, s, loop):
    print(s)
    asyncio.run_coroutine_threadsafe(main(), loop)
    asyncio.run_coroutine_threadsafe(main1(), loop)
    time.sleep(num)
    print(s)
    os.kill(os.getpid(), signal.SIGINT)


if __name__ == '__main__':

    new_loop = asyncio.new_event_loop()
    for sign_name in ('SIGINT', 'SIGTERM'):
        new_loop.add_signal_handler(getattr(signal, sign_name),
                                    functools.partial(exit_loop, sign_name))

    t = Thread(target=send_signal, args=(6, 'hello', new_loop), )
    t.start()
    asyncio.set_event_loop(new_loop)
    new_loop.run_forever()
    new_loop.close()
    t.join()
    print(f'loop.is_running {new_loop.is_running()}')

版权声明:除特别注明外,本站所有文章均为王晨曦个人站点原创

转载请注明:出处来自王晨曦个人站点 » asyncio使用举例

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注