mirror of
https://github.com/apache/druid.git
synced 2025-02-09 19:44:57 +00:00
Merge branch 'master' of github.com:druid-io/druid into system-table
This commit is contained in:
commit
9efbe9609e
@ -16,6 +16,12 @@ cache:
|
||||
|
||||
matrix:
|
||||
include:
|
||||
# license checks
|
||||
- env:
|
||||
- NAME="license checks"
|
||||
install: true
|
||||
script: MAVEN_OPTS='-Xmx3000m' mvn clean verify -Prat -DskipTests -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
|
||||
|
||||
# strict compilation
|
||||
- env:
|
||||
- NAME="strict compilation"
|
||||
|
@ -45,7 +45,6 @@
|
||||
<!-- the default value is a repeated flag from the command line, since blank value is not allowed -->
|
||||
<druid.distribution.pulldeps.opts>--clean</druid.distribution.pulldeps.opts>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>dist</id>
|
||||
@ -91,6 +90,8 @@
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-avro-extensions</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-bloom-filter</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-datasketches</argument>
|
||||
<argument>-c</argument>
|
||||
<argument>org.apache.druid.extensions:druid-hdfs-storage</argument>
|
||||
|
45
docs/content/development/extensions-core/bloom-filter.md
Normal file
45
docs/content/development/extensions-core/bloom-filter.md
Normal file
@ -0,0 +1,45 @@
|
||||
---
|
||||
layout: doc_page
|
||||
---
|
||||
|
||||
# Druid Bloom Filter
|
||||
|
||||
Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an extension.
|
||||
|
||||
BloomFilter is a probabilistic data structure for set membership check.
|
||||
Following are some characterstics of BloomFilter
|
||||
- BloomFilters are highly space efficient when compared to using a HashSet.
|
||||
- Because of the probabilistic nature of bloom filter false positive (element not present in bloom filter but test() says true) are possible
|
||||
- false negatives are not possible (if element is present then test() will never say false).
|
||||
- The false positive probability is configurable (default: 5%) depending on which storage requirement may increase or decrease.
|
||||
- Lower the false positive probability greater is the space requirement.
|
||||
- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
|
||||
- During the creation of bloom filter expected number of entries must be specified.If the number of insertions exceed the specified initial number of entries then false positive probability will increase accordingly.
|
||||
|
||||
Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash algorithm.
|
||||
|
||||
### Json Representation of Bloom Filter
|
||||
```json
|
||||
{
|
||||
"type" : "bloom",
|
||||
"dimension" : <dimension_name>,
|
||||
"bloomKFilter" : <serialized_bytes_for_BloomKFilter>,
|
||||
"extractionFn" : <extraction_fn>
|
||||
}
|
||||
```
|
||||
|
||||
|Property |Description |required? |
|
||||
|-------------------------|------------------------------|----------------------------------|
|
||||
|`type` |Filter Type. Should always be `bloom`|yes|
|
||||
|`dimension` |The dimension to filter over. | yes |
|
||||
|`bloomKFilter` |Base64 encoded Binary representation of `org.apache.hive.common.util.BloomKFilter`| yes |
|
||||
|`extractionFn`|[Extraction function](./../dimensionspecs.html#extraction-functions) to apply to the dimension values |no|
|
||||
|
||||
|
||||
### Serialized Format for BloomKFilter
|
||||
Serialized BloomKFilter format:
|
||||
- 1 byte for the number of hash functions.
|
||||
- 1 big endian int(That is how OutputStream works) for the number of longs in the bitset
|
||||
- big endian longs in the BloomKFilter bitset
|
||||
|
||||
Note: `org.apache.hive.common.util.BloomKFilter` provides a serialize method which can be used to serialize bloom filters to outputStream.
|
@ -23,6 +23,7 @@ Core extensions are maintained by Druid committers.
|
||||
|----|-----------|----|
|
||||
|druid-avro-extensions|Support for data in Apache Avro data format.|[link](../development/extensions-core/avro.html)|
|
||||
|druid-basic-security|Support for Basic HTTP authentication and role-based access control.|[link](../development/extensions-core/druid-basic-security.html)|
|
||||
|druid-bloom-filter|Support for providing Bloom filters in druid queries.|[link](../development/extensions-core/bloom-filter.html)|
|
||||
|druid-caffeine-cache|A local cache implementation backed by Caffeine.|[link](../development/extensions-core/caffeine-cache.html)|
|
||||
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)|
|
||||
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
|
||||
|
@ -187,7 +187,7 @@ handle all formatting decisions on their own, without using the ParseSpec.
|
||||
| Field | Type | Description | Required |
|
||||
|-------|------|-------------|----------|
|
||||
| column | String | The column of the timestamp. | yes |
|
||||
| format | String | iso, millis, posix, auto or any [Joda time](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) format. | no (default == 'auto' |
|
||||
| format | String | iso, posix, millis, micro, nano, auto or any [Joda time](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) format. | no (default == 'auto' |
|
||||
|
||||
### DimensionsSpec
|
||||
|
||||
|
@ -368,9 +368,13 @@ Retrieve the status of a task.
|
||||
|
||||
Retrieve information about the segments of a task.
|
||||
|
||||
<div class="note caution">
|
||||
This API is deprecated and will be removed in future releases.
|
||||
</div>
|
||||
|
||||
* `/druid/indexer/v1/task/{taskId}/reports`
|
||||
|
||||
Retrieve a [task completion report](../ingestion/reports.html) for a task. Only works for completed tasks.
|
||||
Retrieve a [task completion report](../ingestion/reports.html) for a task. Only works for completed tasks.
|
||||
|
||||
#### POST
|
||||
|
||||
|
@ -17,7 +17,7 @@ When a rule is updated, the change may not be reflected until the next time the
|
||||
Load Rules
|
||||
----------
|
||||
|
||||
Load rules indicate how many replicas of a segment should exist in a server tier.
|
||||
Load rules indicate how many replicas of a segment should exist in a server tier. **Please note**: If a Load rule is used to retain only data from a certain interval or period, it must be accompanied by a Drop rule. If a Drop rule is not included, data not within the specified interval or period will be retained by the default rule (loadForever).
|
||||
|
||||
### Forever Load Rule
|
||||
|
||||
|
@ -66,6 +66,14 @@
|
||||
<artifactId>commons-validator</artifactId>
|
||||
<version>1.4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ircclouds.irc</groupId>
|
||||
<artifactId>irc-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.maxmind.geoip2</groupId>
|
||||
<artifactId>geoip2</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- For tests! -->
|
||||
<dependency>
|
||||
|
@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.inject.Binder;
|
||||
import org.apache.druid.examples.twitter.TwitterSpritzerFirehoseFactory;
|
||||
import org.apache.druid.examples.wikipedia.IrcFirehoseFactory;
|
||||
import org.apache.druid.examples.wikipedia.IrcInputRowParser;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -39,7 +41,9 @@ public class ExamplesDruidModule implements DruidModule
|
||||
return Collections.<Module>singletonList(
|
||||
new SimpleModule("ExamplesModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer")
|
||||
new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(IrcInputRowParser.class, "irc")
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.realtime.firehose;
|
||||
package org.apache.druid.examples.wikipedia;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.realtime.firehose;
|
||||
package org.apache.druid.examples.wikipedia;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
@ -234,13 +234,8 @@ public class IrcFirehoseFactory implements FirehoseFactory<InputRowParser<Pair<D
|
||||
@Override
|
||||
public Runnable commit()
|
||||
{
|
||||
return new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
// nothing to see here
|
||||
}
|
||||
return () -> {
|
||||
// nothing to see here
|
||||
};
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.realtime.firehose;
|
||||
package org.apache.druid.examples.wikipedia;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.segment.realtime.firehose;
|
||||
package org.apache.druid.examples.wikipedia;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
@ -29,6 +29,7 @@
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"timestamp",
|
||||
"col1",
|
||||
"col2"
|
||||
],
|
||||
|
@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
|
||||
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
|
||||
import org.apache.hadoop.hive.serde2.SerDeException;
|
||||
import org.apache.hadoop.hive.serde2.io.DateWritable;
|
||||
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
|
||||
@ -81,7 +82,10 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
|
||||
{
|
||||
this.parseSpec = parseSpec;
|
||||
this.typeString = typeString == null ? typeStringFromParseSpec(parseSpec) : typeString;
|
||||
this.mapFieldNameFormat = mapFieldNameFormat == null || mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 || mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
|
||||
this.mapFieldNameFormat =
|
||||
mapFieldNameFormat == null ||
|
||||
mapFieldNameFormat.indexOf(MAP_PARENT_TAG) < 0 ||
|
||||
mapFieldNameFormat.indexOf(MAP_CHILD_TAG) < 0 ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
|
||||
this.mapParentFieldNameFormat = this.mapFieldNameFormat.replace(MAP_PARENT_TAG, "%s");
|
||||
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
|
||||
this.oip = makeObjectInspector(this.typeString);
|
||||
@ -226,9 +230,16 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
|
||||
{
|
||||
StringBuilder builder = new StringBuilder("struct<");
|
||||
builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string");
|
||||
// the typeString seems positionally dependent, so repeated timestamp column causes incorrect mapping
|
||||
if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) {
|
||||
builder.append(",");
|
||||
builder.append(String.join(":string,", parseSpec.getDimensionsSpec().getDimensionNames()));
|
||||
builder.append(String.join(
|
||||
":string,",
|
||||
parseSpec.getDimensionsSpec()
|
||||
.getDimensionNames()
|
||||
.stream()
|
||||
.filter(s -> !s.equals(parseSpec.getTimestampSpec().getTimestampColumn()))
|
||||
.collect(Collectors.toList())));
|
||||
builder.append(":string");
|
||||
}
|
||||
builder.append(">");
|
||||
@ -241,6 +252,8 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
|
||||
if (object instanceof HiveDecimalWritable) {
|
||||
// inspector on HiveDecimal rounds off to integer for some reason.
|
||||
return ((HiveDecimalWritable) object).getHiveDecimal().doubleValue();
|
||||
} else if (object instanceof DateWritable) {
|
||||
return object.toString();
|
||||
} else {
|
||||
return inspector.getPrimitiveJavaObject(object);
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
|
||||
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
|
||||
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
|
||||
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
|
||||
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
|
||||
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
|
||||
@ -43,6 +44,7 @@ import org.apache.orc.CompressionKind;
|
||||
import org.apache.orc.OrcFile;
|
||||
import org.apache.orc.TypeDescription;
|
||||
import org.apache.orc.Writer;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -52,6 +54,7 @@ import org.junit.rules.TemporaryFolder;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DruidOrcInputFormatTest
|
||||
{
|
||||
@ -110,6 +113,36 @@ public class DruidOrcInputFormatTest
|
||||
reader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadDateColumn() throws IOException, InterruptedException
|
||||
{
|
||||
File testFile2 = makeOrcFileWithDate();
|
||||
Path path = new Path(testFile2.getAbsoluteFile().toURI());
|
||||
FileSplit split = new FileSplit(path, 0, testFile2.length(), null);
|
||||
|
||||
InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
|
||||
|
||||
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
|
||||
RecordReader reader = inputFormat.createRecordReader(split, context);
|
||||
InputRowParser<OrcStruct> parser = (InputRowParser<OrcStruct>) config.getParser();
|
||||
|
||||
reader.initialize(split, context);
|
||||
|
||||
reader.nextKeyValue();
|
||||
|
||||
OrcStruct data = (OrcStruct) reader.getCurrentValue();
|
||||
|
||||
MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
|
||||
|
||||
Assert.assertTrue(row.getEvent().keySet().size() == 4);
|
||||
Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
|
||||
Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
|
||||
Assert.assertEquals(col1, row.getEvent().get("col1"));
|
||||
Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
|
||||
|
||||
reader.close();
|
||||
}
|
||||
|
||||
private File makeOrcFile() throws IOException
|
||||
{
|
||||
final File dir = temporaryFolder.newFolder();
|
||||
@ -157,4 +190,52 @@ public class DruidOrcInputFormatTest
|
||||
|
||||
return testOrc;
|
||||
}
|
||||
|
||||
private File makeOrcFileWithDate() throws IOException
|
||||
{
|
||||
final File dir = temporaryFolder.newFolder();
|
||||
final File testOrc = new File(dir, "test-2.orc");
|
||||
TypeDescription schema = TypeDescription.createStruct()
|
||||
.addField("timestamp", TypeDescription.createDate())
|
||||
.addField("col1", TypeDescription.createString())
|
||||
.addField("col2", TypeDescription.createList(TypeDescription.createString()))
|
||||
.addField("val1", TypeDescription.createFloat());
|
||||
Configuration conf = new Configuration();
|
||||
Writer writer = OrcFile.createWriter(
|
||||
new Path(testOrc.getPath()),
|
||||
OrcFile.writerOptions(conf)
|
||||
.setSchema(schema)
|
||||
.stripeSize(100000)
|
||||
.bufferSize(10000)
|
||||
.compress(CompressionKind.ZLIB)
|
||||
.version(OrcFile.Version.CURRENT)
|
||||
);
|
||||
VectorizedRowBatch batch = schema.createRowBatch();
|
||||
batch.size = 1;
|
||||
DateTime ts = DateTimes.of(timestamp);
|
||||
|
||||
// date is stored as long column vector with number of days since epoch
|
||||
((LongColumnVector) batch.cols[0]).vector[0] =
|
||||
TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis());
|
||||
|
||||
((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
|
||||
|
||||
ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
|
||||
listColumnVector.childCount = col2.length;
|
||||
listColumnVector.lengths[0] = 3;
|
||||
for (int idx = 0; idx < col2.length; idx++) {
|
||||
((BytesColumnVector) listColumnVector.child).setRef(
|
||||
idx,
|
||||
StringUtils.toUtf8(col2[idx]),
|
||||
0,
|
||||
col2[idx].length()
|
||||
);
|
||||
}
|
||||
|
||||
((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
|
||||
writer.addRowBatch(batch);
|
||||
writer.close();
|
||||
|
||||
return testOrc;
|
||||
}
|
||||
}
|
||||
|
65
extensions-core/druid-bloom-filter/pom.xml
Normal file
65
extensions-core/druid-bloom-filter/pom.xml
Normal file
@ -0,0 +1,65 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>org.apache.druid.extensions</groupId>
|
||||
<artifactId>druid-bloom-filter</artifactId>
|
||||
<name>druid-bloom-filter</name>
|
||||
<description>druid-bloom-filter</description>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.13.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-storage-api</artifactId>
|
||||
<version>2.7.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
<groupId>org.apache.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -20,32 +20,24 @@
|
||||
package org.apache.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.inject.Binder;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.segment.realtime.firehose.IrcInputRowParser;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ParsersModule implements DruidModule
|
||||
public class BloomFilterExtensionModule implements DruidModule
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.<Module>singletonList(
|
||||
new SimpleModule("ParsersModule")
|
||||
.registerSubtypes(
|
||||
new NamedType(IrcInputRowParser.class, "irc")
|
||||
)
|
||||
);
|
||||
return Collections.singletonList(new BloomFilterSerializersModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.druid.guice;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
|
||||
import org.apache.druid.query.filter.BloomDimFilter;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
public class BloomFilterSerializersModule extends SimpleModule
|
||||
{
|
||||
public static String BLOOM_FILTER_TYPE_NAME = "bloom";
|
||||
|
||||
public BloomFilterSerializersModule()
|
||||
{
|
||||
registerSubtypes(
|
||||
new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME)
|
||||
);
|
||||
addSerializer(BloomKFilter.class, new BloomKFilterSerializer());
|
||||
addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer());
|
||||
}
|
||||
|
||||
public static class BloomKFilterSerializer extends StdSerializer<BloomKFilter>
|
||||
{
|
||||
|
||||
public BloomKFilterSerializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(
|
||||
BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider
|
||||
) throws IOException
|
||||
{
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
|
||||
byte[] bytes = byteArrayOutputStream.toByteArray();
|
||||
jsonGenerator.writeBinary(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
public static class BloomKFilterDeserializer extends StdDeserializer<BloomKFilter>
|
||||
{
|
||||
|
||||
protected BloomKFilterDeserializer()
|
||||
{
|
||||
super(BloomKFilter.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BloomKFilter deserialize(
|
||||
JsonParser jsonParser, DeserializationContext deserializationContext
|
||||
) throws IOException, JsonProcessingException
|
||||
{
|
||||
byte[] bytes = jsonParser.getBinaryValue();
|
||||
return BloomKFilter.deserialize(new ByteArrayInputStream(bytes));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,236 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.filter;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.RangeSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.extraction.ExtractionFn;
|
||||
import org.apache.druid.segment.filter.DimensionPredicateFilter;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BloomDimFilter implements DimFilter
|
||||
{
|
||||
|
||||
private final String dimension;
|
||||
private final BloomKFilter bloomKFilter;
|
||||
private final ExtractionFn extractionFn;
|
||||
|
||||
@JsonCreator
|
||||
public BloomDimFilter(
|
||||
@JsonProperty("dimension") String dimension,
|
||||
@JsonProperty("bloomKFilter") BloomKFilter bloomKFilter,
|
||||
@JsonProperty("extractionFn") ExtractionFn extractionFn
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(dimension != null, "dimension must not be null");
|
||||
Preconditions.checkNotNull(bloomKFilter);
|
||||
this.dimension = dimension;
|
||||
this.bloomKFilter = bloomKFilter;
|
||||
this.extractionFn = extractionFn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
try {
|
||||
BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(StringUtils.format("Exception when generating cache key for [%s]", this), e);
|
||||
}
|
||||
byte[] bloomFilterBytes = byteArrayOutputStream.toByteArray();
|
||||
return new CacheKeyBuilder(DimFilterUtils.BLOOM_DIM_FILTER_CACHE_ID)
|
||||
.appendString(dimension)
|
||||
.appendByte(DimFilterUtils.STRING_SEPARATOR)
|
||||
.appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey())
|
||||
.appendByte(DimFilterUtils.STRING_SEPARATOR)
|
||||
.appendByteArray(bloomFilterBytes)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DimFilter optimize()
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Filter toFilter()
|
||||
{
|
||||
return new DimensionPredicateFilter(
|
||||
dimension,
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
@Override
|
||||
public Predicate<String> makeStringPredicate()
|
||||
{
|
||||
return str -> {
|
||||
if (str == null) {
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
return bloomKFilter.testString(str);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate makeLongPredicate()
|
||||
{
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
{
|
||||
return bloomKFilter.testLong(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidFloatPredicate makeFloatPredicate()
|
||||
{
|
||||
return new DruidFloatPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyFloat(float input)
|
||||
{
|
||||
return bloomKFilter.testFloat(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidDoublePredicate makeDoublePredicate()
|
||||
{
|
||||
return new DruidDoublePredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean applyDouble(double input)
|
||||
{
|
||||
return bloomKFilter.testDouble(input);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean applyNull()
|
||||
{
|
||||
return bloomKFilter.testBytes(null, 0, 0);
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
extractionFn
|
||||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDimension()
|
||||
{
|
||||
return dimension;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public BloomKFilter getBloomKFilter()
|
||||
{
|
||||
return bloomKFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public ExtractionFn getExtractionFn()
|
||||
{
|
||||
return extractionFn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
return StringUtils.format("%s(%s) = %s", extractionFn, dimension, bloomKFilter);
|
||||
} else {
|
||||
return StringUtils.format("%s = %s", dimension, bloomKFilter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BloomDimFilter that = (BloomDimFilter) o;
|
||||
|
||||
if (!dimension.equals(that.dimension)) {
|
||||
return false;
|
||||
}
|
||||
if (bloomKFilter != null ? !bloomKFilter.equals(that.bloomKFilter) : that.bloomKFilter != null) {
|
||||
return false;
|
||||
}
|
||||
return extractionFn != null ? extractionFn.equals(that.extractionFn) : that.extractionFn == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RangeSet<String> getDimensionRangeSet(String dimension)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HashSet<String> getRequiredColumns()
|
||||
{
|
||||
return Sets.newHashSet(dimension);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = dimension.hashCode();
|
||||
result = 31 * result + (bloomKFilter != null ? bloomKFilter.hashCode() : 0);
|
||||
result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
org.apache.druid.guice.BloomFilterExtensionModule
|
@ -0,0 +1,388 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.filter;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.MapInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.guice.BloomFilterSerializersModule;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.query.extraction.MapLookupExtractor;
|
||||
import org.apache.druid.query.extraction.TimeDimExtractionFn;
|
||||
import org.apache.druid.query.lookup.LookupExtractionFn;
|
||||
import org.apache.druid.query.lookup.LookupExtractor;
|
||||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.filter.BaseFilterTest;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.hive.common.util.BloomKFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class BloomDimFilterTest extends BaseFilterTest
|
||||
{
|
||||
private static final String TIMESTAMP_COLUMN = "timestamp";
|
||||
|
||||
private static final InputRowParser<Map<String, Object>> PARSER = new MapInputRowParser(
|
||||
new TimeAndDimsParseSpec(
|
||||
new TimestampSpec(TIMESTAMP_COLUMN, "iso", DateTimes.of("2000")),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3", "dim6")),
|
||||
null,
|
||||
null
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
private static final List<InputRow> ROWS = ImmutableList.of(
|
||||
PARSER.parseBatch(ImmutableMap.of(
|
||||
"dim0",
|
||||
"0",
|
||||
"dim1",
|
||||
"",
|
||||
"dim2",
|
||||
ImmutableList.of("a", "b"),
|
||||
"dim6",
|
||||
"2017-07-25"
|
||||
)).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25"))
|
||||
.get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25"))
|
||||
.get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0),
|
||||
PARSER.parseBatch(ImmutableMap.of("dim0", "5", "dim1", "abc")).get(0)
|
||||
);
|
||||
|
||||
public BloomDimFilterTest(
|
||||
String testName,
|
||||
IndexBuilder indexBuilder,
|
||||
Function<IndexBuilder, Pair<StorageAdapter, Closeable>> finisher,
|
||||
boolean cnf,
|
||||
boolean optimize
|
||||
)
|
||||
{
|
||||
super(
|
||||
testName,
|
||||
ROWS,
|
||||
indexBuilder.schema(
|
||||
new IncrementalIndexSchema.Builder()
|
||||
.withDimensionsSpec(PARSER.getParseSpec().getDimensionsSpec()).build()
|
||||
),
|
||||
finisher,
|
||||
cnf,
|
||||
optimize
|
||||
);
|
||||
}
|
||||
|
||||
private static DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass()
|
||||
{
|
||||
mapper.registerModule(new BloomFilterSerializersModule());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception
|
||||
{
|
||||
BaseFilterTest.tearDown(BloomDimFilterTest.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
BloomKFilter bloomFilter = new BloomKFilter(1500);
|
||||
bloomFilter.addString("myTestString");
|
||||
BloomDimFilter bloomDimFilter = new BloomDimFilter(
|
||||
"abc",
|
||||
bloomFilter,
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
);
|
||||
DimFilter filter = mapper.readValue(mapper.writeValueAsBytes(bloomDimFilter), DimFilter.class);
|
||||
Assert.assertTrue(filter instanceof BloomDimFilter);
|
||||
BloomDimFilter serde = (BloomDimFilter) filter;
|
||||
Assert.assertEquals(bloomDimFilter.getDimension(), serde.getDimension());
|
||||
Assert.assertEquals(bloomDimFilter.getExtractionFn(), serde.getExtractionFn());
|
||||
Assert.assertTrue(bloomDimFilter.getBloomKFilter().testString("myTestString"));
|
||||
Assert.assertFalse(bloomDimFilter.getBloomKFilter().testString("not_match"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithTimeExtractionFnNull()
|
||||
{
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim0",
|
||||
bloomKFilter(1000, null, ""),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, null, ""),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("3", "4", "5"));
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, "2017-07"),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("0", "1"));
|
||||
assertFilterMatches(new BloomDimFilter(
|
||||
"dim6",
|
||||
bloomKFilter(1000, "2017-05"),
|
||||
new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true)
|
||||
), ImmutableList.of("2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleValueStringColumnWithoutNulls()
|
||||
{
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "0"), null), ImmutableList.of("0"));
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "1"), null), ImmutableList.of("1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleValueStringColumnWithNulls()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, (String) null), null), ImmutableList.of("0"));
|
||||
} else {
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, (String) null), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, ""), null), ImmutableList.of("0"));
|
||||
}
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "10"), null), ImmutableList.of("1"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "2"), null), ImmutableList.of("2"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "1"), null), ImmutableList.of("3"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "def"), null), ImmutableList.of("4"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "abc"), null), ImmutableList.of("5"));
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "ab"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiValueStringColumn()
|
||||
{
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("1", "2", "5")
|
||||
);
|
||||
} else {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("1", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, ""), null), ImmutableList.of("2"));
|
||||
}
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "a"), null), ImmutableList.of("0", "3"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "b"), null), ImmutableList.of("0"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "c"), null), ImmutableList.of("4"));
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "d"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingColumnSpecifiedInDimensionList()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim3", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "a"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "b"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "c"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingColumnNotSpecifiedInDimensionList()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim4", bloomKFilter(1000, (String) null), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, ""), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "a"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "b"), null), ImmutableList.of());
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "c"), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpressionVirtualColumn()
|
||||
{
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("expr", bloomKFilter(1000, 1.1F), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("expr", bloomKFilter(1000, 1.2F), null), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("exprDouble", bloomKFilter(1000, 2.1D), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("exprDouble", bloomKFilter(1000, 2.2D), null), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("exprLong", bloomKFilter(1000, 3L), null),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(new BloomDimFilter("exprLong", bloomKFilter(1000, 4L), null), ImmutableList.of());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectorWithLookupExtractionFn()
|
||||
{
|
||||
final Map<String, String> stringMap = ImmutableMap.of(
|
||||
"1", "HELLO",
|
||||
"a", "HELLO",
|
||||
"def", "HELLO",
|
||||
"abc", "UNKNOWN"
|
||||
);
|
||||
LookupExtractor mapExtractor = new MapLookupExtractor(stringMap, false);
|
||||
LookupExtractionFn lookupFn = new LookupExtractionFn(mapExtractor, false, "UNKNOWN", false, true);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("1"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("3", "4"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim1", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of("0", "3"));
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim2", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim3", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim3", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
assertFilterMatches(new BloomDimFilter("dim4", bloomKFilter(1000, "HELLO"), lookupFn), ImmutableList.of());
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim4", bloomKFilter(1000, "UNKNOWN"), lookupFn),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
|
||||
final Map<String, String> stringMap2 = ImmutableMap.of(
|
||||
"2", "5"
|
||||
);
|
||||
LookupExtractor mapExtractor2 = new MapLookupExtractor(stringMap2, false);
|
||||
LookupExtractionFn lookupFn2 = new LookupExtractionFn(mapExtractor2, true, null, false, true);
|
||||
assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, "5"), lookupFn2), ImmutableList.of("2", "5"));
|
||||
|
||||
final Map<String, String> stringMap3 = ImmutableMap.of(
|
||||
"1", ""
|
||||
);
|
||||
LookupExtractor mapExtractor3 = new MapLookupExtractor(stringMap3, false);
|
||||
LookupExtractionFn lookupFn3 = new LookupExtractionFn(mapExtractor3, false, null, false, true);
|
||||
if (NullHandling.replaceWithDefault()) {
|
||||
// Nulls and empty strings are considered equivalent
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), lookupFn3),
|
||||
ImmutableList.of("0", "1", "2", "3", "4", "5")
|
||||
);
|
||||
} else {
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), lookupFn3),
|
||||
ImmutableList.of("0", "2", "3", "4", "5")
|
||||
);
|
||||
assertFilterMatches(
|
||||
new BloomDimFilter("dim0", bloomKFilter(1000, ""), lookupFn3),
|
||||
ImmutableList.of("1")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, String... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (String value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addString(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Float... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Float value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addFloat(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Double... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Double value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addDouble(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
private static BloomKFilter bloomKFilter(int expectedEntries, Long... values)
|
||||
{
|
||||
BloomKFilter filter = new BloomKFilter(expectedEntries);
|
||||
for (Long value : values) {
|
||||
if (value == null) {
|
||||
filter.addBytes(null, 0, 0);
|
||||
} else {
|
||||
filter.addLong(value);
|
||||
}
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
}
|
@ -367,8 +367,12 @@ public class KafkaSupervisor implements Supervisor
|
||||
exec.submit(
|
||||
() -> {
|
||||
try {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
final Notice notice = notices.take();
|
||||
long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
|
||||
while (!Thread.currentThread().isInterrupted() && !stopped) {
|
||||
final Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS);
|
||||
if (notice == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
notice.handle();
|
||||
@ -783,7 +787,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
// defend against consecutive reset requests from replicas
|
||||
// as well as the case where the metadata store do not have an entry for the reset partitions
|
||||
boolean doReset = false;
|
||||
for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
|
||||
for (Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
|
||||
.getPartitionOffsetMap()
|
||||
.entrySet()) {
|
||||
final Long partitionOffsetInMetadataStore = currentMetadata == null
|
||||
@ -866,7 +870,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
// checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they
|
||||
// have, as replicas that are supposed to publish the same segment may not have read the same set of offsets.
|
||||
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
|
||||
killTask(entry.getKey());
|
||||
} else {
|
||||
@ -914,7 +918,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
{
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
|
||||
for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
|
||||
sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue()));
|
||||
}
|
||||
String partitionOffsetStr = sb.toString().substring(1);
|
||||
@ -1073,7 +1077,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
// existing) so that the next tasks will start reading from where this task left off
|
||||
Map<Integer, Long> publishingTaskEndOffsets = taskClient.getEndOffsets(taskId);
|
||||
|
||||
for (Map.Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
|
||||
for (Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
|
||||
Integer partition = entry.getKey();
|
||||
Long offset = entry.getValue();
|
||||
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
|
||||
@ -1380,7 +1384,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
|
||||
// update status (and startTime if unknown) of current tasks in taskGroups
|
||||
for (TaskGroup group : taskGroups.values()) {
|
||||
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||
final String taskId = entry.getKey();
|
||||
final TaskData taskData = entry.getValue();
|
||||
|
||||
@ -1423,7 +1427,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
// update status of pending completion tasks in pendingCompletionTaskGroups
|
||||
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
||||
for (TaskGroup group : taskGroups) {
|
||||
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||
entry.getValue().status = taskStorage.getStatus(entry.getKey()).get();
|
||||
}
|
||||
}
|
||||
@ -1446,7 +1450,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
|
||||
final List<Integer> futureGroupIds = Lists.newArrayList();
|
||||
|
||||
for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||
Integer groupId = entry.getKey();
|
||||
TaskGroup group = entry.getValue();
|
||||
|
||||
@ -1479,7 +1483,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
|
||||
|
||||
// set endOffsets as the next startOffsets
|
||||
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
|
||||
for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
|
||||
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
} else {
|
||||
@ -1505,9 +1509,9 @@ public class KafkaSupervisor implements Supervisor
|
||||
{
|
||||
if (finalize) {
|
||||
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
|
||||
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
|
||||
Iterator<Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Map.Entry<String, TaskData> taskEntry = i.next();
|
||||
Entry<String, TaskData> taskEntry = i.next();
|
||||
String taskId = taskEntry.getKey();
|
||||
TaskData task = taskEntry.getValue();
|
||||
|
||||
@ -1569,7 +1573,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
taskGroup.tasks.remove(taskId);
|
||||
|
||||
} else { // otherwise build a map of the highest offsets seen
|
||||
for (Map.Entry<Integer, Long> offset : result.entrySet()) {
|
||||
for (Entry<Integer, Long> offset : result.entrySet()) {
|
||||
if (!endOffsets.containsKey(offset.getKey())
|
||||
|| endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
|
||||
endOffsets.put(offset.getKey(), offset.getValue());
|
||||
@ -1647,7 +1651,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
{
|
||||
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
||||
|
||||
for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
|
||||
for (Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
|
||||
|
||||
boolean stopTasksInTaskGroup = false;
|
||||
Integer groupId = pendingGroupList.getKey();
|
||||
@ -1728,9 +1732,9 @@ public class KafkaSupervisor implements Supervisor
|
||||
private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
|
||||
{
|
||||
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
||||
Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
|
||||
Iterator<Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
|
||||
while (iTaskGroups.hasNext()) {
|
||||
Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
|
||||
Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
|
||||
Integer groupId = taskGroupEntry.getKey();
|
||||
TaskGroup taskGroup = taskGroupEntry.getValue();
|
||||
|
||||
@ -1742,9 +1746,9 @@ public class KafkaSupervisor implements Supervisor
|
||||
|
||||
log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds());
|
||||
|
||||
Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
|
||||
Iterator<Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
|
||||
while (iTasks.hasNext()) {
|
||||
Map.Entry<String, TaskData> task = iTasks.next();
|
||||
Entry<String, TaskData> task = iTasks.next();
|
||||
String taskId = task.getKey();
|
||||
TaskData taskData = task.getValue();
|
||||
|
||||
@ -1817,7 +1821,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
|
||||
// iterate through all the current task groups and make sure each one has the desired number of replica tasks
|
||||
boolean createdTask = false;
|
||||
for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||
TaskGroup taskGroup = entry.getValue();
|
||||
Integer groupId = entry.getKey();
|
||||
|
||||
@ -1910,7 +1914,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId)
|
||||
{
|
||||
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
|
||||
for (Map.Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
|
||||
for (Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
|
||||
Integer partition = entry.getKey();
|
||||
Long offset = entry.getValue();
|
||||
|
||||
@ -2035,7 +2039,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
}
|
||||
|
||||
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
final String taskId = entry.getKey();
|
||||
final TaskData taskData = entry.getValue();
|
||||
if (taskData.status == null) {
|
||||
@ -2121,7 +2125,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
|
||||
try {
|
||||
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
@Nullable
|
||||
DateTime startTime = entry.getValue().startTime;
|
||||
@ -2149,7 +2153,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
|
||||
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
||||
for (TaskGroup taskGroup : taskGroups) {
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
@Nullable
|
||||
DateTime startTime = entry.getValue().startTime;
|
||||
@ -2218,7 +2222,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
.stream()
|
||||
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
|
||||
.flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
|
||||
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max));
|
||||
}
|
||||
|
||||
private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
|
||||
@ -2228,7 +2232,7 @@ public class KafkaSupervisor implements Supervisor
|
||||
.stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
Entry::getKey,
|
||||
e -> latestOffsetsFromKafka != null
|
||||
&& latestOffsetsFromKafka.get(e.getKey()) != null
|
||||
&& e.getValue() != null
|
||||
|
@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.TestUtils;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
|
||||
@ -2194,7 +2195,8 @@ public class KafkaIndexTaskTest
|
||||
);
|
||||
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
taskActionToolbox
|
||||
taskActionToolbox,
|
||||
new TaskAuditLogConfig(false)
|
||||
);
|
||||
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
|
||||
{
|
||||
|
@ -31,14 +31,21 @@ public class LocalTaskActionClient implements TaskActionClient
|
||||
private final Task task;
|
||||
private final TaskStorage storage;
|
||||
private final TaskActionToolbox toolbox;
|
||||
private final TaskAuditLogConfig auditLogConfig;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
|
||||
|
||||
public LocalTaskActionClient(Task task, TaskStorage storage, TaskActionToolbox toolbox)
|
||||
public LocalTaskActionClient(
|
||||
Task task,
|
||||
TaskStorage storage,
|
||||
TaskActionToolbox toolbox,
|
||||
TaskAuditLogConfig auditLogConfig
|
||||
)
|
||||
{
|
||||
this.task = task;
|
||||
this.storage = storage;
|
||||
this.toolbox = toolbox;
|
||||
this.auditLogConfig = auditLogConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -46,7 +53,7 @@ public class LocalTaskActionClient implements TaskActionClient
|
||||
{
|
||||
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
|
||||
|
||||
if (taskAction.isAudited()) {
|
||||
if (auditLogConfig.isEnabled() && taskAction.isAudited()) {
|
||||
// Add audit log
|
||||
try {
|
||||
final long auditLogStartTime = System.currentTimeMillis();
|
||||
|
@ -29,17 +29,19 @@ public class LocalTaskActionClientFactory implements TaskActionClientFactory
|
||||
{
|
||||
private final TaskStorage storage;
|
||||
private final TaskActionToolbox toolbox;
|
||||
private final TaskAuditLogConfig auditLogConfig;
|
||||
|
||||
@Inject
|
||||
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox)
|
||||
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox, TaskAuditLogConfig auditLogConfig)
|
||||
{
|
||||
this.storage = storage;
|
||||
this.toolbox = toolbox;
|
||||
this.auditLogConfig = auditLogConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskActionClient create(Task task)
|
||||
{
|
||||
return new LocalTaskActionClient(task, storage, toolbox);
|
||||
return new LocalTaskActionClient(task, storage, toolbox, auditLogConfig);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.druid.indexing.common.actions;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
/**
|
||||
* The configuration for task audit logging.
|
||||
* This class will be removed in future releases. See https://github.com/apache/incubator-druid/issues/5859.
|
||||
*/
|
||||
@Deprecated
|
||||
public class TaskAuditLogConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private final boolean enabled;
|
||||
|
||||
@JsonCreator
|
||||
public TaskAuditLogConfig(@JsonProperty("enabled") boolean enabled)
|
||||
{
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
@JsonProperty("enabled")
|
||||
public boolean isEnabled()
|
||||
{
|
||||
return enabled;
|
||||
}
|
||||
}
|
@ -373,6 +373,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
|
||||
{
|
||||
@ -386,6 +387,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public List<TaskAction> getAuditLogs(String taskid)
|
||||
{
|
||||
|
@ -291,6 +291,7 @@ public class MetadataTaskStorage implements TaskStorage
|
||||
);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
|
||||
{
|
||||
@ -301,6 +302,7 @@ public class MetadataTaskStorage implements TaskStorage
|
||||
handler.addLog(task.getId(), taskAction);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public List<TaskAction> getAuditLogs(final String taskId)
|
||||
{
|
||||
|
@ -25,6 +25,7 @@ import com.google.inject.Inject;
|
||||
import org.apache.druid.client.indexing.IndexingService;
|
||||
import org.apache.druid.curator.discovery.ServiceAnnouncer;
|
||||
import org.apache.druid.discovery.DruidLeaderSelector;
|
||||
import org.apache.druid.discovery.DruidLeaderSelector.Listener;
|
||||
import org.apache.druid.guice.annotations.Self;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
@ -49,6 +50,8 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
*/
|
||||
public class TaskMaster
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
|
||||
|
||||
private final DruidLeaderSelector overlordLeaderSelector;
|
||||
private final DruidLeaderSelector.Listener leadershipListener;
|
||||
|
||||
@ -61,7 +64,12 @@ public class TaskMaster
|
||||
private volatile TaskRunner taskRunner;
|
||||
private volatile TaskQueue taskQueue;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
|
||||
/**
|
||||
* This flag indicates that all services has been started and should be true before calling
|
||||
* {@link ServiceAnnouncer#announce}. This is set to false immediately once {@link Listener#stopBeingLeader()} is
|
||||
* called.
|
||||
*/
|
||||
private volatile boolean initialized;
|
||||
|
||||
@Inject
|
||||
public TaskMaster(
|
||||
@ -127,6 +135,7 @@ public class TaskMaster
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
initialized = true;
|
||||
serviceAnnouncer.announce(node);
|
||||
}
|
||||
|
||||
@ -153,6 +162,7 @@ public class TaskMaster
|
||||
{
|
||||
giant.lock();
|
||||
try {
|
||||
initialized = false;
|
||||
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
|
||||
if (leaderLifecycle != null) {
|
||||
leaderLifecycle.stop();
|
||||
@ -198,9 +208,12 @@ public class TaskMaster
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if it's the leader and its all services have been properly initialized.
|
||||
*/
|
||||
public boolean isLeader()
|
||||
{
|
||||
return overlordLeaderSelector.isLeader();
|
||||
return overlordLeaderSelector.isLeader() && initialized;
|
||||
}
|
||||
|
||||
public String getCurrentLeader()
|
||||
@ -210,7 +223,7 @@ public class TaskMaster
|
||||
|
||||
public Optional<TaskRunner> getTaskRunner()
|
||||
{
|
||||
if (overlordLeaderSelector.isLeader()) {
|
||||
if (isLeader()) {
|
||||
return Optional.of(taskRunner);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
@ -219,7 +232,7 @@ public class TaskMaster
|
||||
|
||||
public Optional<TaskQueue> getTaskQueue()
|
||||
{
|
||||
if (overlordLeaderSelector.isLeader()) {
|
||||
if (isLeader()) {
|
||||
return Optional.of(taskQueue);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
@ -228,7 +241,7 @@ public class TaskMaster
|
||||
|
||||
public Optional<TaskActionClient> getTaskActionClient(Task task)
|
||||
{
|
||||
if (overlordLeaderSelector.isLeader()) {
|
||||
if (isLeader()) {
|
||||
return Optional.of(taskActionClientFactory.create(task));
|
||||
} else {
|
||||
return Optional.absent();
|
||||
@ -237,7 +250,7 @@ public class TaskMaster
|
||||
|
||||
public Optional<ScalingStats> getScalingStats()
|
||||
{
|
||||
if (overlordLeaderSelector.isLeader()) {
|
||||
if (isLeader()) {
|
||||
return taskRunner.getScalingStats();
|
||||
} else {
|
||||
return Optional.absent();
|
||||
@ -246,7 +259,7 @@ public class TaskMaster
|
||||
|
||||
public Optional<SupervisorManager> getSupervisorManager()
|
||||
{
|
||||
if (overlordLeaderSelector.isLeader()) {
|
||||
if (isLeader()) {
|
||||
return Optional.of(supervisorManager);
|
||||
} else {
|
||||
return Optional.absent();
|
||||
|
@ -106,6 +106,7 @@ public interface TaskStorage
|
||||
*
|
||||
* @param <T> task action return type
|
||||
*/
|
||||
@Deprecated
|
||||
<T> void addAuditLog(Task task, TaskAction<T> taskAction);
|
||||
|
||||
/**
|
||||
@ -114,6 +115,7 @@ public interface TaskStorage
|
||||
* @param taskid task ID
|
||||
* @return list of task actions
|
||||
*/
|
||||
@Deprecated
|
||||
List<TaskAction> getAuditLogs(String taskid);
|
||||
|
||||
/**
|
||||
|
@ -93,6 +93,7 @@ public class TaskStorageQueryAdapter
|
||||
* @param taskid task ID
|
||||
* @return set of segments created by the specified task
|
||||
*/
|
||||
@Deprecated
|
||||
public Set<DataSegment> getInsertedSegments(final String taskid)
|
||||
{
|
||||
final Set<DataSegment> segments = Sets.newHashSet();
|
||||
|
@ -307,6 +307,7 @@ public class OverlordResource
|
||||
return Response.status(status).entity(response).build();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@GET
|
||||
@Path("/task/{taskid}/segments")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
|
@ -62,6 +62,7 @@ import org.apache.druid.indexing.common.TestUtils;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
|
||||
@ -1526,7 +1527,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
|
||||
);
|
||||
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
taskActionToolbox
|
||||
taskActionToolbox,
|
||||
new TaskAuditLogConfig(false)
|
||||
);
|
||||
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
|
||||
null,
|
||||
|
@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.TestUtils;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
|
||||
import org.apache.druid.indexing.overlord.TaskLockbox;
|
||||
@ -54,7 +55,7 @@ public abstract class IngestionTestBase
|
||||
|
||||
public TaskActionClient createActionClient(Task task)
|
||||
{
|
||||
return new LocalTaskActionClient(task, taskStorage, createTaskActionToolbox());
|
||||
return new LocalTaskActionClient(task, taskStorage, createTaskActionToolbox(), new TaskAuditLogConfig(false));
|
||||
}
|
||||
|
||||
public void prepareTaskForLocking(Task task) throws EntryExistsException
|
||||
|
@ -55,6 +55,7 @@ import org.apache.druid.indexing.common.TestUtils;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
|
||||
@ -988,7 +989,8 @@ public class RealtimeIndexTaskTest
|
||||
);
|
||||
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
|
||||
taskStorage,
|
||||
taskActionToolbox
|
||||
taskActionToolbox,
|
||||
new TaskAuditLogConfig(false)
|
||||
);
|
||||
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
|
||||
null,
|
||||
|
@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.TaskToolboxFactory;
|
||||
import org.apache.druid.indexing.common.TestUtils;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.task.NoopTask;
|
||||
@ -222,7 +223,8 @@ public class IngestSegmentFirehoseFactoryTest
|
||||
newMockEmitter(),
|
||||
EasyMock.createMock(SupervisorManager.class),
|
||||
new Counters()
|
||||
)
|
||||
),
|
||||
new TaskAuditLogConfig(false)
|
||||
);
|
||||
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
|
||||
EasyMock.replay(notifierFactory);
|
||||
|
@ -57,6 +57,7 @@ import org.apache.druid.indexing.common.actions.LockListAction;
|
||||
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
|
||||
@ -538,7 +539,8 @@ public class TaskLifecycleTest
|
||||
emitter,
|
||||
EasyMock.createMock(SupervisorManager.class),
|
||||
new Counters()
|
||||
)
|
||||
),
|
||||
new TaskAuditLogConfig(true)
|
||||
);
|
||||
File tmpDir = temporaryFolder.newFolder();
|
||||
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
|
||||
|
@ -43,8 +43,11 @@ Set the docker environment:
|
||||
```
|
||||
eval "$(docker-machine env integration)"
|
||||
export DOCKER_IP=$(docker-machine ip integration)
|
||||
export DOCKER_MACHINE_IP=$(docker-machine inspect integration | jq -r .Driver[\"HostOnlyCIDR\"])
|
||||
```
|
||||
|
||||
The final command uses the `jq` tool to read the Driver->HostOnlyCIDR field from the `docker-machine inspect` output. If you don't wish to install `jq`, you will need to set DOCKER_MACHINE_IP manually.
|
||||
|
||||
## Running tests
|
||||
|
||||
To run all the tests using docker and mvn run the following command:
|
||||
|
@ -27,6 +27,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
EOT
|
||||
|
@ -27,6 +27,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
EOT
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
export DOCKER_HOST_IP=$(resolveip -s $HOSTNAME)
|
||||
|
||||
|
||||
# Generate a client cert with an incorrect hostname for testing
|
||||
cat <<EOT > invalid_hostname_csr.conf
|
||||
[req]
|
||||
|
@ -58,6 +58,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
EOT
|
||||
|
@ -28,6 +28,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
|
||||
|
@ -27,6 +27,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
EOT
|
||||
|
@ -58,6 +58,7 @@ basicConstraints=CA:FALSE,pathlen:0
|
||||
IP.1 = ${DOCKER_HOST_IP}
|
||||
IP.2 = 127.0.0.1
|
||||
IP.3 = 172.172.172.1
|
||||
IP.4 = ${DOCKER_MACHINE_IP:=127.0.0.1}
|
||||
DNS.1 = ${HOSTNAME}
|
||||
DNS.2 = localhost
|
||||
EOT
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.druid.java.util.common;
|
||||
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.joda.time.Chronology;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
@ -40,6 +41,7 @@ public final class DateTimes
|
||||
ISODateTimeFormat.dateTimeParser().withOffsetParsed()
|
||||
);
|
||||
|
||||
@SuppressForbidden(reason = "DateTimeZone#forID")
|
||||
public static DateTimeZone inferTzfromString(String tzId)
|
||||
{
|
||||
try {
|
||||
@ -65,6 +67,7 @@ public final class DateTimes
|
||||
this.innerFormatter = innerFormatter.withChronology(ISOChronology.getInstanceUTC());
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "DateTimeFormatter#parseDateTime")
|
||||
public DateTime parse(final String instant)
|
||||
{
|
||||
return innerFormatter.parseDateTime(instant);
|
||||
|
@ -71,8 +71,9 @@ public class TimestampParser
|
||||
return DateTimes.of(ParserUtils.stripQuotes(input));
|
||||
};
|
||||
} else if ("posix".equalsIgnoreCase(format)
|
||||
|| "millis".equalsIgnoreCase(format)
|
||||
|| "nano".equalsIgnoreCase(format)) {
|
||||
|| "millis".equalsIgnoreCase(format)
|
||||
|| "micro".equalsIgnoreCase(format)
|
||||
|| "nano".equalsIgnoreCase(format)) {
|
||||
final Function<Number, DateTime> numericFun = createNumericTimestampParser(format);
|
||||
return input -> {
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(input), "null timestamp");
|
||||
@ -104,6 +105,8 @@ public class TimestampParser
|
||||
{
|
||||
if ("posix".equalsIgnoreCase(format)) {
|
||||
return input -> DateTimes.utc(TimeUnit.SECONDS.toMillis(input.longValue()));
|
||||
} else if ("micro".equalsIgnoreCase(format)) {
|
||||
return input -> DateTimes.utc(TimeUnit.MICROSECONDS.toMillis(input.longValue()));
|
||||
} else if ("nano".equalsIgnoreCase(format)) {
|
||||
return input -> DateTimes.utc(TimeUnit.NANOSECONDS.toMillis(input.longValue()));
|
||||
} else if ("ruby".equalsIgnoreCase(format)) {
|
||||
|
222
pom.xml
222
pom.xml
@ -116,6 +116,7 @@
|
||||
<!-- Core extensions -->
|
||||
<module>extensions-core/avro-extensions</module>
|
||||
<module>extensions-core/datasketches</module>
|
||||
<module>extensions-core/druid-bloom-filter</module>
|
||||
<module>extensions-core/druid-kerberos</module>
|
||||
<module>extensions-core/hdfs-storage</module>
|
||||
<module>extensions-core/histogram</module>
|
||||
@ -931,15 +932,12 @@
|
||||
<signaturesFile>${session.executionRootDirectory}/codestyle/joda-time-forbidden-apis.txt</signaturesFile>
|
||||
<signaturesFile>${session.executionRootDirectory}/codestyle/druid-forbidden-apis.txt</signaturesFile>
|
||||
</signaturesFiles>
|
||||
<excludes>
|
||||
<exclude>org/apache/druid/java/util/common/DateTimes$UtcFormatter.class</exclude>
|
||||
<exclude>org/apache/druid/java/util/common/DateTimes.class</exclude>
|
||||
</excludes>
|
||||
<suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>validate</id>
|
||||
<phase>validate</phase>
|
||||
<id>compile</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
@ -952,8 +950,8 @@
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>testValidate</id>
|
||||
<phase>validate</phase>
|
||||
<id>testCompile</id>
|
||||
<phase>test-compile</phase>
|
||||
<goals>
|
||||
<goal>testCheck</goal>
|
||||
</goals>
|
||||
@ -1016,104 +1014,6 @@
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<version>0.12</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<outputDirectory>${project.basedir}/rat</outputDirectory>
|
||||
<licenses>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>MIT</licenseFamilyCategory>
|
||||
<licenseFamilyName>MIT JQyery</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2012 jQuery Foundation and other contributors; Licensed MIT</pattern>
|
||||
<pattern>jQuery Foundation, Inc. | jquery.org/license</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Underscore</licenseFamilyCategory>
|
||||
<licenseFamilyName>Underscore</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Underscore is freely distributable under the MIT license</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Allan Jardine</licenseFamilyCategory>
|
||||
<licenseFamilyName>Allan Jardine</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2009 Allan Jardine. All Rights Reserved</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Allan Jardine</licenseFamilyCategory>
|
||||
<licenseFamilyName>Allan Jardine</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2009 Allan Jardine. All Rights Reserved</pattern>
|
||||
<pattern>Copyright 2008-2011 Allan Jardine</pattern>
|
||||
<pattern>GPL v2 or BSD 3 point style</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
</licenses>
|
||||
|
||||
<licenseFamilies>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>MIT JQyery</familyName>
|
||||
</licenseFamily>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>Underscore</familyName>
|
||||
</licenseFamily>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>Allan Jardine</familyName>
|
||||
</licenseFamily>
|
||||
</licenseFamilies>
|
||||
<excludes>
|
||||
<!--DOCS-->
|
||||
<exclude>**/*.md</exclude>
|
||||
<exclude>publications/**</exclude>
|
||||
<exclude>docs/**</exclude>
|
||||
<!--CODE STYLE-->
|
||||
<exclude>codestyle/*</exclude>
|
||||
<exclude>eclipse.importorder</exclude>
|
||||
<!--CODE RESOURCES-->
|
||||
<exclude>**/javax.annotation.processing.Processor</exclude>
|
||||
<exclude>**/org.apache.druid.initialization.DruidModule</exclude>
|
||||
<exclude>**/org/apache/druid/math/expr/antlr/Expr.g4</exclude>
|
||||
<exclude>**/dependency-reduced-pom.xml</exclude>
|
||||
<!--BUILD and TESTS-->
|
||||
<exclude>**/target/**</exclude>
|
||||
<exclude>**/build/**</exclude>
|
||||
<exclude>**/test/resources/**</exclude>
|
||||
<exclude>**/src/test/avro/**</exclude>
|
||||
<exclude>**/src/test/thrift/**</exclude>
|
||||
<exclude>.travis.yml</exclude>
|
||||
<!--DEV and IT-TESTS-->
|
||||
<exclude>**/*.json</exclude>
|
||||
<exclude>**/jvm.config</exclude>
|
||||
<exclude>**/quickstart/protobuf/**</exclude>
|
||||
<exclude>**/tutorial/conf/**</exclude>
|
||||
<exclude>**/derby.log</exclude>
|
||||
<exclude>**/docker/**</exclude>
|
||||
<exclude>**/client_tls/**</exclude>
|
||||
<!--IDE MODULE FILES-->
|
||||
<exclude>**/*.iml</exclude>
|
||||
<!--CRASH LOGS-->
|
||||
<exclude>**/hs_err_pid*.log</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
@ -1366,5 +1266,115 @@
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
<profile>
|
||||
<!-- Run Apache Rat license checks in a separate profile, because during local builds it doesn't skip files
|
||||
that are not checked into Git -->
|
||||
<id>rat</id>
|
||||
<activation>
|
||||
<activeByDefault>false</activeByDefault>
|
||||
</activation>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<version>0.12</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>verify</phase>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<outputDirectory>${project.basedir}/rat</outputDirectory>
|
||||
<licenses>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>MIT</licenseFamilyCategory>
|
||||
<licenseFamilyName>MIT JQyery</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2012 jQuery Foundation and other contributors; Licensed MIT</pattern>
|
||||
<pattern>jQuery Foundation, Inc. | jquery.org/license</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Underscore</licenseFamilyCategory>
|
||||
<licenseFamilyName>Underscore</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Underscore is freely distributable under the MIT license</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Allan Jardine</licenseFamilyCategory>
|
||||
<licenseFamilyName>Allan Jardine</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2009 Allan Jardine. All Rights Reserved</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
<license implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
|
||||
<licenseFamilyCategory>Allan Jardine</licenseFamilyCategory>
|
||||
<licenseFamilyName>Allan Jardine</licenseFamilyName>
|
||||
<notes></notes>
|
||||
<patterns>
|
||||
<pattern>Copyright 2009 Allan Jardine. All Rights Reserved</pattern>
|
||||
<pattern>Copyright 2008-2011 Allan Jardine</pattern>
|
||||
<pattern>GPL v2 or BSD 3 point style</pattern>
|
||||
</patterns>
|
||||
</license>
|
||||
</licenses>
|
||||
|
||||
<licenseFamilies>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>MIT JQyery</familyName>
|
||||
</licenseFamily>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>Underscore</familyName>
|
||||
</licenseFamily>
|
||||
<licenseFamily implementation="org.apache.rat.license.SimpleLicenseFamily">
|
||||
<familyName>Allan Jardine</familyName>
|
||||
</licenseFamily>
|
||||
</licenseFamilies>
|
||||
<excludes>
|
||||
<!--DOCS-->
|
||||
<exclude>**/*.md</exclude>
|
||||
<exclude>publications/**</exclude>
|
||||
<exclude>docs/**</exclude>
|
||||
<!--CODE STYLE-->
|
||||
<exclude>codestyle/*</exclude>
|
||||
<exclude>eclipse.importorder</exclude>
|
||||
<!--CODE RESOURCES-->
|
||||
<exclude>**/javax.annotation.processing.Processor</exclude>
|
||||
<exclude>**/org.apache.druid.initialization.DruidModule</exclude>
|
||||
<exclude>**/org/apache/druid/math/expr/antlr/Expr.g4</exclude>
|
||||
<exclude>**/dependency-reduced-pom.xml</exclude>
|
||||
<!--BUILD and TESTS-->
|
||||
<exclude>**/target/**</exclude>
|
||||
<exclude>**/build/**</exclude>
|
||||
<exclude>**/test/resources/**</exclude>
|
||||
<exclude>**/src/test/avro/**</exclude>
|
||||
<exclude>**/src/test/thrift/**</exclude>
|
||||
<exclude>.travis.yml</exclude>
|
||||
<!--DEV and IT-TESTS-->
|
||||
<exclude>**/*.json</exclude>
|
||||
<exclude>**/jvm.config</exclude>
|
||||
<exclude>**/quickstart/protobuf/**</exclude>
|
||||
<exclude>**/tutorial/conf/**</exclude>
|
||||
<exclude>**/derby.log</exclude>
|
||||
<exclude>**/docker/**</exclude>
|
||||
<exclude>**/client_tls/**</exclude>
|
||||
<!--IDE MODULE FILES-->
|
||||
<exclude>**/*.iml</exclude>
|
||||
<!--CRASH LOGS-->
|
||||
<exclude>**/hs_err_pid*.log</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
@ -24,12 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.extraction.ExtractionFn;
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@ -132,12 +131,10 @@ public class DefaultDimensionSpec implements DimensionSpec
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = StringUtils.toUtf8(dimension);
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(dimensionBytes)
|
||||
.array();
|
||||
return new CacheKeyBuilder(CACHE_TYPE_ID)
|
||||
.appendString(dimension)
|
||||
.appendString(outputType.toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,13 +22,11 @@ package org.apache.druid.query.dimension;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.extraction.ExtractionFn;
|
||||
import org.apache.druid.segment.DimensionSelector;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ExtractionDimensionSpec implements DimensionSpec
|
||||
@ -114,14 +112,11 @@ public class ExtractionDimensionSpec implements DimensionSpec
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] dimensionBytes = StringUtils.toUtf8(dimension);
|
||||
byte[] dimExtractionFnBytes = extractionFn.getCacheKey();
|
||||
|
||||
return ByteBuffer.allocate(1 + dimensionBytes.length + dimExtractionFnBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(dimensionBytes)
|
||||
.put(dimExtractionFnBytes)
|
||||
.array();
|
||||
return new CacheKeyBuilder(CACHE_TYPE_ID)
|
||||
.appendString(dimension)
|
||||
.appendCacheable(extractionFn)
|
||||
.appendString(outputType.toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,8 +52,10 @@ public class DimFilterUtils
|
||||
static final byte COLUMN_COMPARISON_CACHE_ID = 0xD;
|
||||
static final byte EXPRESSION_CACHE_ID = 0xE;
|
||||
static final byte TRUE_CACHE_ID = 0xF;
|
||||
public static byte BLOOM_DIM_FILTER_CACHE_ID = 0x10;
|
||||
public static final byte STRING_SEPARATOR = (byte) 0xFF;
|
||||
|
||||
|
||||
static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
|
||||
{
|
||||
if (filters.size() == 1) {
|
||||
|
@ -55,4 +55,12 @@ public class DefaultDimensionSpecTest
|
||||
Assert.assertEquals(spec, other);
|
||||
Assert.assertEquals(spec.hashCode(), other.hashCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheKey()
|
||||
{
|
||||
final DimensionSpec spec = new DefaultDimensionSpec("foo", "foo", ValueType.FLOAT);
|
||||
final byte[] expected = new byte[] {0, 7, 102, 111, 111, 7, 70, 76, 79, 65, 84};
|
||||
Assert.assertArrayEquals(expected, spec.getCacheKey());
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.query.extraction.MatchingDimExtractionFn;
|
||||
import org.apache.druid.query.extraction.RegexDimExtractionFn;
|
||||
import org.apache.druid.query.extraction.StrlenExtractionFn;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -144,4 +145,17 @@ public class ExtractionDimensionSpecTest
|
||||
.getExtractionFn() instanceof MatchingDimExtractionFn
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheKey()
|
||||
{
|
||||
final ExtractionDimensionSpec dimensionSpec = new ExtractionDimensionSpec(
|
||||
"foo",
|
||||
"len",
|
||||
ValueType.LONG,
|
||||
StrlenExtractionFn.instance()
|
||||
);
|
||||
final byte[] expected = new byte[]{1, 7, 102, 111, 111, 9, 14, 7, 76, 79, 78, 71};
|
||||
Assert.assertArrayEquals(expected, dimensionSpec.getCacheKey());
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,9 @@ public abstract class BaseFilterTest
|
||||
{
|
||||
private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
|
||||
ImmutableList.of(
|
||||
new ExpressionVirtualColumn("expr", "1.0 + 0.1", ValueType.FLOAT, TestExprMacroTable.INSTANCE)
|
||||
new ExpressionVirtualColumn("expr", "1.0 + 0.1", ValueType.FLOAT, TestExprMacroTable.INSTANCE),
|
||||
new ExpressionVirtualColumn("exprDouble", "1.0 + 1.1", ValueType.DOUBLE, TestExprMacroTable.INSTANCE),
|
||||
new ExpressionVirtualColumn("exprLong", "1 + 2", ValueType.LONG, TestExprMacroTable.INSTANCE)
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -159,14 +159,6 @@
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlets</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.ircclouds.irc</groupId>
|
||||
<artifactId>irc-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.maxmind.geoip2</groupId>
|
||||
<artifactId>geoip2</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
|
@ -29,7 +29,6 @@ import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.HttpFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.IrcFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.SqlFirehoseFactory;
|
||||
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
|
||||
@ -54,7 +53,6 @@ public class FirehoseModule implements DruidModule
|
||||
.registerSubtypes(
|
||||
new NamedType(ClippedFirehoseFactory.class, "clipped"),
|
||||
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
|
||||
new NamedType(IrcFirehoseFactory.class, "irc"),
|
||||
new NamedType(LocalFirehoseFactory.class, "local"),
|
||||
new NamedType(HttpFirehoseFactory.class, "http"),
|
||||
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
|
||||
|
@ -47,7 +47,6 @@ import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.guice.LocalDataStorageDruidModule;
|
||||
import org.apache.druid.guice.MetadataConfigModule;
|
||||
import org.apache.druid.guice.ModulesConfig;
|
||||
import org.apache.druid.guice.ParsersModule;
|
||||
import org.apache.druid.guice.ServerModule;
|
||||
import org.apache.druid.guice.ServerViewModule;
|
||||
import org.apache.druid.guice.StartupLoggingModule;
|
||||
@ -394,7 +393,6 @@ public class Initialization
|
||||
new CoordinatorDiscoveryModule(),
|
||||
new LocalDataStorageDruidModule(),
|
||||
new FirehoseModule(),
|
||||
new ParsersModule(),
|
||||
new JavaScriptModule(),
|
||||
new AuthenticatorModule(),
|
||||
new AuthenticatorMapperModule(),
|
||||
|
@ -217,9 +217,13 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||
tableName, getPayloadType(), getQuoteString()
|
||||
),
|
||||
StringUtils.format(
|
||||
"CREATE INDEX idx_%1$s_datasource_used_end ON %1$s(dataSource, used, %2$send%2$s)",
|
||||
"CREATE INDEX idx_%1$s_datasource_end ON %1$s(dataSource, %2$send%2$s)",
|
||||
tableName,
|
||||
getQuoteString()
|
||||
),
|
||||
StringUtils.format(
|
||||
"CREATE INDEX idx_%1$s_datasource_sequence ON %1$s(dataSource, sequence_name)",
|
||||
tableName
|
||||
)
|
||||
)
|
||||
);
|
||||
@ -264,14 +268,11 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||
+ ")",
|
||||
tableName, getPayloadType(), getQuoteString()
|
||||
),
|
||||
StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", tableName),
|
||||
StringUtils.format(
|
||||
"CREATE INDEX idx_%1$s_datasource_end ON %1$s(dataSource, %2$send%2$s)",
|
||||
"CREATE INDEX idx_%1$s_datasource_used_end ON %1$s(dataSource, used, %2$send%2$s)",
|
||||
tableName,
|
||||
getQuoteString()
|
||||
),
|
||||
StringUtils.format(
|
||||
"CREATE INDEX idx_%1$s_datasource_sequence ON %1$s(dataSource, sequence_name)",
|
||||
tableName
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -29,6 +29,10 @@ import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.apache.druid.client.DruidDataSource;
|
||||
import org.apache.druid.client.DruidServer;
|
||||
import org.apache.druid.client.ImmutableDruidDataSource;
|
||||
@ -50,7 +54,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
|
||||
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.java.util.common.guava.FunctionalIterable;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
@ -71,10 +74,6 @@ import org.apache.druid.server.coordinator.rules.Rule;
|
||||
import org.apache.druid.server.initialization.ZkPathsConfig;
|
||||
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import it.unimi.dsi.fastutil.objects.Object2LongMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
@ -103,6 +102,7 @@ public class DruidCoordinator
|
||||
.reverse();
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final DruidCoordinatorConfig config;
|
||||
private final ZkPathsConfig zkPaths;
|
||||
@ -119,14 +119,15 @@ public class DruidCoordinator
|
||||
private final ServiceAnnouncer serviceAnnouncer;
|
||||
private final DruidNode self;
|
||||
private final Set<DruidCoordinatorHelper> indexingServiceHelpers;
|
||||
private volatile boolean started = false;
|
||||
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
|
||||
private final BalancerStrategyFactory factory;
|
||||
private final LookupCoordinatorManager lookupCoordinatorManager;
|
||||
private final DruidLeaderSelector coordLeaderSelector;
|
||||
|
||||
private final DruidCoordinatorSegmentCompactor segmentCompactor;
|
||||
|
||||
private volatile boolean started = false;
|
||||
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
|
||||
|
||||
@Inject
|
||||
public DruidCoordinator(
|
||||
DruidCoordinatorConfig config,
|
||||
@ -533,6 +534,7 @@ public class DruidCoordinator
|
||||
|
||||
metadataSegmentManager.start();
|
||||
metadataRuleManager.start();
|
||||
lookupCoordinatorManager.start();
|
||||
serviceAnnouncer.announce(self);
|
||||
final int startingLeaderCounter = coordLeaderSelector.localTerm();
|
||||
|
||||
@ -580,8 +582,6 @@ public class DruidCoordinator
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
lookupCoordinatorManager.start();
|
||||
}
|
||||
}
|
||||
|
||||
@ -598,9 +598,9 @@ public class DruidCoordinator
|
||||
loadManagementPeons.clear();
|
||||
|
||||
serviceAnnouncer.unannounce(self);
|
||||
lookupCoordinatorManager.stop();
|
||||
metadataRuleManager.stop();
|
||||
metadataSegmentManager.stop();
|
||||
lookupCoordinatorManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@ -697,13 +697,15 @@ public class DruidCoordinator
|
||||
ImmutableList.of(
|
||||
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
|
||||
params -> {
|
||||
// Display info about all historical servers
|
||||
Iterable<ImmutableDruidServer> servers = FunctionalIterable
|
||||
.create(serverInventoryView.getInventory())
|
||||
List<ImmutableDruidServer> servers = serverInventoryView
|
||||
.getInventory()
|
||||
.stream()
|
||||
.filter(DruidServer::segmentReplicatable)
|
||||
.transform(DruidServer::toImmutableDruidServer);
|
||||
.map(DruidServer::toImmutableDruidServer)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
// Display info about all historical servers
|
||||
log.debug("Servers");
|
||||
for (ImmutableDruidServer druidServer : servers) {
|
||||
log.debug(" %s", druidServer);
|
||||
|
@ -54,6 +54,7 @@ import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
|
||||
@ -166,6 +167,7 @@ public class CliOverlord extends ServerRunnable
|
||||
JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
|
||||
|
||||
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
|
||||
|
||||
|
@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
|
||||
@ -199,6 +200,7 @@ public class CliPeon extends GuiceRunnable
|
||||
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
|
||||
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
|
||||
|
||||
configureTaskActionClient(binder);
|
||||
|
@ -36,6 +36,7 @@ import com.google.inject.Module;
|
||||
import com.google.inject.name.Names;
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.druid.collections.bitmap.BitmapFactory;
|
||||
import org.apache.druid.collections.bitmap.ConciseBitmapFactory;
|
||||
import org.apache.druid.collections.bitmap.ImmutableBitmap;
|
||||
@ -417,6 +418,7 @@ public class DumpSegment extends GuiceRunnable
|
||||
return ImmutableList.copyOf(columnNames);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "System#out")
|
||||
private <T> T withOutputStream(Function<OutputStream, T> f) throws IOException
|
||||
{
|
||||
if (outputFileName == null) {
|
||||
|
@ -23,6 +23,7 @@ import com.google.inject.Injector;
|
||||
import io.airlift.airline.Cli;
|
||||
import io.airlift.airline.Help;
|
||||
import io.airlift.airline.ParseException;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.druid.cli.validate.DruidJsonValidator;
|
||||
import org.apache.druid.guice.ExtensionsConfig;
|
||||
import org.apache.druid.guice.GuiceInjectors;
|
||||
@ -45,6 +46,7 @@ public class Main
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SuppressForbidden(reason = "System#out")
|
||||
public static void main(String[] args)
|
||||
{
|
||||
final Cli.CliBuilder<Runnable> builder = Cli.builder("druid");
|
||||
|
@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.druid.guice.ExtensionsConfig;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
@ -414,6 +415,7 @@ public class PullDependencies implements Runnable
|
||||
log.info("Finish downloading extension [%s]", versionedArtifact);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "System#out")
|
||||
private DefaultTeslaAether getAetherClient()
|
||||
{
|
||||
/*
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.apache.druid.cli;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.initialization.Initialization;
|
||||
import org.apache.druid.server.StatusResource;
|
||||
@ -31,6 +32,7 @@ import org.apache.druid.server.StatusResource;
|
||||
public class Version implements Runnable
|
||||
{
|
||||
@Override
|
||||
@SuppressForbidden(reason = "System#out")
|
||||
public void run()
|
||||
{
|
||||
System.out.println(new StatusResource.Status(Initialization.getLoadedImplementations(DruidModule.class)));
|
||||
|
@ -32,6 +32,7 @@ import com.google.inject.Injector;
|
||||
import com.google.inject.name.Names;
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import io.netty.util.SuppressForbidden;
|
||||
import org.apache.druid.cli.GuiceRunnable;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
@ -40,7 +41,6 @@ import org.apache.druid.guice.ExtensionsConfig;
|
||||
import org.apache.druid.guice.FirehoseModule;
|
||||
import org.apache.druid.guice.IndexingServiceFirehoseModule;
|
||||
import org.apache.druid.guice.LocalDataStorageDruidModule;
|
||||
import org.apache.druid.guice.ParsersModule;
|
||||
import org.apache.druid.guice.QueryRunnerFactoryModule;
|
||||
import org.apache.druid.guice.QueryableModule;
|
||||
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
|
||||
@ -69,6 +69,7 @@ import java.util.List;
|
||||
name = "validator",
|
||||
description = "Validates that a given Druid JSON object is correctly formatted"
|
||||
)
|
||||
@SuppressForbidden(reason = "System#out")
|
||||
public class DruidJsonValidator extends GuiceRunnable
|
||||
{
|
||||
private static final Logger LOG = new Logger(DruidJsonValidator.class);
|
||||
@ -128,8 +129,7 @@ public class DruidJsonValidator extends GuiceRunnable
|
||||
new FirehoseModule(),
|
||||
new IndexingHadoopModule(),
|
||||
new IndexingServiceFirehoseModule(),
|
||||
new LocalDataStorageDruidModule(),
|
||||
new ParsersModule()
|
||||
new LocalDataStorageDruidModule()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
Loading…
x
Reference in New Issue
Block a user