mirror of https://github.com/apache/druid.git
Catch some incorrect method parameter or call argument formatting patterns with checkstyle (#6461)
* Catch some incorrect method parameter or call argument formatting patterns with checkstyle * Fix DiscoveryModule * Inline parameters_and_arguments.txt * Fix a bug in PolyBind * Fix formatting
This commit is contained in:
parent
c5bf4e7503
commit
84ac18dc1b
|
@ -22,7 +22,6 @@ package org.apache.druid.benchmark;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.druid.java.util.common.guava.Accumulator;
|
||||
import org.apache.druid.java.util.common.guava.MergeSequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
|
@ -94,25 +93,14 @@ public class MergeSequenceBenchmark
|
|||
}
|
||||
|
||||
if (!toMerge.isEmpty()) {
|
||||
partialMerged.add(new MergeSequence<Integer>(Ordering.natural(), Sequences.simple(toMerge)));
|
||||
partialMerged.add(new MergeSequence<>(Ordering.natural(), Sequences.simple(toMerge)));
|
||||
}
|
||||
MergeSequence<Integer> mergeSequence = new MergeSequence(
|
||||
Ordering.<Integer>natural(),
|
||||
MergeSequence<Integer> mergeSequence = new MergeSequence<>(
|
||||
Ordering.natural(),
|
||||
Sequences.simple(partialMerged)
|
||||
);
|
||||
Integer accumulate = mergeSequence.accumulate(
|
||||
0, new Accumulator<Integer, Integer>()
|
||||
{
|
||||
@Override
|
||||
public Integer accumulate(Integer accumulated, Integer in)
|
||||
{
|
||||
return accumulated + in;
|
||||
}
|
||||
}
|
||||
);
|
||||
Integer accumulate = mergeSequence.accumulate(0, Integer::sum);
|
||||
blackhole.consume(accumulate);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
|
@ -120,20 +108,8 @@ public class MergeSequenceBenchmark
|
|||
@OutputTimeUnit(TimeUnit.MILLISECONDS)
|
||||
public void mergeFlat(final Blackhole blackhole)
|
||||
{
|
||||
MergeSequence<Integer> mergeSequence = new MergeSequence(Ordering.<Integer>natural(), Sequences.simple(sequences));
|
||||
Integer accumulate = mergeSequence.accumulate(
|
||||
0, new Accumulator<Integer, Integer>()
|
||||
{
|
||||
@Override
|
||||
public Integer accumulate(Integer accumulated, Integer in)
|
||||
{
|
||||
return accumulated + in;
|
||||
}
|
||||
}
|
||||
);
|
||||
MergeSequence<Integer> mergeSequence = new MergeSequence<>(Ordering.natural(), Sequences.simple(sequences));
|
||||
Integer accumulate = mergeSequence.accumulate(0, Integer::sum);
|
||||
blackhole.consume(accumulate);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -42,11 +42,8 @@ public class QueryBenchmarkUtil
|
|||
Segment adapter
|
||||
)
|
||||
{
|
||||
return new FinalizeResultsQueryRunner<T>(
|
||||
new BySegmentQueryRunner<T>(
|
||||
segmentId, adapter.getDataInterval().getStart(),
|
||||
factory.createRunner(adapter)
|
||||
),
|
||||
return new FinalizeResultsQueryRunner<>(
|
||||
new BySegmentQueryRunner<>(segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter)),
|
||||
(QueryToolChest<T, Query<T>>) factory.getToolchest()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
|
||||
<suppress checks="Indentation" files="[\\/]target[\\/]generated-test-sources[\\/]" />
|
||||
<suppress checks="Indentation" files="ProtoTestEventWrapper.java" />
|
||||
<suppress checks="Regex" files="ProtoTestEventWrapper.java" />
|
||||
|
||||
<suppress checks="OneStatementPerLine" files="[\\/]target[\\/]generated-test-sources[\\/]" />
|
||||
|
||||
|
|
|
@ -205,5 +205,63 @@
|
|||
<property name="message" value="ObjectColumnSelector, LongColumnSelector, FloatColumnSelector
|
||||
and DoubleColumnSelector must not be used in an instanceof statement, see Javadoc of those interfaces."/>
|
||||
</module>
|
||||
|
||||
<module name="Regexp">
|
||||
<property
|
||||
name="format"
|
||||
value='(?<!ImmutableMap.of|Types.mapOf|orderedMap|makeSelectResults|makeListOfPairs)\(\s*\n +([^,\n\(\{"</]+|[^,\n\(\{" /]+> [a-zA-Z0-9_]+)\, ++[^,\n/]+'
|
||||
/>
|
||||
<property name="illegalPattern" value="true"/>
|
||||
<property name="message" value="
|
||||
According to the Druid code style, if a method or constructor declaration or a call
|
||||
doesn't fit a single line, each parameter or argument should be on it's own, e. g:
|
||||
|
||||
MyReturnType myMethodWithAVeryLongName(
|
||||
MyParamTypeA myParamAWithAVeryLongName,
|
||||
MyParamTypeB myParamBWithAVeryLongName
|
||||
)
|
||||
|
||||
or
|
||||
|
||||
MyValueType myValue = myMethodWithAVeryLongName(
|
||||
myVeryLongArgA,
|
||||
myVeryLongArgB
|
||||
)
|
||||
|
||||
The exceptions from this rule are map-like and pair-accepting constructors and methods,
|
||||
for those it's preferred to put each pair on it's own line, e. g:
|
||||
|
||||
Map<MyKeyType, MyValueType> myMap = ImmutableMap.of(
|
||||
myKey1, myValue1,
|
||||
myKey2, myValue2
|
||||
)
|
||||
|
||||
Always prefer to fit a declaration or a method or constructor call into a single line
|
||||
(less than 120 cols), if possible.
|
||||
|
||||
Two things to note to avoid unnecessary breakdown:
|
||||
|
||||
1) Exceptions declared for a method could be broken to the next line separately, e. g:
|
||||
|
||||
MyReturnType myMethodWithAVeryLongName(MyParamTypeA myParamA, MyParamTypeB myParamB)
|
||||
throws MyExceptionTypeAWithVeryLongName, MyExceptionTypeBWithVeryLongName
|
||||
|
||||
2) In a variable, field or constant assignment, it's often more readable to break the
|
||||
whole right hand side expression to the next line, instead of breaking the expression
|
||||
arguments, e. g:
|
||||
|
||||
MyTypeWithAVeryLongName myVariableWithAVeryLongName =
|
||||
myMethodWithAVeryLongName(myArgA, myArgB);
|
||||
|
||||
Also note that this checkstyle rule (the one that caused this message to be printed)
|
||||
doesn't spot all violations of the corresponding Druid code style rule. If you see
|
||||
a place where method or constructor parameters or call arguments are not properly
|
||||
located each on it's own line, but this checkstyle rule is silent, if doesn't mean
|
||||
that the code is formatted correctly. Fix it anyway.
|
||||
|
||||
If you encouter a map-like or a pair-accepting method that is reported by this
|
||||
checkstyle rule, you should add it as an exception in the corresponding rule in
|
||||
codestyle/checkstyle.xml. "/>
|
||||
</module>
|
||||
</module>
|
||||
</module>
|
||||
|
|
|
@ -80,7 +80,10 @@ public class ConfigManager
|
|||
|
||||
poller = new PollingCallable();
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec, new Duration(0), config.get().getPollDuration().toStandardDuration(), poller
|
||||
exec,
|
||||
new Duration(0),
|
||||
config.get().getPollDuration().toStandardDuration(),
|
||||
poller
|
||||
);
|
||||
|
||||
started = true;
|
||||
|
|
|
@ -72,9 +72,8 @@ public class CombiningSequence<T> implements Sequence<T>
|
|||
@Override
|
||||
public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAccumulator<OutType, T> accumulator)
|
||||
{
|
||||
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator = new CombiningYieldingAccumulator<>(
|
||||
ordering, mergeFn, accumulator
|
||||
);
|
||||
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator =
|
||||
new CombiningYieldingAccumulator<>(ordering, mergeFn, accumulator);
|
||||
|
||||
combiningAccumulator.setRetVal(initValue);
|
||||
Yielder<T> baseYielder = baseSequence.toYielder(null, combiningAccumulator);
|
||||
|
|
|
@ -122,9 +122,7 @@ public class DimensionsSpec
|
|||
@JsonIgnore
|
||||
public List<SpatialDimensionSchema> getSpatialDimensions()
|
||||
{
|
||||
Iterable<NewSpatialDimensionSchema> filteredList = Iterables.filter(
|
||||
dimensions, NewSpatialDimensionSchema.class
|
||||
);
|
||||
Iterable<NewSpatialDimensionSchema> filteredList = Iterables.filter(dimensions, NewSpatialDimensionSchema.class);
|
||||
|
||||
Iterable<SpatialDimensionSchema> transformedList = Iterables.transform(
|
||||
filteredList,
|
||||
|
|
|
@ -91,12 +91,8 @@ public class DruidSecondaryModule implements Module
|
|||
|
||||
mapper.setInjectableValues(new GuiceInjectableValues(injector));
|
||||
mapper.setAnnotationIntrospectors(
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()
|
||||
),
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()
|
||||
)
|
||||
new AnnotationIntrospectorPair(guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()),
|
||||
new AnnotationIntrospectorPair(guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.druid.guice.annotations.PublicApi;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
@ -122,13 +123,10 @@ public class PolyBind
|
|||
final TypeLiteral<T> interfaceType = interfaceKey.getTypeLiteral();
|
||||
|
||||
if (interfaceKey.getAnnotation() != null) {
|
||||
return MapBinder.newMapBinder(
|
||||
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotation()
|
||||
);
|
||||
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotation());
|
||||
} else if (interfaceKey.getAnnotationType() != null) {
|
||||
return MapBinder.newMapBinder(
|
||||
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotationType()
|
||||
);
|
||||
Class<? extends Annotation> annotationType = interfaceKey.getAnnotationType();
|
||||
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType, annotationType);
|
||||
} else {
|
||||
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType);
|
||||
}
|
||||
|
@ -178,7 +176,7 @@ public class PolyBind
|
|||
if (key.getAnnotation() != null) {
|
||||
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
|
||||
} else if (key.getAnnotationType() != null) {
|
||||
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
|
||||
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotationType()));
|
||||
} else {
|
||||
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType));
|
||||
}
|
||||
|
|
|
@ -131,7 +131,12 @@ public class Execs
|
|||
queue = new SynchronousQueue<>();
|
||||
}
|
||||
return new ThreadPoolExecutor(
|
||||
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat, priority),
|
||||
1,
|
||||
1,
|
||||
0L,
|
||||
TimeUnit.MILLISECONDS,
|
||||
queue,
|
||||
makeThreadFactory(nameFormat, priority),
|
||||
new RejectedExecutionHandler()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.apache.druid.java.util.common.concurrent;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.joda.time.Duration;
|
||||
|
@ -180,8 +179,6 @@ public class ScheduledExecutors
|
|||
|
||||
public static ScheduledExecutorService fixed(int corePoolSize, String nameFormat)
|
||||
{
|
||||
return Executors.newScheduledThreadPool(
|
||||
corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
|
||||
);
|
||||
return Executors.newScheduledThreadPool(corePoolSize, Execs.makeThreadFactory(nameFormat));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.java.util.common.granularity;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonSerializable;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
|
||||
|
@ -428,9 +427,8 @@ public class PeriodGranularity extends Granularity implements JsonSerializable
|
|||
}
|
||||
|
||||
@Override
|
||||
public void serialize(
|
||||
JsonGenerator jsonGenerator, SerializerProvider serializerProvider
|
||||
) throws IOException, JsonProcessingException
|
||||
public void serialize(JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
|
||||
throws IOException
|
||||
{
|
||||
// Retain the same behavior as before #3850.
|
||||
// i.e. when Granularity class was an enum.
|
||||
|
@ -448,8 +446,10 @@ public class PeriodGranularity extends Granularity implements JsonSerializable
|
|||
|
||||
@Override
|
||||
public void serializeWithType(
|
||||
JsonGenerator jsonGenerator, SerializerProvider serializerProvider, TypeSerializer typeSerializer
|
||||
) throws IOException, JsonProcessingException
|
||||
JsonGenerator jsonGenerator,
|
||||
SerializerProvider serializerProvider,
|
||||
TypeSerializer typeSerializer
|
||||
) throws IOException
|
||||
{
|
||||
serialize(jsonGenerator, serializerProvider);
|
||||
}
|
||||
|
|
|
@ -48,9 +48,8 @@ public class FilteredSequence<T> implements Sequence<T>
|
|||
@Override
|
||||
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
|
||||
{
|
||||
final FilteringYieldingAccumulator<OutType, T> filteringAccumulator = new FilteringYieldingAccumulator<>(
|
||||
pred, accumulator
|
||||
);
|
||||
final FilteringYieldingAccumulator<OutType, T> filteringAccumulator =
|
||||
new FilteringYieldingAccumulator<>(pred, accumulator);
|
||||
|
||||
return wrapYielder(baseSequence.toYielder(initValue, filteringAccumulator), filteringAccumulator);
|
||||
}
|
||||
|
|
|
@ -82,9 +82,7 @@ public class JSONToLowerParser implements Parser<String, Object>
|
|||
|
||||
private ArrayList<String> fieldNames;
|
||||
|
||||
public JSONToLowerParser(
|
||||
ObjectMapper objectMapper, Iterable<String> fieldNames, Iterable<String> exclude
|
||||
)
|
||||
public JSONToLowerParser(ObjectMapper objectMapper, Iterable<String> fieldNames, Iterable<String> exclude)
|
||||
{
|
||||
this.objectMapper = objectMapper;
|
||||
if (fieldNames != null) {
|
||||
|
|
|
@ -71,9 +71,7 @@ public class FullResponseHandler implements HttpResponseHandler<FullResponseHold
|
|||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<FullResponseHolder> clientResponse, Throwable e
|
||||
)
|
||||
public void exceptionCaught(ClientResponse<FullResponseHolder> clientResponse, Throwable e)
|
||||
{
|
||||
// Its safe to Ignore as the ClientResponse returned in handleChunk were unfinished
|
||||
}
|
||||
|
|
|
@ -70,9 +70,7 @@ public class StatusResponseHandler implements HttpResponseHandler<StatusResponse
|
|||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<StatusResponseHolder> clientResponse, Throwable e
|
||||
)
|
||||
public void exceptionCaught(ClientResponse<StatusResponseHolder> clientResponse, Throwable e)
|
||||
{
|
||||
// Its safe to Ignore as the ClientResponse returned in handleChunk were unfinished
|
||||
}
|
||||
|
|
|
@ -313,7 +313,8 @@ public class SysMonitor extends FeedDefiningMonitor
|
|||
}
|
||||
if (du != null) {
|
||||
final Map<String, Long> stats = diff.to(
|
||||
name, ImmutableMap.<String, Long>builder()
|
||||
name,
|
||||
ImmutableMap.<String, Long>builder()
|
||||
.put("sys/disk/read/size", du.getReadBytes())
|
||||
.put("sys/disk/read/count", du.getReads())
|
||||
.put("sys/disk/write/size", du.getWriteBytes())
|
||||
|
@ -379,7 +380,8 @@ public class SysMonitor extends FeedDefiningMonitor
|
|||
}
|
||||
if (netstat != null) {
|
||||
final Map<String, Long> stats = diff.to(
|
||||
name, ImmutableMap.<String, Long>builder()
|
||||
name,
|
||||
ImmutableMap.<String, Long>builder()
|
||||
.put("sys/net/read/size", netstat.getRxBytes())
|
||||
.put("sys/net/read/packets", netstat.getRxPackets())
|
||||
.put("sys/net/read/errors", netstat.getRxErrors())
|
||||
|
@ -434,7 +436,8 @@ public class SysMonitor extends FeedDefiningMonitor
|
|||
final Cpu cpu = cpus[i];
|
||||
final String name = Integer.toString(i);
|
||||
final Map<String, Long> stats = diff.to(
|
||||
name, ImmutableMap.<String, Long>builder()
|
||||
name,
|
||||
ImmutableMap.<String, Long>builder()
|
||||
.put("user", cpu.getUser()) // user = Δuser / Δtotal
|
||||
.put("sys", cpu.getSys()) // sys = Δsys / Δtotal
|
||||
.put("nice", cpu.getNice()) // nice = Δnice / Δtotal
|
||||
|
|
|
@ -61,7 +61,9 @@ public interface DataSegmentFinder
|
|||
* @param segmentModifiedAt segment modified timestamp
|
||||
*/
|
||||
static void putInMapRetainingNewest(
|
||||
Map<String, Pair<DataSegment, Long>> timestampedSegments, DataSegment dataSegment, long segmentModifiedAt
|
||||
Map<String, Pair<DataSegment, Long>> timestampedSegments,
|
||||
DataSegment dataSegment,
|
||||
long segmentModifiedAt
|
||||
)
|
||||
{
|
||||
timestampedSegments.merge(
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.apache.druid;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.MapperFeature;
|
||||
|
@ -55,12 +54,12 @@ public class TestObjectMapper extends ObjectMapper
|
|||
{
|
||||
addSerializer(Interval.class, ToStringSerializer.instance);
|
||||
addDeserializer(
|
||||
Interval.class, new StdDeserializer<Interval>(Interval.class)
|
||||
Interval.class,
|
||||
new StdDeserializer<Interval>(Interval.class)
|
||||
{
|
||||
@Override
|
||||
public Interval deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException, JsonProcessingException
|
||||
public Interval deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
|
||||
throws IOException
|
||||
{
|
||||
return Intervals.of(jsonParser.getText());
|
||||
}
|
||||
|
|
|
@ -57,7 +57,8 @@ public class UUIDUtilsTest
|
|||
args.add(new String[]{possibleArg});
|
||||
}
|
||||
return Collections2.transform(
|
||||
args, new Function<String[], Object[]>()
|
||||
args,
|
||||
new Function<String[], Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(String[] input)
|
||||
|
|
|
@ -65,7 +65,10 @@ public class JsonConfiguratorTest
|
|||
|
||||
@Override
|
||||
public <T> Set<ConstraintViolation<T>> validateValue(
|
||||
Class<T> beanType, String propertyName, Object value, Class<?>... groups
|
||||
Class<T> beanType,
|
||||
String propertyName,
|
||||
Object value,
|
||||
Class<?>... groups
|
||||
)
|
||||
{
|
||||
return ImmutableSet.of();
|
||||
|
|
|
@ -24,17 +24,13 @@ package org.apache.druid.java.util.common.guava;
|
|||
public class UnsupportedSequence implements Sequence<Integer>
|
||||
{
|
||||
@Override
|
||||
public <OutType> OutType accumulate(
|
||||
OutType initValue, Accumulator<OutType, Integer> accumulator
|
||||
)
|
||||
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, Integer> accumulator)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <OutType> Yielder<OutType> toYielder(
|
||||
OutType initValue, YieldingAccumulator<OutType, Integer> accumulator
|
||||
)
|
||||
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, Integer> accumulator)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
|
|
@ -246,7 +246,8 @@ public class FriendlyServersTest
|
|||
final ListenableFuture<StatusResponseHolder> response2 = skepticalClient
|
||||
.go(
|
||||
new Request(
|
||||
HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", sslConnector.getLocalPort()))
|
||||
HttpMethod.GET,
|
||||
new URL(StringUtils.format("https://localhost:%d/", sslConnector.getLocalPort()))
|
||||
),
|
||||
new StatusResponseHandler(StandardCharsets.UTF_8)
|
||||
);
|
||||
|
|
|
@ -122,12 +122,8 @@ public class EvalTest
|
|||
Assert.assertFalse(evalLong("!9223372036854775807", bindings) > 0);
|
||||
|
||||
Assert.assertEquals(3037000499L, evalLong("cast(sqrt(9223372036854775807), 'long')", bindings));
|
||||
Assert.assertEquals(
|
||||
1L, evalLong("if(x == 9223372036854775807, 1, 0)", bindings)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
0L, evalLong("if(x - 1 == 9223372036854775807, 1, 0)", bindings)
|
||||
);
|
||||
Assert.assertEquals(1L, evalLong("if(x == 9223372036854775807, 1, 0)", bindings));
|
||||
Assert.assertEquals(0L, evalLong("if(x - 1 == 9223372036854775807, 1, 0)", bindings));
|
||||
|
||||
Assert.assertEquals(1271030400000L, evalLong("timestamp('2010-04-12')", bindings));
|
||||
Assert.assertEquals(1270998000000L, evalLong("timestamp('2010-04-12T+09:00')", bindings));
|
||||
|
|
|
@ -29,9 +29,7 @@ import java.util.Map;
|
|||
public class NoopDataSegmentMover implements DataSegmentMover
|
||||
{
|
||||
@Override
|
||||
public DataSegment move(
|
||||
DataSegment segment, Map<String, Object> targetLoadSpec
|
||||
)
|
||||
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec)
|
||||
{
|
||||
return segment;
|
||||
}
|
||||
|
|
|
@ -97,8 +97,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
|
|||
log.info("Retrieving file from region[%s], container[%s] and path [%s]",
|
||||
region, container, path
|
||||
);
|
||||
CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(
|
||||
cloudFilesApi, region, container);
|
||||
CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
|
||||
return new CloudFilesByteSource(objectApi, path);
|
||||
}
|
||||
|
||||
|
|
|
@ -64,8 +64,10 @@ public class CloudFilesDataSegmentPuller
|
|||
}
|
||||
catch (IOException ioe) {
|
||||
log.warn(
|
||||
ioe, "Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(), path
|
||||
ioe,
|
||||
"Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(),
|
||||
path
|
||||
);
|
||||
}
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
|
|
|
@ -92,7 +92,9 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
|
|||
return CloudFilesUtils.retryCloudFilesOperation(
|
||||
() -> {
|
||||
CloudFilesObject segmentData = new CloudFilesObject(
|
||||
segmentPath, outFile, objectApi.getRegion(),
|
||||
segmentPath,
|
||||
outFile,
|
||||
objectApi.getRegion(),
|
||||
objectApi.getContainer()
|
||||
);
|
||||
|
||||
|
@ -103,8 +105,10 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
|
|||
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
|
||||
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
|
||||
CloudFilesObject descriptorData = new CloudFilesObject(
|
||||
segmentPath, descFile,
|
||||
objectApi.getRegion(), objectApi.getContainer()
|
||||
segmentPath,
|
||||
descFile,
|
||||
objectApi.getRegion(),
|
||||
objectApi.getContainer()
|
||||
);
|
||||
log.info("Pushing %s.", descriptorData.getPath());
|
||||
objectApi.put(descriptorData);
|
||||
|
|
|
@ -68,8 +68,10 @@ public class GoogleDataSegmentPuller implements URIDataPuller
|
|||
}
|
||||
catch (IOException ioe) {
|
||||
LOG.warn(
|
||||
ioe, "Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(), path
|
||||
ioe,
|
||||
"Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(),
|
||||
path
|
||||
);
|
||||
}
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
|
|
|
@ -153,12 +153,14 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
|
|||
final Map<Integer, Long> lastOffsets = loadOffsetFromPreviousMetaData(lastCommit);
|
||||
|
||||
for (Integer partition : partitionIdList) {
|
||||
final KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer(
|
||||
feed, partition, clientId, brokerList, earliest
|
||||
);
|
||||
final KafkaSimpleConsumer kafkaSimpleConsumer =
|
||||
new KafkaSimpleConsumer(feed, partition, clientId, brokerList, earliest);
|
||||
Long startOffset = lastOffsets.get(partition);
|
||||
PartitionConsumerWorker worker = new PartitionConsumerWorker(
|
||||
feed, kafkaSimpleConsumer, partition, startOffset == null ? -1 : startOffset
|
||||
feed,
|
||||
kafkaSimpleConsumer,
|
||||
partition,
|
||||
startOffset == null ? -1 : startOffset
|
||||
);
|
||||
consumerWorkers.add(worker);
|
||||
}
|
||||
|
|
|
@ -196,9 +196,7 @@ public class KafkaSimpleConsumer
|
|||
earliest ? kafka.api.OffsetRequest.EarliestTime() : kafka.api.OffsetRequest.LatestTime(), 1
|
||||
)
|
||||
);
|
||||
OffsetRequest request = new OffsetRequest(
|
||||
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId
|
||||
);
|
||||
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
|
||||
OffsetResponse response = null;
|
||||
try {
|
||||
response = consumer.getOffsetsBefore(request);
|
||||
|
@ -323,7 +321,11 @@ public class KafkaSimpleConsumer
|
|||
catch (Exception e) {
|
||||
ensureNotInterrupted(e);
|
||||
log.warn(
|
||||
e, "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", broker, topic, partitionId
|
||||
e,
|
||||
"error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]",
|
||||
broker,
|
||||
topic,
|
||||
partitionId
|
||||
);
|
||||
}
|
||||
finally {
|
||||
|
|
|
@ -261,18 +261,14 @@ public class DatasourceOptimizerTest extends CuratorTestBase
|
|||
|
||||
private void setupViews() throws Exception
|
||||
{
|
||||
baseView = new BatchServerInventoryView(
|
||||
zkPathsConfig,
|
||||
curator,
|
||||
jsonMapper,
|
||||
Predicates.alwaysTrue()
|
||||
)
|
||||
baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue())
|
||||
{
|
||||
@Override
|
||||
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
|
||||
{
|
||||
super.registerSegmentCallback(
|
||||
exec, new SegmentCallback()
|
||||
exec,
|
||||
new SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
|
|
|
@ -203,9 +203,7 @@ public class OrcIndexGeneratorJobTest
|
|||
Map.class
|
||||
),
|
||||
aggs,
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)),
|
||||
null,
|
||||
mapper
|
||||
),
|
||||
|
|
|
@ -95,8 +95,10 @@ public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
|
|||
|
||||
@Override
|
||||
public RecordMaterializer<GenericRecord> prepareForRead(
|
||||
Configuration configuration, Map<String, String> keyValueMetaData,
|
||||
MessageType fileSchema, ReadContext readContext
|
||||
Configuration configuration,
|
||||
Map<String, String> keyValueMetaData,
|
||||
MessageType fileSchema,
|
||||
ReadContext readContext
|
||||
)
|
||||
{
|
||||
MessageType parquetSchema = readContext.getRequestedSchema();
|
||||
|
|
|
@ -45,9 +45,8 @@ public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericR
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public RecordReader<NullWritable, GenericRecord> createRecordReader(
|
||||
InputSplit split, TaskAttemptContext context
|
||||
) throws IOException
|
||||
public RecordReader<NullWritable, GenericRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
|
||||
throws IOException
|
||||
{
|
||||
Schema readerSchema = AvroJob.getInputValueSchema(context.getConfiguration());
|
||||
|
||||
|
|
|
@ -90,7 +90,9 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
|
|||
catch (EOFException eof) {
|
||||
// waiting for avro v1.9.0 (#AVRO-813)
|
||||
throw new ParseException(
|
||||
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
|
||||
eof,
|
||||
"Avro's unnecessary EOFException, detail: [%s]",
|
||||
"https://issues.apache.org/jira/browse/AVRO-813"
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -122,7 +122,9 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
|
|||
catch (EOFException eof) {
|
||||
// waiting for avro v1.9.0 (#AVRO-813)
|
||||
throw new ParseException(
|
||||
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
|
||||
eof,
|
||||
"Avro's unnecessary EOFException, detail: [%s]",
|
||||
"https://issues.apache.org/jira/browse/AVRO-813"
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -84,7 +84,9 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
|
|||
catch (EOFException eof) {
|
||||
// waiting for avro v1.9.0 (#AVRO-813)
|
||||
throw new ParseException(
|
||||
eof, "Avro's unnecessary EOFException, detail: [%s]", "https://issues.apache.org/jira/browse/AVRO-813"
|
||||
eof,
|
||||
"Avro's unnecessary EOFException, detail: [%s]",
|
||||
"https://issues.apache.org/jira/browse/AVRO-813"
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
|
|
@ -281,7 +281,8 @@ public class AvroStreamInputRowParserTest
|
|||
// towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality
|
||||
assertEquals(1, inputRow.getDimension("someIntValueMap").size());
|
||||
assertEquals(
|
||||
SOME_INT_VALUE_MAP_VALUE, new HashMap<CharSequence, Integer>(
|
||||
SOME_INT_VALUE_MAP_VALUE,
|
||||
new HashMap<CharSequence, Integer>(
|
||||
Maps.transformValues(
|
||||
Splitter.on(",")
|
||||
.withKeyValueSeparator("=")
|
||||
|
@ -299,7 +300,8 @@ public class AvroStreamInputRowParserTest
|
|||
)
|
||||
);
|
||||
assertEquals(
|
||||
SOME_STRING_VALUE_MAP_VALUE, new HashMap<CharSequence, CharSequence>(
|
||||
SOME_STRING_VALUE_MAP_VALUE,
|
||||
new HashMap<CharSequence, CharSequence>(
|
||||
Splitter.on(",")
|
||||
.withKeyValueSeparator("=")
|
||||
.split(inputRow.getDimension("someIntValueMap").get(0).replaceAll("[\\{\\} ]", ""))
|
||||
|
|
|
@ -68,9 +68,7 @@ public class SketchModule implements DruidModule
|
|||
new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG),
|
||||
new NamedType(SketchConstantPostAggregator.class, THETA_SKETCH_CONSTANT_POST_AGG)
|
||||
)
|
||||
.addSerializer(
|
||||
SketchHolder.class, new SketchHolderJsonSerializer()
|
||||
)
|
||||
.addSerializer(SketchHolder.class, new SketchHolderJsonSerializer())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -231,9 +231,7 @@ public class CommonCacheNotifier
|
|||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<StatusResponseHolder> clientResponse, Throwable e
|
||||
)
|
||||
public void exceptionCaught(ClientResponse<StatusResponseHolder> clientResponse, Throwable e)
|
||||
{
|
||||
// Its safe to Ignore as the ClientResponse returned in handleChunk were unfinished
|
||||
log.error(e, "exceptionCaught in CommonCacheNotifier ResponseHandler.");
|
||||
|
|
|
@ -152,9 +152,8 @@ public class BasicHTTPAuthenticator implements Authenticator
|
|||
|
||||
|
||||
@Override
|
||||
public void doFilter(
|
||||
ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain
|
||||
) throws IOException, ServletException
|
||||
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
|
||||
throws IOException, ServletException
|
||||
{
|
||||
HttpServletResponse httpResp = (HttpServletResponse) servletResponse;
|
||||
|
||||
|
|
|
@ -68,9 +68,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler<FullRespons
|
|||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(
|
||||
ClientResponse<FullResponseHolder> clientResponse, Throwable e
|
||||
)
|
||||
public void exceptionCaught(ClientResponse<FullResponseHolder> clientResponse, Throwable e)
|
||||
{
|
||||
// Its safe to Ignore as the ClientResponse returned in handleChunk were unfinished
|
||||
}
|
||||
|
|
|
@ -67,9 +67,7 @@ public class BasicRoleBasedAuthorizer implements Authorizer
|
|||
}
|
||||
|
||||
@Override
|
||||
public Access authorize(
|
||||
AuthenticationResult authenticationResult, Resource resource, Action action
|
||||
)
|
||||
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
|
||||
{
|
||||
if (authenticationResult == null) {
|
||||
throw new IAE("WTF? authenticationResult should never be null.");
|
||||
|
|
|
@ -356,9 +356,7 @@ public class CoordinatorBasicAuthorizerMetadataStorageUpdater implements BasicAu
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setPermissions(
|
||||
String prefix, String roleName, List<ResourceAction> permissions
|
||||
)
|
||||
public void setPermissions(String prefix, String roleName, List<ResourceAction> permissions)
|
||||
{
|
||||
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
setPermissionsInternal(prefix, roleName, permissions);
|
||||
|
|
|
@ -241,9 +241,7 @@ public class CoordinatorBasicAuthorizerResourceHandler implements BasicAuthorize
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response setRolePermissions(
|
||||
String authorizerName, String roleName, List<ResourceAction> permissions
|
||||
)
|
||||
public Response setRolePermissions(String authorizerName, String roleName, List<ResourceAction> permissions)
|
||||
{
|
||||
final BasicRoleBasedAuthorizer authorizer = authorizerMap.get(authorizerName);
|
||||
if (authorizer == null) {
|
||||
|
|
|
@ -126,9 +126,7 @@ public class DefaultBasicAuthorizerResourceHandler implements BasicAuthorizerRes
|
|||
}
|
||||
|
||||
@Override
|
||||
public Response setRolePermissions(
|
||||
String authorizerName, String roleName, List<ResourceAction> permissions
|
||||
)
|
||||
public Response setRolePermissions(String authorizerName, String roleName, List<ResourceAction> permissions)
|
||||
{
|
||||
return NOT_FOUND_RESPONSE;
|
||||
}
|
||||
|
|
|
@ -40,9 +40,7 @@ public class BloomFilterSerializersModule extends SimpleModule
|
|||
|
||||
public BloomFilterSerializersModule()
|
||||
{
|
||||
registerSubtypes(
|
||||
new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME)
|
||||
);
|
||||
registerSubtypes(new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME));
|
||||
addSerializer(BloomKFilter.class, new BloomKFilterSerializer());
|
||||
addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
|
||||
addDeserializer(BloomKFilterHolder.class, new BloomKFilterHolderDeserializer());
|
||||
|
@ -50,16 +48,14 @@ public class BloomFilterSerializersModule extends SimpleModule
|
|||
|
||||
private static class BloomKFilterSerializer extends StdSerializer<BloomKFilter>
|
||||
{
|
||||
|
||||
BloomKFilterSerializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(
|
||||
BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider
|
||||
) throws IOException
|
||||
public void serialize(BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
|
||||
throws IOException
|
||||
{
|
||||
jsonGenerator.writeBinary(bloomKFilterToBytes(bloomKFilter));
|
||||
}
|
||||
|
@ -67,16 +63,14 @@ public class BloomFilterSerializersModule extends SimpleModule
|
|||
|
||||
private static class BloomKFilterDeserializer extends StdDeserializer<BloomKFilter>
|
||||
{
|
||||
|
||||
BloomKFilterDeserializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BloomKFilter deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException
|
||||
public BloomKFilter deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
|
||||
throws IOException
|
||||
{
|
||||
return bloomKFilterFromBytes(jsonParser.getBinaryValue());
|
||||
}
|
||||
|
@ -90,9 +84,8 @@ public class BloomFilterSerializersModule extends SimpleModule
|
|||
}
|
||||
|
||||
@Override
|
||||
public BloomKFilterHolder deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException
|
||||
public BloomKFilterHolder deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
|
||||
throws IOException
|
||||
{
|
||||
return BloomKFilterHolder.fromBytes(jsonParser.getBinaryValue());
|
||||
}
|
||||
|
|
|
@ -91,8 +91,7 @@ public class DruidKerberosAuthenticationHandler extends KerberosAuthenticationHa
|
|||
// specifically configured
|
||||
final String[] spnegoPrincipals;
|
||||
if ("*".equals(principal)) {
|
||||
spnegoPrincipals = KerberosUtil.getPrincipalNames(
|
||||
keytab, Pattern.compile("HTTP/.*"));
|
||||
spnegoPrincipals = KerberosUtil.getPrincipalNames(keytab, Pattern.compile("HTTP/.*"));
|
||||
if (spnegoPrincipals.length == 0) {
|
||||
throw new ServletException("Principals do not exist in the keytab");
|
||||
}
|
||||
|
|
|
@ -229,9 +229,8 @@ public class KerberosAuthenticator implements Authenticator
|
|||
}
|
||||
|
||||
@Override
|
||||
public void doFilter(
|
||||
ServletRequest request, ServletResponse response, FilterChain filterChain
|
||||
) throws IOException, ServletException
|
||||
public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain)
|
||||
throws IOException, ServletException
|
||||
{
|
||||
HttpServletRequest httpReq = (HttpServletRequest) request;
|
||||
|
||||
|
@ -474,7 +473,9 @@ public class KerberosAuthenticator implements Authenticator
|
|||
|
||||
@Override
|
||||
public void decorateProxyRequest(
|
||||
HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest
|
||||
HttpServletRequest clientRequest,
|
||||
HttpServletResponse proxyResponse,
|
||||
Request proxyRequest
|
||||
)
|
||||
{
|
||||
Object cookieToken = clientRequest.getAttribute(SIGNED_TOKEN_ATTRIBUTE);
|
||||
|
@ -674,8 +675,11 @@ public class KerberosAuthenticator implements Authenticator
|
|||
* long, boolean, boolean)
|
||||
*/
|
||||
private static void tokenToAuthCookie(
|
||||
HttpServletResponse resp, String token,
|
||||
String domain, String path, long expires,
|
||||
HttpServletResponse resp,
|
||||
String token,
|
||||
String domain,
|
||||
String path,
|
||||
long expires,
|
||||
boolean isCookiePersistent,
|
||||
boolean isSecure
|
||||
)
|
||||
|
|
|
@ -378,7 +378,8 @@ public class HdfsDataSegmentPusherTest
|
|||
addSerializer(Interval.class, ToStringSerializer.instance);
|
||||
addSerializer(NumberedShardSpec.class, ToStringSerializer.instance);
|
||||
addDeserializer(
|
||||
Interval.class, new StdDeserializer<Interval>(Interval.class)
|
||||
Interval.class,
|
||||
new StdDeserializer<Interval>(Interval.class)
|
||||
{
|
||||
@Override
|
||||
public Interval deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
|
||||
|
|
|
@ -575,8 +575,15 @@ public class ApproximateHistogram
|
|||
}
|
||||
|
||||
int mergedBinCount = combineBins(
|
||||
this.binCount, this.positions, this.bins, h.binCount, h.positions, h.bins,
|
||||
mergedPositions, mergedBins, deltas
|
||||
this.binCount,
|
||||
this.positions,
|
||||
this.bins,
|
||||
h.binCount,
|
||||
h.positions,
|
||||
h.bins,
|
||||
mergedPositions,
|
||||
mergedBins,
|
||||
deltas
|
||||
);
|
||||
if (mergedBinCount == 0) {
|
||||
return this;
|
||||
|
@ -631,14 +638,26 @@ public class ApproximateHistogram
|
|||
if (this.binCount + h.binCount <= this.size) {
|
||||
// no need to merge bins
|
||||
mergedBinCount = combineBins(
|
||||
this.binCount, this.positions, this.bins,
|
||||
h.binCount, h.positions, h.bins,
|
||||
mergedPositions, mergedBins, null
|
||||
this.binCount,
|
||||
this.positions,
|
||||
this.bins,
|
||||
h.binCount,
|
||||
h.positions,
|
||||
h.bins,
|
||||
mergedPositions,
|
||||
mergedBins,
|
||||
null
|
||||
);
|
||||
} else {
|
||||
mergedBinCount = ruleCombineBins(
|
||||
this.binCount, this.positions, this.bins, h.binCount, h.positions, h.bins,
|
||||
mergedPositions, mergedBins
|
||||
this.binCount,
|
||||
this.positions,
|
||||
this.bins,
|
||||
h.binCount,
|
||||
h.positions,
|
||||
h.bins,
|
||||
mergedPositions,
|
||||
mergedBins
|
||||
);
|
||||
}
|
||||
for (int i = 0; i < mergedBinCount; ++i) {
|
||||
|
@ -653,9 +672,14 @@ public class ApproximateHistogram
|
|||
}
|
||||
|
||||
protected int ruleCombineBins(
|
||||
int leftBinCount, float[] leftPositions, long[] leftBins,
|
||||
int rightBinCount, float[] rightPositions, long[] rightBins,
|
||||
float[] mergedPositions, long[] mergedBins
|
||||
int leftBinCount,
|
||||
float[] leftPositions,
|
||||
long[] leftBins,
|
||||
int rightBinCount,
|
||||
float[] rightPositions,
|
||||
long[] rightBins,
|
||||
float[] mergedPositions,
|
||||
long[] mergedBins
|
||||
)
|
||||
{
|
||||
final float cutoff;
|
||||
|
@ -855,7 +879,8 @@ public class ApproximateHistogram
|
|||
* @return the last valid index into the mergedPositions and mergedBins arrays
|
||||
*/
|
||||
private static void mergeBins(
|
||||
int mergedBinCount, float[] mergedPositions,
|
||||
int mergedBinCount,
|
||||
float[] mergedPositions,
|
||||
long[] mergedBins,
|
||||
float[] deltas,
|
||||
int numMerge,
|
||||
|
@ -1049,9 +1074,15 @@ public class ApproximateHistogram
|
|||
* @return the number of combined bins
|
||||
*/
|
||||
private static int combineBins(
|
||||
int leftBinCount, float[] leftPositions, long[] leftBins,
|
||||
int rightBinCount, float[] rightPositions, long[] rightBins,
|
||||
float[] mergedPositions, long[] mergedBins, float[] deltas
|
||||
int leftBinCount,
|
||||
float[] leftPositions,
|
||||
long[] leftBins,
|
||||
int rightBinCount,
|
||||
float[] rightPositions,
|
||||
long[] rightBins,
|
||||
float[] mergedPositions,
|
||||
long[] mergedBins,
|
||||
float[] deltas
|
||||
)
|
||||
{
|
||||
int i = 0;
|
||||
|
|
|
@ -242,7 +242,8 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
|
|||
}
|
||||
);
|
||||
Futures.addCallback(
|
||||
future, new FutureCallback<Object>()
|
||||
future,
|
||||
new FutureCallback<Object>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(Object result)
|
||||
|
|
|
@ -95,7 +95,10 @@ public class KafkaLookupExtractorFactoryTest
|
|||
{
|
||||
@Override
|
||||
public Object findInjectableValue(
|
||||
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
|
||||
Object valueId,
|
||||
DeserializationContext ctxt,
|
||||
BeanProperty forProperty,
|
||||
Object beanInstance
|
||||
)
|
||||
{
|
||||
if ("org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
|
||||
|
|
|
@ -67,7 +67,11 @@ public class KafkaIndexTaskClient extends IndexTaskClient
|
|||
|
||||
try {
|
||||
final FullResponseHolder response = submitRequestWithEmptyContent(
|
||||
id, HttpMethod.POST, "stop", publish ? "publish=true" : null, true
|
||||
id,
|
||||
HttpMethod.POST,
|
||||
"stop",
|
||||
publish ? "publish=true" : null,
|
||||
true
|
||||
);
|
||||
return isSuccess(response);
|
||||
}
|
||||
|
|
|
@ -2195,9 +2195,9 @@ public class KafkaSupervisor implements Supervisor
|
|||
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
|
||||
Long remainingSeconds = null;
|
||||
if (startTime != null) {
|
||||
remainingSeconds = Math.max(
|
||||
0, ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - startTime.getMillis())
|
||||
) / 1000;
|
||||
long elapsedMillis = System.currentTimeMillis() - startTime.getMillis();
|
||||
long remainingMillis = Math.max(0, ioConfig.getTaskDuration().getMillis() - elapsedMillis);
|
||||
remainingSeconds = TimeUnit.MILLISECONDS.toSeconds(remainingMillis);
|
||||
}
|
||||
|
||||
taskReports.add(
|
||||
|
|
|
@ -2102,9 +2102,7 @@ public class KafkaIndexTaskTest
|
|||
)
|
||||
{
|
||||
@Override
|
||||
public <T> QueryRunner<T> decorate(
|
||||
QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest
|
||||
)
|
||||
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
@ -2204,7 +2202,9 @@ public class KafkaIndexTaskTest
|
|||
{
|
||||
@Override
|
||||
public boolean registerSegmentHandoffCallback(
|
||||
SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable
|
||||
SegmentDescriptor descriptor,
|
||||
Executor exec,
|
||||
Runnable handOffRunnable
|
||||
)
|
||||
{
|
||||
if (doHandoff) {
|
||||
|
|
|
@ -134,9 +134,7 @@ public class KafkaSupervisorIOConfigTest
|
|||
+ " \"ssl.key.password\":\"mykeypassword\"}\n"
|
||||
+ "}";
|
||||
|
||||
KafkaSupervisorIOConfig config = mapper.readValue(
|
||||
jsonStr, KafkaSupervisorIOConfig.class
|
||||
);
|
||||
KafkaSupervisorIOConfig config = mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class);
|
||||
Properties props = new Properties();
|
||||
KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties());
|
||||
|
||||
|
|
|
@ -91,7 +91,10 @@ public class NamespaceLookupExtractorFactoryTest
|
|||
{
|
||||
@Override
|
||||
public Object findInjectableValue(
|
||||
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
|
||||
Object valueId,
|
||||
DeserializationContext ctxt,
|
||||
BeanProperty forProperty,
|
||||
Object beanInstance
|
||||
)
|
||||
{
|
||||
if (CacheScheduler.class.getName().equals(valueId)) {
|
||||
|
@ -524,7 +527,9 @@ public class NamespaceLookupExtractorFactoryTest
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,10 +68,12 @@ public class UriExtractionNamespaceTest
|
|||
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
|
||||
mapper.setAnnotationIntrospectors(
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()
|
||||
guiceIntrospector,
|
||||
mapper.getSerializationConfig().getAnnotationIntrospector()
|
||||
),
|
||||
new AnnotationIntrospectorPair(
|
||||
guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector()
|
||||
guiceIntrospector,
|
||||
mapper.getDeserializationConfig().getAnnotationIntrospector()
|
||||
)
|
||||
);
|
||||
return mapper;
|
||||
|
|
|
@ -49,7 +49,9 @@ public class OffHeapNamespaceExtractionCacheManagerTest
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,9 @@ public class OnHeapNamespaceExtractionCacheManagerTest
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
new DruidNode("test-inject", null, false, null, null, true, false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,7 @@ public class LookupCacheStats
|
|||
/**
|
||||
* Constructs a new {@code CacheStats} instance.
|
||||
*/
|
||||
public LookupCacheStats(
|
||||
long hitCount, long missCount,
|
||||
long evictionCount
|
||||
)
|
||||
public LookupCacheStats(long hitCount, long missCount, long evictionCount)
|
||||
{
|
||||
Preconditions.checkArgument(hitCount >= 0);
|
||||
Preconditions.checkArgument(missCount >= 0);
|
||||
|
|
|
@ -62,7 +62,8 @@ public class OffHeapPollingCache<K, V> implements PollingCache<K, V>
|
|||
|
||||
final Set<V> setOfValues = setOfValuesBuilder.build();
|
||||
reverseCache.putAll(Maps.asMap(
|
||||
setOfValues, new Function<V, List<K>>()
|
||||
setOfValues,
|
||||
new Function<V, List<K>>()
|
||||
{
|
||||
@Override
|
||||
public List<K> apply(final V input)
|
||||
|
@ -80,7 +81,8 @@ public class OffHeapPollingCache<K, V> implements PollingCache<K, V>
|
|||
}
|
||||
}).keySet());
|
||||
}
|
||||
}));
|
||||
}
|
||||
));
|
||||
started.getAndSet(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,8 @@ public class S3DataSegmentMover implements DataSegmentMover
|
|||
ImmutableMap.<String, Object>builder()
|
||||
.putAll(
|
||||
Maps.filterKeys(
|
||||
loadSpec, new Predicate<String>()
|
||||
loadSpec,
|
||||
new Predicate<String>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(String input)
|
||||
|
|
|
@ -40,16 +40,22 @@ import java.util.Map;
|
|||
public class S3DataSegmentArchiverTest
|
||||
{
|
||||
private static final ObjectMapper MAPPER = new DefaultObjectMapper()
|
||||
.setInjectableValues(new InjectableValues()
|
||||
{
|
||||
@Override
|
||||
public Object findInjectableValue(
|
||||
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
|
||||
)
|
||||
{
|
||||
return PULLER;
|
||||
}
|
||||
}).registerModule(new SimpleModule("s3-archive-test-module").registerSubtypes(S3LoadSpec.class));
|
||||
.setInjectableValues(
|
||||
new InjectableValues()
|
||||
{
|
||||
@Override
|
||||
public Object findInjectableValue(
|
||||
Object valueId,
|
||||
DeserializationContext ctxt,
|
||||
BeanProperty forProperty,
|
||||
Object beanInstance
|
||||
)
|
||||
{
|
||||
return PULLER;
|
||||
}
|
||||
}
|
||||
)
|
||||
.registerModule(new SimpleModule("s3-archive-test-module").registerSubtypes(S3LoadSpec.class));
|
||||
private static final S3DataSegmentArchiverConfig ARCHIVER_CONFIG = new S3DataSegmentArchiverConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -50,8 +50,10 @@ public class PvaluefromZscorePostAggregatorTest
|
|||
public void testSerde() throws Exception
|
||||
{
|
||||
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
PvaluefromZscorePostAggregator postAggregator1 = mapper.readValue(mapper.writeValueAsString(
|
||||
pvaluefromZscorePostAggregator), PvaluefromZscorePostAggregator.class);
|
||||
PvaluefromZscorePostAggregator postAggregator1 = mapper.readValue(
|
||||
mapper.writeValueAsString(pvaluefromZscorePostAggregator),
|
||||
PvaluefromZscorePostAggregator.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(pvaluefromZscorePostAggregator, postAggregator1);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.apache.druid.hll;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.HashFunction;
|
||||
|
@ -31,7 +30,6 @@ import org.junit.Assert;
|
|||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.Arrays;
|
||||
|
@ -763,19 +761,8 @@ public class HyperLogLogCollectorTest
|
|||
);
|
||||
|
||||
List<HyperLogLogCollector> collectors = Lists.transform(
|
||||
objects, new Function<String, HyperLogLogCollector>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public HyperLogLogCollector apply(
|
||||
@Nullable String s
|
||||
)
|
||||
{
|
||||
return HyperLogLogCollector.makeCollector(
|
||||
ByteBuffer.wrap(Base64.decodeBase64(s))
|
||||
);
|
||||
}
|
||||
}
|
||||
objects,
|
||||
s -> HyperLogLogCollector.makeCollector(ByteBuffer.wrap(Base64.decodeBase64(s)))
|
||||
);
|
||||
|
||||
Collection<List<HyperLogLogCollector>> permutations = Collections2.permutations(collectors);
|
||||
|
|
|
@ -167,7 +167,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
if (Utils.exists(groupByJob, fileSystem, partitionInfoPath)) {
|
||||
final Long numRows = config.JSON_MAPPER.readValue(
|
||||
Utils.openInputStream(groupByJob, partitionInfoPath),
|
||||
new TypeReference<Long>() {}
|
||||
Long.class
|
||||
);
|
||||
|
||||
log.info("Found approximately [%,d] rows in data.", numRows);
|
||||
|
@ -381,19 +381,12 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
|
||||
intervals.add(interval);
|
||||
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
|
||||
final OutputStream out = Utils.makePathAndOutputStream(
|
||||
context, outPath, config.isOverwriteFiles()
|
||||
);
|
||||
final OutputStream out = Utils.makePathAndOutputStream(context, outPath, config.isOverwriteFiles());
|
||||
|
||||
try {
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.writerWithType(
|
||||
new TypeReference<Long>()
|
||||
{
|
||||
}
|
||||
).writeValue(
|
||||
out,
|
||||
aggregate.estimateCardinalityRound()
|
||||
);
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
.writerWithType(Long.class)
|
||||
.writeValue(out, aggregate.estimateCardinalityRound());
|
||||
}
|
||||
finally {
|
||||
Closeables.close(out, false);
|
||||
|
@ -407,9 +400,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
super.run(context);
|
||||
if (determineIntervals) {
|
||||
final Path outPath = config.makeIntervalInfoPath();
|
||||
final OutputStream out = Utils.makePathAndOutputStream(
|
||||
context, outPath, config.isOverwriteFiles()
|
||||
);
|
||||
final OutputStream out = Utils.makePathAndOutputStream(context, outPath, config.isOverwriteFiles());
|
||||
|
||||
try {
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.writerWithType(
|
||||
|
|
|
@ -360,9 +360,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void map(
|
||||
BytesWritable key, NullWritable value, Context context
|
||||
) throws IOException, InterruptedException
|
||||
protected void map(BytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException
|
||||
{
|
||||
final List<Object> timeAndDims = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(key.getBytes(), List.class);
|
||||
|
||||
|
@ -529,9 +527,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void reduce(
|
||||
BytesWritable key, Iterable<Text> values, Context context
|
||||
) throws IOException, InterruptedException
|
||||
protected void reduce(BytesWritable key, Iterable<Text> values, Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
|
||||
|
||||
|
@ -540,7 +537,9 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
protected abstract void innerReduce(
|
||||
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||
Context context,
|
||||
SortableBytes keyBytes,
|
||||
Iterable<DimValueCount> combinedIterable
|
||||
) throws IOException, InterruptedException;
|
||||
|
||||
private Iterable<DimValueCount> combineRows(Iterable<Text> input)
|
||||
|
@ -586,9 +585,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
public static class DeterminePartitionsDimSelectionCombiner extends DeterminePartitionsDimSelectionBaseReducer
|
||||
{
|
||||
@Override
|
||||
protected void innerReduce(
|
||||
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||
) throws IOException, InterruptedException
|
||||
protected void innerReduce(Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
for (DimValueCount dvc : combinedIterable) {
|
||||
write(context, keyBytes.getGroupKey(), dvc);
|
||||
|
@ -602,9 +600,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
private static final int HIGH_CARDINALITY_THRESHOLD = 3000000;
|
||||
|
||||
@Override
|
||||
protected void innerReduce(
|
||||
Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable
|
||||
) throws IOException
|
||||
protected void innerReduce(Context context, SortableBytes keyBytes, Iterable<DimValueCount> combinedIterable)
|
||||
throws IOException
|
||||
{
|
||||
final ByteBuffer groupKey = ByteBuffer.wrap(keyBytes.getGroupKey());
|
||||
groupKey.position(4); // Skip partition
|
||||
|
@ -807,14 +804,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
: minDistancePartitions;
|
||||
|
||||
final List<ShardSpec> chosenShardSpecs = Lists.transform(
|
||||
chosenPartitions.partitions, new Function<DimPartition, ShardSpec>()
|
||||
{
|
||||
@Override
|
||||
public ShardSpec apply(DimPartition dimPartition)
|
||||
{
|
||||
return dimPartition.shardSpec;
|
||||
}
|
||||
}
|
||||
chosenPartitions.partitions,
|
||||
dimPartition -> dimPartition.shardSpec
|
||||
);
|
||||
|
||||
log.info("Chosen partitions:");
|
||||
|
@ -950,13 +941,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
context.write(
|
||||
new SortableBytes(
|
||||
groupKey, TAB_JOINER.join(dimValueCount.dim, dimValueCount.value).getBytes(
|
||||
HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET
|
||||
)
|
||||
).toBytesWritable(),
|
||||
dimValueCount.toText()
|
||||
);
|
||||
byte[] sortKey = TAB_JOINER.join(dimValueCount.dim, dimValueCount.value)
|
||||
.getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
|
||||
context.write(new SortableBytes(groupKey, sortKey).toBytesWritable(), dimValueCount.toText());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,9 @@ public class HadoopDruidIndexerConfig
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-indexer", null, false, null, null, true, false)
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
new DruidNode("hadoop-indexer", null, false, null, null, true, false)
|
||||
);
|
||||
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HadoopKerberosConfig.class);
|
||||
}
|
||||
|
@ -162,9 +164,7 @@ public class HadoopDruidIndexerConfig
|
|||
{
|
||||
try {
|
||||
return fromMap(
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
|
||||
file, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||
)
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(file, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -178,9 +178,7 @@ public class HadoopDruidIndexerConfig
|
|||
// This is a map to try and prevent dependency screwbally-ness
|
||||
try {
|
||||
return fromMap(
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
|
||||
str, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||
)
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(str, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
|
@ -197,9 +195,7 @@ public class HadoopDruidIndexerConfig
|
|||
Reader reader = new InputStreamReader(fs.open(pt), StandardCharsets.UTF_8);
|
||||
|
||||
return fromMap(
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
|
||||
reader, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||
)
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(reader, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -420,11 +420,9 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void reduce(
|
||||
final BytesWritable key, Iterable<BytesWritable> values, final Context context
|
||||
) throws IOException, InterruptedException
|
||||
protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, final Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
|
||||
Iterator<BytesWritable> iter = values.iterator();
|
||||
BytesWritable first = iter.next();
|
||||
|
||||
|
@ -587,9 +585,8 @@ public class IndexGeneratorJob implements Jobby
|
|||
final ProgressIndicator progressIndicator
|
||||
) throws IOException
|
||||
{
|
||||
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
|
||||
index, interval, file, config.getIndexSpec(), progressIndicator, null
|
||||
);
|
||||
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
|
||||
.persist(index, interval, file, config.getIndexSpec(), progressIndicator, null);
|
||||
}
|
||||
|
||||
protected File mergeQueryableIndex(
|
||||
|
@ -600,9 +597,8 @@ public class IndexGeneratorJob implements Jobby
|
|||
) throws IOException
|
||||
{
|
||||
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
|
||||
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
|
||||
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null
|
||||
);
|
||||
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
|
||||
.mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -624,9 +620,8 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void reduce(
|
||||
BytesWritable key, Iterable<BytesWritable> values, final Context context
|
||||
) throws IOException, InterruptedException
|
||||
protected void reduce(BytesWritable key, Iterable<BytesWritable> values, final Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
|
||||
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
|
||||
|
@ -775,9 +770,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
|
||||
log.info("starting merge of intermediate persisted segments.");
|
||||
long mergeStartTime = System.currentTimeMillis();
|
||||
mergedBase = mergeQueryableIndex(
|
||||
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
|
||||
);
|
||||
mergedBase = mergeQueryableIndex(indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator);
|
||||
log.info(
|
||||
"finished merge of intermediate persisted segments. time taken [%d] ms.",
|
||||
(System.currentTimeMillis() - mergeStartTime)
|
||||
|
|
|
@ -412,7 +412,8 @@ public class JobHelper
|
|||
final FileSystem outputFS = FileSystem.get(finalIndexZipFilePath.toUri(), configuration);
|
||||
final AtomicLong size = new AtomicLong(0L);
|
||||
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
|
||||
DataPusher.class, new DataPusher()
|
||||
DataPusher.class,
|
||||
new DataPusher()
|
||||
{
|
||||
@Override
|
||||
public long push() throws IOException
|
||||
|
@ -469,7 +470,8 @@ public class JobHelper
|
|||
throws IOException
|
||||
{
|
||||
final DataPusher descriptorPusher = (DataPusher) RetryProxy.create(
|
||||
DataPusher.class, new DataPusher()
|
||||
DataPusher.class,
|
||||
new DataPusher()
|
||||
{
|
||||
@Override
|
||||
public long push() throws IOException
|
||||
|
@ -694,7 +696,8 @@ public class JobHelper
|
|||
}
|
||||
|
||||
final DataPusher zipPusher = (DataPusher) RetryProxy.create(
|
||||
DataPusher.class, new DataPusher()
|
||||
DataPusher.class,
|
||||
new DataPusher()
|
||||
{
|
||||
@Override
|
||||
public long push() throws IOException
|
||||
|
|
|
@ -115,10 +115,7 @@ public class SortableBytes
|
|||
int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
|
||||
int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();
|
||||
|
||||
final int retVal = compareBytes(
|
||||
b1, s1 + 8, b1Length,
|
||||
b2, s2 + 8, b2Length
|
||||
);
|
||||
final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
@ -138,15 +135,16 @@ public class SortableBytes
|
|||
int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
|
||||
int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();
|
||||
|
||||
int retVal = compareBytes(
|
||||
b1, s1 + 8, b1Length,
|
||||
b2, s2 + 8, b2Length
|
||||
);
|
||||
int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
|
||||
|
||||
if (retVal == 0) {
|
||||
retVal = compareBytes(
|
||||
b1, s1 + 8 + b1Length, l1 - 8 - b1Length,
|
||||
b2, s2 + 8 + b2Length, l2 - 8 - b2Length
|
||||
b1,
|
||||
s1 + 8 + b1Length,
|
||||
l1 - 8 - b1Length,
|
||||
b2,
|
||||
s2 + 8 + b2Length,
|
||||
l2 - 8 - b2Length
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -118,14 +118,9 @@ public class Utils
|
|||
);
|
||||
}
|
||||
|
||||
public static void storeStats(
|
||||
JobContext job, Path path, Map<String, Object> stats
|
||||
) throws IOException
|
||||
public static void storeStats(JobContext job, Path path, Map<String, Object> stats) throws IOException
|
||||
{
|
||||
jsonMapper.writeValue(
|
||||
makePathAndOutputStream(job, path, true),
|
||||
stats
|
||||
);
|
||||
jsonMapper.writeValue(makePathAndOutputStream(job, path, true), stats);
|
||||
}
|
||||
|
||||
public static String getFailureMessage(Job failedJob, ObjectMapper jsonMapper)
|
||||
|
|
|
@ -111,9 +111,7 @@ public class DatasourcePathSpec implements PathSpec
|
|||
}
|
||||
|
||||
@Override
|
||||
public Job addInputPaths(
|
||||
HadoopDruidIndexerConfig config, Job job
|
||||
) throws IOException
|
||||
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
|
||||
{
|
||||
if (segments == null || segments.isEmpty()) {
|
||||
if (ingestionSpec.isIgnoreWhenNoSegments()) {
|
||||
|
|
|
@ -43,9 +43,7 @@ public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<DataSegment> getUsedSegmentsForIntervals(
|
||||
String dataSource, List<Interval> intervals
|
||||
)
|
||||
public List<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> intervals)
|
||||
{
|
||||
return indexerMetadataStorageCoordinator.getUsedSegmentsForIntervals(dataSource, intervals);
|
||||
}
|
||||
|
|
|
@ -46,9 +46,7 @@ public class MultiplePathSpec implements PathSpec
|
|||
}
|
||||
|
||||
@Override
|
||||
public Job addInputPaths(
|
||||
HadoopDruidIndexerConfig config, Job job
|
||||
) throws IOException
|
||||
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
|
||||
{
|
||||
for (PathSpec spec : children) {
|
||||
spec.addInputPaths(config, job);
|
||||
|
|
|
@ -302,7 +302,8 @@ public class HadoopConverterJob
|
|||
}
|
||||
final List<DataSegment> returnList = ImmutableList.copyOf(
|
||||
Lists.transform(
|
||||
goodPaths, new Function<Path, DataSegment>()
|
||||
goodPaths,
|
||||
new Function<Path, DataSegment>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
|
@ -486,10 +487,7 @@ public class HadoopConverterJob
|
|||
private static final String TMP_FILE_LOC_KEY = "org.apache.druid.indexer.updater.converter.reducer.tmpDir";
|
||||
|
||||
@Override
|
||||
protected void map(
|
||||
String key, String value,
|
||||
final Context context
|
||||
) throws IOException, InterruptedException
|
||||
protected void map(String key, String value, final Context context) throws IOException, InterruptedException
|
||||
{
|
||||
final InputSplit split = context.getInputSplit();
|
||||
if (!(split instanceof DatasourceInputSplit)) {
|
||||
|
@ -623,7 +621,8 @@ public class HadoopConverterJob
|
|||
throw new IOException("Bad config, missing segments");
|
||||
}
|
||||
return Lists.transform(
|
||||
segments, new Function<DataSegment, InputSplit>()
|
||||
segments,
|
||||
new Function<DataSegment, InputSplit>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
|
@ -637,7 +636,8 @@ public class HadoopConverterJob
|
|||
|
||||
@Override
|
||||
public RecordReader<String, String> createRecordReader(
|
||||
final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext
|
||||
final InputSplit inputSplit,
|
||||
final TaskAttemptContext taskAttemptContext
|
||||
)
|
||||
{
|
||||
return new RecordReader<String, String>()
|
||||
|
|
|
@ -64,7 +64,9 @@ public class HadoopDruidConverterConfig
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-converter", null, false, null, null, true, false)
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
new DruidNode("hadoop-converter", null, false, null, null, true, false)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -462,9 +462,7 @@ public class BatchDeltaIngestionTest
|
|||
new LongSumAggregatorFactory("visited_sum", "visited_num"),
|
||||
new HyperUniquesAggregatorFactory("unique_hosts", "host2")
|
||||
},
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(INTERVAL_FULL)),
|
||||
null,
|
||||
MAPPER
|
||||
),
|
||||
|
|
|
@ -237,7 +237,9 @@ public class DeterminePartitionsJobTest
|
|||
),
|
||||
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of(interval))
|
||||
Granularities.DAY,
|
||||
Granularities.NONE,
|
||||
ImmutableList.of(Intervals.of(interval))
|
||||
),
|
||||
null,
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
|
|
|
@ -81,7 +81,9 @@ public class IndexGeneratorCombinerTest
|
|||
new HyperUniquesAggregatorFactory("unique_hosts", "host")
|
||||
},
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2010/2011"))
|
||||
Granularities.DAY,
|
||||
Granularities.NONE,
|
||||
ImmutableList.of(Intervals.of("2010/2011"))
|
||||
),
|
||||
null,
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
|
|
|
@ -502,9 +502,7 @@ public class IndexGeneratorJobTest
|
|||
Map.class
|
||||
),
|
||||
aggs,
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)),
|
||||
null,
|
||||
mapper
|
||||
),
|
||||
|
|
|
@ -92,9 +92,7 @@ public class JobHelperTest
|
|||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)
|
||||
),
|
||||
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)),
|
||||
null,
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
),
|
||||
|
|
|
@ -48,18 +48,13 @@ public class SortableBytesTest
|
|||
|
||||
Assert.assertEquals(
|
||||
-1,
|
||||
WritableComparator.compareBytes(
|
||||
thingie1Bytes, 0, thingie1Bytes.length,
|
||||
thingie2Bytes, 0, thingie2Bytes.length
|
||||
)
|
||||
WritableComparator.compareBytes(thingie1Bytes, 0, thingie1Bytes.length, thingie2Bytes, 0, thingie2Bytes.length)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
new SortableBytes.SortableBytesGroupingComparator().compare(
|
||||
thingie1Bytes, 0, thingie1Bytes.length,
|
||||
thingie2Bytes, 0, thingie2Bytes.length
|
||||
)
|
||||
new SortableBytes.SortableBytesGroupingComparator()
|
||||
.compare(thingie1Bytes, 0, thingie1Bytes.length, thingie2Bytes, 0, thingie2Bytes.length)
|
||||
);
|
||||
|
||||
SortableBytes reconThingie1 = SortableBytes.fromBytes(thingie1Bytes, 4, thingie1Bytes.length - 4);
|
||||
|
|
|
@ -151,35 +151,27 @@ public class DatasourceInputFormatTest
|
|||
Path path4 = new Path(JobHelper.getURIFromSegment(segments2.get(0).getSegment()));
|
||||
|
||||
// dummy locations for test
|
||||
locations = ImmutableList.of(
|
||||
new LocatedFileStatus(
|
||||
1000, false, 0, 0, 0, 0, null, null, null, null, path1,
|
||||
new BlockLocation[]{
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 600),
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 600, 400)
|
||||
}
|
||||
),
|
||||
new LocatedFileStatus(
|
||||
4000, false, 0, 0, 0, 0, null, null, null, null, path2,
|
||||
new BlockLocation[]{
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000),
|
||||
new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200),
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100),
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700)
|
||||
}
|
||||
),
|
||||
new LocatedFileStatus(
|
||||
500, false, 0, 0, 0, 0, null, null, null, null, path3,
|
||||
new BlockLocation[]{
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
|
||||
}
|
||||
),
|
||||
new LocatedFileStatus(
|
||||
500, false, 0, 0, 0, 0, null, null, null, null, path4,
|
||||
new BlockLocation[]{
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
|
||||
}
|
||||
)
|
||||
BlockLocation[] locations1 = {
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 600),
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 600, 400)
|
||||
};
|
||||
BlockLocation[] locations2 = {
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000),
|
||||
new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200),
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100),
|
||||
new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700)
|
||||
};
|
||||
BlockLocation[] locations3 = {
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
|
||||
};
|
||||
BlockLocation[] locations4 = {
|
||||
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
|
||||
};
|
||||
this.locations = ImmutableList.of(
|
||||
new LocatedFileStatus(1000, false, 0, 0, 0, 0, null, null, null, null, path1, locations1),
|
||||
new LocatedFileStatus(4000, false, 0, 0, 0, 0, null, null, null, null, path2, locations2),
|
||||
new LocatedFileStatus(500, false, 0, 0, 0, 0, null, null, null, null, path3, locations3),
|
||||
new LocatedFileStatus(500, false, 0, 0, 0, 0, null, null, null, null, path4, locations4)
|
||||
);
|
||||
|
||||
config = populateConfiguration(new JobConf(), segments1, 0);
|
||||
|
|
|
@ -342,7 +342,9 @@ public class DatasourcePathSpecTest
|
|||
new LongSumAggregatorFactory("visited_sum", "visited")
|
||||
},
|
||||
new UniformGranularitySpec(
|
||||
Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2000/3000"))
|
||||
Granularities.DAY,
|
||||
Granularities.NONE,
|
||||
ImmutableList.of(Intervals.of("2000/3000"))
|
||||
),
|
||||
null,
|
||||
HadoopDruidIndexerConfig.JSON_MAPPER
|
||||
|
|
|
@ -93,9 +93,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Boolean perform(
|
||||
Task task, TaskActionToolbox toolbox
|
||||
)
|
||||
public Boolean perform(Task task, TaskActionToolbox toolbox)
|
||||
{
|
||||
return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
|
||||
supervisorId,
|
||||
|
|
|
@ -59,9 +59,7 @@ public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Boolean perform(
|
||||
Task task, TaskActionToolbox toolbox
|
||||
)
|
||||
public Boolean perform(Task task, TaskActionToolbox toolbox)
|
||||
{
|
||||
return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata);
|
||||
}
|
||||
|
|
|
@ -63,9 +63,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
|
|||
}
|
||||
|
||||
@Override
|
||||
public Void perform(
|
||||
Task task, TaskActionToolbox toolbox
|
||||
)
|
||||
public Void perform(Task task, TaskActionToolbox toolbox)
|
||||
{
|
||||
TaskActionPreconditions.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
|
||||
|
||||
|
|
|
@ -242,9 +242,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
|
|||
RealtimeAppenderatorTuningConfig tuningConfig = spec.getTuningConfig()
|
||||
.withBasePersistDirectory(toolbox.getPersistDir());
|
||||
|
||||
final FireDepartment fireDepartmentForMetrics = new FireDepartment(
|
||||
dataSchema, new RealtimeIOConfig(null, null, null), null
|
||||
);
|
||||
final FireDepartment fireDepartmentForMetrics =
|
||||
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null, null), null);
|
||||
|
||||
final TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
|
||||
this,
|
||||
|
|
|
@ -890,9 +890,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
|
|||
) throws IOException, InterruptedException
|
||||
{
|
||||
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
|
||||
final FireDepartment fireDepartmentForMetrics = new FireDepartment(
|
||||
dataSchema, new RealtimeIOConfig(null, null, null), null
|
||||
);
|
||||
final FireDepartment fireDepartmentForMetrics =
|
||||
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null, null), null);
|
||||
buildSegmentsFireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
|
||||
|
||||
if (toolbox.getMonitorScheduler() != null) {
|
||||
|
|
|
@ -289,9 +289,8 @@ public class ParallelIndexSubTask extends AbstractTask
|
|||
{
|
||||
final DataSchema dataSchema = ingestionSchema.getDataSchema();
|
||||
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
|
||||
final FireDepartment fireDepartmentForMetrics = new FireDepartment(
|
||||
dataSchema, new RealtimeIOConfig(null, null, null), null
|
||||
);
|
||||
final FireDepartment fireDepartmentForMetrics =
|
||||
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null, null), null);
|
||||
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
|
||||
|
||||
if (toolbox.getMonitorScheduler() != null) {
|
||||
|
|
|
@ -43,9 +43,7 @@ public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<DataSegment> getUsedSegmentsForIntervals(
|
||||
String dataSource, List<Interval> intervals
|
||||
) throws IOException
|
||||
public List<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> intervals) throws IOException
|
||||
{
|
||||
return toolbox
|
||||
.getTaskActionClient()
|
||||
|
|
|
@ -222,7 +222,9 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
|
||||
@Override
|
||||
public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
|
||||
@Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String datasource
|
||||
@Nullable Integer maxTaskStatuses,
|
||||
@Nullable Duration duration,
|
||||
@Nullable String datasource
|
||||
)
|
||||
{
|
||||
giant.lock();
|
||||
|
|
|
@ -780,7 +780,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
ImmutableMap.copyOf(
|
||||
Maps.transformEntries(
|
||||
Maps.filterEntries(
|
||||
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
|
||||
zkWorkers,
|
||||
new Predicate<Map.Entry<String, ZkWorker>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Map.Entry<String, ZkWorker> input)
|
||||
|
@ -791,16 +792,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
}
|
||||
}
|
||||
),
|
||||
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
|
||||
{
|
||||
@Override
|
||||
public ImmutableWorkerInfo transformEntry(
|
||||
String key, ZkWorker value
|
||||
)
|
||||
{
|
||||
return value.toImmutable();
|
||||
}
|
||||
}
|
||||
(String key, ZkWorker value) -> value.toImmutable()
|
||||
)
|
||||
),
|
||||
task
|
||||
|
|
|
@ -329,7 +329,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
ImmutableMap.copyOf(
|
||||
Maps.transformEntries(
|
||||
Maps.filterEntries(
|
||||
workers, new Predicate<Map.Entry<String, WorkerHolder>>()
|
||||
workers,
|
||||
new Predicate<Map.Entry<String, WorkerHolder>>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Map.Entry<String, WorkerHolder> input)
|
||||
|
@ -340,16 +341,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
}
|
||||
}
|
||||
),
|
||||
new Maps.EntryTransformer<String, WorkerHolder, ImmutableWorkerInfo>()
|
||||
{
|
||||
@Override
|
||||
public ImmutableWorkerInfo transformEntry(
|
||||
String key, WorkerHolder value
|
||||
)
|
||||
{
|
||||
return value.toImmutable();
|
||||
}
|
||||
}
|
||||
(String key, WorkerHolder value) -> value.toImmutable()
|
||||
)
|
||||
),
|
||||
task
|
||||
|
@ -778,9 +770,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<Worker> markWorkersLazy(
|
||||
Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers
|
||||
)
|
||||
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
|
||||
{
|
||||
synchronized (statusLock) {
|
||||
Iterator<String> iterator = workers.keySet().iterator();
|
||||
|
|
|
@ -698,9 +698,8 @@ public class OverlordResource
|
|||
final Interval theInterval = Intervals.of(interval.replace("_", "/"));
|
||||
duration = theInterval.toDuration();
|
||||
}
|
||||
final List<TaskInfo<Task, TaskStatus>> taskInfoList = taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(
|
||||
maxCompletedTasks, duration, dataSource
|
||||
);
|
||||
final List<TaskInfo<Task, TaskStatus>> taskInfoList =
|
||||
taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource);
|
||||
final List<TaskStatusPlus> completedTasks = taskInfoList.stream()
|
||||
.map(completeTaskTransformFunc::apply)
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
@ -74,7 +74,9 @@ public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy
|
|||
|
||||
@Override
|
||||
public ImmutableWorkerInfo findWorkerForTask(
|
||||
WorkerTaskRunnerConfig config, ImmutableMap<String, ImmutableWorkerInfo> zkWorkers, Task task
|
||||
WorkerTaskRunnerConfig config,
|
||||
ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
|
||||
Task task
|
||||
)
|
||||
{
|
||||
fnSelector = fnSelector == null ? compileSelectorFunction() : fnSelector;
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue