Session 12 |
Cooperation among threads
Thread synchronization suffices to avoid race conditions by ensuring the mutual exclusion of multiple threads in the critical region, but sometimes we also need a way for threads to cooperate. Synchronization is a difficult topic, but fortunately there is a simpler alternative - the exchange of messages between threads.
The use of multiple threads simultaneously forces the programmer to work hard to ensure that the code is correctly synchronized and the threads do not interrupt each other. In the case of messaging technology, threads never use the same objects at once. If one thread wants to make any data visible to another thread, then it sends a message to the thread which uses the resource. Messages are sent using special queues (blocking queues) whose methods do not require synchronization.
Queues will be introduced next week in Part 13.3. A blocking queue causes the thread to be blocked when we try to add an element to a full queue or to remove an element from an empty queue. The (BlockingQueue
) interface extends java.util.Queue
and provides the synchronized put
and take
methods for adding an element to the tail of the queue and for removing an element from the head of the queue.
What messages should threads send to each other? If a thread wants to assign a task to another thread (e.g. read a file, calculate a value), the task provider puts to the queue the initial data for the task (filename, some parameters). The task handler then receives the initial data from the queue and starts the job. If there are several tasks, then several threads can share the queue with the initial data. If a thread gets ready with a given task, it can return the calculated results to the sender of the task with another queue. In this case, the solution to the problem is the content of the message (data obtained from the file, the calculated value). Such a system is often referred to as a producer-consumer model.
Let us look at an example that takes the sentences and reverses the order of the words in them.
import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; public class ReverseWords implements Runnable { // queue for taking the messages private BlockingQueue<String> initialSentences; // queue for sending messages back private BlockingQueue<String> reversedSentences; public ReverseWords( BlockingQueue<String> initialSentences, BlockingQueue<String> reversedSentences) { this.initialSentences = initialSentences; this.reversedSentences = reversedSentences; } @Override public void run() { try { while (true) { // try to take next message String sentence = initialSentences.poll(); if (sentence == null) break; // if there is no more messages, stop the job List<String> words = Arrays.asList(sentence.split(" ")); Collections.reverse(words); String reversedSentence = String.join(" ", words); // send the message back reversedSentences.put(reversedSentence); } } catch (InterruptedException e) { throw new RuntimeException(e); } } } import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ReverseWordsTest { public static void main(String[] args) throws InterruptedException { List<String> initialData = Arrays.asList( "A BlockingQueue does not accept null elements", "Designed to be used primarily for producer-consumer queues", "BlockingQueue implementations are thread-safe", "BlockingQueue can safely be used with multiple producers/consumers"); // create queues and assign the maximum capacity BlockingQueue<String> initialSentences = new ArrayBlockingQueue<>(10); BlockingQueue<String> reversedSentences = new ArrayBlockingQueue<>(10); // add all sentences to the queue initialSentences.addAll(initialData); // start producers (both threads take tasks from one queue) new Thread(new ReverseWords(initialSentences, reversedSentences)).start(); new Thread(new ReverseWords(initialSentences, reversedSentences)).start(); for (int i = 0; i < initialData.size(); i++) { // take the next sentence from result queue (wait if needed) System.out.println(reversedSentences.take()); } } }
This program does not use monitors and synchronization at all because synchronization is already
implemented in ArrayBlockingQueue
.
Session 12 |