Merge pull request #578 from metamx/new-guava

Update guava, java-util, and druid-api
This commit is contained in:
fjy 2014-06-18 14:23:32 -06:00
commit 0bc1915067
56 changed files with 183 additions and 216 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.Yielders; import com.metamx.common.guava.Yielders;
@ -70,7 +71,7 @@ public class OrderedMergeSequence<T> implements Sequence<T>
return yielder.get(); return yielder.get();
} }
finally { finally {
Closeables.closeQuietly(yielder); CloseQuietly.close(yielder);
} }
} }

View File

@ -46,14 +46,9 @@ public class SerializerUtils
public void writeString(OutputSupplier<? extends OutputStream> supplier, String name) throws IOException public void writeString(OutputSupplier<? extends OutputStream> supplier, String name) throws IOException
{ {
OutputStream out = null; try (OutputStream out = supplier.getOutput()) {
try {
out = supplier.getOutput();
writeString(out, name); writeString(out, name);
} }
finally {
Closeables.closeQuietly(out);
}
} }
public void writeString(WritableByteChannel out, String name) throws IOException public void writeString(WritableByteChannel out, String name) throws IOException

View File

@ -21,6 +21,7 @@ package io.druid.storage.hdfs;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -52,22 +53,17 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
final FileSystem fs = checkPathAndGetFilesystem(path); final FileSystem fs = checkPathAndGetFilesystem(path);
FSDataInputStream in = null;
try {
if (path.getName().endsWith(".zip")) { if (path.getName().endsWith(".zip")) {
in = fs.open(path); try {
try (FSDataInputStream in = fs.open(path)) {
CompressionUtils.unzip(in, dir); CompressionUtils.unzip(in, dir);
in.close();
}
else {
throw new SegmentLoadingException("Unknown file type[%s]", path);
} }
} }
catch (IOException e) { catch (IOException e) {
throw new SegmentLoadingException(e, "Some IOException"); throw new SegmentLoadingException(e, "Some IOException");
} }
finally { } else {
Closeables.closeQuietly(in); throw new SegmentLoadingException("Unknown file type[%s]", path);
} }
} }
@ -85,7 +81,8 @@ public class HdfsDataSegmentPuller implements DataSegmentPuller
} }
} }
private Path getPath(DataSegment segment) { private Path getPath(DataSegment segment)
{
return new Path(String.valueOf(segment.getLoadSpec().get("path"))); return new Path(String.valueOf(segment.getLoadSpec().get("path")));
} }

View File

@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils; import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusher;
@ -78,17 +79,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
fs.mkdirs(outFile.getParent()); fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile); log.info("Compressing files from[%s] to [%s]", inDir, outFile);
FSDataOutputStream out = null;
long size; long size;
try { try (FSDataOutputStream out = fs.create(outFile)) {
out = fs.create(outFile);
size = CompressionUtils.zip(inDir, out); size = CompressionUtils.zip(inDir, out);
out.close();
}
finally {
Closeables.closeQuietly(out);
} }
return createDescriptorFile( return createDescriptorFile(

View File

@ -31,6 +31,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
@ -425,7 +426,7 @@ public class IndexGeneratorJob implements Jobby
if (caughtException == null) { if (caughtException == null) {
Closeables.close(out, false); Closeables.close(out, false);
} else { } else {
Closeables.closeQuietly(out); CloseQuietly.close(out);
throw Throwables.propagate(caughtException); throw Throwables.propagate(caughtException);
} }
} }
@ -605,7 +606,7 @@ public class IndexGeneratorJob implements Jobby
} }
} }
finally { finally {
Closeables.closeQuietly(in); CloseQuietly.close(in);
} }
out.closeEntry(); out.closeEntry();
context.progress(); context.progress();

View File

@ -24,9 +24,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.Closeables;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.exception.FormattedException; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
@ -44,8 +44,8 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentConfig; import io.druid.segment.realtime.FireDepartmentConfig;
@ -353,7 +353,7 @@ public class RealtimeIndexTask extends AbstractTask
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
} }
} }
catch (FormattedException e) { catch (ParseException e) {
log.warn(e, "unparseable line"); log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable(); fireDepartment.getMetrics().incrementUnparseable();
} }
@ -375,7 +375,7 @@ public class RealtimeIndexTask extends AbstractTask
log.makeAlert(e, "Failed to finish realtime task").emit(); log.makeAlert(e, "Failed to finish realtime task").emit();
} }
finally { finally {
Closeables.closeQuietly(firehose); CloseQuietly.close(firehose);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
} }
} }

View File

@ -585,7 +585,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running // Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks // on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = new Stopwatch(); Stopwatch timeoutStopwatch = Stopwatch.createUnstarted();
timeoutStopwatch.start(); timeoutStopwatch.start();
synchronized (statusLock) { synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker, task.getId())) { while (!isWorkerRunningTask(theWorker, task.getId())) {

View File

@ -63,7 +63,7 @@ public class TestUtils
public static boolean conditionValid(IndexingServiceCondition condition) public static boolean conditionValid(IndexingServiceCondition condition)
{ {
try { try {
Stopwatch stopwatch = new Stopwatch(); Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start(); stopwatch.start();
while (!condition.isValid()) { while (!condition.isValid()) {
Thread.sleep(100); Thread.sleep(100);

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
@ -115,7 +114,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
} }
@Override @Override
public InputRow nextRow() throws FormattedException public InputRow nextRow()
{ {
final byte[] message = iter.next().message(); final byte[] message = iter.next().message();
@ -123,16 +122,8 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return null; return null;
} }
try {
return theParser.parse(ByteBuffer.wrap(message)); return theParser.parse(ByteBuffer.wrap(message));
} }
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", ByteBuffer.wrap(message), e.toString()))
.build();
}
}
@Override @Override
public Runnable commit() public Runnable commit()

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
@ -123,7 +122,7 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
} }
@Override @Override
public InputRow nextRow() throws FormattedException public InputRow nextRow()
{ {
final Message message = iter.next().message(); final Message message = iter.next().message();
@ -134,18 +133,10 @@ public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInpu
return parseMessage(message); return parseMessage(message);
} }
public InputRow parseMessage(Message message) throws FormattedException public InputRow parseMessage(Message message)
{ {
try {
return theParser.parse(message.payload()); return theParser.parse(message.payload());
} }
catch (Exception e) {
throw new FormattedException.Builder()
.withErrorCode(FormattedException.ErrorCode.UNPARSABLE_ROW)
.withMessage(String.format("Error parsing[%s], got [%s]", message.payload(), e.toString()))
.build();
}
}
@Override @Override
public Runnable commit() public Runnable commit()

10
pom.xml
View File

@ -30,7 +30,7 @@
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection> <connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url> <url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.107-SNAPSHOT</tag> <tag>druid-0.6.117-SNAPSHOT</tag>
</scm> </scm>
<prerequisites> <prerequisites>
@ -39,9 +39,9 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.6</metamx.java-util.version> <metamx.java-util.version>0.26.5</metamx.java-util.version>
<apache.curator.version>2.4.0</apache.curator.version> <apache.curator.version>2.5.0</apache.curator.version>
<druid.api.version>0.2.3</druid.api.version> <druid.api.version>0.2.4-SNAPSHOT</druid.api.version>
</properties> </properties>
<modules> <modules>
@ -198,7 +198,7 @@
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>14.0.1</version> <version>17.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.inject</groupId> <groupId>com.google.inject</groupId>

View File

@ -28,10 +28,8 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage; import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.ParseSpec;
@ -94,7 +92,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
} }
@Override @Override
public InputRow parse(ByteBuffer input) throws FormattedException public InputRow parse(ByteBuffer input)
{ {
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses // We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly... // the DynamicMessage directly...

View File

@ -19,7 +19,7 @@
package io.druid.query; package io.druid.query;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.ReferenceCountingSegment;
@ -52,7 +52,7 @@ public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
return new ResourceClosingSequence<T>(baseSequence, closeable); return new ResourceClosingSequence<T>(baseSequence, closeable);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
Closeables.closeQuietly(closeable); CloseQuietly.close(closeable);
throw e; throw e;
} }
} }

View File

@ -54,7 +54,7 @@ public class CardinalityAggregator implements Aggregator
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases. // nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) { if (size == 1) {
final String value = selector.lookupName(row.get(0)); final String value = selector.lookupName(row.get(0));
hasher.putString(value != null ? value : NULL_STRING); hasher.putUnencodedChars(value != null ? value : NULL_STRING);
} else if (size != 0) { } else if (size != 0) {
final String[] values = new String[size]; final String[] values = new String[size];
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
@ -67,7 +67,7 @@ public class CardinalityAggregator implements Aggregator
if (i != 0) { if (i != 0) {
hasher.putChar(SEPARATOR); hasher.putChar(SEPARATOR);
} }
hasher.putString(values[i]); hasher.putUnencodedChars(values[i]);
} }
} }
} }
@ -79,7 +79,7 @@ public class CardinalityAggregator implements Aggregator
for (final DimensionSelector selector : selectors) { for (final DimensionSelector selector : selectors) {
for (final Integer index : selector.getRow()) { for (final Integer index : selector.getRow()) {
final String value = selector.lookupName(index); final String value = selector.lookupName(index);
collector.add(hashFn.hashString(value == null ? NULL_STRING : value).asBytes()); collector.add(hashFn.hashUnencodedChars(value == null ? NULL_STRING : value).asBytes());
} }
} }
} }

View File

@ -25,12 +25,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -123,7 +123,7 @@ public class GroupByQueryEngine
@Override @Override
public void cleanup(RowIterator iterFromMake) public void cleanup(RowIterator iterFromMake)
{ {
Closeables.closeQuietly(iterFromMake); CloseQuietly.close(iterFromMake);
} }
} }
); );
@ -135,7 +135,7 @@ public class GroupByQueryEngine
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closeables.closeQuietly(bufferHolder); CloseQuietly.close(bufferHolder);
} }
} }
) )

View File

@ -19,8 +19,8 @@
package io.druid.query.topn; package io.druid.query.topn;
import com.google.common.io.Closeables;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.BufferAggregator;
@ -233,7 +233,7 @@ public class PooledTopNAlgorithm
if (resultsBufHolder != null) { if (resultsBufHolder != null) {
resultsBufHolder.get().clear(); resultsBufHolder.get().clear();
} }
Closeables.closeQuietly(resultsBufHolder); CloseQuietly.close(resultsBufHolder);
} }
public static class PooledTopNParams extends TopNParams public static class PooledTopNParams extends TopNParams

View File

@ -19,8 +19,8 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.io.Closeables;
import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.guava.CloseQuietly;
import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BitmapIndexSelector;
import io.druid.segment.column.Column; import io.druid.segment.column.Column;
import io.druid.segment.column.DictionaryEncodedColumn; import io.druid.segment.column.DictionaryEncodedColumn;
@ -95,7 +95,7 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
return column.length(); return column.length();
} }
finally { finally {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
} }

View File

@ -165,15 +165,10 @@ public class IndexIO
} }
final File indexFile = new File(inDir, "index.drd"); final File indexFile = new File(inDir, "index.drd");
InputStream in = null;
int version; int version;
try { try (InputStream in = new FileInputStream(indexFile)) {
in = new FileInputStream(indexFile);
version = in.read(); version = in.read();
} }
finally {
Closeables.closeQuietly(in);
}
return version; return version;
} }

View File

@ -38,6 +38,7 @@ import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -438,9 +439,9 @@ public class IndexMerger
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime)); serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
} }
finally { finally {
Closeables.closeQuietly(channel); CloseQuietly.close(channel);
channel = null; channel = null;
Closeables.closeQuietly(fileOutputStream); CloseQuietly.close(fileOutputStream);
fileOutputStream = null; fileOutputStream = null;
} }
IndexIO.checkFileSize(indexFile); IndexIO.checkFileSize(indexFile);
@ -881,7 +882,7 @@ public class IndexMerger
); );
} }
finally { finally {
Closeables.closeQuietly(channel); CloseQuietly.close(channel);
channel = null; channel = null;
} }
IndexIO.checkFileSize(indexFile); IndexIO.checkFileSize(indexFile);

View File

@ -20,7 +20,7 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.data.ConciseCompressedIndexedInts; import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats; import io.druid.segment.data.IndexedFloats;
@ -118,9 +118,9 @@ public class MMappedIndexAdapter implements IndexableAdapter
{ {
final boolean hasNext = currRow < numRows; final boolean hasNext = currRow < numRows;
if (!hasNext && !done) { if (!hasNext && !done) {
Closeables.closeQuietly(timestamps); CloseQuietly.close(timestamps);
for (IndexedFloats floatMetric : floatMetrics) { for (IndexedFloats floatMetric : floatMetrics) {
Closeables.closeQuietly(floatMetric); CloseQuietly.close(floatMetric);
} }
done = true; done = true;
} }

View File

@ -20,11 +20,11 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.common.utils.SerializerUtils; import io.druid.common.utils.SerializerUtils;
import io.druid.segment.data.CompressedFloatsIndexedSupplier; import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedFloatsSupplierSerializer; import io.druid.segment.data.CompressedFloatsSupplierSerializer;
@ -84,8 +84,8 @@ public class MetricHolder
ByteStreams.copy(in, out); ByteStreams.copy(in, out);
} }
finally { finally {
Closeables.closeQuietly(out); CloseQuietly.close(out);
Closeables.closeQuietly(in); CloseQuietly.close(in);
} }
} }

View File

@ -22,8 +22,8 @@ package io.druid.segment;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column; import io.druid.segment.column.Column;
@ -208,10 +208,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{ {
final boolean hasNext = currRow < numRows; final boolean hasNext = currRow < numRows;
if (!hasNext && !done) { if (!hasNext && !done) {
Closeables.closeQuietly(timestamps); CloseQuietly.close(timestamps);
for (Object metric : metrics) { for (Object metric : metrics) {
if (metric instanceof Closeable) { if (metric instanceof Closeable) {
Closeables.closeQuietly((Closeable) metric); CloseQuietly.close((Closeable) metric);
} }
} }
done = true; done = true;

View File

@ -23,7 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
@ -109,7 +109,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return new DateTime(column.getLongSingleValueRow(0)); return new DateTime(column.getLongSingleValueRow(0));
} }
finally { finally {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
} }
@ -122,7 +122,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return new DateTime(column.getLongSingleValueRow(column.length() - 1)); return new DateTime(column.getLongSingleValueRow(column.length() - 1));
} }
finally { finally {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
} }
@ -535,16 +535,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closeables.closeQuietly(timestamps); CloseQuietly.close(timestamps);
for (GenericColumn column : genericColumnCache.values()) { for (GenericColumn column : genericColumnCache.values()) {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
for (ComplexColumn complexColumn : complexColumnCache.values()) { for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn); CloseQuietly.close(complexColumn);
} }
for (Object column : objectColumnCache.values()) { for (Object column : objectColumnCache.values()) {
if(column instanceof Closeable) { if(column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column); CloseQuietly.close((Closeable) column);
} }
} }
} }
@ -962,16 +962,16 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closeables.closeQuietly(timestamps); CloseQuietly.close(timestamps);
for (GenericColumn column : genericColumnCache.values()) { for (GenericColumn column : genericColumnCache.values()) {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
for (ComplexColumn complexColumn : complexColumnCache.values()) { for (ComplexColumn complexColumn : complexColumnCache.values()) {
Closeables.closeQuietly(complexColumn); CloseQuietly.close(complexColumn);
} }
for (Object column : objectColumnCache.values()) { for (Object column : objectColumnCache.values()) {
if (column instanceof Closeable) { if (column instanceof Closeable) {
Closeables.closeQuietly((Closeable) column); CloseQuietly.close((Closeable) column);
} }
} }
} }

View File

@ -20,7 +20,7 @@
package io.druid.segment.column; package io.druid.segment.column;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
/** /**
*/ */
@ -68,7 +68,7 @@ class SimpleColumn implements Column
return column.length(); return column.length();
} }
finally { finally {
Closeables.closeQuietly(column); CloseQuietly.close(column);
} }
} }

View File

@ -25,6 +25,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder; import io.druid.collections.StupidResourceHolder;
@ -123,7 +124,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
private void loadBuffer(int bufferNum) private void loadBuffer(int bufferNum)
{ {
Closeables.closeQuietly(holder); CloseQuietly.close(holder);
holder = baseFloatBuffers.get(bufferNum); holder = baseFloatBuffers.get(bufferNum);
buffer = holder.get(); buffer = holder.get();
currIndex = bufferNum; currIndex = bufferNum;

View File

@ -106,17 +106,11 @@ public class CompressedFloatsSupplierSerializer
flattener.close(); flattener.close();
OutputStream out = null; try (OutputStream out = consolidatedOut.getOutput()) {
try {
out = consolidatedOut.getOutput();
out.write(CompressedFloatsIndexedSupplier.version); out.write(CompressedFloatsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer)); out.write(Ints.toByteArray(sizePer));
ByteStreams.copy(flattener.combineStreams(), out); ByteStreams.copy(flattener.combineStreams(), out);
} }
finally {
Closeables.closeQuietly(out);
}
} }
} }

View File

@ -25,6 +25,7 @@ import com.google.common.io.Closeables;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder; import io.druid.collections.StupidResourceHolder;
@ -122,7 +123,7 @@ public class CompressedLongsIndexedSupplier implements Supplier<IndexedLongs>
private void loadBuffer(int bufferNum) private void loadBuffer(int bufferNum)
{ {
Closeables.closeQuietly(holder); CloseQuietly.close(holder);
holder = baseLongBuffers.get(bufferNum); holder = baseLongBuffers.get(bufferNum);
buffer = holder.get(); buffer = holder.get();
currIndex = bufferNum; currIndex = bufferNum;

View File

@ -100,17 +100,11 @@ public class CompressedLongsSupplierSerializer
flattener.close(); flattener.close();
OutputStream out = null; try (OutputStream out = consolidatedOut.getOutput()) {
try {
out = consolidatedOut.getOutput();
out.write(CompressedLongsIndexedSupplier.version); out.write(CompressedLongsIndexedSupplier.version);
out.write(Ints.toByteArray(numInserted)); out.write(Ints.toByteArray(numInserted));
out.write(Ints.toByteArray(sizePer)); out.write(Ints.toByteArray(sizePer));
ByteStreams.copy(flattener.combineStreams(), out); ByteStreams.copy(flattener.combineStreams(), out);
} }
finally {
Closeables.closeQuietly(out);
}
} }
} }

View File

@ -20,7 +20,7 @@
package io.druid.segment.data; package io.druid.segment.data;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import com.ning.compress.lzf.ChunkEncoder; import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFDecoder;
@ -74,7 +74,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
buf.put(outputBytes, 0, numDecompressedBytes); buf.put(outputBytes, 0, numDecompressedBytes);
buf.flip(); buf.flip();
Closeables.closeQuietly(outputBytesHolder); CloseQuietly.close(outputBytesHolder);
return new ResourceHolder<T>() return new ResourceHolder<T>()
{ {
@ -105,7 +105,7 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder(); final ResourceHolder<ChunkEncoder> encoder = CompressedPools.getChunkEncoder();
LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length); LZFChunk chunk = encoder.get().encodeChunk(buf.array(), 0, buf.array().length);
Closeables.closeQuietly(encoder); CloseQuietly.close(encoder);
return chunk.getData(); return chunk.getData();
} }

View File

@ -21,9 +21,9 @@ package io.druid.segment.data;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.guava.CloseQuietly;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.Closeable; import java.io.Closeable;
@ -73,14 +73,14 @@ public class GenericIndexed<T> implements Indexed<T>
allowReverseLookup = false; allowReverseLookup = false;
} }
if (prevVal instanceof Closeable) { if (prevVal instanceof Closeable) {
Closeables.closeQuietly((Closeable) prevVal); CloseQuietly.close((Closeable) prevVal);
} }
prevVal = next; prevVal = next;
++count; ++count;
} }
if (prevVal instanceof Closeable) { if (prevVal instanceof Closeable) {
Closeables.closeQuietly((Closeable) prevVal); CloseQuietly.close((Closeable) prevVal);
} }
ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(4 + (count * 4)); ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(4 + (count * 4));
@ -98,7 +98,7 @@ public class GenericIndexed<T> implements Indexed<T>
valueBytes.write(bytes); valueBytes.write(bytes);
if (object instanceof Closeable) { if (object instanceof Closeable) {
Closeables.closeQuietly((Closeable) object); CloseQuietly.close((Closeable) object);
} }
} }
} }

View File

@ -22,7 +22,7 @@ package io.druid.segment.data;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder; import io.druid.collections.StupidResourceHolder;
@ -153,7 +153,7 @@ public class InMemoryCompressedFloats implements IndexedFloats
private void loadBuffer(int bufferNum) private void loadBuffer(int bufferNum)
{ {
loadBuffer = null; loadBuffer = null;
Closeables.closeQuietly(holder); CloseQuietly.close(holder);
final byte[] compressedBytes = compressedBuffers.get(bufferNum); final byte[] compressedBytes = compressedBuffers.get(bufferNum);
holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length); holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length);
loadBuffer = holder.get(); loadBuffer = holder.get();
@ -191,6 +191,6 @@ public class InMemoryCompressedFloats implements IndexedFloats
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Closeables.closeQuietly(holder); CloseQuietly.close(holder);
} }
} }

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder; import io.druid.collections.StupidResourceHolder;
@ -163,7 +164,7 @@ public class InMemoryCompressedLongs implements IndexedLongs
private void loadBuffer(int bufferNum) private void loadBuffer(int bufferNum)
{ {
loadBuffer = null; loadBuffer = null;
Closeables.closeQuietly(holder); CloseQuietly.close(holder);
final byte[] compressedBytes = compressedBuffers.get(bufferNum); final byte[] compressedBytes = compressedBuffers.get(bufferNum);
holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length); holder = strategy.fromByteBuffer(ByteBuffer.wrap(compressedBytes), compressedBytes.length);
loadBuffer = holder.get(); loadBuffer = holder.get();

View File

@ -23,12 +23,14 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Floats; import com.google.common.primitives.Floats;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
@ -134,15 +136,21 @@ public class SpatialDimensionRowFormatter
} }
@Override @Override
public Object getRaw(String dimension) { public Object getRaw(String dimension)
{
return row.getRaw(dimension); return row.getRaw(dimension);
} }
@Override @Override
public float getFloatMetric(String metric) public float getFloatMetric(String metric)
{ {
try {
return row.getFloatMetric(metric); return row.getFloatMetric(metric);
} }
catch (ParseException e) {
throw Throwables.propagate(e);
}
}
@Override @Override
public String toString() public String toString()

View File

@ -180,6 +180,7 @@ public class TestIndex
new TimestampSpec("ts", "iso"), new TimestampSpec("ts", "iso"),
new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null), new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null),
"\t", "\t",
"\u0001",
Arrays.asList(COLUMNS) Arrays.asList(COLUMNS)
), ),
null, null, null, null null, null, null, null

View File

@ -19,8 +19,8 @@
package io.druid.segment.data; package io.druid.segment.data;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.common.guava.CloseQuietly;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -47,7 +47,7 @@ public class CompressedLongsIndexedSupplierTest
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
Closeables.closeQuietly(indexed); CloseQuietly.close(indexed);
indexed = null; indexed = null;
supplier = null; supplier = null;
vals = null; vals = null;
@ -56,7 +56,7 @@ public class CompressedLongsIndexedSupplierTest
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
Closeables.closeQuietly(indexed); CloseQuietly.close(indexed);
} }
private void setupSimple() private void setupSimple()
@ -247,7 +247,7 @@ public class CompressedLongsIndexedSupplierTest
stopLatch.await(); stopLatch.await();
} }
finally { finally {
Closeables.closeQuietly(indexed2); CloseQuietly.close(indexed2);
} }
if (failureHappened.get()) { if (failureHappened.get()) {

View File

@ -128,7 +128,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<StringInputRowPa
@Override @Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{ {
final StringInputRowParser stringParser = (StringInputRowParser) firehoseParser; final StringInputRowParser stringParser = firehoseParser;
ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory); ConnectionOptions lyraOptions = new ConnectionOptions(this.connectionFactory);
Config lyraConfig = new Config() Config lyraConfig = new Config()

View File

@ -21,7 +21,6 @@ package io.druid.storage.s3;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -95,9 +94,7 @@ public class S3DataSegmentPuller implements DataSegmentPuller
try { try {
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path); s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
InputStream in = null; try (InputStream in = s3Obj.getDataInputStream()) {
try {
in = s3Obj.getDataInputStream();
final String key = s3Obj.getKey(); final String key = s3Obj.getKey();
if (key.endsWith(".zip")) { if (key.endsWith(".zip")) {
CompressionUtils.unzip(in, outDir); CompressionUtils.unzip(in, outDir);
@ -113,9 +110,6 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (IOException e) { catch (IOException e) {
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e); throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
} }
finally {
Closeables.closeQuietly(in);
}
} }
finally { finally {
S3Utils.closeStreamsQuietly(s3Obj); S3Utils.closeStreamsQuietly(s3Obj);
@ -127,7 +121,8 @@ public class S3DataSegmentPuller implements DataSegmentPuller
catch (Exception e) { catch (Exception e) {
try { try {
FileUtils.deleteDirectory(outDir); FileUtils.deleteDirectory(outDir);
} catch (IOException ioe) { }
catch (IOException ioe) {
log.warn( log.warn(
ioe, ioe,
"Failed to remove output directory for segment[%s] after exception: %s", "Failed to remove output directory for segment[%s] after exception: %s",

View File

@ -29,7 +29,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -37,6 +36,7 @@ import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.RE; import com.metamx.common.RE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -242,7 +242,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public void cleanup(JsonParserIterator<T> iterFromMake) public void cleanup(JsonParserIterator<T> iterFromMake)
{ {
Closeables.closeQuietly(iterFromMake); CloseQuietly.close(iterFromMake);
} }
} }
); );
@ -285,7 +285,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
return false; return false;
} }
if (jp.getCurrentToken() == JsonToken.END_ARRAY) { if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
Closeables.closeQuietly(jp); CloseQuietly.close(jp);
return false; return false;
} }

View File

@ -23,10 +23,10 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -112,7 +112,7 @@ public class Announcer
started = false; started = false;
for (Map.Entry<String, PathChildrenCache> entry : listeners.entrySet()) { for (Map.Entry<String, PathChildrenCache> entry : listeners.entrySet()) {
Closeables.closeQuietly(entry.getValue()); CloseQuietly.close(entry.getValue());
} }
for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : announcements.entrySet()) { for (Map.Entry<String, ConcurrentMap<String, byte[]>> entry : announcements.entrySet()) {
@ -353,7 +353,7 @@ public class Announcer
cache.start(); cache.start();
} }
catch (Exception e) { catch (Exception e) {
Closeables.closeQuietly(cache); CloseQuietly.close(cache);
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }

View File

@ -406,6 +406,12 @@ public class DiscoveryModule implements Module
return null; return null;
} }
@Override
public Collection<ServiceInstance<T>> getAllInstances() throws Exception
{
return null;
}
@Override @Override
public void noteError(ServiceInstance<T> tServiceInstance) { public void noteError(ServiceInstance<T> tServiceInstance) {

View File

@ -25,6 +25,7 @@ import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProvider;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
/** /**
*/ */
@ -62,6 +63,12 @@ public class ServerDiscoveryFactory
return null; return null;
} }
@Override
public Collection<ServiceInstance<T>> getAllInstances() throws Exception
{
return null;
}
@Override @Override
public void noteError(ServiceInstance<T> tServiceInstance) { public void noteError(ServiceInstance<T> tServiceInstance) {
// do nothing // do nothing

View File

@ -23,11 +23,11 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.exception.FormattedException; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
@ -95,7 +95,7 @@ public class RealtimeManager implements QuerySegmentWalker
public void stop() public void stop()
{ {
for (FireChief chief : chiefs.values()) { for (FireChief chief : chiefs.values()) {
Closeables.closeQuietly(chief); CloseQuietly.close(chief);
} }
} }
@ -185,7 +185,7 @@ public class RealtimeManager implements QuerySegmentWalker
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow inputRow; InputRow inputRow = null;
try { try {
try { try {
inputRow = firehose.nextRow(); inputRow = firehose.nextRow();
@ -214,10 +214,11 @@ public class RealtimeManager implements QuerySegmentWalker
} }
metrics.incrementProcessed(); metrics.incrementProcessed();
} }
catch (FormattedException e) { catch (ParseException e) {
log.info(e, "unparseable line: %s", e.getDetails()); if (inputRow != null) {
log.error(e, "unparseable line: %s", inputRow);
}
metrics.incrementUnparseable(); metrics.incrementUnparseable();
continue;
} }
} }
} }
@ -237,7 +238,7 @@ public class RealtimeManager implements QuerySegmentWalker
throw e; throw e;
} }
finally { finally {
Closeables.closeQuietly(firehose); CloseQuietly.close(firehose);
if (normalExit) { if (normalExit) {
plumber.finishJob(); plumber.finishJob();
plumber = null; plumber = null;

View File

@ -31,18 +31,13 @@ import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter; import com.ircclouds.irc.api.listeners.VariousMessageListenerAdapter;
import com.ircclouds.irc.api.state.IIRCState; import com.ircclouds.irc.api.state.IIRCState;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

View File

@ -24,12 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg; import com.ircclouds.irc.api.domain.messages.ChannelPrivMsg;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.exception.FormattedException;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** /**
@ -52,7 +49,7 @@ public class IrcParser implements InputRowParser<Pair<DateTime, ChannelPrivMsg>>
} }
@Override @Override
public InputRow parse(Pair<DateTime, ChannelPrivMsg> msg) throws FormattedException public InputRow parse(Pair<DateTime, ChannelPrivMsg> msg)
{ {
return decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText()); return decoder.decodeMessage(msg.lhs, msg.rhs.getChannelName(), msg.rhs.getText());
} }

View File

@ -22,20 +22,16 @@ package io.druid.server;
import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Accumulators;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingAccumulators;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
@ -51,7 +47,6 @@ import org.joda.time.DateTime;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE; import javax.ws.rs.DELETE;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -59,12 +54,10 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.UUID; import java.util.UUID;
/** /**
@ -231,10 +224,20 @@ public class QueryResource
new DateTime(), new DateTime(),
req.getRemoteAddr(), req.getRemoteAddr(),
query, query,
new QueryStats(ImmutableMap.<String, Object>of("success", false, "interrupted", true, "reason", e.toString())) new QueryStats(
ImmutableMap.<String, Object>of(
"success",
false,
"interrupted",
true,
"reason",
e.toString()
)
)
) )
); );
} catch (Exception e2) { }
catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query); log.error(e2, "Unable to log query [%s]!", query);
} }
return Response.serverError().entity( return Response.serverError().entity(

View File

@ -23,11 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -338,7 +338,7 @@ public class DruidClusterBridge
log.makeAlert(e, "Exception becoming leader") log.makeAlert(e, "Exception becoming leader")
.emit(); .emit();
final LeaderLatch oldLatch = createNewLeaderLatch(); final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch); CloseQuietly.close(oldLatch);
try { try {
leaderLatch.get().start(); leaderLatch.get().start();
} }

View File

@ -27,12 +27,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
@ -571,7 +571,7 @@ public class DruidCoordinator
log.makeAlert(e, "Unable to become leader") log.makeAlert(e, "Unable to become leader")
.emit(); .emit();
final LeaderLatch oldLatch = createNewLeaderLatch(); final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch); CloseQuietly.close(oldLatch);
try { try {
leaderLatch.get().start(); leaderLatch.get().start();
} }

View File

@ -21,9 +21,9 @@ package io.druid.server.initialization;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -80,7 +80,7 @@ public class PropertiesModule implements Module
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up."); log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
} }
finally { finally {
Closeables.closeQuietly(stream); CloseQuietly.close(stream);
} }
binder.bind(Properties.class).toInstance(props); binder.bind(Properties.class).toInstance(props);

View File

@ -21,8 +21,8 @@ package io.druid.server.log;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import io.druid.server.RequestLogLine; import io.druid.server.RequestLogLine;
@ -83,7 +83,7 @@ public class FileRequestLogger implements RequestLogger
try { try {
synchronized (lock) { synchronized (lock) {
Closeables.closeQuietly(fileWriter); CloseQuietly.close(fileWriter);
fileWriter = new FileWriter(new File(baseDir, currentDay.toString()), true); fileWriter = new FileWriter(new File(baseDir, currentDay.toString()), true);
} }
} }
@ -105,7 +105,7 @@ public class FileRequestLogger implements RequestLogger
public void stop() public void stop()
{ {
synchronized (lock) { synchronized (lock) {
Closeables.closeQuietly(fileWriter); CloseQuietly.close(fileWriter);
} }
} }

View File

@ -27,7 +27,7 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids; import io.druid.query.Druids;
@ -222,6 +222,6 @@ public class SQLRunner
} }
} }
Closeables.closeQuietly(stdInput); CloseQuietly.close(stdInput);
} }
} }

View File

@ -335,7 +335,7 @@ public class BatchServerInventoryViewTest
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
{ {
Stopwatch stopwatch = new Stopwatch().start(); Stopwatch stopwatch = Stopwatch.createStarted();
while (Iterables.isEmpty(batchServerInventoryView.getInventory()) while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|| Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) { || Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
Thread.sleep(500); Thread.sleep(500);

View File

@ -19,7 +19,7 @@
package io.druid.curator; package io.druid.curator;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime; import org.apache.curator.retry.RetryOneTime;
@ -51,7 +51,7 @@ public class CuratorTestBase
protected void tearDownServerAndCurator() protected void tearDownServerAndCurator()
{ {
Closeables.closeQuietly(curator); CloseQuietly.close(curator);
Closeables.closeQuietly(server); CloseQuietly.close(server);
} }
} }

View File

@ -137,7 +137,7 @@ public class RealtimeManagerTest
{ {
realtimeManager.start(); realtimeManager.start();
Stopwatch stopwatch = new Stopwatch().start(); Stopwatch stopwatch = Stopwatch.createStarted();
while (realtimeManager.getMetrics("test").processed() != 1) { while (realtimeManager.getMetrics("test").processed() != 1) {
Thread.sleep(100); Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {

View File

@ -26,7 +26,6 @@ import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
@ -85,7 +84,7 @@ public class RealtimePlumberSchoolTest
new InputRowParser() new InputRowParser()
{ {
@Override @Override
public InputRow parse(Object input) throws FormattedException public InputRow parse(Object input)
{ {
return null; return null;
} }
@ -177,7 +176,7 @@ public class RealtimePlumberSchoolTest
} }
); );
Stopwatch stopwatch = new Stopwatch().start(); Stopwatch stopwatch = Stopwatch.createStarted();
while (!committed.booleanValue()) { while (!committed.booleanValue()) {
Thread.sleep(100); Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {

View File

@ -71,7 +71,7 @@ public class DruidCoordinatorBalancerProfiler
public void bigProfiler() public void bigProfiler()
{ {
Stopwatch watch = new Stopwatch(); Stopwatch watch = Stopwatch.createUnstarted();
int numSegments = 55000; int numSegments = 55000;
int numServers = 50; int numServers = 50;
EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes(); EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
@ -184,7 +184,7 @@ public class DruidCoordinatorBalancerProfiler
public void profileRun() public void profileRun()
{ {
Stopwatch watch = new Stopwatch(); Stopwatch watch = Stopwatch.createUnstarted();
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester();

View File

@ -22,7 +22,7 @@ package io.druid.cli.convert;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Closeables; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.airlift.command.Option; import io.airlift.command.Option;
@ -196,7 +196,7 @@ public class ConvertProperties implements Runnable
} }
finally { finally {
if (out != null) { if (out != null) {
Closeables.closeQuietly(out); CloseQuietly.close(out);
} }
} }