Merge remote-tracking branch 'origin/master' into new_balancer

This commit is contained in:
Nelson Ray 2013-01-03 09:10:48 -08:00
commit 5441662cb8
21 changed files with 142 additions and 48 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -83,7 +83,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
}
},
toolChest.preMergeQueryDecoration(baseClient)
)
).withWaitMeasuredFromNow()
)
),
toolChest

View File

@ -0,0 +1,72 @@
package com.metamx.druid.query;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MetricsEmittingExecutorService extends AbstractExecutorService
{
private final ExecutorService base;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;
public MetricsEmittingExecutorService(
ExecutorService base,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder
)
{
this.base = base;
this.emitter = emitter;
this.metricBuilder = metricBuilder;
}
@Override
public void shutdown()
{
base.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return base.shutdownNow();
}
@Override
public boolean isShutdown()
{
return base.isShutdown();
}
@Override
public boolean isTerminated()
{
return base.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return base.awaitTermination(l, timeUnit);
}
@Override
public void execute(Runnable runnable)
{
emitMetrics();
base.execute(runnable);
}
private void emitMetrics()
{
if (base instanceof ThreadPoolExecutor) {
emitter.emit(metricBuilder.build("exec/backlog", ((ThreadPoolExecutor) base).getQueue().size()));
}
}
}

View File

@ -37,16 +37,33 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner;
private final long creationTime;
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner
)
{
this(emitter, builderFn, queryRunner, -1);
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
long creationTime
)
{
this.emitter = emitter;
this.builderFn = builderFn;
this.queryRunner = queryRunner;
this.creationTime = creationTime;
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
}
@Override
@ -77,6 +94,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
}
return retVal;
@ -149,6 +170,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
yielder.close();
}
};

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -181,7 +181,7 @@ public class QueryGranularityTest
);
}
//@Test
@Test
public void testPeriodDaylightSaving() throws Exception
{
final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<modules>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -211,10 +211,10 @@ public class RemoteTaskRunner implements TaskRunner
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert(
"It has been %d millis since last scheduled termination but nodes remain",
durSinceLastTerminate.getMillis()
).emit();
log.makeAlert("Worker node termination taking too long")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
}
log.info(
@ -330,7 +330,9 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Registering retry for failed task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
log.makeAlert("Task exceeded maximum retry count")
.addData("task", task.getId())
.addData("retryCount", retryPolicy.getNumRetries())
.emit();
return;
}
@ -542,10 +544,10 @@ public class RemoteTaskRunner implements TaskRunner
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert(
"It has been %d millis since last scheduled provision but nodes remain",
durSinceLastProvision.getMillis()
).emit();
log.makeAlert("Worker node provisioning taking too long")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
}
log.info(

View File

@ -20,11 +20,9 @@
package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
@ -35,12 +33,9 @@ import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.VersionedTaskWrapper;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.concurrent.ExecutorService;
public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
@ -242,17 +237,12 @@ public class TaskConsumer implements Runnable
emitter.emit(builder.build("indexer/segment/bytes", bytes));
if (status.isFailure()) {
emitter.emit(
new AlertEvent.Builder().build(
String.format("Failed to index: %s", task.getDataSource()),
ImmutableMap.<String, Object>builder()
.put("task", task.getId())
.put("type", task.getType().toString())
.put("dataSource", task.getDataSource())
.put("interval", task.getInterval())
.build()
)
);
log.makeAlert("Failed to index")
.addData("task", task.getId())
.addData("type", task.getType().toString())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getInterval())
.emit();
}
log.info(

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<properties>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.1.24-SNAPSHOT</version>
<version>0.1.25-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -336,7 +336,7 @@ public class ServerManager implements QuerySegmentWalker
adapter.getInterval().getStart(),
factory.createRunner(adapter)
)
),
).withWaitMeasuredFromNow(),
segmentSpec
);
}

View File

@ -40,9 +40,11 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
@ -118,13 +120,16 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService executorService = ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
final ExecutorService executorService = new MetricsEmittingExecutorService(
ExecutorServices.create(
getLifecycle(),
getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
)
), emitter, new ServiceMetricEvent.Builder()
);
ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(),

View File

@ -83,7 +83,7 @@ public class PeriodLoadRule extends LoadRule
@Override
public boolean appliesTo(DataSegment segment)
{
final Interval currInterval = new Interval(new DateTime().minus(period), period);
final Interval currInterval = new Interval(period, new DateTime());
return currInterval.overlaps(segment.getInterval());
}
}