address code review

This commit is contained in:
fjy 2014-06-13 11:29:49 -07:00
parent a63cda3281
commit 5bd4ad3f0f
10 changed files with 33 additions and 25 deletions

View File

@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
@ -352,7 +353,7 @@ public class RealtimeIndexTask extends AbstractTask
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
} }
} }
catch (Exception e) { catch (ParseException e) {
log.warn(e, "unparseable line"); log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable(); fireDepartment.getMetrics().incrementUnparseable();
} }

View File

@ -22,7 +22,6 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -123,14 +122,8 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return null; return null;
} }
try {
return theParser.parse(ByteBuffer.wrap(message)); return theParser.parse(ByteBuffer.wrap(message));
} }
catch (Exception e) {
log.error("Unparseable row! Error parsing[%s]", ByteBuffer.wrap(message));
throw Throwables.propagate(e);
}
}
@Override @Override
public Runnable commit() public Runnable commit()

View File

@ -21,7 +21,6 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -136,14 +135,8 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
public InputRow parseMessage(Message message) public InputRow parseMessage(Message message)
{ {
try {
return theParser.parse(message.payload()); return theParser.parse(message.payload());
} }
catch (Exception e) {
log.error("Unparseable row! Error parsing[%s]", message.payload());
throw Throwables.propagate(e);
}
}
@Override @Override
public Runnable commit() public Runnable commit()

View File

@ -40,8 +40,8 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.0-SNAPSHOT</metamx.java-util.version> <metamx.java-util.version>0.26.5</metamx.java-util.version>
<apache.curator.version>2.4.0</apache.curator.version> <apache.curator.version>2.5.0</apache.curator.version>
<druid.api.version>0.2.4-SNAPSHOT</druid.api.version> <druid.api.version>0.2.4-SNAPSHOT</druid.api.version>
</properties> </properties>

View File

@ -23,12 +23,14 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
@ -134,15 +136,21 @@ public class SpatialDimensionRowFormatter
} }
@Override @Override
public Object getRaw(String dimension) { public Object getRaw(String dimension)
{
return row.getRaw(dimension); return row.getRaw(dimension);
} }
@Override @Override
public float getFloatMetric(String metric) public float getFloatMetric(String metric)
{ {
try {
return row.getFloatMetric(metric); return row.getFloatMetric(metric);
} }
catch (ParseException e) {
throw Throwables.propagate(e);
}
}
@Override @Override
public String toString() public String toString()

View File

@ -108,7 +108,7 @@ public class ChainedExecutionQueryRunnerTest
.build() .build()
); );
Future resultFuture = Executors.newFixedThreadPool(1).submit( Future resultFuture = Executors.newSingleThreadExecutor().submit(
new Runnable() new Runnable()
{ {
@Override @Override

View File

@ -128,7 +128,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
@Override @Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{ {
final StringInputRowParser stringParser = (StringInputRowParser) firehoseParser; final StringInputRowParser stringParser = firehoseParser;
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config() Config lyraConfig = new Config()

View File

@ -406,6 +406,12 @@ public class DiscoveryModule implements Module
return null; return null;
} }
@Override
public Collection<ServiceInstance<T>> getAllInstances() throws Exception
{
return null;
}
@Override @Override
public void noteError(ServiceInstance<T> tServiceInstance) { public void noteError(ServiceInstance<T> tServiceInstance) {

View File

@ -25,6 +25,7 @@ import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
/** /**
*/ */
@ -62,6 +63,12 @@ public class ServerDiscoveryFactory
return null; return null;
} }
@Override
public Collection<ServiceInstance<T>> getAllInstances() throws Exception
{
return null;
}
@Override @Override
public void noteError(ServiceInstance<T> tServiceInstance) { public void noteError(ServiceInstance<T> tServiceInstance) {
// do nothing // do nothing

View File

@ -27,6 +27,7 @@ import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -213,12 +214,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
metrics.incrementProcessed(); metrics.incrementProcessed();
} }
catch (Exception e) { catch (ParseException e) {
if (inputRow != null) { if (inputRow != null) {
log.error(e, "unparseable line: %s", inputRow); log.error(e, "unparseable line: %s", inputRow);
} }
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue;
} }
} }
} }