From 53698a135a07c755f84b637e547c09fcb1f1d5c9 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 30 Sep 2013 18:00:59 -0700 Subject: [PATCH] add interface to new firehose as per code review comments --- .../common/index/StaticS3FirehoseFactory.java | 83 ++++++++++--------- .../firehose}/FileIteratingFirehose.java | 29 ++++--- .../firehose/LineIteratorFactory.java | 29 +++++++ .../firehose}/LocalFirehoseFactory.java | 19 +++-- .../src/main/java/io/druid/cli/CliPeon.java | 2 +- .../java/io/druid/guice/FirehoseModule.java | 2 +- 6 files changed, 102 insertions(+), 62 deletions(-) rename {indexing-service/src/main/java/io/druid/indexing/common/index => realtime/src/main/java/io/druid/segment/realtime/firehose}/FileIteratingFirehose.java (80%) create mode 100644 realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java rename {indexing-service/src/main/java/io/druid/indexing/common/index => realtime/src/main/java/io/druid/segment/realtime/firehose}/LocalFirehoseFactory.java (90%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java index 6c099b9683f..3337c3b2edc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java @@ -32,6 +32,8 @@ import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.StringInputRowParser; +import io.druid.segment.realtime.firehose.FileIteratingFirehose; +import io.druid.segment.realtime.firehose.LineIteratorFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.jets3t.service.S3Service; @@ -91,47 +93,48 @@ public class StaticS3FirehoseFactory implements FirehoseFactory Preconditions.checkNotNull(s3Client, "null s3Client"); return new FileIteratingFirehose( + new LineIteratorFactory() + { + @Override + public LineIterator make(URI nextURI) throws Exception + { + final String s3Bucket = nextURI.getAuthority(); + final S3Object s3Object = new S3Object( + nextURI.getPath().startsWith("/") + ? nextURI.getPath().substring(1) + : nextURI.getPath() + ); + + log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); + + try { + final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) + .getDataInputStream(); + + final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") + ? new GZIPInputStream(innerInputStream) + : innerInputStream; + + return IOUtils.lineIterator( + new BufferedReader( + new InputStreamReader(outerInputStream, Charsets.UTF_8) + ) + ); + } + catch (IOException e) { + log.error( + e, + "Exception reading from bucket[%s] object[%s]", + s3Bucket, + s3Object.getKey() + ); + + throw Throwables.propagate(e); + } + } + }, Lists.newLinkedList(uris), parser - ) - { - @Override - public LineIterator makeLineIterator(URI nextURI) throws Exception - { - final String s3Bucket = nextURI.getAuthority(); - final S3Object s3Object = new S3Object( - nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath() - ); - - log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); - - try { - final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) - .getDataInputStream(); - - final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") - ? new GZIPInputStream(innerInputStream) - : innerInputStream; - - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } - catch (IOException e) { - log.error( - e, - "Exception reading from bucket[%s] object[%s]", - s3Bucket, - s3Object.getKey() - ); - - throw Throwables.propagate(e); - } - } - }; + ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java similarity index 80% rename from indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java rename to realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java index b6f1070d4bf..1b1018d29d4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.google.common.base.Throwables; import io.druid.common.guava.Runnables; @@ -32,27 +32,32 @@ import java.util.Queue; /** */ -public abstract class FileIteratingFirehose implements Firehose +public class FileIteratingFirehose implements Firehose { + private final LineIteratorFactory lineIteratorFactory; private final Queue objectQueue; private final StringInputRowParser parser; private LineIterator lineIterator = null; - protected FileIteratingFirehose(Queue objectQueue, StringInputRowParser parser) + public FileIteratingFirehose( + LineIteratorFactory lineIteratorFactory, + Queue objectQueue, + StringInputRowParser parser + ) { + this.lineIteratorFactory = lineIteratorFactory; this.objectQueue = objectQueue; this.parser = parser; } - public abstract LineIterator makeLineIterator(T val) throws Exception; - @Override public boolean hasMore() { try { nextFile(); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -64,11 +69,12 @@ public abstract class FileIteratingFirehose implements Firehose { try { nextFile(); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } - if(lineIterator == null) { + if (lineIterator == null) { throw new NoSuchElementException(); } @@ -85,7 +91,7 @@ public abstract class FileIteratingFirehose implements Firehose public void close() throws IOException { objectQueue.clear(); - if(lineIterator != null) { + if (lineIterator != null) { lineIterator.close(); } } @@ -106,8 +112,9 @@ public abstract class FileIteratingFirehose implements Firehose if (nextObj != null) { try { - lineIterator = makeLineIterator(nextObj); - } catch (Exception e) { + lineIterator = lineIteratorFactory.make(nextObj); + } + catch (Exception e) { throw Throwables.propagate(e); } } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java new file mode 100644 index 00000000000..eb0c6f2d33b --- /dev/null +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java @@ -0,0 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.realtime.firehose; + +import org.apache.commons.io.LineIterator; + +/** + */ +public interface LineIteratorFactory +{ + public LineIterator make(T val) throws Exception; +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java similarity index 90% rename from indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java rename to realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 9918073aa6d..0e5397cf701 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -75,6 +75,14 @@ public class LocalFirehoseFactory implements FirehoseFactory public Firehose connect() throws IOException { return new FileIteratingFirehose( + new LineIteratorFactory() + { + @Override + public LineIterator make(File file) throws Exception + { + return FileUtils.lineIterator(file); + } + }, Lists.newLinkedList( Arrays.asList( baseDir.listFiles( @@ -90,13 +98,6 @@ public class LocalFirehoseFactory implements FirehoseFactory ) ), parser - ) - { - @Override - public LineIterator makeLineIterator(File file) throws Exception - { - return FileUtils.lineIterator(file); - } - }; + ); } } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 77e140bbd8f..960c0b2fd67 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -53,7 +53,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; -import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.TaskRunner; diff --git a/services/src/main/java/io/druid/guice/FirehoseModule.java b/services/src/main/java/io/druid/guice/FirehoseModule.java index 5026d93e752..8312149f1c1 100644 --- a/services/src/main/java/io/druid/guice/FirehoseModule.java +++ b/services/src/main/java/io/druid/guice/FirehoseModule.java @@ -28,7 +28,7 @@ import druid.examples.rand.RandomFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory; import druid.examples.web.WebFirehoseFactory; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;