Here are 10 Python snippets that demonstrate concurrency using the concurrent.futures module with ThreadPoolExecutor and ProcessPoolExecutor.
1. Using ThreadPoolExecutor for Basic Task Execution
Copy
from concurrent.futures import ThreadPoolExecutor
def task(n):
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task, range(5)))
print(results)
2. Using ProcessPoolExecutor for CPU-Intensive Tasks
Copy
from concurrent.futures import ProcessPoolExecutor
import math
def compute_factorial(n):
return math.factorial(n)
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(compute_factorial, [5, 10, 15, 20]))
print(results)
3. Submitting Tasks Individually
Copy
4. Using as_completed to Process Results as They Finish
Copy
5. Handling Exceptions in Concurrent Tasks
Copy
6. Cancelling Pending Tasks
Copy
7. Using ProcessPoolExecutor for Parallel File I/O
Copy
8. Mixing ThreadPoolExecutor with I/O Bound Tasks
Copy
9. Timing Concurrent Tasks
Copy
10. Using ProcessPoolExecutor for Large Dataset Processing
Copy
These snippets demonstrate the power and versatility of concurrent.futures for handling concurrency in both I/O-bound tasks (using ThreadPoolExecutor) and CPU-bound tasks (using ProcessPoolExecutor).
from concurrent.futures import ThreadPoolExecutor
def task(n):
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
print(future.result())
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(n)
return f"Task {n} completed after {n} seconds."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(1, 4)]
for future in as_completed(futures):
print(future.result())
from concurrent.futures import ThreadPoolExecutor
def task(n):
if n == 2:
raise ValueError("Error in task!")
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in futures:
try:
print(future.result())
except Exception as e:
print(f"Exception: {e}")
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(n)
return f"Task {n} completed."
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5)
print("Cancelling task...")
future.cancel()
print("Cancelled:", future.cancelled())
from concurrent.futures import ProcessPoolExecutor
def read_file(file_path):
with open(file_path, "r") as f:
return f.read()
file_paths = ["file1.txt", "file2.txt", "file3.txt"] # Replace with actual files
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(read_file, file_paths))
print(results)
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return f"{url} returned {len(response.content)} bytes."
urls = ["https://example.com", "https://httpbin.org/get", "https://api.github.com"]
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n)
return f"Task {n} done."
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task, [2, 3, 1]))
end = time.time()
print("Results:", results)
print(f"Total time: {end - start:.2f} seconds")
from concurrent.futures import ProcessPoolExecutor
def square(n):
return n * n
data = list(range(1, 11))
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, data))
print("Squared values:", results)