From ba978d8b795914055ee0737acefe3f3fdb8d39e4 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 17 Jul 2014 13:05:59 -0700 Subject: [PATCH] some minor cleanups to ingest firehose --- .../java/io/druid/indexing/common/task/Task.java | 4 ++-- .../firehose/IngestSegmentFirehoseFactory.java | 7 ++----- .../{IngestTask.java => ReIngestTask.java} | 6 +++--- .../firehose/CombiningFirehoseFactory.java | 15 ++++++++++++++- .../firehose/CombiningFirehoseFactoryTest.java | 11 +++++------ 5 files changed, 26 insertions(+), 17 deletions(-) rename indexing-service/src/main/java/io/druid/indexing/firehose/{IngestTask.java => ReIngestTask.java} (94%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index f83453fafe9..9cf0caa2da8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.firehose.IngestTask; +import io.druid.indexing.firehose.ReIngestTask; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -55,7 +55,7 @@ import io.druid.query.QueryRunner; @JsonSubTypes.Type(name = "noop", value = NoopTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class), - @JsonSubTypes.Type(name = "ingest-task", value = IngestTask.class) + @JsonSubTypes.Type(name = "reingest", value = ReIngestTask.class) }) public interface Task { diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 1358d3d48c3..d1b5ccff7ec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -42,12 +42,9 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; -import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; -import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.task.AbstractTask; import io.druid.query.filter.DimFilter; import io.druid.query.select.EventHolder; import io.druid.segment.Cursor; @@ -139,10 +136,10 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory { + private static final EmittingLogger log = new EmittingLogger(CombiningFirehoseFactory.class); + private final List delegateFactoryList; @JsonCreator @@ -86,10 +89,20 @@ public class CombiningFirehoseFactory implements FirehoseFactory if (currentFirehose != null) { currentFirehose.close(); } + currentFirehose = firehoseFactoryIterator.next().connect(parser); } catch (IOException e) { - Throwables.propagate(e); + if (currentFirehose != null) { + try { + currentFirehose.close(); + } + catch (IOException e2) { + log.error(e, "Unable to close currentFirehose!"); + throw Throwables.propagate(e2); + } + } + throw Throwables.propagate(e); } } } diff --git a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java index a69327b86be..ba46a2182eb 100644 --- a/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java +++ b/server/src/test/java/io/druid/realtime/firehose/CombiningFirehoseFactoryTest.java @@ -42,16 +42,15 @@ public class CombiningFirehoseFactoryTest public void testCombiningfirehose() throws IOException { List list1 = Arrays.asList(makeRow(1, 1), makeRow(2, 2)); - List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4)); + List list2 = Arrays.asList(makeRow(3, 3), makeRow(4, 4), makeRow(5, 5)); FirehoseFactory combiningFactory = new CombiningFirehoseFactory( Arrays.asList( - new ListFirehoseFactory( - list1 - ), new ListFirehoseFactory(list2) + new ListFirehoseFactory(list1), + new ListFirehoseFactory(list2) ) ); final Firehose firehose = combiningFactory.connect(null); - for (int i = 1; i < 5; i++) { + for (int i = 1; i < 6; i++) { Assert.assertTrue(firehose.hasMore()); final InputRow inputRow = firehose.nextRow(); Assert.assertEquals(i, inputRow.getTimestampFromEpoch()); @@ -133,7 +132,7 @@ public class CombiningFirehoseFactoryTest @Override public void close() throws IOException { - // + // Do nothing } }; }