mirror of https://github.com/apache/druid.git
remove line iterator factory because it is not needed
This commit is contained in:
parent
5d0d71250b
commit
30df53671e
|
@ -33,7 +33,6 @@ import io.druid.data.input.Firehose;
|
||||||
import io.druid.data.input.FirehoseFactory;
|
import io.druid.data.input.FirehoseFactory;
|
||||||
import io.druid.data.input.StringInputRowParser;
|
import io.druid.data.input.StringInputRowParser;
|
||||||
import io.druid.segment.realtime.firehose.FileIteratingFirehose;
|
import io.druid.segment.realtime.firehose.FileIteratingFirehose;
|
||||||
import io.druid.segment.realtime.firehose.LineIteratorFactory;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.io.LineIterator;
|
import org.apache.commons.io.LineIterator;
|
||||||
import org.jets3t.service.S3Service;
|
import org.jets3t.service.S3Service;
|
||||||
|
@ -44,6 +43,8 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
|
@ -92,12 +93,22 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(s3Client, "null s3Client");
|
Preconditions.checkNotNull(s3Client, "null s3Client");
|
||||||
|
|
||||||
return new FileIteratingFirehose<URI>(
|
final LinkedList<URI> objectQueue = Lists.newLinkedList(uris);
|
||||||
new LineIteratorFactory<URI>()
|
|
||||||
|
return new FileIteratingFirehose(
|
||||||
|
new Iterator<LineIterator>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public LineIterator make(URI nextURI) throws Exception
|
public boolean hasNext()
|
||||||
{
|
{
|
||||||
|
return !objectQueue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LineIterator next()
|
||||||
|
{
|
||||||
|
final URI nextURI = objectQueue.poll();
|
||||||
|
|
||||||
final String s3Bucket = nextURI.getAuthority();
|
final String s3Bucket = nextURI.getAuthority();
|
||||||
final S3Object s3Object = new S3Object(
|
final S3Object s3Object = new S3Object(
|
||||||
nextURI.getPath().startsWith("/")
|
nextURI.getPath().startsWith("/")
|
||||||
|
@ -121,7 +132,7 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (Exception e) {
|
||||||
log.error(
|
log.error(
|
||||||
e,
|
e,
|
||||||
"Exception reading from bucket[%s] object[%s]",
|
"Exception reading from bucket[%s] object[%s]",
|
||||||
|
@ -132,8 +143,13 @@ public class StaticS3FirehoseFactory implements FirehoseFactory
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Lists.newLinkedList(uris),
|
|
||||||
parser
|
parser
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,27 +27,25 @@ import io.druid.data.input.StringInputRowParser;
|
||||||
import org.apache.commons.io.LineIterator;
|
import org.apache.commons.io.LineIterator;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class FileIteratingFirehose<T> implements Firehose
|
public class FileIteratingFirehose implements Firehose
|
||||||
{
|
{
|
||||||
private final LineIteratorFactory<T> lineIteratorFactory;
|
private final Iterator<LineIterator> lineIterators;
|
||||||
private final Queue<T> objectQueue;
|
|
||||||
private final StringInputRowParser parser;
|
private final StringInputRowParser parser;
|
||||||
|
|
||||||
private LineIterator lineIterator = null;
|
private LineIterator lineIterator = null;
|
||||||
|
|
||||||
public FileIteratingFirehose(
|
public FileIteratingFirehose(
|
||||||
LineIteratorFactory lineIteratorFactory,
|
Iterator<LineIterator> lineIterators,
|
||||||
Queue<T> objectQueue,
|
|
||||||
StringInputRowParser parser
|
StringInputRowParser parser
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.lineIteratorFactory = lineIteratorFactory;
|
this.lineIterators = lineIterators;
|
||||||
this.objectQueue = objectQueue;
|
|
||||||
this.parser = parser;
|
this.parser = parser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,30 +53,31 @@ public class FileIteratingFirehose<T> implements Firehose
|
||||||
public boolean hasMore()
|
public boolean hasMore()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
nextFile();
|
return lineIterators.hasNext() || (lineIterator != null && lineIterator.hasNext());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return lineIterator != null && lineIterator.hasNext();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InputRow nextRow()
|
public InputRow nextRow()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
nextFile();
|
if (lineIterator == null || !lineIterator.hasNext()) {
|
||||||
|
// Close old streams, maybe.
|
||||||
|
if (lineIterator != null) {
|
||||||
|
lineIterator.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
lineIterator = lineIterators.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
return parser.parse(lineIterator.next());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lineIterator == null) {
|
|
||||||
throw new NoSuchElementException();
|
|
||||||
}
|
|
||||||
|
|
||||||
return parser.parse(lineIterator.next());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -90,34 +89,8 @@ public class FileIteratingFirehose<T> implements Firehose
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException
|
public void close() throws IOException
|
||||||
{
|
{
|
||||||
objectQueue.clear();
|
|
||||||
if (lineIterator != null) {
|
if (lineIterator != null) {
|
||||||
lineIterator.close();
|
lineIterator.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rolls over our streams and iterators to the next file, if appropriate
|
|
||||||
private void nextFile() throws Exception
|
|
||||||
{
|
|
||||||
|
|
||||||
if (lineIterator == null || !lineIterator.hasNext()) {
|
|
||||||
|
|
||||||
// Close old streams, maybe.
|
|
||||||
if (lineIterator != null) {
|
|
||||||
lineIterator.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open new streams, maybe.
|
|
||||||
final T nextObj = objectQueue.poll();
|
|
||||||
if (nextObj != null) {
|
|
||||||
|
|
||||||
try {
|
|
||||||
lineIterator = lineIteratorFactory.make(nextObj);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
/*
|
|
||||||
* Druid - a distributed column store.
|
|
||||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
|
||||||
*
|
|
||||||
* This program is free software; you can redistribute it and/or
|
|
||||||
* modify it under the terms of the GNU General Public License
|
|
||||||
* as published by the Free Software Foundation; either version 2
|
|
||||||
* of the License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU General Public License
|
|
||||||
* along with this program; if not, write to the Free Software
|
|
||||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.segment.realtime.firehose;
|
|
||||||
|
|
||||||
import org.apache.commons.io.LineIterator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public interface LineIteratorFactory<T>
|
|
||||||
{
|
|
||||||
public LineIterator make(T val) throws Exception;
|
|
||||||
}
|
|
|
@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import io.druid.data.input.Firehose;
|
import io.druid.data.input.Firehose;
|
||||||
import io.druid.data.input.FirehoseFactory;
|
import io.druid.data.input.FirehoseFactory;
|
||||||
|
@ -32,6 +33,8 @@ import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -74,29 +77,47 @@ public class LocalFirehoseFactory implements FirehoseFactory
|
||||||
@Override
|
@Override
|
||||||
public Firehose connect() throws IOException
|
public Firehose connect() throws IOException
|
||||||
{
|
{
|
||||||
return new FileIteratingFirehose<File>(
|
final LinkedList<File> files = Lists.<File>newLinkedList(
|
||||||
new LineIteratorFactory<File>()
|
Arrays.<File>asList(
|
||||||
|
baseDir.listFiles(
|
||||||
|
new FilenameFilter()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean accept(File file, String name)
|
||||||
|
{
|
||||||
|
return name.contains(filter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
return new FileIteratingFirehose(
|
||||||
|
new Iterator<LineIterator>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public LineIterator make(File file) throws Exception
|
public boolean hasNext()
|
||||||
{
|
{
|
||||||
return FileUtils.lineIterator(file);
|
return !files.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LineIterator next()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return FileUtils.lineIterator(files.poll());
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Lists.<File>newLinkedList(
|
|
||||||
Arrays.<File>asList(
|
|
||||||
baseDir.listFiles(
|
|
||||||
new FilenameFilter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public boolean accept(File file, String name)
|
|
||||||
{
|
|
||||||
return name.contains(filter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
),
|
|
||||||
parser
|
parser
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue