An Observer pattern implementation and illiterate programming

This posting is an experiment in documenting Java code using (in my case) illiterate programming. The Java source code is at The Observer Pattern and the Perl code that converted the Java code is at Simpleminded Java + Javadoc to HTML Converter.

The Observer pattern is used to create a relationship between two objects. The relationship is usually unidirectional with the observer waiting for notices from the observed. The relationships is a loose one as the observed only needs to know of the interest of the observer and the observer only needs to know of the set of and ordering of notice the observed will send. A downside of this looseness is that all notices pass through a single observer method for further dispatching.

 10 package com.andrewgilmartin.common.observation;
 11 
 12 import java.util.Set;
 13 import java.util.concurrent.BlockingQueue;
 14 import java.util.concurrent.CopyOnWriteArraySet;
 15 import java.util.concurrent.LinkedBlockingQueue;
 16 

The Observation class is only used to group all the related interfaces and classes into one file. In a typical development environment each interface and class would be in its own file.

 22 public class Observation {
 23 

The Observable is the object that is being watched. As changes are prepared and then made the observable will send consent and information notices to the observers. The notices are normally specialized classes that hold some context about the change. For example, if the notice concerns the addition of new inventory to the warehouse then the notice's class could have a method for enumerating the new inventory.

 32     public interface Observable {
 33 

If it is necessary for the observable to have approval before making a change then a consent notification is sent to the observers. Each observer will be notified and if any observer opposes the change then it must return false. The current thread will be used to notify the observers and so all the observed must wait for all observers to consent.

 42         boolean consentNotification(Object notice);
 43 

An information notification is normally sent after a change. Since the change has already occurred the notices are typically sent asynchronously by a background thread. The order of the notices is preserved, however.

 50         void informationNotification(Object notice);
 51     }
 52 

The observer is the object that is notified by the observable. There is no typed relationship beyond the Observable and Observer classes. As mentioned earlier, the notices are usually of specialized classes where each notice instance holds data relevant to the change.

 59     public interface Observer {
 60 
 61         boolean notice(Observable observable, Object notice);
 62     }
 63 

A registry is the means of establishing the relationship between the observable and the observer. This interface is distinct from Observable as it is sometimes useful to indirectly register an observer via, for example, a registrar.

 70     public interface Registry {
 71 

The order of the observers is undefined.

 75         Set<Observer> getObservers();
 76 

Adds the observer to the set of observers. Returns true if the observer was added.
 81         boolean addObserver(Observer observer);
 82 

Removes the observer from the set of observers. Returns true if the observer was among the observers and was removed.

 87         boolean deleteObserver(Observer observer);
 88     }
 89 

There is often very little difference between implementations of Observable and Registry and so this base implementation can be widely employed by any class that wants to be observed.

When extending this class make sure to document the set of notices, their consent or information role, and what is their ordering.

 98     public static class ObservableBase implements Observable, Registry {
 99 

The management of the set of observers needs to be thread-safe. The set is expected to be mostly stable over the life of the observed and so copy-on-write semantics is appropriate here.

105         private final Set<Observer> observers = new CopyOnWriteArraySet<Observer>();

Information notification notices will be sent by a background thread. A blocking queue will be used to coordinate the passing of notices from the observed to this background thread.

111         private final BlockingQueue<Object> informationNotices = new LinkedBlockingQueue<Object>();
112 
113         public ObservableBase() {

This implementation of the information notification background thread is quite simple and so uses an anonymous class for the implementation.

119             Thread informationEventsDispatcher = new Thread(new Runnable() {
120                 @Override
121                 public void run() {
122                     try {

Here the thread waits for a new notice on the queue and then sends it to each of the current observers.

127                         for (;;) {
128                             Object notice = informationNotices.take();
129                             for (Observer observer : observers) {
130                                 observer.notice(ObservableBase.this, notice);
131                             }
132                         }
133                     }
134                     catch (InterruptedException e) {
135                         // empty
136                     }
137                 }
138             });
139             informationEventsDispatcher.setDaemon(true);
140             informationEventsDispatcher.start();
141         }
142 
143         @Override
144         public boolean consentNotification(Object notice) {

As mentioned earlier, consent notifications are performed by the observed's thread. In this way, as soon as any observer opposes the change the observed must reject the change.

150             for (Observer observer : observers) {
151                 if (!observer.notice(this, notice)) {
152                     return false;
153                 }
154             }
155             return true;
156         }
157 
158         @Override
159         public void informationNotification(Object notice) {

Pass along the notice to the background thread.

163             informationNotices.add(notice);
164         }
165 
166         @Override
167         public Set<Observer> getObservers() {
168             return observers;
169         }
170 
171         @Override
172         public boolean addObserver(Observer observer) {
173             return observers.add(observer);
174         }
175 
176         @Override
177         public boolean deleteObserver(Observer observer) {
178             return observers.remove(observer);
179         }
180     }
181 

Here is a small example of using the observation interfaces and classes.

185     public static void main(String... args) throws Exception {
186 
187         class Notice {
188 
189             private int senderId;
190             private int sequenceNumber;
191 
192             public Notice(int senderId, int sequenceNumber) {
193                 this.senderId = senderId;
194                 this.sequenceNumber = sequenceNumber;
195             }
196 
197             public int getSenderId() {
198                 return senderId;
199             }
200 
201             public int getSequenceNumber() {
202                 return sequenceNumber;
203             }
204         }
205 

The Sender is an observed. All that it does it to send a stream of notices consisting of sender-id & sequence-number pairs.

210         class Sender extends ObservableBase implements Runnable {
211 
212             private int senderId;
213 
214             public Sender(int id) {
215                 this.senderId = id;
216             }
217 
218             @Override
219             public void run() {
220                 for (int sequenceNumber = 0;; sequenceNumber++) {
221                     informationNotification(new Notice(senderId, sequenceNumber));
222                     sleep(); // add some randomness to the processing.
223                 }
224             }
225         }
226 

The Receiver is an observer. All that it does is to print the notice's facts. In this example all notices are information notifications and so the notice() return value does not matter. However, as a matter of course, notice() should always return true unless it is well sure of the consequences of opposing the change.

234         class Receiver implements Observer {
235 
236             private int receiverId;
237 
238             public Receiver(int id) {
239                 this.receiverId = id;
240             }
241 
242             @Override
243             public boolean notice(Observable observable, Object notice) {
244                 if (notice instanceof Notice) {
245                     Notice n = (Notice) notice;
246                     System.out.printf("notice %d %d %d\n", receiverId, n.getSenderId(), n.getSequenceNumber());
247                     sleep(); // add some randomness to the processing.
248                 }
249                 return true;
250             }
251         }
252 

Create a few senders.

256         Sender[] senders = new Sender[5];
257         for (int i = 0; i < senders.length; i++) {
258             senders[i] = new Sender(i);
259         }
260 

Create a few receivers.

264         Receiver[] receivers = new Receiver[3];
265         for (int i = 0; i < receivers.length; i++) {
266             receivers[i] = new Receiver(i);
267         }
268 

Have each receiver observe each sender

272         for (Receiver r : receivers) {
273             for (Sender s : senders) {
274                 s.addObserver(r);
275             }
276         }
277 

Startup the senders

281         for (Sender s : senders) {
282             Thread t = new Thread(s);
283             t.setDaemon(false);
284             t.start();
285         }
286     }
287 

Add some sleep of random duration to the current thread.

291     static void sleep() {
292         try {
293             Thread.sleep(Math.round(Math.random() * 25));
294         }
295         catch (InterruptedException e) {
296             // empty
297         }
298     }
299 }

To build and run this from the command line, first compile using

303 javac -d /tmp ./src/com/andrewgilmartin/common/observation/Observation

And then run using

308 java -classpath /tmp com.andrewgilmartin.common.observation.Observation

You can show to yourself that the events are ordered by sorting the output on the receiver id and you will see that the events are in numeric order.

314 java -classpath /tmp com.andrewgilmartin.common.observation.Observation | head -20 | sort -k 2 -n -s
317 
318 // END