diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index f83453fafe9..f9395165f27 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.firehose.IngestTask; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -54,8 +53,7 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), @JsonSubTypes.Type(name = "noop", value = NoopTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), - @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class), - @JsonSubTypes.Type(name = "ingest-task", value = IngestTask.class) + @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class) }) public interface Task { diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 1358d3d48c3..6987a24e97f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -42,12 +42,10 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; -import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; -import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.task.AbstractTask; +import io.druid.indexing.common.task.NoopTask; import io.druid.query.filter.DimFilter; import io.druid.query.select.EventHolder; import io.druid.segment.Cursor; @@ -139,10 +137,11 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory { + private static final EmittingLogger log = new EmittingLogger(CombiningFirehoseFactory.class); + private final List delegateFactoryList; @JsonCreator @@ -86,10 +89,20 @@ public class CombiningFirehoseFactory implements FirehoseFactory if (currentFirehose != null) { currentFirehose.close(); } + currentFirehose = firehoseFactoryIterator.next().connect(parser); } catch (IOException e) { - Throwables.propagate(e); + if (currentFirehose != null) { + try { + currentFirehose.close(); + } + catch (IOException e2) { + log.error(e, "Unable to close currentFirehose!"); + throw Throwables.propagate(e2); + } + } + throw Throwables.propagate(e); } } } diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java index 210386d2134..3efc191ef3b 100644 --- a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -44,16 +44,15 @@ public class CombiningFirehoseFactoryTest public void testCombiningfirehose() throws IOException { List list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2)); - List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4)); + List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5)); FirehoseFactory combiningFactory = new CombiningFirehoseFactory( Arrays.asList( - new ListFirehoseFactory( - list1 - ), new ListFirehoseFactory(list2) + new ListFirehoseFactory(list1), + new ListFirehoseFactory(list2) ) ); final Firehose firehose = combiningFactory.connect(null); - for (int i = 1; i < 5; i++) { + for (int i = 1; i < 6; i++) { Assert.assertTrue(firehose.hasMore()); final InputRow inputRow = firehose.nextRow(); Assert.assertEquals(i, inputRow.getTimestampFromEpoch()); @@ -84,12 +83,6 @@ public class CombiningFirehoseFactoryTest return new DateTime(timestamp); } - @Override - public int compareTo(Row o) - { - return 0; - } - @Override public List getDimension(String dimension) { @@ -108,6 +101,11 @@ public class CombiningFirehoseFactoryTest return null; } + @Override + public int compareTo(Row o) + { + return 0; + } }; } @@ -147,7 +145,7 @@ public class CombiningFirehoseFactoryTest @Override public void close() throws IOException { - // + // Do nothing } }; }