mirror of https://github.com/apache/druid.git
overlord helpers framework and tasklog auto cleanup (#3677)
* overlord helpers framework and tasklog auto cleanup * review comment changes * further review comments addressed
This commit is contained in:
parent
6440ddcbca
commit
4ca3b7f1e4
|
@ -48,4 +48,10 @@ public class NoopTaskLogs implements TaskLogs
|
|||
{
|
||||
log.info("Noop: No task logs are deleted.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(long timestamp) throws IOException
|
||||
{
|
||||
log.info("Noop: No task logs are deleted.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,4 +26,5 @@ import java.io.IOException;
|
|||
public interface TaskLogKiller
|
||||
{
|
||||
void killAll() throws IOException;
|
||||
void killOlderThan(long timestamp) throws IOException;
|
||||
}
|
||||
|
|
|
@ -25,6 +25,16 @@ If you are running the indexing service in remote mode, the task logs must be st
|
|||
|--------|-----------|-------|
|
||||
|`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
|
||||
|
||||
You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties.
|
||||
Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior.
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false|
|
||||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None|
|
||||
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)|
|
||||
|`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)|
|
||||
|
||||
##### File Task Logs
|
||||
|
||||
Store task logs in the local filesystem.
|
||||
|
|
|
@ -120,4 +120,10 @@ public class AzureTaskLogs implements TaskLogs {
|
|||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(long timestamp) throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,4 +106,10 @@ public class GoogleTaskLogs implements TaskLogs {
|
|||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(long timestamp) throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -123,6 +125,35 @@ public class HdfsTaskLogs implements TaskLogs
|
|||
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
|
||||
fs.delete(taskLogDir, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(long timestamp) throws IOException
|
||||
{
|
||||
Path taskLogDir = new Path(config.getDirectory());
|
||||
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
|
||||
if (fs.exists(taskLogDir)) {
|
||||
|
||||
if (!fs.isDirectory(taskLogDir)) {
|
||||
throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir));
|
||||
}
|
||||
|
||||
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(taskLogDir);
|
||||
while (iter.hasNext()) {
|
||||
LocatedFileStatus file = iter.next();
|
||||
if (file.getModificationTime() < timestamp) {
|
||||
Path p = file.getPath();
|
||||
log.info("Deleting hdfs task log [%s].", p.toUri().toString());
|
||||
fs.delete(p, true);
|
||||
}
|
||||
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
throw new IOException(
|
||||
new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs;
|
|||
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
|
||||
import io.druid.tasklogs.TaskLogs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -53,7 +55,7 @@ public class HdfsTaskLogsTest
|
|||
|
||||
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
|
||||
for (Map.Entry<Long, String> entry : expected.entrySet()) {
|
||||
final String string = readLog(taskLogs, entry.getKey());
|
||||
final String string = readLog(taskLogs, "foo", entry.getKey());
|
||||
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
@ -68,17 +70,52 @@ public class HdfsTaskLogsTest
|
|||
|
||||
Files.write("blah", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("foo", logFile);
|
||||
Assert.assertEquals("blah", readLog(taskLogs, 0));
|
||||
Assert.assertEquals("blah", readLog(taskLogs, "foo", 0));
|
||||
|
||||
Files.write("blah blah", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("foo", logFile);
|
||||
Assert.assertEquals("blah blah", readLog(taskLogs, 0));
|
||||
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
|
||||
}
|
||||
|
||||
private String readLog(TaskLogs taskLogs, long offset) throws IOException
|
||||
@Test
|
||||
public void testKill() throws Exception
|
||||
{
|
||||
final File tmpDir = tempFolder.newFolder();
|
||||
final File logDir = new File(tmpDir, "logs");
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
|
||||
final Path logDirPath = new Path(logDir.toString());
|
||||
FileSystem fs = new Path(logDir.toString()).getFileSystem(new Configuration());
|
||||
|
||||
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
|
||||
|
||||
Files.write("log1content", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("log1", logFile);
|
||||
Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0));
|
||||
|
||||
//File modification timestamp is only maintained to seconds resolution, so artificial delay
|
||||
//is necessary to separate 2 file creations by a timestamp that would result in only one
|
||||
//of them getting deleted
|
||||
Thread.sleep(1500);
|
||||
long time = (System.currentTimeMillis()/1000)*1000;
|
||||
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time);
|
||||
|
||||
Files.write("log2content", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("log2", logFile);
|
||||
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
|
||||
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time);
|
||||
|
||||
taskLogs.killOlderThan(time);
|
||||
|
||||
Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent());
|
||||
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
|
||||
|
||||
}
|
||||
|
||||
private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException
|
||||
{
|
||||
return new String(
|
||||
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()),
|
||||
ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()),
|
||||
Charsets.UTF_8
|
||||
);
|
||||
}
|
||||
|
|
|
@ -144,4 +144,10 @@ public class S3TaskLogs implements TaskLogs
|
|||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(long timestamp) throws IOException
|
||||
{
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.druid.tasklogs.TaskLogs;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
|
@ -89,4 +90,38 @@ public class FileTaskLogs implements TaskLogs
|
|||
log.info("Deleting all task logs from local dir [%s].", config.getDirectory().getAbsolutePath());
|
||||
FileUtils.deleteDirectory(config.getDirectory());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void killOlderThan(final long timestamp) throws IOException
|
||||
{
|
||||
File taskLogDir = config.getDirectory();
|
||||
if (taskLogDir.exists()) {
|
||||
|
||||
if (!taskLogDir.isDirectory()) {
|
||||
throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir));
|
||||
}
|
||||
|
||||
File[] files = taskLogDir.listFiles(
|
||||
new FileFilter()
|
||||
{
|
||||
@Override
|
||||
public boolean accept(File f)
|
||||
{
|
||||
return f.lastModified() < timestamp;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
for (File file : files) {
|
||||
log.info("Deleting local task log [%s].", file.getAbsolutePath());
|
||||
FileUtils.forceDelete(file);
|
||||
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
throw new IOException(
|
||||
new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
|
|||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.autoscaling.ScalingStats;
|
||||
import io.druid.indexing.overlord.config.TaskQueueConfig;
|
||||
import io.druid.indexing.overlord.helpers.OverlordHelperManager;
|
||||
import io.druid.indexing.overlord.supervisor.SupervisorManager;
|
||||
import io.druid.java.util.common.lifecycle.Lifecycle;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
|
@ -78,7 +79,8 @@ public class TaskMaster
|
|||
final CuratorFramework curator,
|
||||
final ServiceAnnouncer serviceAnnouncer,
|
||||
final ServiceEmitter emitter,
|
||||
final SupervisorManager supervisorManager
|
||||
final SupervisorManager supervisorManager,
|
||||
final OverlordHelperManager overlordHelperManager
|
||||
)
|
||||
{
|
||||
this.supervisorManager = supervisorManager;
|
||||
|
@ -120,6 +122,7 @@ public class TaskMaster
|
|||
leaderLifecycle.addManagedInstance(taskRunner);
|
||||
leaderLifecycle.addManagedInstance(taskQueue);
|
||||
leaderLifecycle.addManagedInstance(supervisorManager);
|
||||
leaderLifecycle.addManagedInstance(overlordHelperManager);
|
||||
|
||||
leaderLifecycle.addHandler(
|
||||
new Lifecycle.Handler()
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.helpers;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface OverlordHelper
|
||||
{
|
||||
boolean isEnabled();
|
||||
void schedule(ScheduledExecutorService exec);
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.helpers;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class OverlordHelperManager
|
||||
{
|
||||
private static final Logger log = new Logger(OverlordHelperManager.class);
|
||||
|
||||
private final ScheduledExecutorFactory execFactory;
|
||||
private final Set<OverlordHelper> helpers;
|
||||
|
||||
private volatile ScheduledExecutorService exec;
|
||||
private final Object startStopLock = new Object();
|
||||
private volatile boolean started = false;
|
||||
|
||||
@Inject
|
||||
public OverlordHelperManager(
|
||||
ScheduledExecutorFactory scheduledExecutorFactory,
|
||||
Set<OverlordHelper> helpers)
|
||||
{
|
||||
this.execFactory = scheduledExecutorFactory;
|
||||
this.helpers = helpers;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (startStopLock) {
|
||||
if (!started) {
|
||||
log.info("OverlordHelperManager is starting.");
|
||||
|
||||
for (OverlordHelper helper : helpers) {
|
||||
if (helper.isEnabled()) {
|
||||
if (exec == null) {
|
||||
exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d");
|
||||
}
|
||||
helper.schedule(exec);
|
||||
}
|
||||
}
|
||||
started = true;
|
||||
|
||||
log.info("OverlordHelperManager is started.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (startStopLock) {
|
||||
if (started) {
|
||||
log.info("OverlordHelperManager is stopping.");
|
||||
if (exec != null) {
|
||||
exec.shutdownNow();
|
||||
exec = null;
|
||||
}
|
||||
started = false;
|
||||
|
||||
log.info("OverlordHelperManager is stopped.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.helpers;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.concurrent.ScheduledExecutors;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.tasklogs.TaskLogKiller;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TaskLogAutoCleaner implements OverlordHelper
|
||||
{
|
||||
private static final Logger log = new Logger(TaskLogAutoCleaner.class);
|
||||
|
||||
private final TaskLogKiller taskLogKiller;
|
||||
private final TaskLogAutoCleanerConfig config;
|
||||
|
||||
@Inject
|
||||
public TaskLogAutoCleaner(
|
||||
TaskLogKiller taskLogKiller,
|
||||
TaskLogAutoCleanerConfig config
|
||||
)
|
||||
{
|
||||
this.taskLogKiller = taskLogKiller;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled()
|
||||
{
|
||||
return config.isEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void schedule(ScheduledExecutorService exec)
|
||||
{
|
||||
log.info("Scheduling TaskLogAutoCleaner with config [%s].", config.toString());
|
||||
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec,
|
||||
Duration.millis(config.getInitialDelay()),
|
||||
Duration.millis(config.getDelay()),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to clean-up the task logs");
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.helpers;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TaskLogAutoCleanerConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private final boolean enabled;
|
||||
|
||||
@JsonProperty
|
||||
private final long initialDelay;
|
||||
|
||||
@JsonProperty
|
||||
private final long delay;
|
||||
|
||||
@JsonProperty
|
||||
private final long durationToRetain;
|
||||
|
||||
@JsonCreator
|
||||
public TaskLogAutoCleanerConfig(
|
||||
@JsonProperty("enabled") boolean enabled,
|
||||
@JsonProperty("initialDelay") Long initialDelay,
|
||||
@JsonProperty("delay") Long delay,
|
||||
@JsonProperty("durationToRetain") Long durationToRetain
|
||||
){
|
||||
if (enabled) {
|
||||
Preconditions.checkNotNull(durationToRetain, "durationToRetain must be provided.");
|
||||
}
|
||||
|
||||
this.enabled = enabled;
|
||||
this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue();
|
||||
this.delay = delay == null ? 6*60*60*1000 : delay.longValue();
|
||||
this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue();
|
||||
|
||||
Preconditions.checkArgument(this.initialDelay > 0, "initialDelay must be > 0.");
|
||||
Preconditions.checkArgument(this.delay > 0, "delay must be > 0.");
|
||||
Preconditions.checkArgument(this.durationToRetain > 0, "durationToRetain must be > 0.");
|
||||
}
|
||||
|
||||
public boolean isEnabled()
|
||||
{
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public long getInitialDelay()
|
||||
{
|
||||
return initialDelay;
|
||||
}
|
||||
|
||||
public long getDelay()
|
||||
{
|
||||
return delay;
|
||||
}
|
||||
|
||||
public long getDurationToRetain()
|
||||
{
|
||||
return durationToRetain;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "TaskLogAutoCleanerConfig{" +
|
||||
"enabled=" + enabled +
|
||||
", initialDelay=" + initialDelay +
|
||||
", delay=" + delay +
|
||||
", durationToRetain=" + durationToRetain +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -86,4 +86,43 @@ public class FileTaskLogsTest
|
|||
expectedException.expectMessage("Unable to create task log dir");
|
||||
taskLogs.pushTaskLog("foo", logFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKill() throws Exception
|
||||
{
|
||||
final File tmpDir = temporaryFolder.newFolder();
|
||||
final File logDir = new File(tmpDir, "logs");
|
||||
final File logFile = new File(tmpDir, "log");
|
||||
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
|
||||
|
||||
Files.write("log1content", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("log1", logFile);
|
||||
Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0));
|
||||
|
||||
//File modification timestamp is only maintained to seconds resolution, so artificial delay
|
||||
//is necessary to separate 2 file creations by a timestamp that would result in only one
|
||||
//of them getting deleted
|
||||
Thread.sleep(1500);
|
||||
long time = (System.currentTimeMillis()/1000)*1000;
|
||||
Assert.assertTrue(new File(logDir, "log1.log").lastModified() < time);
|
||||
|
||||
Files.write("log2content", logFile, Charsets.UTF_8);
|
||||
taskLogs.pushTaskLog("log2", logFile);
|
||||
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
|
||||
Assert.assertTrue(new File(logDir, "log2.log").lastModified() >= time);
|
||||
|
||||
taskLogs.killOlderThan(time);
|
||||
|
||||
Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent());
|
||||
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
|
||||
|
||||
}
|
||||
|
||||
private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException
|
||||
{
|
||||
return new String(
|
||||
ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()),
|
||||
Charsets.UTF_8
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.indexing.overlord.helpers;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.druid.TestUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TaskLogAutoCleanerConfigTest
|
||||
{
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
String json = "{\n"
|
||||
+ " \"enabled\": true,\n"
|
||||
+ " \"initialDelay\": 10,\n"
|
||||
+ " \"delay\": 40,\n"
|
||||
+ " \"durationToRetain\": 30\n"
|
||||
+ "}";
|
||||
|
||||
ObjectMapper mapper = TestUtil.MAPPER;
|
||||
|
||||
TaskLogAutoCleanerConfig config = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
json,
|
||||
TaskLogAutoCleanerConfig.class
|
||||
)
|
||||
), TaskLogAutoCleanerConfig.class
|
||||
);
|
||||
|
||||
Assert.assertTrue(config.isEnabled());
|
||||
Assert.assertEquals(10, config.getInitialDelay());
|
||||
Assert.assertEquals(40, config.getDelay());
|
||||
Assert.assertEquals(30, config.getDurationToRetain());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithDefaults() throws Exception
|
||||
{
|
||||
String json = "{}";
|
||||
|
||||
ObjectMapper mapper = TestUtil.MAPPER;
|
||||
|
||||
TaskLogAutoCleanerConfig config = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
json,
|
||||
TaskLogAutoCleanerConfig.class
|
||||
)
|
||||
), TaskLogAutoCleanerConfig.class
|
||||
);
|
||||
|
||||
Assert.assertFalse(config.isEnabled());
|
||||
Assert.assertTrue(config.getInitialDelay() >= 60000 && config.getInitialDelay() <= 300000);
|
||||
Assert.assertEquals(6*60*60*1000, config.getDelay());
|
||||
Assert.assertEquals(Long.MAX_VALUE, config.getDurationToRetain());
|
||||
}
|
||||
}
|
|
@ -48,6 +48,7 @@ import io.druid.indexing.overlord.TaskStorage;
|
|||
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
|
||||
import io.druid.indexing.overlord.autoscaling.ScalingStats;
|
||||
import io.druid.indexing.overlord.config.TaskQueueConfig;
|
||||
import io.druid.indexing.overlord.helpers.OverlordHelperManager;
|
||||
import io.druid.indexing.overlord.supervisor.SupervisorManager;
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.java.util.common.guava.CloseQuietly;
|
||||
|
@ -183,7 +184,8 @@ public class OverlordTest
|
|||
}
|
||||
},
|
||||
serviceEmitter,
|
||||
supervisorManager
|
||||
supervisorManager,
|
||||
EasyMock.createNiceMock(OverlordHelperManager.class)
|
||||
);
|
||||
EmittingLogger.registerEmitter(serviceEmitter);
|
||||
}
|
||||
|
|
|
@ -26,10 +26,10 @@ import com.google.inject.Key;
|
|||
import com.google.inject.Module;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.google.inject.multibindings.MapBinder;
|
||||
import com.google.inject.multibindings.Multibinder;
|
||||
import com.google.inject.name.Names;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.google.inject.util.Providers;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.druid.audit.AuditManager;
|
||||
import io.druid.client.indexing.IndexingServiceSelectorConfig;
|
||||
|
@ -67,6 +67,9 @@ import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
|
|||
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig;
|
||||
import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy;
|
||||
import io.druid.indexing.overlord.config.TaskQueueConfig;
|
||||
import io.druid.indexing.overlord.helpers.OverlordHelper;
|
||||
import io.druid.indexing.overlord.helpers.TaskLogAutoCleaner;
|
||||
import io.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig;
|
||||
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
|
||||
import io.druid.indexing.overlord.http.OverlordResource;
|
||||
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
|
||||
|
@ -149,6 +152,7 @@ public class CliOverlord extends ServerRunnable
|
|||
configureTaskStorage(binder);
|
||||
configureAutoscale(binder);
|
||||
configureRunners(binder);
|
||||
configureOverlordHelpers(binder);
|
||||
|
||||
binder.bind(AuditManager.class)
|
||||
.toProvider(AuditManagerProvider.class)
|
||||
|
@ -232,6 +236,14 @@ public class CliOverlord extends ServerRunnable
|
|||
biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class);
|
||||
|
||||
}
|
||||
|
||||
private void configureOverlordHelpers(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class);
|
||||
Multibinder.newSetBinder(binder, OverlordHelper.class)
|
||||
.addBinding()
|
||||
.to(TaskLogAutoCleaner.class);
|
||||
}
|
||||
},
|
||||
new IndexingServiceFirehoseModule(),
|
||||
new IndexingServiceTaskLogsModule()
|
||||
|
|
Loading…
Reference in New Issue