These snippets demonstrate the use of coroutines for multitasking and cooperative execution, including sending data, handling exceptions, and managing state. They also illustrate how coroutines can be integrated with asyncio for asynchronous programming.
1. Basic Coroutine Example
defsimple_coroutine():print("Coroutine started") x =yieldprint(f"Received: {x}")coro =simple_coroutine()next(coro)# Start the coroutinecoro.send(42)# Output: Coroutine started \n Received: 42
2. Coroutine with Multiple yield Statements
defmulti_yield_coroutine():print("Coroutine started") x =yield"First yield"print(f"Received: {x}") y =yield"Second yield"print(f"Received: {y}")coro =multi_yield_coroutine()print(next(coro))# Output: Coroutine started \n First yieldprint(coro.send(10))# Output: Received: 10 \n Second yieldcoro.send(20)# Output: Received: 20
3. Coroutine for Accumulating Values
4. Coroutine for Data Filtering
5. Coroutine for String Processing
6. Coroutine with Exception Handling
7. Coroutine with Closing Behavior
8. Coroutine for Logging Messages
9. Coroutine for Producing Fibonacci Numbers
10. Coroutine with Asynchronous Behavior Using asyncio
def accumulator():
total = 0
while True:
value = yield total
total += value
coro = accumulator()
next(coro) # Start the coroutine
print(coro.send(10)) # Output: 10
print(coro.send(5)) # Output: 15
print(coro.send(20)) # Output: 35
def data_filter(threshold):
while True:
value = yield
if value > threshold:
print(f"Value {value} is above the threshold {threshold}")
coro = data_filter(10)
next(coro) # Start the coroutine
coro.send(5) # No output
coro.send(15) # Output: Value 15 is above the threshold 10
coro.send(8) # No output
def string_processor():
result = ""
while True:
string = yield result
result += string.upper()
coro = string_processor()
next(coro) # Start the coroutine
print(coro.send("hello")) # Output: HELLO
print(coro.send(" world")) # Output: HELLO WORLD
def exception_handling_coroutine():
try:
while True:
value = yield
print(f"Received: {value}")
except ValueError as e:
print(f"Caught exception: {e}")
coro = exception_handling_coroutine()
next(coro) # Start the coroutine
coro.send(10) # Output: Received: 10
coro.throw(ValueError, "An error occurred!") # Output: Caught exception: An error occurred!
def fibonacci_coroutine():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
coro = fibonacci_coroutine()
for _ in range(6):
print(next(coro), end=" ") # Output: 0 1 1 2 3 5