import trio
async def long_running_task():
print("Task started...")
try:
await trio.sleep(5)
print("Task completed!")
except trio.Cancelled:
print("Task was cancelled!")
async def main():
async with trio.open_nursery() as nursery:
with trio.CancelScope() as cancel_scope:
nursery.start_soon(long_running_task)
await trio.sleep(2) # Wait for a while
cancel_scope.cancel() # Cancel the task
trio.run(main)
import trio
import time
def blocking_task():
time.sleep(2) # Simulates a slow blocking operation
return "Blocking task done!"
async def main():
result = await trio.to_thread.run_sync(blocking_task)
print(result)
trio.run(main)
import trio
async def producer(send_channel):
async with send_channel:
for i in range(3):
print(f"Sending: {i}")
await send_channel.send(i)
await trio.sleep(1)
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"Received: {value}")
async def main():
send_channel, receive_channel = trio.open_memory_channel(3)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
trio.run(main)
import trio
async def long_task():
await trio.sleep(5) # Simulating a long task
return "Completed"
async def main():
with trio.move_on_after(2): # Timeout after 2 seconds
result = await long_task()
print(result) # Won't print because it times out
trio.run(main)
import trio
async def faulty_task():
raise ValueError("Something went wrong!")
async def main():
async with trio.open_nursery() as nursery:
try:
nursery.start_soon(faulty_task)
except Exception as e:
print(f"Caught exception: {e}")
trio.run(main)
import trio
import signal
async def signal_handler():
print("Signal received. Cleaning up...")
await trio.sleep(1)
print("Cleanup done!")
async def main():
with trio.open_signal_receiver(signal.SIGINT) as signal_stream:
async for _ in signal_stream:
await signal_handler()
trio.run(main)