Merge branch 'master' into druid-0.7.x

Conflicts:
	processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java
This commit is contained in:
fjy 2014-10-02 18:12:10 -07:00
commit c7b4d5b7b4
45 changed files with 926 additions and 427 deletions

View File

@ -42,10 +42,10 @@ public class VMUtils
throw new UnsupportedOperationException("VM.maxDirectMemory doesn't exist, cannot do memory check.", e);
}
catch (InvocationTargetException e) {
throw new RuntimeException("static method shouldn't throw this", e);
throw new UnsupportedOperationException("static method shouldn't throw this", e);
}
catch (IllegalAccessException e) {
throw new RuntimeException("public method, shouldn't throw this", e);
throw new UnsupportedOperationException("public method, shouldn't throw this", e);
}
}
}

View File

@ -40,6 +40,36 @@ See [Examples](Examples.html). This firehose creates a stream of random numbers.
This firehose ingests events from a define rabbit-mq queue.
#### LocalFirehose
This Firehose can be used to read the data from files on local disk.
It can be used for POCs to ingest data on disk.
A sample local firehose spec is shown below:
```json
{
"type" : "local",
"filter" : "*.csv",
"parser" : {
"timestampSpec": {
"column": "mytimestamp",
"format": "yyyy-MM-dd HH:mm:ss"
},
"data": {
"format": "csv",
"columns": [...],
"dimensions": [...]
}
}
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes|
|data|A data spec similar to what is used for batch ingestion.|yes|
#### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
@ -63,11 +93,6 @@ A sample ingest firehose spec is shown below -
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](Filters.html)|yes|
Parsing Data
------------

View File

@ -42,6 +42,15 @@ You can check `<BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE>?interva
You can use IngestSegmentFirehose with index task to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
See [Firehose](Firehose.html) for more details on IngestSegmentFirehose.
## How can I change the granularity of existing data in Druid?
In a lot of situations you may want to lower the granularity of older data. Example, any data older than 1 month has only hour level granularity but newer data has minute level granularity.
To do this use the IngestSegmentFirehose and run an indexer task. The IngestSegment firehose will allow you to take in existing segments from Druid and aggregate them and feed them back into druid. It will also allow you to filter the data in those segments while feeding it back in. This means if there are rows you want to delete, you can just filter them away during re-ingestion.
Typically the above will be run as a batch job to say everyday feed in a chunk of data and aggregate it.
## More information
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -19,12 +19,13 @@
package io.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -34,71 +35,77 @@ import java.io.IOException;
import java.io.InputStream;
/**
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs
*
* Created by Frank Ren on 6/20/14.
* Indexer hdfs task logs, to support storing hdfs tasks to hdfs.
*/
public class HdfsTaskLogs implements TaskLogs
{
private static final Logger log = new Logger(HdfsTaskLogs.class);
private static final Logger log = new Logger(HdfsTaskLogs.class);
private final HdfsTaskLogsConfig config;
private final HdfsTaskLogsConfig config;
@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config)
{
this.config = config;
@Inject
public HdfsTaskLogs(HdfsTaskLogsConfig config)
{
this.config = config;
}
@Override
public void pushTaskLog(String taskId, File logFile) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
log.info("Writing task log to: %s", path);
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(conf);
FileUtil.copy(logFile, fs, path, false, conf);
log.info("Wrote task log to: %s", path);
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
log.info("Reading task log from: %s", path);
final long seekPos;
if (offset < 0) {
final FileStatus stat = fs.getFileStatus(path);
seekPos = Math.max(0, stat.getLen() + offset);
} else {
seekPos = offset;
}
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(seekPos);
log.info("Read task log from: %s (seek = %,d)", path, seekPos);
return inputStream;
}
}
);
} else {
return Optional.absent();
}
}
@Override
public void pushTaskLog(String taskId, File logFile) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
log.info("writing task log to: %s", path);
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(conf);
FileUtil.copy(logFile, fs, path, false, conf);
log.info("wrote task log to: %s", path);
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
private Path getTaskLogFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
final FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(path)) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>() {
@Override
public InputStream getInput() throws IOException
{
log.info("reading task log from: %s", path);
final InputStream inputStream = fs.open(path);
ByteStreams.skipFully(inputStream, offset);
log.info("read task log from: %s", path);
return inputStream;
}
}
);
} else {
return Optional.absent();
}
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
private Path getTaskLogFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_")));
}
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}
// some hadoop version Path.mergePaths does not exist
private static String mergePaths(String path1, String path2)
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}
}

View File

@ -23,19 +23,23 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
* Indexer hdfs task logs configuration
*
* Created by Frank Ren on 6/20/14.
* Indexer hdfs task logs configuration.
*/
public class HdfsTaskLogsConfig
{
@JsonProperty
@NotNull
private String directory;
public String getDirectory()
{
return directory;
}
@JsonProperty
@NotNull
private String directory;
public HdfsTaskLogsConfig(String directory)
{
this.directory = directory;
}
public String getDirectory()
{
return directory;
}
}

View File

@ -0,0 +1,41 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class HdfsTaskLogsTest
{
@Test
public void testSimple() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()));
taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}

View File

@ -463,7 +463,7 @@ public class IndexGeneratorJob implements Jobby
} else if (outputFS instanceof DistributedFileSystem) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.getPath()
"path", indexOutURI.toString()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());

View File

@ -37,13 +37,13 @@ public class IndexingServiceTaskLogsModule implements Module
public void configure(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
}

View File

@ -30,6 +30,15 @@ public class FileTaskLogsConfig
@NotNull
private File directory = new File("log");
public FileTaskLogsConfig()
{
}
public FileTaskLogsConfig(File directory)
{
this.directory = directory;
}
public File getDirectory()
{
return directory;

View File

@ -20,7 +20,6 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
@ -29,7 +28,6 @@ import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -69,9 +67,7 @@ public class FileTaskLogs implements TaskLogs
@Override
public InputStream getInput() throws IOException
{
final InputStream inputStream = new FileInputStream(file);
ByteStreams.skipFully(inputStream, offset);
return inputStream;
return LogUtils.streamFile(file, offset);
}
}
);

View File

@ -0,0 +1,30 @@
package io.druid.indexing.common.tasklogs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
public class LogUtils
{
/**
* Open a stream to a file.
*
* @param offset If zero, stream the entire log. If positive, read from this byte position onwards. If negative,
* read this many bytes from the end of the file.
*
* @return input supplier for this log, if available from this provider
*/
public static InputStream streamFile(final File file, final long offset) throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(file, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
}
}

View File

@ -44,6 +44,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.tasklogs.LogUtils;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.server.DruidNode;
@ -391,41 +392,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(Math.max(0, rafLength + offset));
}
return Channels.newInputStream(raf.getChannel());
return LogUtils.streamFile(processHolder.logFile, offset);
}
}
);
}
private int findUnusedPort()
{
synchronized (tasks) {
int port = config.getStartPort();
int maxPortSoFar = -1;
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
if (taskWorkItem.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskWorkItem.processHolder.port;
}
if (taskWorkItem.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
}
return port;
}
}
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private volatile boolean shutdown = false;

View File

@ -0,0 +1,69 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import java.util.Set;
/**
* A snapshot of a {@link io.druid.indexing.overlord.ZkWorker}
*/
public class ImmutableZkWorker
{
private final Worker worker;
private final int currCapacityUsed;
private final Set<String> availabilityGroups;
public ImmutableZkWorker(Worker worker, int currCapacityUsed, Set<String> availabilityGroups)
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
}
public Worker getWorker()
{
return worker;
}
public int getCurrCapacityUsed()
{
return currCapacityUsed;
}
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
}
public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}
}

View File

@ -25,14 +25,13 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Maps;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -49,7 +48,7 @@ import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.ZkPathsConfig;
@ -70,10 +69,8 @@ import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -88,13 +85,13 @@ import java.util.concurrent.TimeUnit;
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
*
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties.
*
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
*
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@ -109,8 +106,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
private final Supplier<WorkerSetupData> workerSetupData;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
// all workers that exist in ZK
private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<>();
@ -135,8 +132,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
ZkPathsConfig zkPaths,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
Supplier<WorkerSetupData> workerSetupData,
HttpClient httpClient
HttpClient httpClient,
WorkerSelectStrategy strategy
)
{
this.jsonMapper = jsonMapper;
@ -145,8 +142,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath());
this.workerSetupData = workerSetupData;
this.httpClient = httpClient;
this.strategy = strategy;
}
@LifecycleStart
@ -524,11 +521,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return true;
} else {
// Nothing running this task, announce it in ZK for a worker to run it
ZkWorker zkWorker = findWorkerForTask(task);
if (zkWorker != null) {
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
ImmutableMap.copyOf(
Maps.transformEntries(
zkWorkers,
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
{
@Override
public ImmutableZkWorker transformEntry(
String key, ZkWorker value
)
{
return value.toImmutable();
}
}
)
),
task
);
if (immutableZkWorker.isPresent()) {
final ZkWorker zkWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
announceTask(task, zkWorker, taskRunnerWorkItem);
return true;
} else {
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
}
}
@ -789,37 +805,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
}
private ZkWorker findWorkerForTask(final Task task)
{
TreeSet<ZkWorker> sortedWorkers = Sets.newTreeSet(
new Comparator<ZkWorker>()
{
@Override
public int compare(
ZkWorker zkWorker, ZkWorker zkWorker2
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
if (retVal == 0) {
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
}
return retVal;
}
}
);
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return zkWorker;
}
}
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null;
}
private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker,

View File

@ -20,26 +20,25 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
/**
*/
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
{
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final ZkPathsConfig zkPaths;
private final ObjectMapper jsonMapper;
private final Supplier<WorkerSetupData> setupDataWatch;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
@Inject
public RemoteTaskRunnerFactory(
@ -47,15 +46,16 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final RemoteTaskRunnerConfig remoteTaskRunnerConfig,
final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper,
final Supplier<WorkerSetupData> setupDataWatch,
@Global final HttpClient httpClient
) {
@Global final HttpClient httpClient,
final WorkerSelectStrategy strategy
)
{
this.curator = curator;
this.remoteTaskRunnerConfig = remoteTaskRunnerConfig;
this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper;
this.setupDataWatch = setupDataWatch;
this.httpClient = httpClient;
this.strategy = strategy;
}
@Override
@ -70,8 +70,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.Builder()
.withCompressed(remoteTaskRunnerConfig.isCompressZnodes())
.build(),
setupDataWatch,
httpClient
httpClient,
strategy
);
}
}

View File

@ -158,6 +158,11 @@ public class ZkWorker implements Closeable
lastCompletedTaskTime.getAndSet(completedTaskTime);
}
public ImmutableZkWorker toImmutable()
{
return new ImmutableZkWorker(worker, getCurrCapacityUsed(), getAvailabilityGroups());
}
@Override
public void close() throws IOException
{

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import java.util.Comparator;
import java.util.TreeSet;
/**
*/
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
private final RemoteTaskRunnerConfig config;
@Inject
public FillCapacityWorkerSelectStrategy(RemoteTaskRunnerConfig config)
{
this.config = config;
}
public Optional<ImmutableZkWorker> findWorkerForTask(
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task
)
{
TreeSet<ImmutableZkWorker> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableZkWorker>()
{
@Override
public int compare(
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
if (retVal == 0) {
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
}
return retVal;
}
}
);
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();
for (ImmutableZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
}
return Optional.absent();
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableZkWorker;
import io.druid.indexing.overlord.ZkWorker;
import java.util.Map;
/**
* The {@link io.druid.indexing.overlord.RemoteTaskRunner} uses this class to select a worker to assign tasks to.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FillCapacityWorkerSelectStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class)
})
public interface WorkerSelectStrategy
{
/**
* Customizable logic for selecting a worker to run a task.
*
* @param zkWorkers An immutable map of workers to choose from.
* @param task The task to assign.
*
* @return A {@link io.druid.indexing.overlord.ImmutableZkWorker} to run the task if one is available.
*/
public Optional<ImmutableZkWorker> findWorkerForTask(
final ImmutableMap<String, ImmutableZkWorker> zkWorkers,
final Task task
);
}

View File

@ -0,0 +1,40 @@
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Map;
public class FileTaskLogsTest
{
@Test
public void testSimple() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
taskLogs.pushTaskLog("foo", logFile);
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final String string = new String(bytes);
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}

View File

@ -39,6 +39,8 @@ import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
@ -367,9 +369,10 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception
{
RemoteTaskRunnerConfig config = new TestRemoteTaskRunnerConfig();
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
new TestRemoteTaskRunnerConfig(),
config,
new ZkPathsConfig()
{
@Override
@ -380,8 +383,8 @@ public class RemoteTaskRunnerTest
},
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData(0, 1, null, null, null))),
null
null,
new FillCapacityWorkerSelectStrategy(config)
);
remoteTaskRunner.start();

View File

@ -324,22 +324,22 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.2.2.v20140723</version>
<version>9.2.3.v20140905</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.2.2.v20140723</version>
<version>9.2.3.v20140905</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>9.2.2.v20140723</version>
<version>9.2.3.v20140905</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<version>9.2.2.v20140723</version>
<version>9.2.3.v20140905</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@ -19,14 +19,10 @@
package io.druid.query;
import com.google.common.base.Joiner;
import java.util.List;
public class DataSourceUtil
{
public static final Joiner COMMA_JOIN = Joiner.on(",");
public static String getMetricName(DataSource dataSource)
{
final List<String> names = dataSource.getNames();

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
/**
*/
public class QueryMetricUtil
{
public static <T> ServiceMetricEvent.Builder makeQueryTimeMetric(Query<T> query)
{
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(
Lists.transform(
query.getIntervals(),
new Function<Interval, String>()
{
@Override
public String apply(Interval input)
{
return input.toString();
}
}
).toArray(new String[query.getIntervals().size()])
)
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString());
}
public static <T> ServiceMetricEvent.Builder makeRequestTimeMetric(
final ObjectMapper jsonMapper, final Query<T> query, final String remoteAddr
) throws JsonProcessingException
{
return makeQueryTimeMetric(query)
.setUser3(
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setUser7(remoteAddr)
.setUser8(query.getId());
}
}

View File

@ -22,7 +22,6 @@ package io.druid.query.groupby;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -45,11 +44,11 @@ import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
@ -61,8 +60,6 @@ import io.druid.query.filter.DimFilter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Iterator;
@ -230,19 +227,9 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser4("groupBy")
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query)
.setUser3(String.format("%,d dims", query.getDimensions().size()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()));
}
@Override

View File

@ -22,12 +22,10 @@ package io.druid.query.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
@ -36,9 +34,8 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
@ -47,7 +44,6 @@ import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -62,14 +58,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
};
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
private final QueryConfig config;
@Inject
public SegmentMetadataQueryQueryToolChest(QueryConfig config)
{
this.config = config;
}
@Override
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{
@ -158,17 +146,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SegmentMetadataQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(query.getType())
.setUser5(Joiner.on(",").join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query);
}
@Override
@ -195,9 +173,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
{
byte[] includerBytes = query.getToInclude().getCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
.array();
}
@Override

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -39,9 +38,9 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -53,8 +52,6 @@ import io.druid.query.search.search.SearchHit;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQueryConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -67,7 +64,6 @@ import java.util.Set;
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{
private static final byte SEARCH_QUERY = 0x2;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<SearchResultValue>> TYPE_REFERENCE = new TypeReference<Result<SearchResultValue>>()
{
};
@ -124,17 +120,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SearchQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("search")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query);
}
@Override
@ -181,7 +167,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final ByteBuffer queryCacheKey = ByteBuffer
.allocate(
1 + 4 + granularityBytes.length + filterBytes.length +
querySpecBytes.length + dimensionsBytesSize
querySpecBytes.length + dimensionsBytesSize
)
.put(SEARCH_QUERY)
.put(Ints.toByteArray(query.getLimit()))

View File

@ -165,13 +165,16 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
while (!cursor.isDone()) {
for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
final DimensionSelector selector = entry.getValue();
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
set.add(new SearchHit(entry.getKey(), dimVal));
if (set.size() >= limit) {
return set;
if (selector != null) {
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
set.add(new SearchHit(entry.getKey(), dimVal));
if (set.size() >= limit) {
return set;
}
}
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@ -35,10 +34,10 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -47,8 +46,6 @@ import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -62,7 +59,6 @@ import java.util.Set;
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
@ -126,17 +122,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(SelectQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("Select")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query);
}
@Override
@ -261,13 +247,13 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
new SelectResultValue(
(Map<String, Integer>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Map<String, Integer>>()
{
}
{
}
),
(List<EventHolder>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<List<EventHolder>>()
{
}
{
}
)
)
);

View File

@ -21,7 +21,6 @@ package io.druid.query.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -33,11 +32,11 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryConfig;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -48,8 +47,6 @@ import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@ -62,7 +59,6 @@ import java.util.Map;
public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery>
{
private static final byte TIMESERIES_QUERY = 0x0;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
@ -124,18 +120,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TimeseriesQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4("timeseries")
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query)
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()));
}
@Override

View File

@ -21,7 +21,6 @@ package io.druid.query.topn;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -37,10 +36,10 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSourceUtil;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
@ -52,8 +51,6 @@ import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import java.nio.ByteBuffer;
import java.util.Iterator;
@ -65,7 +62,6 @@ import java.util.Map;
public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery>
{
private static final byte TOPN_QUERY = 0x1;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>()
{
};
@ -139,18 +135,15 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser4(String.format("topN/%s/%s", query.getThreshold(), query.getDimensionSpec().getDimension()))
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
return QueryMetricUtil.makeQueryTimeMetric(query)
.setUser4(
String.format(
"topN/%s/%s",
query.getThreshold(),
query.getDimensionSpec().getDimension()
)
)
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()));
}
@Override

View File

@ -49,26 +49,17 @@ public class SpatialDimensionRowTransformer implements Function<InputRow, InputR
private static final Joiner JOINER = Joiner.on(",");
private static final Splitter SPLITTER = Splitter.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
private final Map<String, SpatialDimensionSchema> spatialDimensionMap;
private final Set<String> spatialPartialDimNames;
public SpatialDimensionRowTransformer(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, String>()
{
@Override
public String apply(SpatialDimensionSchema input)
{
return input.getDimName();
}
}
)
);
this.spatialDimensionMap = Maps.newHashMap();
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
if (this.spatialDimensionMap.put(spatialDimension.getDimName(), spatialDimension) != null) {
throw new ISE("Duplicate spatial dimension names found! Check your schema yo!");
}
}
this.spatialPartialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
@ -111,7 +102,7 @@ public class SpatialDimensionRowTransformer implements Function<InputRow, InputR
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input) && !spatialPartialDimNames.contains(input);
return !spatialDimensionMap.containsKey(input) && !spatialPartialDimNames.contains(input);
}
}
)
@ -174,32 +165,32 @@ public class SpatialDimensionRowTransformer implements Function<InputRow, InputR
}
};
if (!spatialPartialDimNames.isEmpty()) {
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (Map.Entry<String, SpatialDimensionSchema> entry : spatialDimensionMap.entrySet()) {
final String spatialDimName = entry.getKey();
final SpatialDimensionSchema spatialDim = entry.getValue();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (isSpatialDimValsValid(dimVals)) {
spatialDimVals.addAll(dimVals);
}
}
if (spatialDimVals.size() == spatialPartialDimNames.size()) {
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
}
} else {
for (String spatialDimName : spatialDimNames) {
List<String> dimVals = row.getDimension(spatialDimName);
List<String> dimVals = row.getDimension(spatialDimName);
if (dimVals != null && !dimVals.isEmpty()) {
if (dimVals.size() != 1) {
throw new ISE("Cannot have a spatial dimension value with size[%d]", dimVals.size());
throw new ISE("Spatial dimension value must be in an array!");
}
if (isJoinedSpatialDimValValid(dimVals.get(0))) {
spatialLookup.put(spatialDimName, dimVals);
finalDims.add(spatialDimName);
}
} else {
List<String> spatialDimVals = Lists.newArrayList();
for (String dim : spatialDim.getDims()) {
List<String> partialDimVals = row.getDimension(dim);
if (isSpatialDimValsValid(partialDimVals)) {
spatialDimVals.addAll(partialDimVals);
}
}
if (spatialDimVals.size() == spatialDim.getDims().size()) {
spatialLookup.put(spatialDimName, Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimName);
}
}
}

View File

@ -99,7 +99,7 @@ public class SegmentAnalyzerTest
{
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
(QueryRunnerFactory) new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new QueryConfig()),
new SegmentMetadataQueryQueryToolChest(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
), index
);

View File

@ -49,7 +49,7 @@ public class SegmentMetadataQueryTest
@SuppressWarnings("unchecked")
private final QueryRunner runner = makeQueryRunner(
new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new QueryConfig()),
new SegmentMetadataQueryQueryToolChest(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER)
);
private ObjectMapper mapper = new DefaultObjectMapper();

View File

@ -116,10 +116,10 @@ public class SearchQueryRunnerTest
Map<String, Set<String>> expectedResults = new HashMap<String, Set<String>>();
expectedResults.put(
QueryRunnerTestHelper.qualityDimension, new HashSet<String>(
Arrays.asList(
"automotive", "mezzanine", "travel", "health", "entertainment"
Arrays.asList(
"automotive", "mezzanine", "travel", "health", "entertainment"
)
)
)
);
checkSearchQuery(
@ -367,6 +367,24 @@ public class SearchQueryRunnerTest
);
}
@Test
public void testSearchNonExistingDimension()
{
Map<String, Set<String>> expectedResults = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
checkSearchQuery(
Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimensions("does_not_exist")
.query("a")
.build(),
expectedResults
);
}
private void checkSearchQuery(SearchQuery searchQuery, Map<String, Set<String>> expectedResults)
{
HashMap<String,List> context = new HashMap<String, List>();

View File

@ -22,6 +22,7 @@ package io.druid.segment.filter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import io.druid.data.input.MapBasedInputRow;
@ -46,7 +47,7 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -63,7 +64,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
@ -73,17 +73,13 @@ import java.util.Random;
public class SpatialFilterTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long");
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long", "lat2", "long2");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -120,7 +116,12 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
@ -217,6 +218,18 @@ public class SpatialFilterTest
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"lat2", 0.0f,
"long2", 0.0f,
"val", 13l
)
)
);
// Add a bunch of random points
Random rand = new Random();
@ -247,7 +260,7 @@ public class SpatialFilterTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMaker.persist(theIndex, tmpFile);
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
@ -266,7 +279,12 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
@ -285,10 +303,14 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
TestQueryRunners.pool,
false
@ -305,10 +327,14 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
)
).build(),
TestQueryRunners.pool,
false
@ -405,6 +431,18 @@ public class SpatialFilterTest
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"lat2", 0.0f,
"long2", 0.0f,
"val", 13l
)
)
);
// Add a bunch of random points
Random rand = new Random();
@ -442,12 +480,12 @@ public class SpatialFilterTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMaker.persist(first, DATA_INTERVAL, firstFile);
IndexMaker.persist(second, DATA_INTERVAL, secondFile);
IndexMaker.persist(third, DATA_INTERVAL, thirdFile);
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
@ -461,6 +499,13 @@ public class SpatialFilterTest
}
}
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{
@ -475,7 +520,7 @@ public class SpatialFilterTest
)
)
.aggregators(
Arrays.asList(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
@ -483,7 +528,7 @@ public class SpatialFilterTest
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -504,8 +549,60 @@ public class SpatialFilterTest
factory.createRunner(segment),
factory.getToolchest()
);
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
TestHelper.assertExpectedResults(expectedResults, runner.run(query, Maps.newHashMap()));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void testSpatialQueryWithOtherSpatialDim()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"spatialIsRad",
new RadiusBound(new float[]{0.0f, 0.0f}, 5)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 13l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query, Maps.newHashMap()));
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -526,7 +623,7 @@ public class SpatialFilterTest
)
)
.aggregators(
Arrays.asList(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
@ -534,7 +631,7 @@ public class SpatialFilterTest
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -543,7 +640,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-02T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -552,7 +649,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-03T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -561,7 +658,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-04T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -570,7 +667,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<>(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-05T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -591,8 +688,8 @@ public class SpatialFilterTest
factory.createRunner(segment),
factory.getToolchest()
);
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
TestHelper.assertExpectedResults(expectedResults, runner.run(query, Maps.newHashMap()));
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -97,9 +97,6 @@ public class DruidProcessingModule implements Module
} catch(UnsupportedOperationException e) {
log.info(e.getMessage());
}
catch(RuntimeException e) {
log.warn(e, e.getMessage());
}
return new OffheapBufferPool(config.intermediateComputeSizeBytes());
}

View File

@ -24,17 +24,19 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
@ -42,6 +44,8 @@ import java.util.LinkedList;
*/
public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);
private final File baseDir;
private final String filter;
private final StringInputRowParser parser;
@ -79,26 +83,22 @@ public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParse
@Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{
File[] foundFiles = baseDir.listFiles(
new FilenameFilter()
{
@Override
public boolean accept(File file, String name)
{
return name.contains(filter);
}
}
log.info("Searching for all [%s] in [%s]", filter, baseDir.getAbsoluteFile());
Collection<File> foundFiles = FileUtils.listFiles(
baseDir.getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE
);
if (foundFiles == null || foundFiles.length == 0) {
if (foundFiles == null || foundFiles.isEmpty()) {
throw new ISE("Found no files to ingest! Check your schema.");
}
final LinkedList<File> files = Lists.<File>newLinkedList(
Arrays.asList(foundFiles)
final LinkedList<File> files = Lists.newLinkedList(
foundFiles
);
return new FileIteratingFirehose(
new Iterator<LineIterator>()
{

View File

@ -23,11 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryMetricUtil;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
@ -261,22 +260,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet
final long requestTime = System.currentTimeMillis() - start;
try {
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setUser4(query.getType())
.setUser5(DataSourceUtil.COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(query.getId())
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
QueryMetricUtil.makeRequestTimeMetric(jsonMapper, query, req.getRemoteAddr())
.build("request/time", requestTime)
);
requestLogger.log(

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
@ -35,12 +34,11 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetricUtil;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.RetryQueryRunner;
import io.druid.server.initialization.ServerConfig;
@ -71,7 +69,6 @@ import java.util.UUID;
public class QueryResource
{
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Joiner COMMA_JOIN = Joiner.on(",");
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";
@ -182,22 +179,8 @@ public class QueryResource
long requestTime = System.currentTimeMillis() - start;
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(DataSourceUtil.getMetricName(query.getDataSource()))
.setUser3(
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
)
.setUser4(query.getType())
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
QueryMetricUtil.makeRequestTimeMetric(jsonMapper, query, req.getRemoteAddr())
.build("request/time", requestTime)
);
requestLogger.log(

View File

@ -33,15 +33,21 @@ import java.util.concurrent.ConcurrentHashMap;
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
private final int maxReplicants;
private final int maxLifetime;
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
private volatile int maxReplicants;
private volatile int maxLifetime;
public ReplicationThrottler(int maxReplicants, int maxLifetime)
{
updateParams(maxReplicants, maxLifetime);
}
public void updateParams(int maxReplicants, int maxLifetime)
{
this.maxReplicants = maxReplicants;
this.maxLifetime = maxLifetime;

View File

@ -62,6 +62,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
replicatorThrottler.updateParams(
coordinator.getDynamicConfigs().getReplicationThrottleLimit(),
coordinator.getDynamicConfigs().getReplicantLifetime()
);
CoordinatorStats stats = new CoordinatorStats();
DruidCluster cluster = params.getDruidCluster();

View File

@ -158,6 +158,9 @@ public class JettyServerModule extends JerseyServletModule
ServerConnector connector = new ServerConnector(server);
connector.setPort(node.getPort());
connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
// workaround suggested in -
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=435322#c66 for jetty half open connection issues during failovers
connector.setAcceptorPriorityDelta(-1);
server.setConnectors(new Connector[]{connector});

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.ProvisionException;
import io.druid.query.DruidProcessingConfig;
import org.junit.Test;
public class DruidProcessingModuleTest
{
@Test(expected=ProvisionException.class)
public void testMemoryCheckThrowsException() {
DruidProcessingModule module = new DruidProcessingModule();
module.getIntermediateResultsPool(new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "test";
}
@Override
public int intermediateComputeSizeBytes()
{
return Integer.MAX_VALUE;
}
});
}
}

View File

@ -93,6 +93,7 @@ public class DruidCoordinatorRuleRunnerTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(coordinator);
EasyMock.verify(databaseRuleManager);
}
@ -107,6 +108,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -206,6 +208,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -298,6 +301,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -379,6 +383,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -437,6 +442,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunRuleDoesNotExist() throws Exception
{
mockCoordinator();
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(availableSegments.size());
EasyMock.replay(emitter);
@ -492,6 +498,11 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(coordinator);
@ -552,6 +563,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropTooManyInSameTier() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -629,6 +641,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
@ -712,6 +725,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDontDropInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -791,6 +805,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropServerActuallyServesSegment() throws Exception
{
mockCoordinator();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal")
@ -889,6 +904,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -989,6 +1005,14 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 7, 0, false
)
).atLeastOnce();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -1070,6 +1094,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -1155,4 +1180,16 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
}
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
}

View File

@ -71,6 +71,8 @@ import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactoryImpl
import io.druid.indexing.overlord.scaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementConfig;
import io.druid.indexing.overlord.scaling.SimpleResourceManagementStrategy;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -200,6 +202,20 @@ public class CliOverlord extends ServerRunnable
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.indexer.runner.workerSelectStrategy.type",
Key.get(WorkerSelectStrategy.class),
Key.get(FillCapacityWorkerSelectStrategy.class)
);
final MapBinder<String, WorkerSelectStrategy> stratBinder = PolyBind.optionBinder(
binder,
Key.get(WorkerSelectStrategy.class)
);
stratBinder.addBinding("fillCapacity").to(FillCapacityWorkerSelectStrategy.class);
binder.bind(FillCapacityWorkerSelectStrategy.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)