import multiprocessing
def producer(pipe):
for i in range(5):
print(f"Producer: {i}")
pipe.send(i)
def consumer(pipe):
while True:
item = pipe.recv()
print(f"Consumer: {item}")
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=producer, args=(parent_conn,))
p2 = multiprocessing.Process(target=consumer, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.terminate()
import multiprocessing
def consumer(q, id):
while True:
item = q.get()
if item == "STOP":
break
print(f"Consumer {id}: {item}")
if __name__ == "__main__":
q = multiprocessing.Queue()
# Start multiple consumers
consumers = [multiprocessing.Process(target=consumer, args=(q, i)) for i in range(3)]
for c in consumers:
c.start()
# Producer
for i in range(5):
q.put(i)
# Stop consumers
for c in consumers:
q.put("STOP")
for c in consumers:
c.join()
import multiprocessing
def task1(pipe_out):
pipe_out.send("Hello")
pipe_out.send("World")
def task2(pipe_in, pipe_out):
while True:
msg = pipe_in.recv()
if msg == "END":
break
print(f"Task 2 received: {msg}")
pipe_out.send(f"Processed {msg}")
def task3(pipe_in):
while True:
msg = pipe_in.recv()
if msg == "END":
break
print(f"Task 3 processed: {msg}")
if __name__ == "__main__":
pipe1_out, pipe1_in = multiprocessing.Pipe()
pipe2_out, pipe2_in = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task1, args=(pipe1_out,))
p2 = multiprocessing.Process(target=task2, args=(pipe1_in, pipe2_out))
p3 = multiprocessing.Process(target=task3, args=(pipe2_in,))
p1.start()
p2.start()
p3.start()
p1.join()
pipe1_out.send("END") # End signal for task2
p2.join()
pipe2_out.send("END") # End signal for task3
p3.join()
import multiprocessing
import time
def task_with_error(queue, pipe):
try:
for i in range(5):
if i == 3:
raise ValueError("An error occurred in task")
queue.put(i)
time.sleep(1)
except Exception as e:
pipe.send(str(e))
def handle_errors(pipe):
error_message = pipe.recv()
print(f"Error received: {error_message}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=task_with_error, args=(queue, pipe))
p2 = multiprocessing.Process(target=handle_errors, args=(pipe,))
p1.start()
p2.start()
p1.join()
p2.join()
import multiprocessing
import time
def worker(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Processing item: {item}")
time.sleep(1)
if __name__ == "__main__":
queue = multiprocessing.Queue()
workers = [multiprocessing.Process(target=worker, args=(queue,)) for _ in range(3)]
for w in workers:
w.start()
for item in range(5):
queue.put(item)
for _ in workers:
queue.put(None) # Signal to stop workers
for w in workers:
w.join()
import multiprocessing
def worker(shared_data):
shared_data[0] += 1
print(f"Shared data: {shared_data[0]}")
if __name__ == "__main__":
shared_data = multiprocessing.Array('i', [0]) # Shared memory array
workers = [multiprocessing.Process(target=worker, args=(shared_data,)) for _ in range(5)]
for w in workers:
w.start()
for w in workers:
w.join()
print(f"Final shared data: {shared_data[0]}")
import multiprocessing
def process_task(queue):
while True:
task = queue.get()
if task == "STOP":
break
print(f"Processing: {task}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
workers = [multiprocessing.Process(target=process_task, args=(queue,)) for _ in range(3)]
for w in workers:
w.start()
tasks = ["task1", "task2", "task3", "task4", "task5"]
for task in tasks:
queue.put(task)
for _ in workers:
queue.put("STOP") # Stop signal
for w in workers:
w.join()