View Javadoc

1   /*
2    *
3    * The DbUnit Database Testing Framework
4    * Copyright (C)2002-2004, DbUnit.org
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2.1 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   *
20   */
21  package org.dbunit.dataset.stream;
22  
23  import org.dbunit.dataset.AbstractTable;
24  import org.dbunit.dataset.DataSetException;
25  import org.dbunit.dataset.ITable;
26  import org.dbunit.dataset.ITableIterator;
27  import org.dbunit.dataset.ITableMetaData;
28  import org.dbunit.dataset.RowOutOfBoundsException;
29  import org.dbunit.util.concurrent.BoundedBuffer;
30  import org.dbunit.util.concurrent.Channel;
31  import org.dbunit.util.concurrent.Puttable;
32  import org.dbunit.util.concurrent.Takable;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * Asynchronous table iterator that uses a new Thread for asynchronous processing.
38   * 
39   * @author Manuel Laflamme
40   * @author Last changed by: $Author: gommma $
41   * @version $Revision: 780 $ $Date: 2008-08-10 23:00:11 +0200 (dom, 10 ago 2008) $
42   * @since Apr 17, 2003
43   */
44  public class StreamingIterator implements ITableIterator
45  {
46  
47      /**
48       * Logger for this class
49       */
50      private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);
51  
52      private static final Object EOD = new Object(); // end of dataset marker
53  
54      private final Takable _channel;
55      private StreamingTable _activeTable;
56      private Object _taken = null;
57      private boolean _eod = false;
58      /**
59       * Variable to store an exception that might occur in the asynchronous consumer
60       */
61  	private Exception _asyncException;
62  
63  	
64      /**
65       * Iterator that creates a table iterator by reading the input from
66       * the given source in an asynchronous way. Therefore a Thread is
67       * created.
68       * @param source The source of the data
69       * @throws DataSetException
70       */
71      public StreamingIterator(IDataSetProducer source) throws DataSetException
72      {
73          Channel channel = new BoundedBuffer(30);
74          _channel = channel;
75  
76          AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
77          Thread thread = new Thread(consumer, "StreamingIterator");
78          thread.setDaemon(true);
79          thread.start();
80  
81          // Take first element from asynchronous handler
82          try
83          {
84              _taken = _channel.take();
85          }
86          catch (InterruptedException e)
87          {
88          	logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
89          	throw resolveException(e);
90          }
91      }
92  
93      private DataSetException resolveException(InterruptedException cause) throws DataSetException 
94      {
95      	String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
96      	if(this._asyncException != null)
97      	{
98              return new DataSetException(msg, this._asyncException);
99      	}
100     	else 
101     	{
102     		return new DataSetException(msg, cause);
103     	}
104 	}
105 
106 	////////////////////////////////////////////////////////////////////////////
107     // ITableIterator interface
108 
109     public boolean next() throws DataSetException
110     {
111         logger.debug("next() - start");
112 
113         // End of dataset has previously been reach
114         if (_eod)
115         {
116             return false;
117         }
118 
119         // Iterate to the end of current table.
120         while (_activeTable != null && _activeTable.next())
121             ;
122 
123         // End of dataset is reach
124         if (_taken == EOD)
125         {
126             _eod = true;
127             _activeTable = null;
128 
129             logger.debug("End of iterator.");
130             return false;
131         }
132 
133         // New table
134         if (_taken instanceof ITableMetaData)
135         {
136             _activeTable = new StreamingTable((ITableMetaData)_taken);
137             return true;
138         }
139 
140         throw new IllegalStateException(
141                 "Unexpected object taken from asyncronous handler: " + _taken);
142     }
143 
144     public ITableMetaData getTableMetaData() throws DataSetException
145     {
146         logger.debug("getTableMetaData() - start");
147 
148         return _activeTable.getTableMetaData();
149     }
150 
151     public ITable getTable() throws DataSetException
152     {
153         logger.debug("getTable() - start");
154 
155         return _activeTable;
156     }
157 
158 	private void handleException(Exception e)
159 	{
160 		// Is invoked when the asynchronous thread reports an exception
161 		this._asyncException = e;
162 	}
163 
164     ////////////////////////////////////////////////////////////////////////////
165     // StreamingTable class
166 
167     private class StreamingTable extends AbstractTable
168     {
169 
170         /**
171          * Logger for this class
172          */
173         private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);
174 
175         private ITableMetaData _metaData;
176         private int _lastRow = -1;
177         private boolean _eot = false;
178         private Object[] _rowValues;
179 
180         public StreamingTable(ITableMetaData metaData)
181         {
182             _metaData = metaData;
183         }
184 
185         boolean next() throws DataSetException
186         {
187             logger.debug("next() - start");
188 
189             // End of table has previously been reach
190             if (_eot)
191             {
192                 return false;
193             }
194 
195             try
196             {
197                 _taken = _channel.take();
198                 if (!(_taken instanceof Object[]))
199                 {
200                     _eot = true;
201                     return false;
202                 }
203 
204                 _lastRow++;
205                 _rowValues = (Object[])_taken;
206                 return true;
207             }
208             catch (InterruptedException e)
209             {
210             	throw resolveException(e);
211             }
212         }
213 
214         ////////////////////////////////////////////////////////////////////////
215         // ITable interface
216 
217         public ITableMetaData getTableMetaData()
218         {
219             logger.debug("getTableMetaData() - start");
220 
221             return _metaData;
222         }
223 
224         public int getRowCount()
225         {
226             logger.debug("getRowCount() - start");
227 
228             throw new UnsupportedOperationException();
229         }
230 
231         public Object getValue(int row, String columnName) throws DataSetException
232         {
233             if(logger.isDebugEnabled())
234                 logger.debug("getValue(row={}, columnName={}) - start", Integer.toString(row), columnName);
235 
236             // Iterate up to specified row
237             while (!_eot && row > _lastRow)
238             {
239                 next();
240             }
241 
242             if (row < _lastRow)
243             {
244                 throw new UnsupportedOperationException("Cannot go backward!");
245             }
246 
247             if (_eot || row > _lastRow)
248             {
249                 throw new RowOutOfBoundsException(row + " > " + _lastRow);
250             }
251 
252             return _rowValues[getColumnIndex(columnName)];
253         }
254 
255     }
256 
257     ////////////////////////////////////////////////////////////////////////////
258     // AsynchronousConsumer class
259 
260     private static class AsynchronousConsumer implements Runnable, IDataSetConsumer
261     {
262 
263         /**
264          * Logger for this class
265          */
266         private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);
267 
268         private final IDataSetProducer _producer;
269         private final Puttable _channel;
270         private final StreamingIterator _exceptionHandler;
271         private final Thread _invokerThread;
272 
273         public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler)
274         {
275             _producer = source;
276             _channel = channel;
277             _exceptionHandler = exceptionHandler;
278             _invokerThread = Thread.currentThread();
279         }
280 
281         ////////////////////////////////////////////////////////////////////////
282         // Runnable interface
283 
284         public void run()
285         {
286             logger.debug("run() - start");
287 
288             try
289             {
290                 _producer.setConsumer(this);
291                 _producer.produce();
292             }
293             catch (Exception e)
294             {
295             	_exceptionHandler.handleException(e);
296             	// Since the invoker thread probably waits tell it that we have finished here
297             	_invokerThread.interrupt();
298             }
299             
300             logger.debug("End of thread " + Thread.currentThread());
301         }
302 
303         ////////////////////////////////////////////////////////////////////////
304         // IDataSetConsumer interface
305 
306         public void startDataSet() throws DataSetException
307         {
308         }
309 
310         public void endDataSet() throws DataSetException
311         {
312             logger.debug("endDataSet() - start");
313 
314             try
315             {
316                 _channel.put(EOD);
317             }
318             catch (InterruptedException e)
319             {
320                 throw new DataSetException("Operation was interrupted");
321             }
322         }
323 
324         public void startTable(ITableMetaData metaData) throws DataSetException
325         {
326             logger.debug("startTable(metaData={}) - start", metaData);
327 
328             try
329             {
330                 _channel.put(metaData);
331             }
332             catch (InterruptedException e)
333             {
334                 throw new DataSetException("Operation was interrupted");
335             }
336         }
337 
338         public void endTable() throws DataSetException
339         {
340         }
341 
342         public void row(Object[] values) throws DataSetException
343         {
344             logger.debug("row(values={}) - start", values);
345 
346             try
347             {
348                 _channel.put(values);
349             }
350             catch (InterruptedException e)
351             {
352                 throw new DataSetException("Operation was interrupted");
353             }
354         }
355     }
356 
357 }