Enforce Indentation with Checkstyle (#4799)

This commit is contained in:
Roman Leventov 2017-09-21 15:06:48 -05:00 committed by Charles Allen
parent d8b3bfa63c
commit e267f3901b
142 changed files with 2471 additions and 2359 deletions

View File

@ -22,6 +22,7 @@ package io.druid.data.input;
import io.druid.guice.annotations.ExtensionPoint;
import java.io.Closeable;
/**
* This is an interface that holds onto the stream of incoming data. Realtime data ingestion is built around this
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
@ -40,52 +41,50 @@ import java.io.Closeable;
* This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator.
* <p>
*
* The implementation of this interface only needs to be minimally thread-safe. The methods ##start(), ##advance(),
* ##currRow() and ##makeCommitter() are all called from the same thread. ##makeCommitter(), however, returns a callback
* which will be called on another thread, so the operations inside of that callback must be thread-safe.
* </p>
*/
@ExtensionPoint
public interface FirehoseV2 extends Closeable
{
/**
* For initial start
* */
void start() throws Exception;
/**
* For initial start
*/
void start() throws Exception;
/**
* Advance the firehose to the next offset. Implementations of this interface should make sure that
* if advance() is called and throws out an exception, the next call to currRow() should return a
* null value.
*
* @return true if and when there is another row available, false if the stream has dried up
*/
public boolean advance();
/**
* Advance the firehose to the next offset. Implementations of this interface should make sure that
* if advance() is called and throws out an exception, the next call to currRow() should return a
* null value.
*
* @return true if and when there is another row available, false if the stream has dried up
*/
public boolean advance();
/**
* @return The current row
*/
public InputRow currRow();
/**
* @return The current row
*/
public InputRow currRow();
/**
* Returns a Committer that will "commit" everything read up to the point at which makeCommitter() is called.
*
* This method is called when the main processing loop starts to persist its current batch of things to process.
* The returned committer will be run when the current batch has been successfully persisted
* and the metadata the committer carries can also be persisted along with segment data. There is usually
* some time lag between when this method is called and when the runnable is run. The Runnable is also run on
* a separate thread so its operation should be thread-safe.
*
* Note that "correct" usage of this interface will always call advance() before commit() if the current row
* is considered in the commit.
*
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
* <p>
* A simple implementation of this interface might do nothing when run() is called,
* and save proper commit information in metadata
* </p>
*/
public Committer makeCommitter();
/**
* Returns a Committer that will "commit" everything read up to the point at which makeCommitter() is called.
*
* This method is called when the main processing loop starts to persist its current batch of things to process.
* The returned committer will be run when the current batch has been successfully persisted
* and the metadata the committer carries can also be persisted along with segment data. There is usually
* some time lag between when this method is called and when the runnable is run. The Runnable is also run on
* a separate thread so its operation should be thread-safe.
*
* Note that "correct" usage of this interface will always call advance() before commit() if the current row
* is considered in the commit.
*
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
*
* A simple implementation of this interface might do nothing when run() is called,
* and save proper commit information in metadata
*/
public Committer makeCommitter();
}

View File

@ -32,7 +32,7 @@ import java.util.List;
*/
public class CommaListJoinDeserializer extends StdScalarDeserializer<List<String>>
{
protected CommaListJoinDeserializer()
protected CommaListJoinDeserializer()
{
super(List.class);
}

View File

@ -25,10 +25,7 @@ public class SingleElementPartitionChunk<T> implements PartitionChunk<T>
{
private final T element;
public SingleElementPartitionChunk
(
T element
)
public SingleElementPartitionChunk(T element)
{
this.element = element;
}

View File

@ -268,18 +268,14 @@ public class ConditionalMultibindTest
@Override
public void configure(Binder binder)
{
ConditionalMultibind.create(props, binder,
new TypeLiteral<Set<Animal>>()
{
}
).addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set1
).addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set2);
ConditionalMultibind
.create(props, binder, new TypeLiteral<Set<Animal>>() {})
.addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set1)
.addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set2);
ConditionalMultibind.create(props, binder,
new TypeLiteral<Zoo<Animal>>()
{
}
).addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), zoo1);
ConditionalMultibind
.create(props, binder, new TypeLiteral<Zoo<Animal>>() {})
.addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), zoo1);
}
},
new Module()
@ -287,46 +283,31 @@ public class ConditionalMultibindTest
@Override
public void configure(Binder binder)
{
ConditionalMultibind.create(props, binder,
new TypeLiteral<Set<Animal>>()
{
}
).addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set3);
ConditionalMultibind
.create(props, binder, new TypeLiteral<Set<Animal>>() {})
.addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), set3);
ConditionalMultibind.create(props, binder,
new TypeLiteral<Set<Animal>>()
{
},
SanDiego.class
).addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), union);
ConditionalMultibind
.create(props, binder, new TypeLiteral<Set<Animal>>() {}, SanDiego.class)
.addConditionBinding(ANIMAL_TYPE, Predicates.equalTo("pets"), union);
ConditionalMultibind.create(props, binder,
new TypeLiteral<Zoo<Animal>>()
{
}
).addBinding(new TypeLiteral<Zoo<Animal>>()
{
});
ConditionalMultibind
.create(props, binder, new TypeLiteral<Zoo<Animal>>() {})
.addBinding(new TypeLiteral<Zoo<Animal>>() {});
}
}
);
Set<Set<Animal>> actualAnimalSet = injector.getInstance(Key.get(new TypeLiteral<Set<Set<Animal>>>()
{
}));
Set<Set<Animal>> actualAnimalSet = injector.getInstance(Key.get(new TypeLiteral<Set<Set<Animal>>>() {}));
Assert.assertEquals(3, actualAnimalSet.size());
Assert.assertEquals(ImmutableSet.of(set1, set2, set3), actualAnimalSet);
actualAnimalSet = injector.getInstance(Key.get(new TypeLiteral<Set<Set<Animal>>>()
{
}, SanDiego.class));
actualAnimalSet = injector.getInstance(Key.get(new TypeLiteral<Set<Set<Animal>>>() {}, SanDiego.class));
Assert.assertEquals(1, actualAnimalSet.size());
Assert.assertEquals(ImmutableSet.of(union), actualAnimalSet);
final Set<Zoo<Animal>> actualZooSet = injector.getInstance(Key.get(new TypeLiteral<Set<Zoo<Animal>>>()
{
}));
final Set<Zoo<Animal>> actualZooSet = injector.getInstance(Key.get(new TypeLiteral<Set<Zoo<Animal>>>() {}));
Assert.assertEquals(2, actualZooSet.size());
Assert.assertEquals(ImmutableSet.of(zoo1, zoo2), actualZooSet);
}

View File

@ -179,7 +179,7 @@ public class IndexPersistBenchmark
}
finally {
FileUtils.deleteDirectory(tmpDir);
FileUtils.deleteDirectory(tmpDir);
}
}
}

View File

@ -48,6 +48,9 @@
<suppress checks="LeftCurly" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="RightCurly" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="Indentation" files="[\\/]target[\\/]generated-test-sources[\\/]" />
<suppress checks="Indentation" files="ProtoTestEventWrapper.java" />
<!-- extendedset is a fork of Alessandro Colantonio's CONCISE (COmpressed 'N' Composable Integer SEt) repository and licensed to Metamarkets under a CLA is not true. -->
<suppress checks="Header" files="[\\/]extendedset[\\/]" />
</suppressions>

View File

@ -90,6 +90,11 @@
<property name="allowLineBreaks" value="false"/>
</module>
<module name="Indentation">
<property name="basicOffset" value="2"/>
<property name="caseIndent" value="2"/>
</module>
<module name="Regexp">
<property name="format" value="com\.google\.common\.io\.Closer"/>
<property name="illegalPattern" value="true"/>

View File

@ -19,12 +19,12 @@
package io.druid.collections;
import java.util.Comparator;
import java.util.Iterator;
import io.druid.java.util.common.guava.MergeIterable;
import io.druid.java.util.common.guava.nary.BinaryFn;
import java.util.Comparator;
import java.util.Iterator;
/**
*/
public class CombiningIterable<InType> implements Iterable<InType>
@ -66,9 +66,9 @@ public class CombiningIterable<InType> implements Iterable<InType>
}
public static <InType> CombiningIterable<InType> create(
Iterable<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
Iterable<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
)
{
return new CombiningIterable<InType>(it, comparator, fn);

View File

@ -19,8 +19,6 @@
package io.druid.timeline.partition;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
@ -121,14 +119,8 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
public PartitionChunk<T> getChunk(final int partitionNum)
{
final Iterator<PartitionChunk<T>> retVal = Iterators.filter(
holderSet.iterator(), new Predicate<PartitionChunk<T>>()
{
@Override
public boolean apply(PartitionChunk<T> input)
{
return input.getChunkNumber() == partitionNum;
}
}
holderSet.iterator(),
input -> input.getChunkNumber() == partitionNum
);
return retVal.hasNext() ? retVal.next() : null;
@ -142,17 +134,7 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
public Iterable<T> payloads()
{
return Iterables.transform(
this,
new Function<PartitionChunk<T>, T>()
{
@Override
public T apply(PartitionChunk<T> input)
{
return input.getObject();
}
}
);
return Iterables.transform(this, PartitionChunk::getObject);
}
@Override

View File

@ -56,15 +56,15 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
private final AmbariMetricsEmitterConfig config;
private final String collectorURI;
private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = 60000; // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("AmbariMetricsEmitter-%s")
.build()); // Thread pool of two in order to schedule flush runnable
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(
2, // Thread pool of two in order to schedule flush runnable
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AmbariMetricsEmitter-%s").build()
);
private final AtomicLong countLostEvents = new AtomicLong(0);
public AmbariMetricsEmitter(
AmbariMetricsEmitterConfig config,
List<Emitter> emitterList
AmbariMetricsEmitterConfig config,
List<Emitter> emitterList
)
{
this.config = config;
@ -72,11 +72,11 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
this.timelineMetricConverter = config.getDruidToTimelineEventConverter();
this.eventsQueue = new LinkedBlockingQueue<>(config.getMaxQueueSize());
this.collectorURI = StringUtils.format(
"%s://%s:%s%s",
config.getProtocol(),
config.getHostname(),
config.getPort(),
WS_V1_TIMELINE_METRICS
"%s://%s:%s%s",
config.getProtocol(),
config.getHostname(),
config.getPort(),
WS_V1_TIMELINE_METRICS
);
}
@ -90,10 +90,10 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
loadTruststore(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());
}
exec.scheduleAtFixedRate(
new ConsumerRunnable(),
config.getFlushPeriod(),
config.getFlushPeriod(),
TimeUnit.MILLISECONDS
new ConsumerRunnable(),
config.getFlushPeriod(),
config.getFlushPeriod(),
TimeUnit.MILLISECONDS
);
started.set(true);
}
@ -114,15 +114,15 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
}
try {
final boolean isSuccessful = eventsQueue.offer(
timelineEvent,
config.getEmitWaitTime(),
TimeUnit.MILLISECONDS
timelineEvent,
config.getEmitWaitTime(),
TimeUnit.MILLISECONDS
);
if (!isSuccessful) {
if (countLostEvents.getAndIncrement() % 1000 == 0) {
log.error(
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
countLostEvents.get()
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
countLostEvents.get()
);
}
}
@ -163,16 +163,16 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
try {
final TimelineMetric metricEvent = eventsQueue.poll(
config.getWaitForEventTime(),
TimeUnit.MILLISECONDS
config.getWaitForEventTime(),
TimeUnit.MILLISECONDS
);
if (metricEvent != null) {
metrics.addOrMergeTimelineMetric(metricEvent);
if (metrics.getMetrics().size() == batchSize) {
emitMetrics(metrics);
log.debug(
"sent [%d] events",
metrics.getMetrics().size()
"sent [%d] events",
metrics.getMetrics().size()
);
metrics = new TimelineMetrics();
}
@ -186,8 +186,8 @@ public class AmbariMetricsEmitter extends AbstractTimelineMetricsSink implements
if (metrics.getMetrics().size() > 0) {
emitMetrics(metrics);
log.debug(
"sent [%d] events",
metrics.getMetrics().size()
"sent [%d] events",
metrics.getMetrics().size()
);
}
}

View File

@ -35,7 +35,7 @@ public class GoogleStorage
}
public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
{
{
Storage.Objects.Insert insertObject = storage.objects().insert(bucket, null, mediaContent);
insertObject.setName(path);
insertObject.getMediaHttpUploader().setDirectUploadEnabled(false);

View File

@ -104,7 +104,7 @@ public class GoogleStorageDruidModule implements DruidModule
@Provides
@LazySingleton
public GoogleStorage getGoogleStorage(final GoogleAccountConfig config)
throws IOException, GeneralSecurityException
throws IOException, GeneralSecurityException
{
LOG.info("Building Cloud Storage Client...");

View File

@ -121,12 +121,14 @@ public class OrcIndexGeneratorJobTest
private final Interval interval = Intervals.of("2014-10-22T00:00:00Z/P1D");
private File dataRoot;
private File outputRoot;
private Integer[][][] shardInfoForEachSegment = new Integer[][][]{{
{0, 4},
{1, 4},
{2, 4},
{3, 4}
}};
private Integer[][][] shardInfoForEachSegment = new Integer[][][]{
{
{0, 4},
{1, 4},
{2, 4},
{3, 4}
}
};
private final InputRowParser inputRowParser = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),

View File

@ -37,154 +37,154 @@ import java.util.concurrent.atomic.AtomicLong;
public class RedisCache implements Cache
{
private static final Logger log = new Logger(RedisCache.class);
private static final Logger log = new Logger(RedisCache.class);
private JedisPool pool;
private RedisCacheConfig config;
private JedisPool pool;
private RedisCacheConfig config;
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong priorRequestCount = new AtomicLong(0);
// both getput and getBulk will increase request count by 1
private final AtomicLong totalRequestCount = new AtomicLong(0);
private final AtomicLong priorRequestCount = new AtomicLong(0);
// both getput and getBulk will increase request count by 1
private final AtomicLong totalRequestCount = new AtomicLong(0);
private RedisCache(JedisPool pool, RedisCacheConfig config)
{
this.pool = pool;
this.config = config;
private RedisCache(JedisPool pool, RedisCacheConfig config)
{
this.pool = pool;
this.config = config;
}
public static RedisCache create(final RedisCacheConfig config)
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getMaxTotalConnections());
poolConfig.setMaxIdle(config.getMaxIdleConnections());
poolConfig.setMinIdle(config.getMinIdleConnections());
JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
return new RedisCache(pool, config);
}
@Override
public byte[] get(NamedKey key)
{
totalRequestCount.incrementAndGet();
try (Jedis jedis = pool.getResource()) {
byte[] bytes = jedis.get(key.toByteArray());
if (bytes == null) {
missCount.incrementAndGet();
return null;
} else {
hitCount.incrementAndGet();
return bytes;
}
}
public static RedisCache create(final RedisCacheConfig config)
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(config.getMaxTotalConnections());
poolConfig.setMaxIdle(config.getMaxIdleConnections());
poolConfig.setMinIdle(config.getMinIdleConnections());
JedisPool pool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout());
return new RedisCache(pool, config);
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling item from cache");
return null;
}
}
@Override
public byte[] get(NamedKey key)
{
totalRequestCount.incrementAndGet();
@Override
public void put(NamedKey key, byte[] value)
{
totalRequestCount.incrementAndGet();
try (Jedis jedis = pool.getResource()) {
byte[] bytes = jedis.get(key.toByteArray());
if (bytes == null) {
missCount.incrementAndGet();
return null;
} else {
hitCount.incrementAndGet();
return bytes;
}
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling item from cache");
return null;
try (Jedis jedis = pool.getResource()) {
jedis.psetex(key.toByteArray(), config.getExpiration(), value);
}
catch (JedisException e) {
errorCount.incrementAndGet();
log.warn(e, "Exception pushing item to cache");
}
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();
Map<NamedKey, byte[]> results = new HashMap<>();
try (Jedis jedis = pool.getResource()) {
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}
hitCount.addAndGet(results.size());
missCount.addAndGet(namedKeys.size() - results.size());
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
}
@Override
public void put(NamedKey key, byte[] value)
{
totalRequestCount.incrementAndGet();
return results;
}
try (Jedis jedis = pool.getResource()) {
jedis.psetex(key.toByteArray(), config.getExpiration(), value);
}
catch (JedisException e) {
errorCount.incrementAndGet();
log.warn(e, "Exception pushing item to cache");
}
@Override
public void close(String namespace)
{
// no resources to cleanup
}
@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
0,
0,
0,
timeoutCount.get(),
errorCount.get()
);
}
@Override
public boolean isLocal()
{
return false;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
final long priorCount = priorRequestCount.get();
final long totalCount = totalRequestCount.get();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
emitter.emit(builder.build("query/cache/redis/total/requests", totalCount));
emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount));
if (!priorRequestCount.compareAndSet(priorCount, totalCount)) {
log.error("Prior value changed while I was reporting! updating anyways");
priorRequestCount.set(totalCount);
}
}
@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();
Map<NamedKey, byte[]> results = new HashMap<>();
try (Jedis jedis = pool.getResource()) {
List<NamedKey> namedKeys = Lists.newArrayList(keys);
List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
for (int i = 0; i < byteValues.size(); ++i) {
if (byteValues.get(i) != null) {
results.put(namedKeys.get(i), byteValues.get(i));
}
}
hitCount.addAndGet(results.size());
missCount.addAndGet(namedKeys.size() - results.size());
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
timeoutCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
}
return results;
}
@Override
public void close(String namespace)
{
// no resources to cleanup
}
@Override
public CacheStats getStats()
{
return new CacheStats(
hitCount.get(),
missCount.get(),
0,
0,
0,
timeoutCount.get(),
errorCount.get()
);
}
@Override
public boolean isLocal()
{
return false;
}
@Override
public void doMonitor(ServiceEmitter emitter)
{
final long priorCount = priorRequestCount.get();
final long totalCount = totalRequestCount.get();
final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
emitter.emit(builder.build("query/cache/redis/total/requests", totalCount));
emitter.emit(builder.build("query/cache/redis/delta/requests", totalCount - priorCount));
if (!priorRequestCount.compareAndSet(priorCount, totalCount)) {
log.error("Prior value changed while I was reporting! updating anyways");
priorRequestCount.set(totalCount);
}
}
@VisibleForTesting
static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
{
return new RedisCache(pool, config);
}
@VisibleForTesting
static RedisCache create(final JedisPool pool, final RedisCacheConfig config)
{
return new RedisCache(pool, config);
}
}

View File

@ -23,64 +23,64 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RedisCacheConfig
{
@JsonProperty
private String host;
@JsonProperty
private String host;
@JsonProperty
private int port;
@JsonProperty
private int port;
// milliseconds, default to one day
@JsonProperty
private long expiration = 24 * 3600 * 1000;
// milliseconds, default to one day
@JsonProperty
private long expiration = 24 * 3600 * 1000;
// milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
@JsonProperty
private int timeout = 2000;
// milliseconds, the type is 'int' because current Jedis only accept 'int' for timeout
@JsonProperty
private int timeout = 2000;
// max connections of redis connection pool
@JsonProperty
private int maxTotalConnections = 8;
// max connections of redis connection pool
@JsonProperty
private int maxTotalConnections = 8;
// max idle connections of redis connection pool
@JsonProperty
private int maxIdleConnections = 8;
// max idle connections of redis connection pool
@JsonProperty
private int maxIdleConnections = 8;
// min idle connections of redis connection pool
@JsonProperty
private int minIdleConnections = 0;
// min idle connections of redis connection pool
@JsonProperty
private int minIdleConnections = 0;
public String getHost()
{
return host;
}
public String getHost()
{
return host;
}
public int getPort()
{
return port;
}
public int getPort()
{
return port;
}
public long getExpiration()
{
return expiration;
}
public long getExpiration()
{
return expiration;
}
public int getTimeout()
{
return timeout;
}
public int getTimeout()
{
return timeout;
}
public int getMaxTotalConnections()
{
return maxTotalConnections;
}
public int getMaxTotalConnections()
{
return maxTotalConnections;
}
public int getMaxIdleConnections()
{
return maxIdleConnections;
}
public int getMaxIdleConnections()
{
return maxIdleConnections;
}
public int getMinIdleConnections()
{
return minIdleConnections;
}
public int getMinIdleConnections()
{
return minIdleConnections;
}
}

View File

@ -24,9 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("redis")
public class RedisCacheProvider extends RedisCacheConfig implements CacheProvider
{
@Override
public Cache get()
{
return RedisCache.create(this);
}
@Override
public Cache get()
{
return RedisCache.create(this);
}
}

View File

@ -29,18 +29,15 @@ import java.util.List;
public class RedisDruidModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
@Override
public void configure(Binder binder)
{
// do nothing
}
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("DruidRedisCache")
.registerSubtypes(RedisCacheProvider.class)
);
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(new SimpleModule("DruidRedisCache").registerSubtypes(RedisCacheProvider.class));
}
}

View File

@ -43,169 +43,170 @@ import java.util.UUID;
public class RedisCacheTest
{
private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii");
private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo");
private static final byte[] HI = StringUtils.toUtf8("hiiiiiiiiiiiiiiiiiii");
private static final byte[] HO = StringUtils.toUtf8("hooooooooooooooooooo");
private RedisCache cache;
private final RedisCacheConfig cacheConfig = new RedisCacheConfig()
private RedisCache cache;
private final RedisCacheConfig cacheConfig = new RedisCacheConfig()
{
@Override
public int getTimeout()
{
@Override
public int getTimeout()
{
return 10;
}
return 10;
}
@Override
public long getExpiration()
{
return 3600000;
}
};
@Before
public void setUp() throws Exception
@Override
public long getExpiration()
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(cacheConfig.getMaxTotalConnections());
poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections());
poolConfig.setMinIdle(cacheConfig.getMinIdleConnections());
return 3600000;
}
};
MockJedisPool pool = new MockJedisPool(poolConfig, "localhost");
// orginal MockJedis do not support 'milliseconds' in long type,
// for test we override to support it
pool.setClient(new MockJedis("localhost") {
@Override
public String psetex(byte[] key, long milliseconds, byte[] value)
{
return this.psetex(key, (int) milliseconds, value);
@Before
public void setUp() throws Exception
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(cacheConfig.getMaxTotalConnections());
poolConfig.setMaxIdle(cacheConfig.getMaxIdleConnections());
poolConfig.setMinIdle(cacheConfig.getMinIdleConnections());
MockJedisPool pool = new MockJedisPool(poolConfig, "localhost");
// orginal MockJedis do not support 'milliseconds' in long type,
// for test we override to support it
pool.setClient(new MockJedis("localhost")
{
@Override
public String psetex(byte[] key, long milliseconds, byte[] value)
{
return this.psetex(key, (int) milliseconds, value);
}
});
cache = RedisCache.create(pool, cacheConfig);
}
@Test
public void testBasicInjection() throws Exception
{
final RedisCacheConfig config = new RedisCacheConfig();
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(RedisCacheConfig.class).toInstance(config);
binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class);
}
});
cache = RedisCache.create(pool, cacheConfig);
)
);
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
try {
Cache cache = injector.getInstance(Cache.class);
Assert.assertEquals(RedisCache.class, cache.getClass());
}
@Test
public void testBasicInjection() throws Exception
{
final RedisCacheConfig config = new RedisCacheConfig();
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(RedisCacheConfig.class).toInstance(config);
binder.bind(Cache.class).toProvider(RedisCacheProviderWithConfig.class).in(ManageLifecycle.class);
}
)
);
Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
lifecycle.start();
try {
Cache cache = injector.getInstance(Cache.class);
Assert.assertEquals(RedisCache.class, cache.getClass());
}
finally {
lifecycle.stop();
}
finally {
lifecycle.stop();
}
}
@Test
public void testSimpleInjection()
{
final String uuid = UUID.randomUUID().toString();
System.setProperty(uuid + ".type", "redis");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
@Test
public void testSimpleInjection()
{
final String uuid = UUID.randomUUID().toString();
System.setProperty(uuid + ".type", "redis");
final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.of(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/redis");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(Cache.class).toProvider(CacheProvider.class);
JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
}
)
);
final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class);
Assert.assertNotNull(cacheProvider);
Assert.assertEquals(RedisCacheProvider.class, cacheProvider.getClass());
}
binder.bind(Cache.class).toProvider(CacheProvider.class);
JsonConfigProvider.bind(binder, uuid, CacheProvider.class);
}
)
);
final CacheProvider cacheProvider = injector.getInstance(CacheProvider.class);
Assert.assertNotNull(cacheProvider);
Assert.assertEquals(RedisCacheProvider.class, cacheProvider.getClass());
}
@Test
public void testSanity() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
put(cache, "a", HI, 0);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
@Test
public void testSanity() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
put(cache, "a", HI, 0);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 1);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertEquals(1, get(cache, "the", HI));
put(cache, "the", HI, 1);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertEquals(1, get(cache, "the", HI));
put(cache, "the", HO, 10);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(1, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO));
put(cache, "the", HO, 10);
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(1, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO));
cache.close("the");
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
}
cache.close("the");
Assert.assertEquals(0, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
}
@Test
public void testGetBulk() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
@Test
public void testGetBulk() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));
put(cache, "the", HI, 1);
put(cache, "the", HO, 10);
put(cache, "the", HI, 1);
put(cache, "the", HO, 10);
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2,
key3
)
);
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2,
key3
)
);
Assert.assertEquals(1, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
Assert.assertEquals(null, result.get(key3));
}
Assert.assertEquals(1, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(10, Ints.fromByteArray(result.get(key2)));
Assert.assertEquals(null, result.get(key3));
}
public void put(Cache cache, String namespace, byte[] key, Integer value)
{
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
}
public void put(Cache cache, String namespace, byte[] key, Integer value)
{
cache.put(new Cache.NamedKey(namespace, key), Ints.toByteArray(value));
}
public int get(Cache cache, String namespace, byte[] key)
{
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
}
public int get(Cache cache, String namespace, byte[] key)
{
return Ints.fromByteArray(cache.get(new Cache.NamedKey(namespace, key)));
}
}
class RedisCacheProviderWithConfig extends RedisCacheProvider
{
private final RedisCacheConfig config;
private final RedisCacheConfig config;
@Inject
public RedisCacheProviderWithConfig(RedisCacheConfig config)
{
this.config = config;
}
@Inject
public RedisCacheProviderWithConfig(RedisCacheConfig config)
{
this.config = config;
}
@Override
public Cache get()
{
return RedisCache.create(config);
}
@Override
public Cache get()
{
return RedisCache.create(config);
}
}

View File

@ -42,34 +42,33 @@ public class StatsDEmitter implements Emitter
private final static String DRUID_METRIC_SEPARATOR = "\\/";
private final static String STATSD_SEPARATOR = ":|\\|";
static final StatsDEmitter of(StatsDEmitterConfig config, ObjectMapper mapper)
{
NonBlockingStatsDClient client = new NonBlockingStatsDClient(
config.getPrefix(),
config.getHostname(),
config.getPort(),
new StatsDClientErrorHandler()
{
private int exceptionCount = 0;
@Override
public void handle(Exception exception)
{
if (exceptionCount % 1000 == 0) {
log.error(exception, "Error sending metric to StatsD.");
}
exceptionCount += 1;
}
}
);
return new StatsDEmitter(config, mapper, client);
}
private final StatsDClient statsd;
private final StatsDEmitterConfig config;
private final DimensionConverter converter;
public StatsDEmitter(StatsDEmitterConfig config, ObjectMapper mapper)
{
this(config, mapper,
new NonBlockingStatsDClient(
config.getPrefix(),
config.getHostname(),
config.getPort(),
new StatsDClientErrorHandler()
{
private int exceptionCount = 0;
@Override
public void handle(Exception exception)
{
if (exceptionCount % 1000 == 0) {
log.error(exception, "Error sending metric to StatsD.");
}
exceptionCount += 1;
}
}
)
);
}
public StatsDEmitter(StatsDEmitterConfig config, ObjectMapper mapper, StatsDClient client)
{
this.config = config;

View File

@ -55,6 +55,6 @@ public class StatsDEmitterModule implements DruidModule
@Named(EMITTER_TYPE)
public Emitter getEmitter(StatsDEmitterConfig config, ObjectMapper mapper)
{
return new StatsDEmitter(config, mapper);
return StatsDEmitter.of(config, mapper);
}
}

View File

@ -62,7 +62,9 @@ public class KerberosHttpClient extends AbstractHttpClient
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration
Request request,
HttpResponseHandler<Intermediate, Final> httpResponseHandler,
Duration duration
)
{
final SettableFuture<Final> retVal = SettableFuture.create();
@ -72,10 +74,10 @@ public class KerberosHttpClient extends AbstractHttpClient
private <Intermediate, Final> void inner_go(
final Request request,
final HttpResponseHandler<Intermediate, Final> httpResponseHandler,
final Duration duration,
final SettableFuture<Final> future
final Request request,
final HttpResponseHandler<Intermediate, Final> httpResponseHandler,
final Duration duration,
final SettableFuture<Final> future
)
{
try {
@ -92,9 +94,9 @@ public class KerberosHttpClient extends AbstractHttpClient
if (DruidKerberosUtil.needToSendCredentials(cookieManager.getCookieStore(), uri)) {
// No Cookies for requested URI, authenticate user and add authentication header
log.debug(
"No Auth Cookie found for URI[%s]. Existing Cookies[%s] Authenticating... ",
uri,
cookieManager.getCookieStore().getCookies()
"No Auth Cookie found for URI[%s]. Existing Cookies[%s] Authenticating... ",
uri,
cookieManager.getCookieStore().getCookies()
);
DruidKerberosUtil.authenticateIfRequired(internalClientPrincipal, internalClientKeytab);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
@ -114,13 +116,11 @@ public class KerberosHttpClient extends AbstractHttpClient
}
ListenableFuture<RetryResponseHolder<Final>> internalFuture = delegate.go(
request,
new RetryIfUnauthorizedResponseHandler<Intermediate, Final>(new ResponseCookieHandler(
request.getUrl().toURI(),
cookieManager,
httpResponseHandler
)),
duration
request,
new RetryIfUnauthorizedResponseHandler<Intermediate, Final>(
new ResponseCookieHandler(request.getUrl().toURI(), cookieManager, httpResponseHandler)
),
duration
);
Futures.addCallback(internalFuture, new FutureCallback<RetryResponseHolder<Final>>()

View File

@ -71,9 +71,7 @@ public class ResponseCookieHandler<Intermediate, Final> implements HttpResponseH
}
@Override
public ClientResponse<Intermediate> handleChunk(
ClientResponse<Intermediate> clientResponse, HttpChunk httpChunk
)
public ClientResponse<Intermediate> handleChunk(ClientResponse<Intermediate> clientResponse, HttpChunk httpChunk)
{
return delegate.handleChunk(clientResponse, httpChunk);
}

View File

@ -27,15 +27,13 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
public class RetryIfUnauthorizedResponseHandler<Intermediate, Final>
implements HttpResponseHandler<RetryResponseHolder<Intermediate>, RetryResponseHolder<Final>>
implements HttpResponseHandler<RetryResponseHolder<Intermediate>, RetryResponseHolder<Final>>
{
private static final Logger log = new Logger(RetryIfUnauthorizedResponseHandler.class);
private final HttpResponseHandler<Intermediate, Final> httpResponseHandler;
public RetryIfUnauthorizedResponseHandler(
HttpResponseHandler<Intermediate, Final> httpResponseHandler
)
public RetryIfUnauthorizedResponseHandler(HttpResponseHandler<Intermediate, Final> httpResponseHandler)
{
this.httpResponseHandler = httpResponseHandler;
}
@ -55,7 +53,8 @@ public class RetryIfUnauthorizedResponseHandler<Intermediate, Final>
@Override
public ClientResponse<RetryResponseHolder<Intermediate>> handleChunk(
ClientResponse<RetryResponseHolder<Intermediate>> clientResponse, HttpChunk httpChunk
ClientResponse<RetryResponseHolder<Intermediate>> clientResponse,
HttpChunk httpChunk
)
{
if (clientResponse.getObj().shouldRetry()) {

View File

@ -43,24 +43,24 @@ public class SpnegoFilterConfigTest
public void testserde()
{
Injector injector = Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
new Module()
{
binder.install(new PropertiesModule(Arrays.asList("test.runtime.properties")));
binder.install(new ConfigModule());
binder.install(new DruidGuiceExtensions());
JsonConfigProvider.bind(binder, "druid.hadoop.security.spnego", SpnegoFilterConfig.class);
}
@Override
public void configure(Binder binder)
{
binder.install(new PropertiesModule(Arrays.asList("test.runtime.properties")));
binder.install(new ConfigModule());
binder.install(new DruidGuiceExtensions());
JsonConfigProvider.bind(binder, "druid.hadoop.security.spnego", SpnegoFilterConfig.class);
}
@Provides
@LazySingleton
public ObjectMapper jsonMapper()
{
return new DefaultObjectMapper();
@Provides
@LazySingleton
public ObjectMapper jsonMapper()
{
return new DefaultObjectMapper();
}
}
}
);
Properties props = injector.getInstance(Properties.class);

View File

@ -382,23 +382,23 @@ public class HdfsDataSegmentPusherTest
try {
schema = objectMapper.readValue(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}

View File

@ -1183,11 +1183,7 @@ public class ApproximateHistogram
public boolean canStoreCompact()
{
final long exactCount = getExactCount();
return (
size <= Short.MAX_VALUE
&& exactCount <= Byte.MAX_VALUE
&& (count - exactCount) <= Byte.MAX_VALUE
);
return (size <= Short.MAX_VALUE && exactCount <= Byte.MAX_VALUE && (count - exactCount) <= Byte.MAX_VALUE);
}
/**

View File

@ -19,7 +19,6 @@
package io.druid.query.aggregation.histogram;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -75,14 +74,7 @@ public class ApproximateHistogramTopNQueryTest
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
"TopNQueryRunnerFactory-bufferPool",
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
() -> ByteBuffer.allocate(2000)
),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),

View File

@ -343,8 +343,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
return !(getKafkaTopic().equals(that.getKafkaTopic())
&& getKafkaProperties().equals(that.getKafkaProperties())
&& getConnectTimeout() == that.getConnectTimeout()
&& isInjective() == that.isInjective()
);
&& isInjective() == that.isInjective());
}
@Nullable

View File

@ -1496,22 +1496,25 @@ public class KafkaIndexTaskTest
private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate()
{
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
};
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>of(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
}
),
new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@ -1613,6 +1616,14 @@ public class KafkaIndexTaskTest
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig, objectMapper);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
};
toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
@ -1628,17 +1639,7 @@ public class KafkaIndexTaskTest
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, testUtils.getTestObjectMapper()
)
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -406,7 +406,9 @@ public class NamespaceLookupExtractorFactoryTest
final NamespaceLookupExtractorFactory f1 = new NamespaceLookupExtractorFactory(
en1,
scheduler
), f2 = new NamespaceLookupExtractorFactory(en2, scheduler), f1b = new NamespaceLookupExtractorFactory(
);
final NamespaceLookupExtractorFactory f2 = new NamespaceLookupExtractorFactory(en2, scheduler);
final NamespaceLookupExtractorFactory f1b = new NamespaceLookupExtractorFactory(
en1,
scheduler
);

View File

@ -269,17 +269,12 @@ public class JdbcExtractionNamespaceTest
}
);
try (final Closeable closeable =
new Closeable()
{
@Override
public void close() throws IOException
{
if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) {
throw new IOException("Unable to stop future");
}
}
}) {
Closeable closeable = () -> {
if (!setupFuture.isDone() && !setupFuture.cancel(true) && !setupFuture.isDone()) {
throw new IOException("Unable to stop future");
}
};
try (final Closeable c = closeable) {
handleRef = setupFuture.get(10, TimeUnit.SECONDS);
}
Assert.assertNotNull(handleRef);

View File

@ -55,6 +55,6 @@ public class LookupExtractionModule implements DruidModule
public static byte[] getRandomCacheKey()
{
return StringUtils.toUtf8(UUID.randomUUID().toString());
return StringUtils.toUtf8(UUID.randomUUID().toString());
}
}

View File

@ -124,8 +124,7 @@ public class OffHeapLoadingCache<K, V> implements LoadingCache<K, V>
public Map<K, V> getAllPresent(final Iterable<K> keys)
{
ImmutableMap.Builder builder = ImmutableMap.builder();
for (K key : keys
) {
for (K key : keys) {
V value = getIfPresent(key);
if (value != null) {
builder.put(key, value);

View File

@ -105,22 +105,12 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
@Override
public Iterable<Map.Entry<String, String>> fetchAll()
{
return inReadOnlyTransaction(new TransactionCallback<List<Map.Entry<String, String>>>()
{
@Override
public List<Map.Entry<String, String>> inTransaction(
Handle handle,
TransactionStatus status
) throws Exception
{
return handle.createQuery(fetchAllQuery)
.setFetchSize(streamingFetchSize)
.map(new KeyValueResultSetMapper(keyColumn, valueColumn))
.list();
}
}
);
return inReadOnlyTransaction((handle, status) -> {
return handle.createQuery(fetchAllQuery)
.setFetchSize(streamingFetchSize)
.map(new KeyValueResultSetMapper(keyColumn, valueColumn))
.list();
});
}
@Override

View File

@ -46,14 +46,8 @@ public class LoadingCacheTest
public static Collection<Object[]> inputData()
{
return Arrays.asList(new Object[][]{
{new OnHeapLoadingCache<>(4, 1000, null, null, null)}, {
new OffHeapLoadingCache(
0,
0L,
0L,
0L
)
}
{new OnHeapLoadingCache<>(4, 1000, null, null, null)},
{new OffHeapLoadingCache(0, 0L, 0L, 0L)}
});
}

View File

@ -112,8 +112,7 @@ public class JdbcDataFetcherTest
public void testFetchKeys()
{
ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
for (Map.Entry<String, String> entry: jdbcDataFetcher.fetch(lookupMap.keySet())
) {
for (Map.Entry<String, String> entry: jdbcDataFetcher.fetch(lookupMap.keySet())) {
mapBuilder.put(entry.getKey(), entry.getValue());
}

View File

@ -92,10 +92,8 @@ public class PostgreSQLConnector extends SQLMetadataConnector
{
if (canUpsert == null) {
DatabaseMetaData metaData = handle.getConnection().getMetaData();
canUpsert = metaData.getDatabaseMajorVersion() > 9 || (
metaData.getDatabaseMajorVersion() == 9 &&
metaData.getDatabaseMinorVersion() >= 5
);
canUpsert = metaData.getDatabaseMajorVersion() > 9 ||
(metaData.getDatabaseMajorVersion() == 9 && metaData.getDatabaseMinorVersion() >= 5);
}
return canUpsert;
}

View File

@ -354,8 +354,7 @@ public class S3DataSegmentFinderTest
};
ImmutableList<String> keys = ImmutableList.copyOf(
Ordering.natural().sortedCopy(Iterables.filter(keysOrigin, prefixFilter)
)
Ordering.natural().sortedCopy(Iterables.filter(keysOrigin, prefixFilter))
);
int startOffset = 0;

View File

@ -137,9 +137,8 @@ public class DetermineHashedPartitionsJob implements Jobby
throw new ISE("Path[%s] didn't exist!?", intervalInfoPath);
}
List<Interval> intervals = config.JSON_MAPPER.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), new TypeReference<List<Interval>>()
{
}
Utils.openInputStream(groupByJob, intervalInfoPath),
new TypeReference<List<Interval>>() {}
);
config.setGranularitySpec(
new UniformGranularitySpec(
@ -162,9 +161,8 @@ public class DetermineHashedPartitionsJob implements Jobby
}
if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) {
final Long numRows = config.JSON_MAPPER.readValue(
Utils.openInputStream(groupByJob, partitionInfoPath), new TypeReference<Long>()
{
}
Utils.openInputStream(groupByJob, partitionInfoPath),
new TypeReference<Long>() {}
);
log.info("Found approximately [%,d] rows in data.", numRows);
@ -272,11 +270,9 @@ public class DetermineHashedPartitionsJob implements Jobby
}
interval = maybeInterval.get();
}
hyperLogLogs.get(interval)
.add(
hashFunction.hashBytes(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey))
.asBytes()
);
hyperLogLogs
.get(interval)
.add(hashFunction.hashBytes(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)).asBytes());
}
@Override

View File

@ -54,9 +54,8 @@ import java.util.Map;
public class JobHelperTest
{
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private HadoopDruidIndexerConfig config;
private File tmpDir;

View File

@ -36,7 +36,8 @@ public class MetadataStorageUpdaterJobSpecTest
"jdbc:mysql://localhost/druid",
"druid",
"\"nothing\"",
"nothing");
"nothing"
);
}
@Test
@ -48,17 +49,18 @@ public class MetadataStorageUpdaterJobSpecTest
"jdbc:mysql://localhost/druid",
"druid",
"{\"type\":\"default\",\"password\":\"nothing\"}",
"nothing");
"nothing"
);
}
private void testMetadataStorageUpdaterJobSpec(
String segmentTable,
String type,
String connectURI,
String user,
String pwdString,
String pwd
) throws Exception
String segmentTable,
String type,
String connectURI,
String user,
String pwdString,
String pwd
) throws Exception
{
MetadataStorageUpdaterJobSpec spec = jsonMapper.readValue(
"{" +
@ -68,7 +70,8 @@ public class MetadataStorageUpdaterJobSpecTest
"\"password\": " + pwdString + ",\n" +
"\"segmentTable\": \"" + segmentTable + "\"\n" +
"}",
MetadataStorageUpdaterJobSpec.class);
MetadataStorageUpdaterJobSpec.class
);
Assert.assertEquals(segmentTable, spec.getSegmentTable());
Assert.assertEquals(type, spec.getType());

View File

@ -171,7 +171,7 @@ public class MetadataTaskStorage implements TaskStorage
@Override
public Optional<Task> getTask(final String taskId)
{
return handler.getEntry(taskId);
return handler.getEntry(taskId);
}
@Override

View File

@ -515,16 +515,7 @@ public class TaskLockbox
}
return ImmutableList.copyOf(
Iterables.filter(
searchSpace, new Predicate<TaskLockPosse>()
{
@Override
public boolean apply(TaskLockPosse taskLock)
{
return taskLock.getTaskIds().contains(task.getId());
}
}
)
Iterables.filter(searchSpace, taskLock -> taskLock.getTaskIds().contains(task.getId()))
);
}
finally {

View File

@ -77,7 +77,7 @@ public class TaskMaster
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager,
@IndexingService final DruidLeaderSelector overlordLeaderSelector
)
)
{
this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory;

View File

@ -94,10 +94,10 @@ public class ExecutorLifecycleConfig
public InputStream getParentStream()
{
if ("stdin".equals(parentStreamName)) {
return System.in;
} else {
throw new ISE("Unknown stream name[%s]", parentStreamName);
}
if ("stdin".equals(parentStreamName)) {
return System.in;
} else {
throw new ISE("Unknown stream name[%s]", parentStreamName);
}
}
}

View File

@ -29,12 +29,8 @@ import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import java.io.File;
@ -58,18 +54,13 @@ public class TestRealtimeTask extends RealtimeIndexTask
id,
taskResource,
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper), new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
}, null
), null
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null),
(schema, config, metrics) -> null,
null
),
null
),
null
);

View File

@ -599,7 +599,7 @@ public class IndexTaskTest
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
populateRollupTestData(tmpFile);
populateRollupTestData(tmpFile);
IndexTask indexTask = new IndexTask(
null,
@ -885,73 +885,95 @@ public class IndexTaskTest
indexTask.run(
new TaskToolbox(
null, new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Collections.singletonList(
new TaskLock(
"", "", null, DateTimes.nowUtc().toString()
)
);
}
null,
new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
return (RetType) Collections.singletonList(
new TaskLock(
"", "", null, DateTimes.nowUtc().toString()
)
);
}
if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(
"groupId",
"test",
((LockAcquireAction) taskAction).getInterval(),
DateTimes.nowUtc().toString()
);
}
if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(
"groupId",
"test",
((LockAcquireAction) taskAction).getInterval(),
DateTimes.nowUtc().toString()
);
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
}
if (taskAction instanceof SegmentAllocateAction) {
SegmentAllocateAction action = (SegmentAllocateAction) taskAction;
Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0);
return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec);
}
if (taskAction instanceof SegmentAllocateAction) {
SegmentAllocateAction action = (SegmentAllocateAction) taskAction;
Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0);
return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec);
}
return null;
}
}, null, new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
return null;
}
},
null,
new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, indexMergerV9, null, null, null, null
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
},
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
jsonMapper,
temporaryFolder.newFolder(),
indexIO,
null,
null,
indexMergerV9,
null,
null,
null,
null
)
);

View File

@ -958,22 +958,25 @@ public class RealtimeIndexTaskTest
taskStorage,
taskActionToolbox
);
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
};
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>of(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
}
),
new TimeseriesQueryQueryToolChest(queryRunnerDecorator),
new TimeseriesQueryEngine(),
new QueryWatcher()
{
@ -1023,6 +1026,14 @@ public class RealtimeIndexTaskTest
}
};
final TestUtils testUtils = new TestUtils();
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
};
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
@ -1038,17 +1049,7 @@ public class RealtimeIndexTaskTest
MoreExecutors.sameThreadExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, testUtils.getTestObjectMapper()
)
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -140,101 +140,121 @@ public class SameIntervalMergeTaskTest
mergeTask.run(
new TaskToolbox(
null, new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
Assert.assertNotNull("taskLock should be acquired before list", taskLock);
return (RetType) Arrays.asList(taskLock);
}
if (taskAction instanceof SegmentListUsedAction) {
List<DataSegment> segments = ImmutableList.of(
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT2H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build()
);
return (RetType) segments;
}
if (taskAction instanceof SegmentInsertAction) {
publishCountDown.countDown();
return null;
}
null,
new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof LockListAction) {
Assert.assertNotNull("taskLock should be acquired before list", taskLock);
return (RetType) Arrays.asList(taskLock);
}
if (taskAction instanceof SegmentListUsedAction) {
List<DataSegment> segments = ImmutableList.of(
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT1H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(mergeTask.getDataSource())
.interval(Intervals.of("2010-01-01/PT2H"))
.version("oldVersion")
.shardSpec(new LinearShardSpec(0))
.build()
);
return (RetType) segments;
}
if (taskAction instanceof SegmentInsertAction) {
publishCountDown.countDown();
return null;
}
return null;
}
}, new NoopServiceEmitter(), new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
return null;
}
},
new NoopServiceEmitter(), new DataSegmentPusher()
{
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public String getPathForHadoop()
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
// the merged segment is pushed to storage
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return null;
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
// the merged segment is pushed to storage
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return null;
}
}, null, null, null, null, null, null, null, null, null, new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
},
null,
null,
null,
null,
null,
null,
null,
null,
null,
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{
return false;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return null;
}
@Override
public Segment getSegment(DataSegment segment) throws SegmentLoadingException
{
return null;
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
// dummy file to represent the downloaded segment's dir
return new File("" + segment.getShardSpec().getPartitionNum());
}
@Override
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
// dummy file to represent the downloaded segment's dir
return new File("" + segment.getShardSpec().getPartitionNum());
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
}, jsonMapper, temporaryFolder.newFolder(),
indexIO, null, null, EasyMock.createMock(IndexMergerV9.class),
null, null, null, null
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
}
},
jsonMapper,
temporaryFolder.newFolder(),
indexIO,
null,
null,
EasyMock.createMock(IndexMergerV9.class),
null,
null,
null,
null
)
);

View File

@ -473,15 +473,15 @@ public class TaskSerdeTest
),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
},
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
},
null
),

View File

@ -202,6 +202,14 @@ public class IngestSegmentFirehoseFactoryTest
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
};
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
tac,
@ -277,17 +285,7 @@ public class IngestSegmentFirehoseFactoryTest
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, MAPPER
)
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER)
),
MAPPER,
INDEX_IO,

View File

@ -296,6 +296,14 @@ public class IngestSegmentFirehoseFactoryTimelineTest
};
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
};
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
new TaskActionClientFactory()
@ -318,17 +326,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, MAPPER
)
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER)
),
MAPPER,
INDEX_IO,

View File

@ -89,18 +89,16 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
// Push first segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.build()
)
)
);
SegmentInsertAction firstSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.build()
)
);
toolbox.getTaskActionClient().submit(firstSegmentInsertAction);
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
@ -110,18 +108,16 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
// Push second segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.build()
)
)
);
SegmentInsertAction secondSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.build()
)
);
toolbox.getTaskActionClient().submit(secondSegmentInsertAction);
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));

View File

@ -172,9 +172,8 @@ public class TaskLifecycleTest
this.taskStorageType = taskStorageType;
}
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final Ordering<DataSegment> byIntervalOrdering = new Ordering<DataSegment>()
{
@ -529,6 +528,14 @@ public class TaskLifecycleTest
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
};
return new TaskToolboxFactory(
taskConfig,
tac,
@ -590,17 +597,7 @@ public class TaskLifecycleTest
MoreExecutors.sameThreadExecutor(), // query executor service
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, new DefaultObjectMapper()
)
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper())
),
MAPPER,
INDEX_IO,

View File

@ -19,7 +19,6 @@
package io.druid.indexing.overlord.autoscaling;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -347,12 +346,8 @@ public class PendingTaskBasedProvisioningStrategyTest
new TestZkWorker(testTask).toImmutable()
)
).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Arrays.<Worker>asList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.singletonList(new TestZkWorker(testTask).getWorker()));
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
@ -386,12 +381,8 @@ public class PendingTaskBasedProvisioningStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Arrays.asList(
new TestZkWorker(testTask).toImmutable().getWorker()
)
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.singletonList(new TestZkWorker(testTask).toImmutable().getWorker()));
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
@ -439,10 +430,8 @@ public class PendingTaskBasedProvisioningStrategyTest
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.emptyList());
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
@ -486,10 +475,8 @@ public class PendingTaskBasedProvisioningStrategyTest
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy((Predicate<ImmutableWorkerInfo>) EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.emptyList());
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);

View File

@ -280,11 +280,7 @@ public class SimpleProvisioningStrategyTest
)
).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>singletonList(
new TestZkWorker(testTask).getWorker()
)
);
.andReturn(Collections.singletonList(new TestZkWorker(testTask).getWorker()));
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.replay(runner);
@ -324,12 +320,8 @@ public class SimpleProvisioningStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList()).times(2);
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.singletonList(
new TestZkWorker(testTask).getWorker()
)
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.singletonList(new TestZkWorker(testTask).getWorker()));
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
@ -376,10 +368,8 @@ public class SimpleProvisioningStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.emptyList());
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);
@ -422,10 +412,8 @@ public class SimpleProvisioningStrategyTest
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<Worker>newArrayList());
EasyMock.expect(runner.markWorkersLazy(EasyMock.<Predicate<ImmutableWorkerInfo>>anyObject(), EasyMock.anyInt()))
.andReturn(
Collections.<Worker>emptyList()
);
EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(Collections.emptyList());
EasyMock.replay(runner);
Provisioner provisioner = strategy.makeProvisioner(runner);

View File

@ -41,6 +41,14 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
);
NoopTask noopTask = new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
};
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
@ -73,14 +81,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
noopTask
);
Assert.assertEquals("localhost1", worker.getWorker().getHost());
}

View File

@ -61,16 +61,16 @@ public class TaskAnnouncementTest
new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
},
},
null
),
null

View File

@ -34,7 +34,7 @@ public class IntegrationTestingCuratorConfig extends CuratorConfig
@Inject
public IntegrationTestingCuratorConfig (IntegrationTestingConfig config)
{
this.config = config;
this.config = config;
}
@Override

View File

@ -49,8 +49,9 @@ public class CoordinatorResourceTestClient
@Inject
CoordinatorResourceTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient, IntegrationTestingConfig config
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;

View File

@ -58,9 +58,9 @@ public class OverlordResourceTestClient
@Inject
OverlordResourceTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;

View File

@ -65,11 +65,10 @@ public class DruidTestModule implements Module
@Provides
@TestClient
public HttpClient getHttpClient(
IntegrationTestingConfig config,
Lifecycle lifecycle,
@EscalatedClient HttpClient delegate
)
throws Exception
IntegrationTestingConfig config,
Lifecycle lifecycle,
@EscalatedClient HttpClient delegate
) throws Exception
{
if (config.getUsername() != null) {
return new CredentialedHttpClient(new BasicCredentials(config.getUsername(), config.getPassword()), delegate);

View File

@ -31,19 +31,19 @@ public class LoggerListener extends TestListenerAdapter
@Override
public void onTestFailure(ITestResult tr)
{
LOG.info ("[%s] -- Test method failed", tr.getName());
LOG.info ("[%s] -- Test method failed", tr.getName());
}
@Override
public void onTestSkipped(ITestResult tr)
{
LOG.info ("[%s] -- Test method skipped", tr.getName());
LOG.info ("[%s] -- Test method skipped", tr.getName());
}
@Override
public void onTestSuccess(ITestResult tr)
{
LOG.info ("[%s] -- Test method passed", tr.getName());
LOG.info ("[%s] -- Test method passed", tr.getName());
}
@Override

View File

@ -42,16 +42,13 @@ import org.testng.xml.XmlTest;
import java.net.URL;
import java.util.List;
import java.util.concurrent.Callable;
public class DruidTestRunnerFactory implements ITestRunnerFactory
{
private static final Logger LOG = new Logger(DruidTestRunnerFactory.class);
@Override
public TestRunner newTestRunner(
ISuite suite, XmlTest test, List<IInvokedMethodListener> listeners
)
public TestRunner newTestRunner(ISuite suite, XmlTest test, List<IInvokedMethodListener> listeners)
{
IConfiguration configuration = TestNG.getDefault().getConfiguration();
String outputDirectory = suite.getOutputDirectory();
@ -72,13 +69,13 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
{
protected DruidTestRunner(
IConfiguration configuration,
ISuite suite,
XmlTest test,
String outputDirectory,
IAnnotationFinder finder,
boolean skipFailedInvocationCounts,
List<IInvokedMethodListener> invokedMethodListeners
IConfiguration configuration,
ISuite suite,
XmlTest test,
String outputDirectory,
IAnnotationFinder finder,
boolean skipFailedInvocationCounts,
List<IInvokedMethodListener> invokedMethodListeners
)
{
super(configuration, suite, test, outputDirectory, finder, skipFailedInvocationCounts, invokedMethodListeners);
@ -122,38 +119,22 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
{
final StatusResponseHandler handler = new StatusResponseHandler(Charsets.UTF_8);
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
() -> {
try {
StatusResponseHolder response = client.go(
new Request(
HttpMethod.GET,
new URL(
StringUtils.format(
"%s/status",
host
)
)
),
handler
new Request(HttpMethod.GET, new URL(StringUtils.format("%s/status", host))),
handler
).get();
LOG.info("%s %s", response.getStatus(), response.getContent());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return true;
} else {
return false;
}
return response.getStatus().equals(HttpResponseStatus.OK);
}
catch (Throwable e) {
LOG.error(e, "");
return false;
}
}
}, "Waiting for instance to be ready: [" + host + "]"
},
"Waiting for instance to be ready: [" + host + "]"
);
}
}

View File

@ -55,17 +55,17 @@ public abstract class AbstractIndexerTest
protected void unloadAndKillData(final String dataSource) throws Exception
{
ArrayList<String> intervals = coordinator.getSegmentIntervals(dataSource);
ArrayList<String> intervals = coordinator.getSegmentIntervals(dataSource);
// each element in intervals has this form:
// 2015-12-01T23:15:00.000Z/2015-12-01T23:16:00.000Z
// we'll sort the list (ISO dates have lexicographic order)
// then delete segments from the 1st date in the first string
// to the 2nd date in the last string
Collections.sort (intervals);
String first = intervals.get(0).split("/")[0];
String last = intervals.get(intervals.size() - 1).split("/")[1];
unloadAndKillData (dataSource, first, last);
// each element in intervals has this form:
// 2015-12-01T23:15:00.000Z/2015-12-01T23:16:00.000Z
// we'll sort the list (ISO dates have lexicographic order)
// then delete segments from the 1st date in the first string
// to the 2nd date in the last string
Collections.sort (intervals);
String first = intervals.get(0).split("/")[0];
String last = intervals.get(intervals.size() - 1).split("/")[1];
unloadAndKillData (dataSource, first, last);
}
protected void unloadAndKillData(final String dataSource, String start, String end) throws Exception
@ -92,15 +92,13 @@ public abstract class AbstractIndexerTest
protected void waitForAllTasksToComplete()
{
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
.size()) == 0;
}
}, "Waiting for Tasks Completion"
() -> {
int numTasks = indexer.getPendingTasks().size() +
indexer.getRunningTasks().size() +
indexer.getWaitingTasks().size();
return numTasks == 0;
},
"Waiting for Tasks Completion"
);
}

View File

@ -19,7 +19,6 @@
package io.druid.java.util.common;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -97,16 +96,7 @@ public class JodaUtils
public static boolean overlaps(final Interval i, Iterable<Interval> intervals)
{
return Iterables.any(
intervals, new Predicate<Interval>()
{
@Override
public boolean apply(Interval input)
{
return input.overlaps(i);
}
}
);
return Iterables.any(intervals, input -> input.overlaps(i));
}

View File

@ -40,16 +40,7 @@ public class ConcatSequence<T> implements Sequence<T>
@Override
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> accumulator)
{
return baseSequences.accumulate(
initValue, new Accumulator<OutType, Sequence<T>>()
{
@Override
public OutType accumulate(OutType accumulated, Sequence<T> in)
{
return in.accumulate(accumulated, accumulator);
}
}
);
return baseSequences.accumulate(initValue, (accumulated, in) -> in.accumulate(accumulated, accumulator));
}
@Override

View File

@ -92,10 +92,9 @@ final class LimitedSequence<T> extends YieldingSequenceBase<T>
@Override
public boolean isDone()
{
return subYielder.isDone() || (
!limitedAccumulator.withinThreshold() && (!limitedAccumulator.yielded()
|| limitedAccumulator.isInterruptYield())
);
return subYielder.isDone() ||
(!limitedAccumulator.withinThreshold() &&
(!limitedAccumulator.yielded() || limitedAccumulator.isInterruptYield()));
}
@Override

View File

@ -56,23 +56,24 @@ public class SequenceTestHelper
{
Iterator<Integer> numsIter = nums.iterator();
Yielder<Integer> yielder = seq.toYielder(
0, new YieldingAccumulator<Integer, Integer>()
{
final Iterator<Integer> valsIter = nums.iterator();
int count = 0;
0,
new YieldingAccumulator<Integer, Integer>()
{
final Iterator<Integer> valsIter = nums.iterator();
int count = 0;
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
if (++count >= numToTake) {
count = 0;
yield();
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
if (++count >= numToTake) {
count = 0;
yield();
}
Assert.assertEquals(prefix, valsIter.next(), in);
return accumulated + in;
}
}
Assert.assertEquals(prefix, valsIter.next(), in);
return accumulated + in;
}
}
);
int expectedSum = 0;
@ -105,17 +106,18 @@ public class SequenceTestHelper
}
int sum = seq.accumulate(
0, new Accumulator<Integer, Integer>()
{
final Iterator<Integer> valsIter = nums.iterator();
0,
new Accumulator<Integer, Integer>()
{
final Iterator<Integer> valsIter = nums.iterator();
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
Assert.assertEquals(prefix, valsIter.next(), in);
return accumulated + in;
}
}
@Override
public Integer accumulate(Integer accumulated, Integer in)
{
Assert.assertEquals(prefix, valsIter.next(), in);
return accumulated + in;
}
}
);
Assert.assertEquals(prefix, expectedSum, sum);

View File

@ -89,30 +89,30 @@ class JodaStuff
private static class DateTimeDeserializer extends StdDeserializer<DateTime>
{
public DateTimeDeserializer()
{
super(DateTime.class);
}
public DateTimeDeserializer()
{
super(DateTime.class);
}
@Override
public DateTime deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException
{
JsonToken t = jp.getCurrentToken();
if (t == JsonToken.VALUE_NUMBER_INT) {
return DateTimes.utc(jp.getLongValue());
}
if (t == JsonToken.VALUE_STRING) {
String str = jp.getText().trim();
if (str.length() == 0) { // [JACKSON-360]
return null;
}
// make sure to preserve time zone information when parsing timestamps
return ISODateTimeFormat.dateTimeParser()
.withOffsetParsed()
.parseDateTime(str);
}
throw ctxt.mappingException(getValueClass());
@Override
public DateTime deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException
{
JsonToken t = jp.getCurrentToken();
if (t == JsonToken.VALUE_NUMBER_INT) {
return DateTimes.utc(jp.getLongValue());
}
if (t == JsonToken.VALUE_STRING) {
String str = jp.getText().trim();
if (str.length() == 0) { // [JACKSON-360]
return null;
}
// make sure to preserve time zone information when parsing timestamps
return ISODateTimeFormat.dateTimeParser()
.withOffsetParsed()
.parseDateTime(str);
}
throw ctxt.mappingException(getValueClass());
}
}
}

View File

@ -56,7 +56,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
public DoubleLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

View File

@ -56,7 +56,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
public FloatLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

View File

@ -73,13 +73,13 @@ public class IdentityExtractionFn implements ExtractionFn
@Override
public String toString()
{
return "Identity";
return "Identity";
}
@Override
public boolean equals(Object o)
{
return o != null && o instanceof IdentityExtractionFn;
return o != null && o instanceof IdentityExtractionFn;
}
public static final IdentityExtractionFn getInstance()

View File

@ -292,7 +292,7 @@ public class ByteBufferHashTable
final int startBucket = keyHash % buckets;
int bucket = startBucket;
outer:
outer:
while (true) {
final int bucketOffset = bucket * bucketSizeWithHash;

View File

@ -525,7 +525,7 @@ public class RowBasedGrouperHelper
}
private static class InputRawSupplierColumnSelectorStrategyFactory
implements ColumnSelectorStrategyFactory<InputRawSupplierColumnSelectorStrategy>
implements ColumnSelectorStrategyFactory<InputRawSupplierColumnSelectorStrategy>
{
@Override
public InputRawSupplierColumnSelectorStrategy makeColumnSelectorStrategy(

View File

@ -45,7 +45,7 @@ public class DoubleGroupByColumnSelectorStrategy implements GroupByColumnSelecto
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] values)
{
values[columnIndex] = selector.getDouble();
values[columnIndex] = selector.getDouble();
}
@Override

View File

@ -83,7 +83,7 @@ public class LookupSnapshotTaker
public synchronized void takeSnapshot(List<LookupBean> lookups)
{
try {
objectMapper.writeValue(persistFile, lookups);
objectMapper.writeValue(persistFile, lookups);
}
catch (IOException e) {
throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());

View File

@ -98,8 +98,7 @@ public class PooledTopNAlgorithm
new Generic2AggPooledTopNScannerPrototype();
private static final Historical1AggPooledTopNScanner defaultHistorical1SimpleDoubleAggScanner =
new Historical1SimpleDoubleAggPooledTopNScannerPrototype();
private static final
Historical1AggPooledTopNScanner defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner =
private static final Historical1AggPooledTopNScanner defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner =
new HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype();
private interface ScanAndAggregate

View File

@ -95,17 +95,17 @@ public class TopNQueryBuilder
public TopNQueryBuilder(final TopNQuery query)
{
this.dataSource = query.getDataSource();
this.virtualColumns = query.getVirtualColumns();
this.dimensionSpec = query.getDimensionSpec();
this.topNMetricSpec = query.getTopNMetricSpec();
this.threshold = query.getThreshold();
this.querySegmentSpec = query.getQuerySegmentSpec();
this.dimFilter = query.getDimensionsFilter();
this.granularity = query.getGranularity();
this.aggregatorSpecs = query.getAggregatorSpecs();
this.postAggregatorSpecs = query.getPostAggregatorSpecs();
this.context = query.getContext();
this.dataSource = query.getDataSource();
this.virtualColumns = query.getVirtualColumns();
this.dimensionSpec = query.getDimensionSpec();
this.topNMetricSpec = query.getTopNMetricSpec();
this.threshold = query.getThreshold();
this.querySegmentSpec = query.getQuerySegmentSpec();
this.dimFilter = query.getDimensionsFilter();
this.granularity = query.getGranularity();
this.aggregatorSpecs = query.getAggregatorSpecs();
this.postAggregatorSpecs = query.getPostAggregatorSpecs();
this.context = query.getContext();
}
public DataSource getDataSource()

View File

@ -138,7 +138,9 @@ public final class DimensionHandlerUtils
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
*/
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
//CHECKSTYLE.OFF: Indentation
ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
//CHECKSTYLE.ON: Indentation
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
List<DimensionSpec> dimensionSpecs,
ColumnSelectorFactory columnSelectorFactory

View File

@ -555,7 +555,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
@Override
public LongColumnSelector makeLongColumnSelector(TimeAndDimsHolder currEntry, IncrementalIndex.DimensionDesc desc)
{
return ZeroLongColumnSelector.instance();
return ZeroLongColumnSelector.instance();
}
@Override

View File

@ -300,14 +300,8 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
// these buffers are used by dictIdSeeker in mergeBitmaps() below. The iterator is created and only used
// in writeMergedValueMetadata(), but the buffers are still used until after mergeBitmaps().
Closeable toCloseDictionaryMergeIterator = dictionaryMergeIterator;
Closeable dimValsMappedUnmapper = new Closeable()
{
@Override
public void close()
{
ByteBufferUtils.unmap(dimValsMapped);
}
}) {
Closeable dimValsMappedUnmapper = () -> ByteBufferUtils.unmap(dimValsMapped)
) {
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
BitmapFactory bmpFactory = bitmapSerdeFactory.getBitmapFactory();

View File

@ -65,22 +65,22 @@ public class BlockLayoutIndexedFloatSupplier implements Supplier<IndexedFloats>
final int rem = sizePer - 1;
final boolean powerOf2 = sizePer == (1 << div);
if (powerOf2) {
return new BlockLayoutIndexedFloats()
return new BlockLayoutIndexedFloats()
{
@Override
public float get(int index)
{
@Override
public float get(int index)
{
// optimize division and remainder for powers of 2
final int bufferNum = index >> div;
// optimize division and remainder for powers of 2
final int bufferNum = index >> div;
if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}
final int bufferIndex = index & rem;
return floatBuffer.get(floatBuffer.position() + bufferIndex);
if (bufferNum != currIndex) {
loadBuffer(bufferNum);
}
};
final int bufferIndex = index & rem;
return floatBuffer.get(floatBuffer.position() + bufferIndex);
}
};
} else {
return new BlockLayoutIndexedFloats();
}

View File

@ -33,9 +33,7 @@ public class IndexedIterable<T> implements Iterable<T>
return new IndexedIterable<T>(indexed);
}
public IndexedIterable(
Indexed<T> indexed
)
public IndexedIterable(Indexed<T> indexed)
{
this.indexed = indexed;
}

View File

@ -31,7 +31,7 @@ import io.druid.query.monomorphicprocessing.HotLoopCallee;
*/
public interface ReadableOffset extends HotLoopCallee
{
@CalledFromHotLoop
int getOffset();
@CalledFromHotLoop
int getOffset();
}

View File

@ -104,20 +104,21 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
public ComplexColumnPartSerde build()
{
return new ComplexColumnPartSerde(
typeName, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
typeName,
new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel, smoosher);
}
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel, smoosher);
}
}
);
}
}

View File

@ -302,15 +302,13 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
rMultiValuedColumn = null;
}
builder.setHasMultipleValues(hasMultipleValues)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
rDictionary,
rSingleValuedColumn,
rMultiValuedColumn,
columnConfig.columnCacheSizeBytes()
)
);
DictionaryEncodedColumnSupplier dictionaryEncodedColumnSupplier = new DictionaryEncodedColumnSupplier(
rDictionary,
rSingleValuedColumn,
rMultiValuedColumn,
columnConfig.columnCacheSizeBytes()
);
builder.setHasMultipleValues(hasMultipleValues).setDictionaryEncodedColumn(dictionaryEncodedColumnSupplier);
GenericIndexed<ImmutableBitmap> rBitmaps = GenericIndexed.read(
buffer, bitmapSerdeFactory.getObjectStrategy(), builder.getFileMapper()

View File

@ -68,13 +68,13 @@ public class DoubleGenericColumnPartSerde implements ColumnPartSerde
{
return (buffer, builder, columnConfig) -> {
final CompressedDoublesIndexedSupplier column = CompressedDoublesIndexedSupplier.fromByteBuffer(
buffer,
byteOrder,
builder.getFileMapper()
);
builder.setType(ValueType.DOUBLE)
.setHasMultipleValues(false)
.setGenericColumn(new DoubleGenericColumnSupplier(column));
buffer,
byteOrder,
builder.getFileMapper()
);
builder.setType(ValueType.DOUBLE)
.setHasMultipleValues(false)
.setGenericColumn(new DoubleGenericColumnSupplier(column));
};
}
@ -89,8 +89,7 @@ public class DoubleGenericColumnPartSerde implements ColumnPartSerde
private ByteOrder byteOrder = null;
private DoubleColumnSerializer delegate = null;
public
SerializerBuilder withByteOrder(final ByteOrder byteOrder)
public SerializerBuilder withByteOrder(final ByteOrder byteOrder)
{
this.byteOrder = byteOrder;
return this;

View File

@ -86,20 +86,21 @@ public class FloatGenericColumnPartSerde implements ColumnPartSerde
public FloatGenericColumnPartSerde build()
{
return new FloatGenericColumnPartSerde(
byteOrder, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
byteOrder,
new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher fileSmoosher) throws IOException
{
delegate.writeToChannel(channel, fileSmoosher);
}
}
@Override
public void write(WritableByteChannel channel, FileSmoosher fileSmoosher) throws IOException
{
delegate.writeToChannel(channel, fileSmoosher);
}
}
);
}
}

View File

@ -86,20 +86,21 @@ public class LongGenericColumnPartSerde implements ColumnPartSerde
public LongGenericColumnPartSerde build()
{
return new LongGenericColumnPartSerde(
byteOrder, new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
byteOrder,
new Serializer()
{
@Override
public long numBytes()
{
return delegate.getSerializedSize();
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel, smoosher);
}
}
@Override
public void write(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
delegate.writeToChannel(channel, smoosher);
}
}
);
}
}

View File

@ -36,7 +36,6 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -73,6 +72,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -156,14 +156,7 @@ public class MultiValuedDimensionTest
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
.setGranularity(Granularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("tags", "tags")))
.setAggregatorSpecs(
Arrays.asList(
new AggregatorFactory[]
{
new CountAggregatorFactory("count")
}
)
)
.setAggregatorSpecs(Collections.singletonList(new CountAggregatorFactory("count")))
.build();
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@ -197,17 +190,8 @@ public class MultiValuedDimensionTest
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
.setGranularity(Granularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("tags", "tags")))
.setAggregatorSpecs(
Arrays.asList(
new AggregatorFactory[]
{
new CountAggregatorFactory("count")
}
)
)
.setDimFilter(
new SelectorDimFilter("tags", "t3", null)
)
.setAggregatorSpecs(Collections.singletonList(new CountAggregatorFactory("count")))
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
.build();
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@ -245,17 +229,8 @@ public class MultiValuedDimensionTest
)
)
)
.setAggregatorSpecs(
Arrays.asList(
new AggregatorFactory[]
{
new CountAggregatorFactory("count")
}
)
)
.setDimFilter(
new SelectorDimFilter("tags", "t3", null)
)
.setAggregatorSpecs(Collections.singletonList(new CountAggregatorFactory("count")))
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
.build();
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
@ -286,13 +261,7 @@ public class MultiValuedDimensionTest
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(
new AggregatorFactory[]
{
new CountAggregatorFactory("count")
}
))
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null)).build();

View File

@ -78,9 +78,7 @@ public class FilteredAggregatorTest
new SelectorDimFilter("dim", "a", null)
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(makeColumnSelector(selector));
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;

View File

@ -36,7 +36,6 @@ import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -51,6 +50,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -70,14 +70,7 @@ public class GroupByQueryRunnerFactoryTest
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
.setGranularity(Granularities.ALL)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("tags", "tags")))
.setAggregatorSpecs(
Arrays.asList(
new AggregatorFactory[]
{
new CountAggregatorFactory("count")
}
)
)
.setAggregatorSpecs(Collections.singletonList(new CountAggregatorFactory("count")))
.build();
final QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());

View File

@ -2898,13 +2898,11 @@ public class GroupByQueryRunnerTest
);
// Now try it with an expression based aggregator.
builder.setLimit(Integer.MAX_VALUE)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx", null, "index / 2 + indexMin", TestExprMacroTable.INSTANCE)
)
);
List<AggregatorFactory> aggregatorSpecs = Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx", null, "index / 2 + indexMin", TestExprMacroTable.INSTANCE)
);
builder.setLimit(Integer.MAX_VALUE).setAggregatorSpecs(aggregatorSpecs);
expectedResults = GroupByQueryRunnerTestHelper.createExpectedRows(
new String[]{"__time", "alias", "rows", "idx"},
@ -2930,16 +2928,17 @@ public class GroupByQueryRunnerTest
);
// Now try it with an expression virtual column.
builder.setLimit(Integer.MAX_VALUE)
.setVirtualColumns(
new ExpressionVirtualColumn("expr", "index / 2 + indexMin", ValueType.FLOAT, TestExprMacroTable.INSTANCE)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx", "expr")
)
);
ExpressionVirtualColumn expressionVirtualColumn = new ExpressionVirtualColumn(
"expr",
"index / 2 + indexMin",
ValueType.FLOAT,
TestExprMacroTable.INSTANCE
);
List<AggregatorFactory> aggregatorSpecs2 = Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new DoubleSumAggregatorFactory("idx", "expr")
);
builder.setLimit(Integer.MAX_VALUE).setVirtualColumns(expressionVirtualColumn).setAggregatorSpecs(aggregatorSpecs2);
TestHelper.assertExpectedObjects(
expectedResults,

View File

@ -40,19 +40,23 @@ import static org.junit.Assert.assertTrue;
public class HavingSpecTest
{
private static final Row ROW = new MapBasedInputRow(0, new ArrayList<>(), ImmutableMap.of("metric", Float.valueOf(10)));
private static final Row ROW = new MapBasedInputRow(
0,
new ArrayList<>(),
ImmutableMap.of("metric", Float.valueOf(10))
);
@Test
public void testHavingClauseSerde() throws Exception
{
List<HavingSpec> havings = Arrays.<HavingSpec>asList(
new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
new OrHavingSpec(
Arrays.<HavingSpec>asList(
new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2)))
new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
new OrHavingSpec(
Arrays.<HavingSpec>asList(
new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2)))
)
)
)
);
HavingSpec andHavingSpec = new AndHavingSpec(havings);
@ -97,7 +101,7 @@ public class HavingSpecTest
"value", 1.3
);
ObjectMapper mapper = new DefaultObjectMapper();
HavingSpec spec = mapper.convertValue (greaterMap, HavingSpec.class);
HavingSpec spec = mapper.convertValue(greaterMap, HavingSpec.class);
}
@ -161,6 +165,7 @@ public class HavingSpecTest
private final AtomicInteger counter;
private final boolean value;
private CountingHavingSpec(AtomicInteger counter, boolean value)
{
this.counter = counter;
@ -180,10 +185,10 @@ public class HavingSpecTest
{
AtomicInteger counter = new AtomicInteger(0);
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
@ -196,10 +201,10 @@ public class HavingSpecTest
{
AtomicInteger counter = new AtomicInteger(0);
AndHavingSpec spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);
@ -208,10 +213,10 @@ public class HavingSpecTest
counter.set(0);
spec = new AndHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);
@ -224,10 +229,10 @@ public class HavingSpecTest
{
AtomicInteger counter = new AtomicInteger(0);
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
(HavingSpec) new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, true),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
@ -240,10 +245,10 @@ public class HavingSpecTest
{
AtomicInteger counter = new AtomicInteger(0);
OrHavingSpec spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false)
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false)
));
spec.eval(ROW);
@ -252,10 +257,10 @@ public class HavingSpecTest
counter.set(0);
spec = new OrHavingSpec(ImmutableList.of(
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true)
(HavingSpec) new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, false),
new CountingHavingSpec(counter, true)
));
spec.eval(ROW);

View File

@ -55,27 +55,27 @@ public class TopNSequenceTest
@Parameterized.Parameters
public static Collection<Object[]> makeTestData()
{
Object[][] data = new Object[][] {
{ASC, RAW_ASC, RAW_ASC.size() - 2},
{ASC, RAW_ASC, RAW_ASC.size()},
{ASC, RAW_ASC, RAW_ASC.size() + 2},
{ASC, RAW_ASC, 0},
{ASC, SINGLE, 0},
{ASC, SINGLE, 1},
{ASC, SINGLE, 2},
{ASC, SINGLE, 3},
{ASC, EMPTY, 0},
{ASC, EMPTY, 1},
{DESC, RAW_DESC, RAW_DESC.size() - 2},
{DESC, RAW_DESC, RAW_DESC.size()},
{DESC, RAW_DESC, RAW_DESC.size() + 2},
{DESC, RAW_DESC, 0},
{DESC, RAW_DESC, 0},
{DESC, SINGLE, 1},
{DESC, SINGLE, 2},
{DESC, SINGLE, 3},
{DESC, EMPTY, 0},
{DESC, EMPTY, 1},
Object[][] data = new Object[][]{
{ASC, RAW_ASC, RAW_ASC.size() - 2},
{ASC, RAW_ASC, RAW_ASC.size()},
{ASC, RAW_ASC, RAW_ASC.size() + 2},
{ASC, RAW_ASC, 0},
{ASC, SINGLE, 0},
{ASC, SINGLE, 1},
{ASC, SINGLE, 2},
{ASC, SINGLE, 3},
{ASC, EMPTY, 0},
{ASC, EMPTY, 1},
{DESC, RAW_DESC, RAW_DESC.size() - 2},
{DESC, RAW_DESC, RAW_DESC.size()},
{DESC, RAW_DESC, RAW_DESC.size() + 2},
{DESC, RAW_DESC, 0},
{DESC, RAW_DESC, 0},
{DESC, SINGLE, 1},
{DESC, SINGLE, 2},
{DESC, SINGLE, 3},
{DESC, EMPTY, 0},
{DESC, EMPTY, 1}
};
return Arrays.asList(data);

View File

@ -225,39 +225,40 @@ public class RegisteredLookupExtractionFnTest
{
EasyMock.expect(manager.get(EasyMock.eq(LOOKUP_NAME))).andReturn(
new LookupExtractorFactoryContainer(
"v0", new LookupExtractorFactory()
{
@Override
public boolean start()
{
return false;
}
"v0",
new LookupExtractorFactory()
{
@Override
public boolean start()
{
return false;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return false;
}
@Override
public boolean close()
{
return false;
}
@Override
public boolean close()
{
return false;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return null;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
return null;
}
@Override
public LookupExtractor get()
{
return LOOKUP_EXTRACTOR;
}
}
@Override
public LookupExtractor get()
{
return LOOKUP_EXTRACTOR;
}
}
)
).anyTimes();
}

View File

@ -112,7 +112,7 @@ public class TimeBoundaryQueryQueryToolChestTest
);
for (int i = 0; i < segments.size(); i++) {
Assert.assertEquals(segments.get(i).getInterval(), expected.get(i).getInterval());
Assert.assertEquals(segments.get(i).getInterval(), expected.get(i).getInterval());
}
}

View File

@ -667,14 +667,14 @@ public class TimeseriesQueryRunnerTest
new Interval(DateTimes.of("2011-04-14T01"), DateTimes.of("2011-04-15"))
);
for (Interval interval : iterable) {
lotsOfZeroes.add(
new Result<>(
interval.getStart(),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 0L, "idx", 0L)
)
)
);
lotsOfZeroes.add(
new Result<>(
interval.getStart(),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 0L, "idx", 0L)
)
)
);
}
List<Result<TimeseriesResultValue>> expectedResults1 = Lists.newArrayList(

Some files were not shown because too many files have changed in this diff Show More