1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.dbunit.util.concurrent;
18
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 public class LinkedQueue implements Channel {
40
41
42
43
44 private static final Logger logger = LoggerFactory.getLogger(LinkedQueue.class);
45
46
47
48
49
50
51 protected LinkedNode head_;
52
53
54
55
56 protected final Object putLock_ = new Object();
57
58
59
60
61 protected LinkedNode last_;
62
63
64
65
66
67
68
69
70 protected int waitingForTake_ = 0;
71
72 public LinkedQueue() {
73 head_ = new LinkedNode(null);
74 last_ = head_;
75 }
76
77
78 protected void insert(Object x) {
79 logger.debug("insert(x=" + x + ") - start");
80
81 synchronized(putLock_) {
82 LinkedNode p = new LinkedNode(x);
83 synchronized(last_) {
84 last_.next = p;
85 last_ = p;
86 }
87 if (waitingForTake_ > 0)
88 putLock_.notify();
89 }
90 }
91
92
93 protected synchronized Object extract() {
94 logger.debug("extract() - start");
95
96 synchronized(head_) {
97 Object x = null;
98 LinkedNode first = head_.next;
99 if (first != null) {
100 x = first.value;
101 first.value = null;
102 head_ = first;
103 }
104 return x;
105 }
106 }
107
108
109 public void put(Object x) throws InterruptedException {
110 logger.debug("put(x=" + x + ") - start");
111
112 if (x == null) throw new IllegalArgumentException();
113 if (Thread.interrupted()) throw new InterruptedException();
114 insert(x);
115 }
116
117 public boolean offer(Object x, long msecs) throws InterruptedException {
118 logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
119
120 if (x == null) throw new IllegalArgumentException();
121 if (Thread.interrupted()) throw new InterruptedException();
122 insert(x);
123 return true;
124 }
125
126 public Object take() throws InterruptedException {
127 logger.debug("take() - start");
128
129 if (Thread.interrupted()) throw new InterruptedException();
130
131 Object x = extract();
132 if (x != null)
133 return x;
134 else {
135 synchronized(putLock_) {
136 try {
137 ++waitingForTake_;
138 for (;;) {
139 x = extract();
140 if (x != null) {
141 --waitingForTake_;
142 return x;
143 }
144 else {
145 putLock_.wait();
146 }
147 }
148 }
149 catch(InterruptedException ex) {
150 --waitingForTake_;
151 putLock_.notify();
152 throw ex;
153 }
154 }
155 }
156 }
157
158 public Object peek() {
159 logger.debug("peek() - start");
160
161 synchronized(head_) {
162 LinkedNode first = head_.next;
163 if (first != null)
164 return first.value;
165 else
166 return null;
167 }
168 }
169
170
171 public boolean isEmpty() {
172 logger.debug("isEmpty() - start");
173
174 synchronized(head_) {
175 return head_.next == null;
176 }
177 }
178
179 public Object poll(long msecs) throws InterruptedException {
180 logger.debug("poll(msecs=" + msecs + ") - start");
181
182 if (Thread.interrupted()) throw new InterruptedException();
183 Object x = extract();
184 if (x != null)
185 return x;
186 else {
187 synchronized(putLock_) {
188 try {
189 long waitTime = msecs;
190 long start = (msecs <= 0)? 0 : System.currentTimeMillis();
191 ++waitingForTake_;
192 for (;;) {
193 x = extract();
194 if (x != null || waitTime <= 0) {
195 --waitingForTake_;
196 return x;
197 }
198 else {
199 putLock_.wait(waitTime);
200 waitTime = msecs - (System.currentTimeMillis() - start);
201 }
202 }
203 }
204 catch(InterruptedException ex) {
205 --waitingForTake_;
206 putLock_.notify();
207 throw ex;
208 }
209 }
210 }
211 }
212 }
213
214