Configurability of recency threshold

This commit is contained in:
Gian Merlino 2013-12-13 16:02:54 -08:00
parent 4a8140be81
commit 600dc7546f
6 changed files with 65 additions and 20 deletions

View File

@ -0,0 +1,27 @@
package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
public class TaskStorageConfig
{
@JsonProperty
private final Duration recentlyFinishedThreshold;
@JsonCreator
public TaskStorageConfig(
@JsonProperty("recentlyFinishedThreshold") final Period recentlyFinishedThreshold
)
{
this.recentlyFinishedThreshold = recentlyFinishedThreshold == null
? new Period("PT24H").toStandardDuration()
: recentlyFinishedThreshold.toStandardDuration();
}
public Duration getRecentlyFinishedThreshold()
{
return recentlyFinishedThreshold;
}
}

View File

@ -40,6 +40,7 @@ import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
@ -64,17 +65,24 @@ public class DbTaskStorage implements TaskStorage
private final DbConnector dbConnector;
private final DbTablesConfig dbTables;
private final IDBI dbi;
private final TaskStorageConfig config;
private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis();
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
public DbTaskStorage(ObjectMapper jsonMapper, DbConnector dbConnector, DbTablesConfig dbTables, IDBI dbi)
public DbTaskStorage(
final ObjectMapper jsonMapper,
final DbConnector dbConnector,
final DbTablesConfig dbTables,
final IDBI dbi,
final TaskStorageConfig config
)
{
this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector;
this.dbTables = dbTables;
this.dbi = dbi;
this.config = config;
}
@LifecycleStart
@ -275,7 +283,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(RECENCY_THRESHOLD);
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{

View File

@ -28,15 +28,15 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
@ -47,14 +47,21 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class HeapMemoryTaskStorage implements TaskStorage
{
private final TaskStorageConfig config;
private final ReentrantLock giant = new ReentrantLock();
private final Map<String, TaskStuff> tasks = Maps.newHashMap();
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
private static final long RECENCY_THRESHOLD = new Period("PT24H").toStandardDuration().getMillis();
private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@Inject
public HeapMemoryTaskStorage(TaskStorageConfig config)
{
this.config = config;
}
@Override
public void insert(Task task, TaskStatus status)
{
@ -158,7 +165,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
try {
final List<TaskStatus> returns = Lists.newArrayList();
final long recent = System.currentTimeMillis() - RECENCY_THRESHOLD;
final long recent = System.currentTimeMillis() - config.getRecentlyFinishedThreshold().getMillis();
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
{
@Override

View File

@ -368,20 +368,13 @@ public class OverlordResource
@Produces("application/json")
public Response getScalingState()
{
if (!taskMaster.getResourceManagementScheduler().isPresent()) {
// Don't use asLeaderWith, since we want to return 200 instead of 503 when missing an autoscaler.
final Optional<ResourceManagementScheduler> rms = taskMaster.getResourceManagementScheduler();
if (rms.isPresent()) {
return Response.ok(rms.get().getStats()).build();
} else {
return Response.ok().build();
}
return asLeaderWith(
taskMaster.getResourceManagementScheduler(),
new Function<ResourceManagementScheduler, Response>()
{
@Override
public Response apply(ResourceManagementScheduler resourceManagementScheduler)
{
return Response.ok(resourceManagementScheduler.getStats()).build();
}
}
);
}
@GET

View File

@ -54,6 +54,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask;
@ -75,7 +76,9 @@ import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -120,7 +123,11 @@ public class TaskLifecycleTest
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
TaskQueueConfig.class
);
ts = new HeapMemoryTaskStorage();
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(new Period("PT24H"))
{
}
);
tsqa = new TaskStorageQueryAdapter(ts);
tl = new TaskLockbox(ts);
mdc = newMockMDC();

View File

@ -45,6 +45,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.index.ChatHandlerProvider;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
@ -154,6 +155,8 @@ public class CliOverlord extends ServerRunnable
private void configureTaskStorage(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
);