import time
import asyncio
from threading import Thread
import functools
import os
import signal
def exit_loop(sign_name):
print(f"获取信号{sign_name}: exit")
loop = asyncio.get_event_loop()
loop.stop()
print(f'loop.is_running {loop.is_running()}')
async def main():
await asyncio.sleep(3)
print('world2')
await asyncio.sleep(2)
print('world4')
async def main1():
await asyncio.sleep(2)
print('world1')
await asyncio.sleep(2)
print('world3')
def send_signal(num, s, loop):
print(s)
asyncio.run_coroutine_threadsafe(main(), loop)
asyncio.run_coroutine_threadsafe(main1(), loop)
time.sleep(num)
print(s)
os.kill(os.getpid(), signal.SIGINT)
if __name__ == '__main__':
new_loop = asyncio.new_event_loop()
for sign_name in ('SIGINT', 'SIGTERM'):
new_loop.add_signal_handler(getattr(signal, sign_name),
functools.partial(exit_loop, sign_name))
t = Thread(target=send_signal, args=(6, 'hello', new_loop), )
t.start()
asyncio.set_event_loop(new_loop)
new_loop.run_forever()
new_loop.close()
t.join()
print(f'loop.is_running {new_loop.is_running()}')
版权声明:除特别注明外,本站所有文章均为王晨曦个人站点原创
转载请注明:出处来自王晨曦个人站点 » asyncio使用举例