This example demonstrates how to cancel an asyncio task gracefully.
Copy
import asyncio
async def long_task():
print("Task started")
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(2) # Wait for a while
task.cancel() # Cancel the task
try:
await task
except asyncio.CancelledError:
print("Handled cancellation")
asyncio.run(main())
✅ Fix: The CancelledError is caught, and the task is properly cleaned up after cancellation.
🔹 2. Task Cancellation with Timeout
You can cancel a task after a specific timeout using asyncio.wait_for.
Copy
✅ Fix: If the task takes longer than the specified timeout, it’s cancelled automatically.
🔹 3. Canceling Multiple Tasks in a Group
You can cancel multiple tasks within a task group.
Copy
✅ Fix: All tasks in the group are cancelled after a certain period, and exceptions are handled with return_exceptions=True.
🔹 4. Cancelling Task with Cleanup
It’s essential to perform cleanup operations when a task is cancelled.
Copy
✅ Fix: The task handles cancellation during cleanup and ensures resources are released properly.
🔹 5. Handling Cancellation in Long-Running Loops
If your task runs in a loop, cancel the loop gracefully by catching CancelledError.
Copy
✅ Fix: The task is cancelled during its execution, with proper handling of CancelledError.
🔹 6. Using asyncio.shield() to Prevent Task Cancellation
You can use asyncio.shield() to protect a task from cancellation.
Copy
✅ Fix:asyncio.shield() ensures that the task cannot be cancelled, even when task.cancel() is called.
🔹 7. Canceling Tasks in a Context Manager
Using an async context manager allows you to manage task cancellation in a clean way.
Copy
✅ Fix: The context manager ensures that all tasks in the group are cancelled as needed.
🔹 8. Task Cancellation with CancelledError Handling
You can handle task cancellations explicitly using try-except blocks.
Copy
✅ Fix: The CancelledError is explicitly handled to cleanly cancel the task.
🔹 9. Handling Task Cancellation in Multiple Coroutine Functions
You can manage cancellations in tasks that are waiting for multiple async functions.
Copy
✅ Fix: Both tasks are cancelled, and exceptions are managed with return_exceptions=True.
🔹 10. Task Cancellation with Custom Timeout
Implementing custom timeout handling with task cancellation.
Copy
✅ Fix: The task is cancelled automatically after the specified timeout.
💡 Best Practices for Efficient Task Cancellation:
Catch CancelledError: Always handle the CancelledError to cleanly stop the task and release any resources.
Use asyncio.wait_for: Set a timeout for tasks that may take longer than expected.
Graceful Cleanup: Ensure tasks clean up resources when they are cancelled.
Task Groups: Use asyncio.TaskGroup for managing and cancelling multiple tasks together.
Shield Important Tasks: Protect tasks from cancellation with asyncio.shield() when necessary.
By properly handling task cancellations, you can avoid resource leaks, improve the reliability of your async code, and ensure that tasks don’t leave your system in an inconsistent state.
import asyncio
async def long_running_loop():
i = 0
while True:
await asyncio.sleep(1)
i += 1
print(f"Task running... iteration {i}")
if i == 5:
break
async def main():
task = asyncio.create_task(long_running_loop())
await asyncio.sleep(3) # Let it run for a while
task.cancel() # Cancel the task
try:
await task
except asyncio.CancelledError:
print("Task was cancelled during loop execution")
asyncio.run(main())
import asyncio
async def long_running_task():
await asyncio.sleep(5)
print("Task finished")
async def main():
task = asyncio.create_task(long_running_task())
task = asyncio.shield(task) # Prevent cancellation
await asyncio.sleep(2)
task.cancel() # This will not cancel the task
await task # Wait for the task to complete
asyncio.run(main())
import asyncio
async def long_task():
print("Long task started")
await asyncio.sleep(5)
print("Long task finished")
async def main():
async with asyncio.TaskGroup() as group:
group.create_task(long_task())
await asyncio.sleep(2)
group.cancel_all() # Cancel all tasks in the group
asyncio.run(main())