diff --git a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java index 4eaf72d14cf..f82bf1d7b58 100644 --- a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java +++ b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java @@ -48,4 +48,10 @@ public class NoopTaskLogs implements TaskLogs { log.info("Noop: No task logs are deleted."); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + log.info("Noop: No task logs are deleted."); + } } diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java index 39a879a0ccd..5296ea29a8f 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java @@ -26,4 +26,5 @@ import java.io.IOException; public interface TaskLogKiller { void killAll() throws IOException; + void killOlderThan(long timestamp) throws IOException; } diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 6eca7bc1f50..348b6b16796 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -25,6 +25,16 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| +You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties. +Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| +|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None| +|`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| +|`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)| + ##### File Task Logs Store task logs in the local filesystem. diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 5ea9a324299..1ec0c2c7b67 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -120,4 +120,10 @@ public class AzureTaskLogs implements TaskLogs { { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java index 90cf4738150..08cf00d82f0 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java @@ -106,4 +106,10 @@ public class GoogleTaskLogs implements TaskLogs { { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 811cb012de2..f438b664367 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -28,7 +28,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import java.io.File; import java.io.FileInputStream; @@ -123,6 +125,35 @@ public class HdfsTaskLogs implements TaskLogs FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); fs.delete(taskLogDir, true); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + Path taskLogDir = new Path(config.getDirectory()); + FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); + if (fs.exists(taskLogDir)) { + + if (!fs.isDirectory(taskLogDir)) { + throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir)); + } + + RemoteIterator iter = fs.listLocatedStatus(taskLogDir); + while (iter.hasNext()) { + LocatedFileStatus file = iter.next(); + if (file.getModificationTime() < timestamp) { + Path p = file.getPath(); + log.info("Deleting hdfs task log [%s].", p.toUri().toString()); + fs.delete(p, true); + } + + if (Thread.currentThread().isInterrupted()) { + throw new IOException( + new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.") + ); + } + } + } + } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index c3b253096a3..a90164ee67e 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -27,6 +27,8 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogs; import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import io.druid.tasklogs.TaskLogs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -53,7 +55,7 @@ public class HdfsTaskLogsTest final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); for (Map.Entry entry : expected.entrySet()) { - final String string = readLog(taskLogs, entry.getKey()); + final String string = readLog(taskLogs, "foo", entry.getKey()); Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); } } @@ -68,17 +70,52 @@ public class HdfsTaskLogsTest Files.write("blah", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("foo", logFile); - Assert.assertEquals("blah", readLog(taskLogs, 0)); + Assert.assertEquals("blah", readLog(taskLogs, "foo", 0)); Files.write("blah blah", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("foo", logFile); - Assert.assertEquals("blah blah", readLog(taskLogs, 0)); + Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0)); } - private String readLog(TaskLogs taskLogs, long offset) throws IOException + @Test + public void testKill() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + + final Path logDirPath = new Path(logDir.toString()); + FileSystem fs = new Path(logDir.toString()).getFileSystem(new Configuration()); + + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.write("log1content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log1", logFile); + Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); + + //File modification timestamp is only maintained to seconds resolution, so artificial delay + //is necessary to separate 2 file creations by a timestamp that would result in only one + //of them getting deleted + Thread.sleep(1500); + long time = (System.currentTimeMillis()/1000)*1000; + Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time); + + Files.write("log2content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log2", logFile); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time); + + taskLogs.killOlderThan(time); + + Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + } + + private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException { return new String( - ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()), + ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()), Charsets.UTF_8 ); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index 7f854e86f30..0a3cdcebe0f 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -144,4 +144,10 @@ public class S3TaskLogs implements TaskLogs { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index 4b24e6a09ee..9e36fdca818 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -29,6 +29,7 @@ import io.druid.tasklogs.TaskLogs; import org.apache.commons.io.FileUtils; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.io.InputStream; @@ -89,4 +90,38 @@ public class FileTaskLogs implements TaskLogs log.info("Deleting all task logs from local dir [%s].", config.getDirectory().getAbsolutePath()); FileUtils.deleteDirectory(config.getDirectory()); } + + @Override + public void killOlderThan(final long timestamp) throws IOException + { + File taskLogDir = config.getDirectory(); + if (taskLogDir.exists()) { + + if (!taskLogDir.isDirectory()) { + throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir)); + } + + File[] files = taskLogDir.listFiles( + new FileFilter() + { + @Override + public boolean accept(File f) + { + return f.lastModified() < timestamp; + } + } + ); + + for (File file : files) { + log.info("Deleting local task log [%s].", file.getAbsolutePath()); + FileUtils.forceDelete(file); + + if (Thread.currentThread().isInterrupted()) { + throw new IOException( + new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.") + ); + } + } + } + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 253b2c9e463..93ced507882 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -31,6 +31,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelperManager; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -78,7 +79,8 @@ public class TaskMaster final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, final ServiceEmitter emitter, - final SupervisorManager supervisorManager + final SupervisorManager supervisorManager, + final OverlordHelperManager overlordHelperManager ) { this.supervisorManager = supervisorManager; @@ -120,6 +122,7 @@ public class TaskMaster leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordHelperManager); leaderLifecycle.addHandler( new Lifecycle.Handler() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java new file mode 100644 index 00000000000..0b5cbf91922 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java @@ -0,0 +1,30 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public interface OverlordHelper +{ + boolean isEnabled(); + void schedule(ScheduledExecutorService exec); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java new file mode 100644 index 00000000000..ee95def43c2 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java @@ -0,0 +1,91 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.google.inject.Inject; +import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class OverlordHelperManager +{ + private static final Logger log = new Logger(OverlordHelperManager.class); + + private final ScheduledExecutorFactory execFactory; + private final Set helpers; + + private volatile ScheduledExecutorService exec; + private final Object startStopLock = new Object(); + private volatile boolean started = false; + + @Inject + public OverlordHelperManager( + ScheduledExecutorFactory scheduledExecutorFactory, + Set helpers) + { + this.execFactory = scheduledExecutorFactory; + this.helpers = helpers; + } + + @LifecycleStart + public void start() + { + synchronized (startStopLock) { + if (!started) { + log.info("OverlordHelperManager is starting."); + + for (OverlordHelper helper : helpers) { + if (helper.isEnabled()) { + if (exec == null) { + exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d"); + } + helper.schedule(exec); + } + } + started = true; + + log.info("OverlordHelperManager is started."); + } + } + } + + @LifecycleStop + public void stop() + { + synchronized (startStopLock) { + if (started) { + log.info("OverlordHelperManager is stopping."); + if (exec != null) { + exec.shutdownNow(); + exec = null; + } + started = false; + + log.info("OverlordHelperManager is stopped."); + } + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java new file mode 100644 index 00000000000..d413d6ae46f --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -0,0 +1,79 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.google.inject.Inject; +import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.logger.Logger; +import io.druid.tasklogs.TaskLogKiller; +import org.joda.time.Duration; + +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class TaskLogAutoCleaner implements OverlordHelper +{ + private static final Logger log = new Logger(TaskLogAutoCleaner.class); + + private final TaskLogKiller taskLogKiller; + private final TaskLogAutoCleanerConfig config; + + @Inject + public TaskLogAutoCleaner( + TaskLogKiller taskLogKiller, + TaskLogAutoCleanerConfig config + ) + { + this.taskLogKiller = taskLogKiller; + this.config = config; + } + + @Override + public boolean isEnabled() + { + return config.isEnabled(); + } + + @Override + public void schedule(ScheduledExecutorService exec) + { + log.info("Scheduling TaskLogAutoCleaner with config [%s].", config.toString()); + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + Duration.millis(config.getInitialDelay()), + Duration.millis(config.getDelay()), + new Runnable() + { + @Override + public void run() + { + try { + taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain()); + } + catch (Exception ex) { + log.error(ex, "Failed to clean-up the task logs"); + } + } + } + ); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java new file mode 100644 index 00000000000..b5907e9778c --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java @@ -0,0 +1,95 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Random; + +/** + */ +public class TaskLogAutoCleanerConfig +{ + @JsonProperty + private final boolean enabled; + + @JsonProperty + private final long initialDelay; + + @JsonProperty + private final long delay; + + @JsonProperty + private final long durationToRetain; + + @JsonCreator + public TaskLogAutoCleanerConfig( + @JsonProperty("enabled") boolean enabled, + @JsonProperty("initialDelay") Long initialDelay, + @JsonProperty("delay") Long delay, + @JsonProperty("durationToRetain") Long durationToRetain + ){ + if (enabled) { + Preconditions.checkNotNull(durationToRetain, "durationToRetain must be provided."); + } + + this.enabled = enabled; + this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); + this.delay = delay == null ? 6*60*60*1000 : delay.longValue(); + this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); + + Preconditions.checkArgument(this.initialDelay > 0, "initialDelay must be > 0."); + Preconditions.checkArgument(this.delay > 0, "delay must be > 0."); + Preconditions.checkArgument(this.durationToRetain > 0, "durationToRetain must be > 0."); + } + + public boolean isEnabled() + { + return this.enabled; + } + + public long getInitialDelay() + { + return initialDelay; + } + + public long getDelay() + { + return delay; + } + + public long getDurationToRetain() + { + return durationToRetain; + } + + @Override + public String toString() + { + return "TaskLogAutoCleanerConfig{" + + "enabled=" + enabled + + ", initialDelay=" + initialDelay + + ", delay=" + delay + + ", durationToRetain=" + durationToRetain + + '}'; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 79415c07310..64c97666ffa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -86,4 +86,43 @@ public class FileTaskLogsTest expectedException.expectMessage("Unable to create task log dir"); taskLogs.pushTaskLog("foo", logFile); } + + @Test + public void testKill() throws Exception + { + final File tmpDir = temporaryFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir)); + + Files.write("log1content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log1", logFile); + Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); + + //File modification timestamp is only maintained to seconds resolution, so artificial delay + //is necessary to separate 2 file creations by a timestamp that would result in only one + //of them getting deleted + Thread.sleep(1500); + long time = (System.currentTimeMillis()/1000)*1000; + Assert.assertTrue(new File(logDir, "log1.log").lastModified() < time); + + Files.write("log2content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log2", logFile); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + Assert.assertTrue(new File(logDir, "log2.log").lastModified() >= time); + + taskLogs.killOlderThan(time); + + Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + } + + private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException + { + return new String( + ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()), + Charsets.UTF_8 + ); + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java new file mode 100644 index 00000000000..1a5b0e54c23 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java @@ -0,0 +1,79 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.TestUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class TaskLogAutoCleanerConfigTest +{ + @Test + public void testSerde() throws Exception + { + String json = "{\n" + + " \"enabled\": true,\n" + + " \"initialDelay\": 10,\n" + + " \"delay\": 40,\n" + + " \"durationToRetain\": 30\n" + + "}"; + + ObjectMapper mapper = TestUtil.MAPPER; + + TaskLogAutoCleanerConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + json, + TaskLogAutoCleanerConfig.class + ) + ), TaskLogAutoCleanerConfig.class + ); + + Assert.assertTrue(config.isEnabled()); + Assert.assertEquals(10, config.getInitialDelay()); + Assert.assertEquals(40, config.getDelay()); + Assert.assertEquals(30, config.getDurationToRetain()); + } + + @Test + public void testSerdeWithDefaults() throws Exception + { + String json = "{}"; + + ObjectMapper mapper = TestUtil.MAPPER; + + TaskLogAutoCleanerConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + json, + TaskLogAutoCleanerConfig.class + ) + ), TaskLogAutoCleanerConfig.class + ); + + Assert.assertFalse(config.isEnabled()); + Assert.assertTrue(config.getInitialDelay() >= 60000 && config.getInitialDelay() <= 300000); + Assert.assertEquals(6*60*60*1000, config.getDelay()); + Assert.assertEquals(Long.MAX_VALUE, config.getDurationToRetain()); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index e39d07886ae..7ac6c94a16d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -48,6 +48,7 @@ import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelperManager; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; @@ -183,7 +184,8 @@ public class OverlordTest } }, serviceEmitter, - supervisorManager + supervisorManager, + EasyMock.createNiceMock(OverlordHelperManager.class) ); EmittingLogger.registerEmitter(serviceEmitter); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 0c37f4e34bb..39aa998bf0a 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -26,10 +26,10 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; import com.google.inject.util.Providers; - import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.indexing.IndexingServiceSelectorConfig; @@ -67,6 +67,9 @@ import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy; import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelper; +import io.druid.indexing.overlord.helpers.TaskLogAutoCleaner; +import io.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig; import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordResource; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; @@ -149,6 +152,7 @@ public class CliOverlord extends ServerRunnable configureTaskStorage(binder); configureAutoscale(binder); configureRunners(binder); + configureOverlordHelpers(binder); binder.bind(AuditManager.class) .toProvider(AuditManagerProvider.class) @@ -232,6 +236,14 @@ public class CliOverlord extends ServerRunnable biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class); } + + private void configureOverlordHelpers(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class); + Multibinder.newSetBinder(binder, OverlordHelper.class) + .addBinding() + .to(TaskLogAutoCleaner.class); + } }, new IndexingServiceFirehoseModule(), new IndexingServiceTaskLogsModule()