1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.dbunit.dataset.stream;
22
23 import org.dbunit.dataset.AbstractTable;
24 import org.dbunit.dataset.DataSetException;
25 import org.dbunit.dataset.ITable;
26 import org.dbunit.dataset.ITableIterator;
27 import org.dbunit.dataset.ITableMetaData;
28 import org.dbunit.dataset.RowOutOfBoundsException;
29 import org.dbunit.util.concurrent.BoundedBuffer;
30 import org.dbunit.util.concurrent.Channel;
31 import org.dbunit.util.concurrent.Puttable;
32 import org.dbunit.util.concurrent.Takable;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44 public class StreamingIterator implements ITableIterator
45 {
46
47
48
49
50 private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);
51
52 private static final Object EOD = new Object();
53
54 private final Takable _channel;
55 private StreamingTable _activeTable;
56 private Object _taken = null;
57 private boolean _eod = false;
58
59
60
61 private Exception _asyncException;
62
63
64
65
66
67
68
69
70
71 public StreamingIterator(IDataSetProducer source) throws DataSetException
72 {
73 Channel channel = new BoundedBuffer(30);
74 _channel = channel;
75
76 AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
77 Thread thread = new Thread(consumer, "StreamingIterator");
78 thread.setDaemon(true);
79 thread.start();
80
81
82 try
83 {
84 _taken = _channel.take();
85 }
86 catch (InterruptedException e)
87 {
88 logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
89 throw resolveException(e);
90 }
91 }
92
93 private DataSetException resolveException(InterruptedException cause) throws DataSetException
94 {
95 String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
96 if(this._asyncException != null)
97 {
98 return new DataSetException(msg, this._asyncException);
99 }
100 else
101 {
102 return new DataSetException(msg, cause);
103 }
104 }
105
106
107
108
109 public boolean next() throws DataSetException
110 {
111 logger.debug("next() - start");
112
113
114 if (_eod)
115 {
116 return false;
117 }
118
119
120 while (_activeTable != null && _activeTable.next())
121 ;
122
123
124 if (_taken == EOD)
125 {
126 _eod = true;
127 _activeTable = null;
128
129 logger.debug("End of iterator.");
130 return false;
131 }
132
133
134 if (_taken instanceof ITableMetaData)
135 {
136 _activeTable = new StreamingTable((ITableMetaData)_taken);
137 return true;
138 }
139
140 throw new IllegalStateException(
141 "Unexpected object taken from asyncronous handler: " + _taken);
142 }
143
144 public ITableMetaData getTableMetaData() throws DataSetException
145 {
146 logger.debug("getTableMetaData() - start");
147
148 return _activeTable.getTableMetaData();
149 }
150
151 public ITable getTable() throws DataSetException
152 {
153 logger.debug("getTable() - start");
154
155 return _activeTable;
156 }
157
158 private void handleException(Exception e)
159 {
160
161 this._asyncException = e;
162 }
163
164
165
166
167 private class StreamingTable extends AbstractTable
168 {
169
170
171
172
173 private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);
174
175 private ITableMetaData _metaData;
176 private int _lastRow = -1;
177 private boolean _eot = false;
178 private Object[] _rowValues;
179
180 public StreamingTable(ITableMetaData metaData)
181 {
182 _metaData = metaData;
183 }
184
185 boolean next() throws DataSetException
186 {
187 logger.debug("next() - start");
188
189
190 if (_eot)
191 {
192 return false;
193 }
194
195 try
196 {
197 _taken = _channel.take();
198 if (!(_taken instanceof Object[]))
199 {
200 _eot = true;
201 return false;
202 }
203
204 _lastRow++;
205 _rowValues = (Object[])_taken;
206 return true;
207 }
208 catch (InterruptedException e)
209 {
210 throw resolveException(e);
211 }
212 }
213
214
215
216
217 public ITableMetaData getTableMetaData()
218 {
219 logger.debug("getTableMetaData() - start");
220
221 return _metaData;
222 }
223
224 public int getRowCount()
225 {
226 logger.debug("getRowCount() - start");
227
228 throw new UnsupportedOperationException();
229 }
230
231 public Object getValue(int row, String columnName) throws DataSetException
232 {
233 if(logger.isDebugEnabled())
234 logger.debug("getValue(row={}, columnName={}) - start", Integer.toString(row), columnName);
235
236
237 while (!_eot && row > _lastRow)
238 {
239 next();
240 }
241
242 if (row < _lastRow)
243 {
244 throw new UnsupportedOperationException("Cannot go backward!");
245 }
246
247 if (_eot || row > _lastRow)
248 {
249 throw new RowOutOfBoundsException(row + " > " + _lastRow);
250 }
251
252 return _rowValues[getColumnIndex(columnName)];
253 }
254
255 }
256
257
258
259
260 private static class AsynchronousConsumer implements Runnable, IDataSetConsumer
261 {
262
263
264
265
266 private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);
267
268 private final IDataSetProducer _producer;
269 private final Puttable _channel;
270 private final StreamingIterator _exceptionHandler;
271 private final Thread _invokerThread;
272
273 public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler)
274 {
275 _producer = source;
276 _channel = channel;
277 _exceptionHandler = exceptionHandler;
278 _invokerThread = Thread.currentThread();
279 }
280
281
282
283
284 public void run()
285 {
286 logger.debug("run() - start");
287
288 try
289 {
290 _producer.setConsumer(this);
291 _producer.produce();
292 }
293 catch (Exception e)
294 {
295 _exceptionHandler.handleException(e);
296
297 _invokerThread.interrupt();
298 }
299
300 logger.debug("End of thread " + Thread.currentThread());
301 }
302
303
304
305
306 public void startDataSet() throws DataSetException
307 {
308 }
309
310 public void endDataSet() throws DataSetException
311 {
312 logger.debug("endDataSet() - start");
313
314 try
315 {
316 _channel.put(EOD);
317 }
318 catch (InterruptedException e)
319 {
320 throw new DataSetException("Operation was interrupted");
321 }
322 }
323
324 public void startTable(ITableMetaData metaData) throws DataSetException
325 {
326 logger.debug("startTable(metaData={}) - start", metaData);
327
328 try
329 {
330 _channel.put(metaData);
331 }
332 catch (InterruptedException e)
333 {
334 throw new DataSetException("Operation was interrupted");
335 }
336 }
337
338 public void endTable() throws DataSetException
339 {
340 }
341
342 public void row(Object[] values) throws DataSetException
343 {
344 logger.debug("row(values={}) - start", values);
345
346 try
347 {
348 _channel.put(values);
349 }
350 catch (InterruptedException e)
351 {
352 throw new DataSetException("Operation was interrupted");
353 }
354 }
355 }
356
357 }