Межпотоковые коммуникации

Предыдущие примеры, безусловно, блокировали другие потоки от асинхронного доступа к некоторым методам. Это использование неявных мониторов объектов Java является мощным средством, но вы можете достичь более тонкого уровня контроля посредством межпроцессных коммуникаций. Как вы увидите, это особенно просто в Java.

Как обсуждалось ранее, многопоточность заменила программирование на основе циклов событий за счет разделения ваших задач на дискретные, логически обособленные единицы. Потоки также предоставляют вторичную выгоду: они исключают опрос. Опрос обычно реализуется в виде цикла, используемого для периодической проверки некоторого условия. Как только условие истинно, выполняется определенное действие. Это расходует время процессора. Например, рассмотрим классическую проблему, когда один поток генерирует некоторые данные, а другой принимает их. Чтобы сделать проблему более интересной, предположим, что поставщик данных должен ожидать, когда потребитель завершит, прежде чем поставщик сгенерирует новые данных. В системах с опросом потребитель данных тратит много циклов процессора на ожидание данных от поставщика. Как только поставщик завершает, он должен начать опрос, расходующий циклы процессора в ожидании завершения работы потребителя данных, и так далее. Понятно, что такая ситуация нежелательна.

Чтобы избежать опроса, Java включает элегантный механизм межпроцессных коммуникаций посредством методов wait (), notify () и notifyAll (). Эти методы реализованы как final в классе Object, поэтому они доступны всем классам. Все три метода могут быть вызваны только из synchronized-контекста. Хотя с точки зрения компьютерной науки они концептуально сложны, правила применения этих методов достаточно просты.

  • wait () принуждает вызывающий поток отдать монитор и приостановить выполнение до тех пор, пока какой-нибудь другой поток не войдет в тот же монитор и не вызовет notify ().
  • notifу () возобновляет работу потока, который вызвал wait () на том же самом объекте.
  • notifyAll () возобновляет работу всех протоков, который вызвали wait () на том же самом объекте. Одному из потоков дается доступ.

Эти методы объявлены в Object, как показано ниже:

final void wait ()
throws InterruptedException final void notify ()
final void notifyAll()

Существуют дополнительные формы wait (), позволяющие указать время ожидания. Прежде чем рассматривать пример, демонстрирующий межпотоковое взаимодействие, необходимо сделать одно важное замечание. Хотя wait () обычно ожидает до тех пор, пока не будет вызван notify () или notifyAll (), существует вероятность, что в очень редких случаях ожидающий поток может быть разбужен поддельным сигналом. При этом ожидающий поток возобновляется без вызова notify () или notifyAll (). (По сути, поток возобновляется без явных причин.) Из-за этой маловероятной возможности Sun рекомендует выполнять вызовы wait () внутри цикла, проверяющего условие, по которому поток ожидает. В приведенном ниже примере показан такой подход.

А пока рассмотрим пример, использующий wait () и notify (). Для начала проанализируем следующий простой пример программы, некорректно реализующий задачу "поставщик/потребитель". Она состоит из четырех классов: Q — очередь, которую нужно синхронизировать, Producer — объект-поток, который генерирует элементы очереди, Consumer — объект-поток, принимающий элементы очереди, и PC — крошечный класс, который создает объекты Q, Producer и Consumer.

// Неправильная реализация поставщика и потребителя.
class Q {
int n;
synchronized int get() {
System.out.println("Получено: " + n) ;
return n;
}
synchronized void put(int n) {
this.n = n;
System.out.println("Отправлено: " + n) ;
}
}
class Producer implements Runnable {
Q q;
Producer(Q q) {
this. q = q;
new Thread(this, "Поставщик").start ();
}
public void run() {
int i = 0;
while (true) {
q.put(i++);
}
}
}
class Consumer implements Runnable {
Q q;
Consumer(Q q) {
this.q = q;
new Thread(this,"Потребитель").start();
) public void run() {v while(true) {
q.get () ;
}
}
}
class PC {
public static void main(String args[]) {
Q q = new Q () ;
new Producer(q);
new Consumer(q);
System.out.println("Для останова нажмите Control-C.");
}
}

Несмотря на то что методы put () и get () в Q синхронизированы, ничто не остановит переполнение потребителя поставщиком, как и ничто не помешает потребителю извлечь один и тот же компонент очереди дважды. То есть вы получите неверный результат, показанный ниже (точная последовательность может быть другой, в зависимости от скорости процессора и загрузки).

Отправлено: 1
Получено: 1
Получено: 1
Получено: 1
Получено: 1
Получено: 1
Отправлено: 2
Отправлено: 3
Отправлено: 4
Отправлено: 5
Отправлено: 6
Отправлено: 7
Получено: 7

Как видите, после того, как поставщик отправляет 1, запускается потребитель и получает это же значение 1 пять раз подряд. Затем поставщик продолжает работу и поставляет значения от 2 до 7, не давая возможности потребителю получить их.

Правильный способ написания этой программы на Java заключается в том, чтобы применить wait () и notify (), чтобы передавать сигналы в обоих направлениях, как показано ниже:

// Правильная реализация поставщика и потребителя.
class Q {
int n;
boolean valueSet = false;
synchronized int get() {
while(!valueSet) try {
wait () ;
}
catch(InterruptedException e) {
System.out.println("InterruptedException перехвачено");
}
System.out.println("Получено: " + n) ;
valueSet = false;
notify();
return n;
}
synchronized void put(int n) {
while(valueSet) try {
wait () ;
} catch(InterruptedException e) {
System.out.println("InterruptedException перехвачено");
}
this.n = n;
valueSet = true;
System.out.println("Отправлено: " + n) ;
notify();v }
}
class Producer implements Runnable {
Q q;
Producer(Q q) {
this. q = q;
new Thread(this,"Поставщик").start ();
}
public void run() {
int i = 0;
while(true) {
q.put(i++);
}
}
}
class Consumer implements Runnable {
Q q;
Consumer(Q q) {
this.q = q;
new Thread(this, "Потребитель").start ();
}
public void run() {
while(true) {
q.get () ;
}
}
}
class PCFixed {
public static void main(String args[]) {
Q q = new Q();
new Producer (q) ;
new Consumer (q) ;
System.out.println("Для останова нажмите Control-C.");
}
}

Внутри get () вызывается wait (). Это приостанавливает работу потока до тех пор, пока Producer не известит вас о том, что данные прочитаны. Когда это случается, выполнение внутри get () продолжается. После получения данных get () вызывает notify ()

Это сообщает Producer, что все в порядке, и можно помещать в очередь следующий элемент данных. Внутри put () метод wait () приостанавливает выполнение до тех пор, пока Consumer не извлечет элемент из очереди. Когда выполнение возобновится, следующий элемент данных помещается в очередь и вызывается notify (). Это сообщает Consumer, что он теперь может извлечь его.

Ниже приведен вывод программы, который доказывает, что теперь синхронизация работает корректно.

Отправлено: 1
Получено: 1
Отправлено: 2
Получено: 2
Отправлено: 3
Получено: 3
Отправлено: 4
Получено: 4
Отправлено: 5
Получено: 5