add interface to new firehose as per code review comments

This commit is contained in:
fjy 2013-09-30 18:00:59 -07:00
parent f55a5199b1
commit 53698a135a
6 changed files with 102 additions and 62 deletions

View File

@ -32,6 +32,8 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.StringInputRowParser;
import io.druid.segment.realtime.firehose.FileIteratingFirehose;
import io.druid.segment.realtime.firehose.LineIteratorFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.jets3t.service.S3Service;
@ -91,47 +93,48 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
Preconditions.checkNotNull(s3Client, "null s3Client");
return new FileIteratingFirehose<URI>(
new LineIteratorFactory<URI>()
{
@Override
public LineIterator make(URI nextURI) throws Exception
{
final String s3Bucket = nextURI.getAuthority();
final S3Object s3Object = new S3Object(
nextURI.getPath().startsWith("/")
? nextURI.getPath().substring(1)
: nextURI.getPath()
);
log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI);
try {
final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey())
.getDataInputStream();
final InputStream outerInputStream = s3Object.getKey().endsWith(".gz")
? new GZIPInputStream(innerInputStream)
: innerInputStream;
return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(outerInputStream, Charsets.UTF_8)
)
);
}
catch (IOException e) {
log.error(
e,
"Exception reading from bucket[%s] object[%s]",
s3Bucket,
s3Object.getKey()
);
throw Throwables.propagate(e);
}
}
},
Lists.newLinkedList(uris),
parser
)
{
@Override
public LineIterator makeLineIterator(URI nextURI) throws Exception
{
final String s3Bucket = nextURI.getAuthority();
final S3Object s3Object = new S3Object(
nextURI.getPath().startsWith("/")
? nextURI.getPath().substring(1)
: nextURI.getPath()
);
log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI);
try {
final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey())
.getDataInputStream();
final InputStream outerInputStream = s3Object.getKey().endsWith(".gz")
? new GZIPInputStream(innerInputStream)
: innerInputStream;
return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(outerInputStream, Charsets.UTF_8)
)
);
}
catch (IOException e) {
log.error(
e,
"Exception reading from bucket[%s] object[%s]",
s3Bucket,
s3Object.getKey()
);
throw Throwables.propagate(e);
}
}
};
);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.google.common.base.Throwables;
import io.druid.common.guava.Runnables;
@ -32,27 +32,32 @@ import java.util.Queue;
/**
*/
public abstract class FileIteratingFirehose<T> implements Firehose
public class FileIteratingFirehose<T> implements Firehose
{
private final LineIteratorFactory<T> lineIteratorFactory;
private final Queue<T> objectQueue;
private final StringInputRowParser parser;
private LineIterator lineIterator = null;
protected FileIteratingFirehose(Queue<T> objectQueue, StringInputRowParser parser)
public FileIteratingFirehose(
LineIteratorFactory lineIteratorFactory,
Queue<T> objectQueue,
StringInputRowParser parser
)
{
this.lineIteratorFactory = lineIteratorFactory;
this.objectQueue = objectQueue;
this.parser = parser;
}
public abstract LineIterator makeLineIterator(T val) throws Exception;
@Override
public boolean hasMore()
{
try {
nextFile();
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -64,11 +69,12 @@ public abstract class FileIteratingFirehose<T> implements Firehose
{
try {
nextFile();
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
if(lineIterator == null) {
if (lineIterator == null) {
throw new NoSuchElementException();
}
@ -85,7 +91,7 @@ public abstract class FileIteratingFirehose<T> implements Firehose
public void close() throws IOException
{
objectQueue.clear();
if(lineIterator != null) {
if (lineIterator != null) {
lineIterator.close();
}
}
@ -106,8 +112,9 @@ public abstract class FileIteratingFirehose<T> implements Firehose
if (nextObj != null) {
try {
lineIterator = makeLineIterator(nextObj);
} catch (Exception e) {
lineIterator = lineIteratorFactory.make(nextObj);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

View File

@ -0,0 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import org.apache.commons.io.LineIterator;
/**
*/
public interface LineIteratorFactory<T>
{
public LineIterator make(T val) throws Exception;
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.index;
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -75,6 +75,14 @@ public class LocalFirehoseFactory implements FirehoseFactory
public Firehose connect() throws IOException
{
return new FileIteratingFirehose<File>(
new LineIteratorFactory<File>()
{
@Override
public LineIterator make(File file) throws Exception
{
return FileUtils.lineIterator(file);
}
},
Lists.<File>newLinkedList(
Arrays.<File>asList(
baseDir.listFiles(
@ -90,13 +98,6 @@ public class LocalFirehoseFactory implements FirehoseFactory
)
),
parser
)
{
@Override
public LineIterator makeLineIterator(File file) throws Exception
{
return FileUtils.lineIterator(file);
}
};
);
}
}

View File

@ -53,7 +53,7 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.indexing.common.index.NoopChatHandlerProvider;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.indexing.coordinator.TaskRunner;

View File

@ -28,7 +28,7 @@ import druid.examples.rand.RandomFirehoseFactory;
import druid.examples.twitter.TwitterSpritzerFirehoseFactory;
import druid.examples.web.WebFirehoseFactory;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.indexing.common.index.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.indexing.common.index.StaticS3FirehoseFactory;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;