diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java index 3337c3b2edc..69bdfa8d5ab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java @@ -33,7 +33,6 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.StringInputRowParser; import io.druid.segment.realtime.firehose.FileIteratingFirehose; -import io.druid.segment.realtime.firehose.LineIteratorFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.jets3t.service.S3Service; @@ -44,6 +43,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.zip.GZIPInputStream; @@ -92,12 +93,22 @@ public class StaticS3FirehoseFactory implements FirehoseFactory { Preconditions.checkNotNull(s3Client, "null s3Client"); - return new FileIteratingFirehose( - new LineIteratorFactory() + final LinkedList objectQueue = Lists.newLinkedList(uris); + + return new FileIteratingFirehose( + new Iterator() { @Override - public LineIterator make(URI nextURI) throws Exception + public boolean hasNext() { + return !objectQueue.isEmpty(); + } + + @Override + public LineIterator next() + { + final URI nextURI = objectQueue.poll(); + final String s3Bucket = nextURI.getAuthority(); final S3Object s3Object = new S3Object( nextURI.getPath().startsWith("/") @@ -121,7 +132,7 @@ public class StaticS3FirehoseFactory implements FirehoseFactory ) ); } - catch (IOException e) { + catch (Exception e) { log.error( e, "Exception reading from bucket[%s] object[%s]", @@ -132,8 +143,13 @@ public class StaticS3FirehoseFactory implements FirehoseFactory throw Throwables.propagate(e); } } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } }, - Lists.newLinkedList(uris), parser ); } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java index 1b1018d29d4..a8a6be14002 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java @@ -27,27 +27,25 @@ import io.druid.data.input.StringInputRowParser; import org.apache.commons.io.LineIterator; import java.io.IOException; +import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; /** */ -public class FileIteratingFirehose implements Firehose +public class FileIteratingFirehose implements Firehose { - private final LineIteratorFactory lineIteratorFactory; - private final Queue objectQueue; + private final Iterator lineIterators; private final StringInputRowParser parser; private LineIterator lineIterator = null; public FileIteratingFirehose( - LineIteratorFactory lineIteratorFactory, - Queue objectQueue, + Iterator lineIterators, StringInputRowParser parser ) { - this.lineIteratorFactory = lineIteratorFactory; - this.objectQueue = objectQueue; + this.lineIterators = lineIterators; this.parser = parser; } @@ -55,30 +53,31 @@ public class FileIteratingFirehose implements Firehose public boolean hasMore() { try { - nextFile(); + return lineIterators.hasNext() || (lineIterator != null && lineIterator.hasNext()); } catch (Exception e) { throw Throwables.propagate(e); } - - return lineIterator != null && lineIterator.hasNext(); } @Override public InputRow nextRow() { try { - nextFile(); + if (lineIterator == null || !lineIterator.hasNext()) { + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + + lineIterator = lineIterators.next(); + } + + return parser.parse(lineIterator.next()); } catch (Exception e) { throw Throwables.propagate(e); } - - if (lineIterator == null) { - throw new NoSuchElementException(); - } - - return parser.parse(lineIterator.next()); } @Override @@ -90,34 +89,8 @@ public class FileIteratingFirehose implements Firehose @Override public void close() throws IOException { - objectQueue.clear(); if (lineIterator != null) { lineIterator.close(); } } - - // Rolls over our streams and iterators to the next file, if appropriate - private void nextFile() throws Exception - { - - if (lineIterator == null || !lineIterator.hasNext()) { - - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } - - // Open new streams, maybe. - final T nextObj = objectQueue.poll(); - if (nextObj != null) { - - try { - lineIterator = lineIteratorFactory.make(nextObj); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - } } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java deleted file mode 100644 index eb0c6f2d33b..00000000000 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.realtime.firehose; - -import org.apache.commons.io.LineIterator; - -/** - */ -public interface LineIteratorFactory -{ - public LineIterator make(T val) throws Exception; -} diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 0e5397cf701..eccf17ff206 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -32,6 +33,8 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; /** */ @@ -74,29 +77,47 @@ public class LocalFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - return new FileIteratingFirehose( - new LineIteratorFactory() + final LinkedList files = Lists.newLinkedList( + Arrays.asList( + baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } + ) + ) + ); + + return new FileIteratingFirehose( + new Iterator() { @Override - public LineIterator make(File file) throws Exception + public boolean hasNext() { - return FileUtils.lineIterator(file); + return !files.isEmpty(); + } + + @Override + public LineIterator next() + { + try { + return FileUtils.lineIterator(files.poll()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); } }, - Lists.newLinkedList( - Arrays.asList( - baseDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File file, String name) - { - return name.contains(filter); - } - } - ) - ) - ), parser ); }