1 /*
2 File: SynchronousChannel.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Disabled direct semaphore permit check
13 31Jul1998 dl Replaced main algorithm with one with
14 better scaling and fairness properties.
15 25aug1998 dl added peek
16 24Nov2001 dl Replaced main algorithm with faster one.
17 */
18
19 package org.dbunit.util.concurrent;
20
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 /**
25 * A rendezvous channel, similar to those used in CSP and Ada. Each
26 * put must wait for a take, and vice versa. Synchronous channels
27 * are well suited for handoff designs, in which an object running in
28 * one thread must synch up with an object running in another thread
29 * in order to hand it some information, event, or task.
30 * <p> If you only need threads to synch up without
31 * exchanging information, consider using a Barrier. If you need
32 * bidirectional exchanges, consider using a Rendezvous. <p>
33 *
34 * <p>Read the
35 * <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">introduction to this package</a>
36 * for more details.
37 *
38 * @author Doug Lea
39 * @author Last changed by: $Author: gommma $
40 * @version $Revision: 766 $ $Date: 2008-08-01 13:05:20 +0200 (ven, 01 ago 2008) $
41 * @since ? (pre 2.1)
42 * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/Rendezvous.html">Rendezvous</a>
43 * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.html">CyclicBarrier</a>
44 */
45 public class SynchronousChannel implements BoundedChannel {
46
47 /**
48 * Logger for this class
49 */
50 private static final Logger logger = LoggerFactory.getLogger(SynchronousChannel.class);
51
52 /*
53 This implementation divides actions into two cases for puts:
54
55 * An arriving putter that does not already have a waiting taker
56 creates a node holding item, and then waits for a taker to take it.
57 * An arriving putter that does already have a waiting taker fills
58 the slot node created by the taker, and notifies it to continue.
59
60 And symmetrically, two for takes:
61
62 * An arriving taker that does not already have a waiting putter
63 creates an empty slot node, and then waits for a putter to fill it.
64 * An arriving taker that does already have a waiting putter takes
65 item from the node created by the putter, and notifies it to continue.
66
67 This requires keeping two simple queues: waitingPuts and waitingTakes.
68
69 When a put or take waiting for the actions of its counterpart
70 aborts due to interruption or timeout, it marks the node
71 it created as "CANCELLED", which causes its counterpart to retry
72 the entire put or take sequence.
73 */
74
75 /**
76 * Special marker used in queue nodes to indicate that
77 * the thread waiting for a change in the node has timed out
78 * or been interrupted.
79 **/
80 protected static final Object CANCELLED = new Object();
81
82 /**
83 * Simple FIFO queue class to hold waiting puts/takes.
84 **/
85 protected static class Queue {
86
87 /**
88 * Logger for this class
89 */
90 private static final Logger logger = LoggerFactory.getLogger(Queue.class);
91
92 protected LinkedNode head;
93 protected LinkedNode last;
94
95 protected void enq(LinkedNode p) {
96 logger.debug("enq(p={}) - start", p);
97
98 if (last == null)
99 last = head = p;
100 else
101 last = last.next = p;
102 }
103
104 protected LinkedNode deq() {
105 logger.debug("deq() - start");
106
107 LinkedNode p = head;
108 if (p != null && (head = p.next) == null)
109 last = null;
110 return p;
111 }
112 }
113
114 protected final Queue waitingPuts = new Queue();
115 protected final Queue waitingTakes = new Queue();
116
117 /**
118 * @return zero --
119 * Synchronous channels have no internal capacity.
120 **/
121 public int capacity() {
122 logger.debug("capacity() - start");
123 return 0; }
124
125 /**
126 * @return null --
127 * Synchronous channels do not hold contents unless actively taken
128 **/
129 public Object peek() {
130 logger.debug("peek() - start");
131 return null; }
132
133
134 public void put(Object x) throws InterruptedException {
135 logger.debug("put(x={}) - start", x);
136
137 if (x == null) throw new IllegalArgumentException();
138
139 // This code is conceptually straightforward, but messy
140 // because we need to intertwine handling of put-arrives first
141 // vs take-arrives first cases.
142
143 // Outer loop is to handle retry due to canceled waiting taker
144 for (;;) {
145
146 // Get out now if we are interrupted
147 if (Thread.interrupted()) throw new InterruptedException();
148
149 // Exactly one of item or slot will be not-null at end of
150 // synchronized block, depending on whether a put or a take
151 // arrived first.
152 LinkedNode slot;
153 LinkedNode item = null;
154
155 synchronized(this) {
156 // Try to match up with a waiting taker; fill and signal it below
157 slot = waitingTakes.deq();
158
159 // If no takers yet, create a node and wait below
160 if (slot == null)
161 waitingPuts.enq(item = new LinkedNode(x));
162 }
163
164 if (slot != null) { // There is a waiting taker.
165 // Fill in the slot created by the taker and signal taker to
166 // continue.
167 synchronized(slot) {
168 if (slot.value != CANCELLED) {
169 slot.value = x;
170 slot.notify();
171 return;
172 }
173 // else the taker has canceled, so retry outer loop
174 }
175 }
176
177 else {
178 // Wait for a taker to arrive and take the item.
179 synchronized(item) {
180 try {
181 while (item.value != null)
182 item.wait();
183 return;
184 }
185 catch (InterruptedException ie) {
186 // If item was taken, return normally but set interrupt status
187 if (item.value == null) {
188 Thread.currentThread().interrupt();
189 return;
190 }
191 else {
192 item.value = CANCELLED;
193 throw ie;
194 }
195 }
196 }
197 }
198 }
199 }
200
201 public Object take() throws InterruptedException {
202 logger.debug("take() - start");
203
204 // Entirely symmetric to put()
205
206 for (;;) {
207 if (Thread.interrupted()) throw new InterruptedException();
208
209 LinkedNode item;
210 LinkedNode slot = null;
211
212 synchronized(this) {
213 item = waitingPuts.deq();
214 if (item == null)
215 waitingTakes.enq(slot = new LinkedNode());
216 }
217
218 if (item != null) {
219 synchronized(item) {
220 Object x = item.value;
221 if (x != CANCELLED) {
222 item.value = null;
223 item.next = null;
224 item.notify();
225 return x;
226 }
227 }
228 }
229
230 else {
231 synchronized(slot) {
232 try {
233 for (;;) {
234 Object x = slot.value;
235 if (x != null) {
236 slot.value = null;
237 slot.next = null;
238 return x;
239 }
240 else
241 slot.wait();
242 }
243 }
244 catch(InterruptedException ie) {
245 Object x = slot.value;
246 if (x != null) {
247 slot.value = null;
248 slot.next = null;
249 Thread.currentThread().interrupt();
250 return x;
251 }
252 else {
253 slot.value = CANCELLED;
254 throw ie;
255 }
256 }
257 }
258 }
259 }
260 }
261
262 /*
263 Offer and poll are just like put and take, except even messier.
264 */
265
266
267 public boolean offer(Object x, long msecs) throws InterruptedException {
268 if(logger.isDebugEnabled())
269 logger.debug("offer(x={}, msecs={}) - start", x, String.valueOf(msecs));
270
271 if (x == null) throw new IllegalArgumentException();
272 long waitTime = msecs;
273 long startTime = 0; // lazily initialize below if needed
274
275 for (;;) {
276 if (Thread.interrupted()) throw new InterruptedException();
277
278 LinkedNode slot;
279 LinkedNode item = null;
280
281 synchronized(this) {
282 slot = waitingTakes.deq();
283 if (slot == null) {
284 if (waitTime <= 0)
285 return false;
286 else
287 waitingPuts.enq(item = new LinkedNode(x));
288 }
289 }
290
291 if (slot != null) {
292 synchronized(slot) {
293 if (slot.value != CANCELLED) {
294 slot.value = x;
295 slot.notify();
296 return true;
297 }
298 }
299 }
300
301 long now = System.currentTimeMillis();
302 if (startTime == 0)
303 startTime = now;
304 else
305 waitTime = msecs - (now - startTime);
306
307 if (item != null) {
308 synchronized(item) {
309 try {
310 for (;;) {
311 if (item.value == null)
312 return true;
313 if (waitTime <= 0) {
314 item.value = CANCELLED;
315 return false;
316 }
317 item.wait(waitTime);
318 waitTime = msecs - (System.currentTimeMillis() - startTime);
319 }
320 }
321 catch (InterruptedException ie) {
322 if (item.value == null) {
323 Thread.currentThread().interrupt();
324 return true;
325 }
326 else {
327 item.value = CANCELLED;
328 throw ie;
329 }
330 }
331 }
332 }
333 }
334 }
335
336 public Object poll(long msecs) throws InterruptedException {
337 if(logger.isDebugEnabled())
338 logger.debug("poll(msecs={}) - start", String.valueOf(msecs));
339
340 long waitTime = msecs;
341 long startTime = 0;
342
343 for (;;) {
344 if (Thread.interrupted()) throw new InterruptedException();
345
346 LinkedNode item;
347 LinkedNode slot = null;
348
349 synchronized(this) {
350 item = waitingPuts.deq();
351 if (item == null) {
352 if (waitTime <= 0)
353 return null;
354 else
355 waitingTakes.enq(slot = new LinkedNode());
356 }
357 }
358
359 if (item != null) {
360 synchronized(item) {
361 Object x = item.value;
362 if (x != CANCELLED) {
363 item.value = null;
364 item.next = null;
365 item.notify();
366 return x;
367 }
368 }
369 }
370
371 long now = System.currentTimeMillis();
372 if (startTime == 0)
373 startTime = now;
374 else
375 waitTime = msecs - (now - startTime);
376
377 if (slot != null) {
378 synchronized(slot) {
379 try {
380 for (;;) {
381 Object x = slot.value;
382 if (x != null) {
383 slot.value = null;
384 slot.next = null;
385 return x;
386 }
387 if (waitTime <= 0) {
388 slot.value = CANCELLED;
389 return null;
390 }
391 slot.wait(waitTime);
392 waitTime = msecs - (System.currentTimeMillis() - startTime);
393 }
394 }
395 catch(InterruptedException ie) {
396 Object x = slot.value;
397 if (x != null) {
398 slot.value = null;
399 slot.next = null;
400 Thread.currentThread().interrupt();
401 return x;
402 }
403 else {
404 slot.value = CANCELLED;
405 throw ie;
406 }
407 }
408 }
409 }
410 }
411 }
412
413 }