1 /*
2 File: BoundedLinkedQueue.java
3
4 Originally written by Doug Lea and released into the public domain.
5 This may be used for any purposes whatsoever without acknowledgment.
6 Thanks for the assistance and support of Sun Microsystems Labs,
7 and everyone contributing, testing, and using this code.
8
9 History:
10 Date Who What
11 11Jun1998 dl Create public version
12 17Jul1998 dl Simplified by eliminating wait counts
13 25aug1998 dl added peek
14 10oct1999 dl lock on node object to ensure visibility
15 27jan2000 dl setCapacity forces immediate permit reconcile
16 */
17
18 package org.dbunit.util.concurrent;
19
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24 * A bounded variant of
25 * LinkedQueue
26 * class. This class may be
27 * preferable to
28 * BoundedBuffer
29 * because it allows a bit more
30 * concurency among puts and takes, because it does not
31 * pre-allocate fixed storage for elements, and allows
32 * capacity to be dynamically reset.
33 * On the other hand, since it allocates a node object
34 * on each put, it can be slow on systems with slow
35 * allocation and GC.
36 * Also, it may be
37 * preferable to
38 * LinkedQueue
39 * when you need to limit
40 * the capacity to prevent resource exhaustion. This protection
41 * normally does not hurt much performance-wise: When the
42 * queue is not empty or full, most puts and
43 * takes are still usually able to execute concurrently.
44 * @see LinkedQueue
45 * @see BoundedBuffer
46 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
47 *
48 * @author Doug Lea
49 * @author Last changed by: $Author: gommma $
50 * @version $Revision: 766 $ $Date: 2008-08-01 13:05:20 +0200 (ven, 01 ago 2008) $
51 * @since ? (pre 2.1)
52 */
53 public class BoundedLinkedQueue implements BoundedChannel {
54
55 /**
56 * Logger for this class
57 */
58 private static final Logger logger = LoggerFactory.getLogger(BoundedLinkedQueue.class);
59
60 /*
61 * It might be a bit nicer if this were declared as
62 * a subclass of LinkedQueue, or a sibling class of
63 * a common abstract class. It shares much of the
64 * basic design and bookkeeping fields. But too
65 * many details differ to make this worth doing.
66 */
67
68
69
70 /**
71 * Dummy header node of list. The first actual node, if it exists, is always
72 * at head_.next. After each take, the old first node becomes the head.
73 **/
74 protected LinkedNode head_;
75
76 /**
77 * The last node of list. Put() appends to list, so modifies last_
78 **/
79 protected LinkedNode last_;
80
81
82 /**
83 * Helper monitor. Ensures that only one put at a time executes.
84 **/
85
86 protected final Object putGuard_ = new Object();
87
88 /**
89 * Helper monitor. Protects and provides wait queue for takes
90 **/
91
92 protected final Object takeGuard_ = new Object();
93
94
95 /** Number of elements allowed **/
96 protected int capacity_;
97
98
99 /**
100 * One side of a split permit count.
101 * The counts represent permits to do a put. (The queue is full when zero).
102 * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
103 * (The length is never separately recorded, so this cannot be
104 * checked explicitly.)
105 * To minimize contention between puts and takes, the
106 * put side uses up all of its permits before transfering them from
107 * the take side. The take side just increments the count upon each take.
108 * Thus, most puts and take can run independently of each other unless
109 * the queue is empty or full.
110 * Initial value is queue capacity.
111 **/
112
113 protected int putSidePutPermits_;
114
115 /** Number of takes since last reconcile **/
116 protected int takeSidePutPermits_ = 0;
117
118
119 /**
120 * Create a queue with the given capacity
121 * @exception IllegalArgumentException if capacity less or equal to zero
122 **/
123 public BoundedLinkedQueue(int capacity) {
124 if (capacity <= 0) throw new IllegalArgumentException();
125 capacity_ = capacity;
126 putSidePutPermits_ = capacity;
127 head_ = new LinkedNode(null);
128 last_ = head_;
129 }
130
131 /**
132 * Create a queue with the current default capacity
133 **/
134
135 public BoundedLinkedQueue() {
136 this(DefaultChannelCapacity.get());
137 }
138
139 /**
140 * Move put permits from take side to put side;
141 * return the number of put side permits that are available.
142 * Call only under synch on puGuard_ AND this.
143 **/
144 protected final int reconcilePutPermits() {
145 logger.debug("reconcilePutPermits() - start");
146
147 putSidePutPermits_ += takeSidePutPermits_;
148 takeSidePutPermits_ = 0;
149 return putSidePutPermits_;
150 }
151
152
153 /** Return the current capacity of this queue **/
154 public synchronized int capacity() {
155 logger.debug("capacity() - start");
156 return capacity_; }
157
158
159 /**
160 * Return the number of elements in the queue.
161 * This is only a snapshot value, that may be in the midst
162 * of changing. The returned value will be unreliable in the presence of
163 * active puts and takes, and should only be used as a heuristic
164 * estimate, for example for resource monitoring purposes.
165 **/
166 public synchronized int size() {
167 logger.debug("size() - start");
168
169 /*
170 This should ideally synch on putGuard_, but
171 doing so would cause it to block waiting for an in-progress
172 put, which might be stuck. So we instead use whatever
173 value of putSidePutPermits_ that we happen to read.
174 */
175 return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
176 }
177
178
179 /**
180 * Reset the capacity of this queue.
181 * If the new capacity is less than the old capacity,
182 * existing elements are NOT removed, but
183 * incoming puts will not proceed until the number of elements
184 * is less than the new capacity.
185 * @exception IllegalArgumentException if capacity less or equal to zero
186 **/
187
188 public void setCapacity(int newCapacity) {
189 logger.debug("setCapacity(newCapacity=" + newCapacity + ") - start");
190
191 if (newCapacity <= 0) throw new IllegalArgumentException();
192 synchronized (putGuard_) {
193 synchronized(this) {
194 takeSidePutPermits_ += (newCapacity - capacity_);
195 capacity_ = newCapacity;
196
197 // Force immediate reconcilation.
198 reconcilePutPermits();
199 notifyAll();
200 }
201 }
202 }
203
204
205 /** Main mechanics for take/poll **/
206 protected synchronized Object extract() {
207 logger.debug("extract() - start");
208
209 synchronized(head_) {
210 Object x = null;
211 LinkedNode first = head_.next;
212 if (first != null) {
213 x = first.value;
214 first.value = null;
215 head_ = first;
216 ++takeSidePutPermits_;
217 notify();
218 }
219 return x;
220 }
221 }
222
223 public Object peek() {
224 logger.debug("peek() - start");
225
226 synchronized(head_) {
227 LinkedNode first = head_.next;
228 if (first != null)
229 return first.value;
230 else
231 return null;
232 }
233 }
234
235 public Object take() throws InterruptedException {
236 logger.debug("take() - start");
237
238 if (Thread.interrupted()) throw new InterruptedException();
239 Object x = extract();
240 if (x != null)
241 return x;
242 else {
243 synchronized(takeGuard_) {
244 try {
245 for (;;) {
246 x = extract();
247 if (x != null) {
248 return x;
249 }
250 else {
251 takeGuard_.wait();
252 }
253 }
254 }
255 catch(InterruptedException ex) {
256 takeGuard_.notify();
257 throw ex;
258 }
259 }
260 }
261 }
262
263 public Object poll(long msecs) throws InterruptedException {
264 logger.debug("poll(msecs=" + msecs + ") - start");
265
266 if (Thread.interrupted()) throw new InterruptedException();
267 Object x = extract();
268 if (x != null)
269 return x;
270 else {
271 synchronized(takeGuard_) {
272 try {
273 long waitTime = msecs;
274 long start = (msecs <= 0)? 0: System.currentTimeMillis();
275 for (;;) {
276 x = extract();
277 if (x != null || waitTime <= 0) {
278 return x;
279 }
280 else {
281 takeGuard_.wait(waitTime);
282 waitTime = msecs - (System.currentTimeMillis() - start);
283 }
284 }
285 }
286 catch(InterruptedException ex) {
287 takeGuard_.notify();
288 throw ex;
289 }
290 }
291 }
292 }
293
294 /** Notify a waiting take if needed **/
295 protected final void allowTake() {
296 logger.debug("allowTake() - start");
297
298 synchronized(takeGuard_) {
299 takeGuard_.notify();
300 }
301 }
302
303
304 /**
305 * Create and insert a node.
306 * Call only under synch on putGuard_
307 **/
308 protected void insert(Object x) {
309 logger.debug("insert(x=" + x + ") - start");
310
311 --putSidePutPermits_;
312 LinkedNode p = new LinkedNode(x);
313 synchronized(last_) {
314 last_.next = p;
315 last_ = p;
316 }
317 }
318
319
320 /*
321 put and offer(ms) differ only in policy before insert/allowTake
322 */
323
324 public void put(Object x) throws InterruptedException {
325 logger.debug("put(x=" + x + ") - start");
326
327 if (x == null) throw new IllegalArgumentException();
328 if (Thread.interrupted()) throw new InterruptedException();
329
330 synchronized(putGuard_) {
331
332 if (putSidePutPermits_ <= 0) { // wait for permit.
333 synchronized(this) {
334 if (reconcilePutPermits() <= 0) {
335 try {
336 for(;;) {
337 wait();
338 if (reconcilePutPermits() > 0) {
339 break;
340 }
341 }
342 }
343 catch (InterruptedException ex) {
344 notify();
345 throw ex;
346 }
347 }
348 }
349 }
350 insert(x);
351 }
352 // call outside of lock to loosen put/take coupling
353 allowTake();
354 }
355
356 public boolean offer(Object x, long msecs) throws InterruptedException {
357 logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
358
359 if (x == null) throw new IllegalArgumentException();
360 if (Thread.interrupted()) throw new InterruptedException();
361
362 synchronized(putGuard_) {
363
364 if (putSidePutPermits_ <= 0) {
365 synchronized(this) {
366 if (reconcilePutPermits() <= 0) {
367 if (msecs <= 0)
368 return false;
369 else {
370 try {
371 long waitTime = msecs;
372 long start = System.currentTimeMillis();
373
374 for(;;) {
375 wait(waitTime);
376 if (reconcilePutPermits() > 0) {
377 break;
378 }
379 else {
380 waitTime = msecs - (System.currentTimeMillis() - start);
381 if (waitTime <= 0) {
382 return false;
383 }
384 }
385 }
386 }
387 catch (InterruptedException ex) {
388 notify();
389 throw ex;
390 }
391 }
392 }
393 }
394 }
395
396 insert(x);
397 }
398
399 allowTake();
400 return true;
401 }
402
403 public boolean isEmpty() {
404 logger.debug("isEmpty() - start");
405
406 synchronized(head_) {
407 return head_.next == null;
408 }
409 }
410
411 }