Реализация паттерна Producer/Consumer

1,00
р.
Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков производят данные, и параллельно этому один или несколько потоков потребляют их.
Как правильно реализовать этот паттерн в популярных языках программирования? Задача сама по себе нетривиальна, поскольку включает синхронизацию между потоками, и потенциальную гонку между несколькими производителями и потребителями.

Справка.
Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».
Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.
Примеры:
Произведённые данные могут представлять вычислительно интенсивное задание. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления).
Или производящие потоки загружают данные из сети, а по окончанию загрузки выполняющие потоки производит разбор загруженных данных. В этом случае разумно иметь по одному производителю на сайт и, и ограничивать число производителей, если предел доступной скорости сети исчерпан.

Этот вопрос — адаптация одноименного исследования с Хэшкода.

Ответ
Реализация на C#
Для современных версий языка (начиная с C# 4.0), имеет смысл не писать реализацию вручную, а (руководствуясь советом @Flammable), воспользоваться классом BlockingCollection, представляющим нужную функциональность.
Для чтения в consumer-потоках используем просто циклы по последовательности, которую даёт GetConsumingEnumerable(). В producer-потоках пользуемся Add, и в конце не забываем CompleteAdding, чтобы consumer-потоки смогли остановиться.
Пример:
class Program { static public void Main() { new Program().Run() }
BlockingCollection q = new BlockingCollection()
void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) } foreach (var t in threads) t.Start()
string s while ((s = Console.ReadLine()).Length != 0) q.Add(s)
q.CompleteAdding() // останавливаем
foreach (var t in threads) t.Join() }
void Consumer() { foreach (var s in q.GetConsumingEnumerable()) { Console.WriteLine("Processing: {0}", s) Thread.Sleep(2000) Console.WriteLine("Processed: {0}", s) } } }
BlockingCollection позволяет ограничить количество элементов, так что попытка добавить элемент в переполненную очередь также может быть заблокирована до освобождения места.
Заметьте, что GetConsumingEnumerable корректно работает даже в случае, когда у вас много консьюмеров. Это не так уж и очевидно.

Если вы работаете со старой версией C#, вам придётся писать нужную функциональность вручную. Вы можете воспользоваться встроенным классом Monitor (который является аналогом mutex + condition variable из pthreads).
public class ProducerConsumer where T : class { object mutex = new object() Queue queue = new Queue() bool isDead = false
public void Enqueue(T task) { if (task == null) throw new ArgumentNullException("task") lock (mutex) { if (isDead) throw new InvalidOperationException("Queue already stopped") queue.Enqueue(task) Monitor.Pulse(mutex) } }
public T Dequeue() { lock (mutex) { while (queue.Count == 0 && !isDead) Monitor.Wait(mutex)
if (queue.Count == 0) return null
return queue.Dequeue() } }
public void Stop() { lock (mutex) { isDead = true Monitor.PulseAll(mutex) } } }
Использование (аналогичный пример):
class Program { static public void Main() { new Program().Run() }
ProducerConsumer q = new ProducerConsumer()
void Run() { var threads = new [] { new Thread(Consumer), new Thread(Consumer) } foreach (var t in threads) t.Start()
string s while ((s = Console.ReadLine()).Length != 0) q.Enqueue(s)
q.Stop()
foreach (var t in threads) t.Join() }
void Consumer() { while (true) { string s = q.Dequeue() if (s == null) break Console.WriteLine("Processing: {0}", s) Thread.Sleep(2000) Console.WriteLine("Processed: {0}", s) } } }