Merge pull request #1063 from metamx/fix-dimension-exclusion

fix dimension exclusion
This commit is contained in:
Fangjin Yang 2015-01-28 06:10:41 +08:00
commit b78ef22829
8 changed files with 147 additions and 15 deletions

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -164,12 +165,20 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensions();
} else {
Set<String> dimSet = new HashSet<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) {
dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions());
}
dims = Lists.newArrayList(dimSet);
dims = Lists.newArrayList(
Sets.difference(
dimSet,
inputRowParser.getParseSpec().getDimensionsSpec()
.getDimensionExclusions()
)
);
}
List<String> metricsList;

View File

@ -41,7 +41,7 @@ ADD lib/* /usr/local/druid/lib/
WORKDIR /
# Setup metadata store
RUN /etc/init.d/mysql start && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid;" | mysql -u root && /etc/init.d/mysql stop
RUN /etc/init.d/mysql start && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root && /etc/init.d/mysql stop
# Add sample data
RUN /etc/init.d/mysql start && java -Ddruid.metadata.storage.type=mysql -cp "/usr/local/druid/lib/*" io.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd && /etc/init.d/mysql stop

View File

@ -43,7 +43,7 @@ docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/histo
docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper:druid-zookeeper --link druid-overlord:druid-overlord druid/cluster
# Start Broker
docker run -d --name druid-broker -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper:druid-zookeeper --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper:druid-zookeeper --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
# Start Router
docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper:druid-zookeeper --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster

View File

@ -57,6 +57,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return dockerIp+ ":8888";
}
@Override
public String getBrokerHost()
{
return dockerIp + ":8082";
}
@Override
public String getMiddleManagerHost()
{

View File

@ -29,5 +29,7 @@ public interface IntegrationTestingConfig
public String getRouterHost();
public String getBrokerHost();
public String getMiddleManagerHost();
}

View File

@ -0,0 +1,90 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.testing.clients;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
import io.druid.testing.IntegrationTestingConfig;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
import java.util.List;
public class ClientInfoResourceTestClient
{
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String broker;
private final StatusResponseHandler responseHandler;
@Inject
ClientInfoResourceTestClient(
ObjectMapper jsonMapper,
@Global HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.broker = config.getBrokerHost();
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
}
private String getBrokerURL()
{
return String.format(
"http://%s/druid/v2/datasources",
broker
);
}
public List<String> getDimensions(String dataSource, String interval){
try {
StatusResponseHolder response = httpClient.get(new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval)))
.go(responseHandler)
.get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while querying[%s] status[%s] content[%s]",
getBrokerURL(),
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<List<String>>()
{
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -23,11 +23,14 @@ import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.clients.ClientInfoResourceTestClient;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import org.junit.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.Callable;
@Guice(moduleFactory = DruidTestModuleFactory.class)
@ -37,17 +40,29 @@ public class ITIndexerTest extends AbstractIndexerTest
private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static String INDEX_DATASOURCE = "wikipedia_index_test";
private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
private static String REINDEX_DATASOURCE = "wikipedia_reindex_test";
@Inject
private IntegrationTestingConfig config;
@Inject
ClientInfoResourceTestClient clientInfoResourceTestClient;
@Test
public void testIndexData() throws Exception
{
loadData();
try {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
reIndexData();
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
// verify excluded dimension is not reIndexed
final List<String> dimensions = clientInfoResourceTestClient.getDimensions(
REINDEX_DATASOURCE,
"2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z"
);
Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot"));
}
catch (Exception e) {
e.printStackTrace();
@ -55,6 +70,8 @@ public class ITIndexerTest extends AbstractIndexerTest
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
unloadAndKillData(REINDEX_DATASOURCE);
}
}
@ -77,5 +94,22 @@ public class ITIndexerTest extends AbstractIndexerTest
);
}
private void reIndexData() throws Exception
{
final String taskID = indexer.submitTask(getTaskAsString(REINDEX_TASK));
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return coordinator.areSegmentsLoaded(REINDEX_DATASOURCE);
}
}, "Segment Load"
);
}
}

View File

@ -2,12 +2,8 @@
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "wikipedia_index_test",
"dataSource": "wikipedia_reindex_test",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
@ -37,10 +33,7 @@
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous",
"namespace", "continent", "country", "region", "city"
]
"dimensionExclusions" : ["robot", "continent"]
}
}
}
@ -50,8 +43,6 @@
"firehose": {
"type": "ingestSegment",
"dataSource": "wikipedia_index_test",
"dimensions": ["user", "nonexist"],
"metrics": ["added", "added2"],
"interval": "2013-08-31/2013-09-01"
}
},