From 5bd4ad3f0f29d6b9e8a955f0b582eaacb6238582 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 13 Jun 2014 11:29:49 -0700 Subject: [PATCH] address code review --- .../indexing/common/task/RealtimeIndexTask.java | 3 ++- .../firehose/kafka/KafkaEightFirehoseFactory.java | 9 +-------- .../firehose/kafka/KafkaSevenFirehoseFactory.java | 9 +-------- pom.xml | 4 ++-- .../incremental/SpatialDimensionRowFormatter.java | 12 ++++++++++-- .../druid/query/ChainedExecutionQueryRunnerTest.java | 2 +- .../firehose/rabbitmq/RabbitMQFirehoseFactory.java | 2 +- .../io/druid/curator/discovery/DiscoveryModule.java | 6 ++++++ .../curator/discovery/ServerDiscoveryFactory.java | 7 +++++++ .../io/druid/segment/realtime/RealtimeManager.java | 4 ++-- 10 files changed, 33 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index bd04596f047..84ef51a9c4f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -352,7 +353,7 @@ public class RealtimeIndexTask extends AbstractTask nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); } } - catch (Exception e) { + catch (ParseException e) { log.warn(e, "unparseable line"); fireDepartment.getMetrics().incrementUnparseable(); } diff --git a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 70d373d7319..a97f18ae1b7 100644 --- a/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -22,7 +22,6 @@ package io.druid.firehose.kafka; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.metamx.common.logger.Logger; @@ -123,13 +122,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory UTF-8 - 0.26.0-SNAPSHOT - 2.4.0 + 0.26.5 + 2.5.0 0.2.4-SNAPSHOT diff --git a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java index ca98d740dc3..82311045d27 100644 --- a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java +++ b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowFormatter.java @@ -23,12 +23,14 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Splitter; +import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Floats; import com.metamx.common.ISE; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.impl.SpatialDimensionSchema; @@ -134,14 +136,20 @@ public class SpatialDimensionRowFormatter } @Override - public Object getRaw(String dimension) { + public Object getRaw(String dimension) + { return row.getRaw(dimension); } @Override public float getFloatMetric(String metric) { - return row.getFloatMetric(metric); + try { + return row.getFloatMetric(metric); + } + catch (ParseException e) { + throw Throwables.propagate(e); + } } @Override diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index f2555dd7214..54b474830b2 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -108,7 +108,7 @@ public class ChainedExecutionQueryRunnerTest .build() ); - Future resultFuture = Executors.newFixedThreadPool(1).submit( + Future resultFuture = Executors.newSingleThreadExecutor().submit( new Runnable() { @Override diff --git a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index 71e53b5089c..c2e43e3917a 100644 --- a/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -128,7 +128,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory> getAllInstances() throws Exception + { + return null; + } + @Override public void noteError(ServiceInstance tServiceInstance) { diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java index 0e66df0b9ed..d8e5814e0d6 100644 --- a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java @@ -25,6 +25,7 @@ import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import java.io.IOException; +import java.util.Collection; /** */ @@ -62,6 +63,12 @@ public class ServerDiscoveryFactory return null; } + @Override + public Collection> getAllInstances() throws Exception + { + return null; + } + @Override public void noteError(ServiceInstance tServiceInstance) { // do nothing diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index f249d5e3570..a1cfb220972 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -27,6 +27,7 @@ import com.google.inject.Inject; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; @@ -213,12 +214,11 @@ public class RealtimeManager implements QuerySegmentWalker } metrics.incrementProcessed(); } - catch (Exception e) { + catch (ParseException e) { if (inputRow != null) { log.error(e, "unparseable line: %s", inputRow); } metrics.incrementUnparseable(); - continue; } } }