reset-cluster command to clean up druid state stored on metadata and deep storage (#3670)

This commit is contained in:
Himanshu 2016-11-09 11:07:01 -06:00 committed by Parag Jain
parent 575aeb843a
commit b76b3f8d85
28 changed files with 509 additions and 14 deletions

View File

@ -21,9 +21,13 @@ package io.druid.segment.loading;
import io.druid.timeline.DataSegment;
import java.io.IOException;
/**
*/
public interface DataSegmentKiller
{
public void kill(DataSegment segments) throws SegmentLoadingException;
void kill(DataSegment segments) throws SegmentLoadingException;
void killAll() throws IOException;
}

View File

@ -42,4 +42,10 @@ public class NoopTaskLogs implements TaskLogs
{
log.info("Not pushing logs for task: %s", taskid);
}
@Override
public void killAll() throws IOException
{
log.info("Noop: No task logs are deleted.");
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.tasklogs;
import java.io.IOException;
/**
*/
public interface TaskLogKiller
{
void killAll() throws IOException;
}

View File

@ -19,6 +19,6 @@
package io.druid.tasklogs;
public interface TaskLogs extends TaskLogStreamer, TaskLogPusher
public interface TaskLogs extends TaskLogStreamer, TaskLogPusher, TaskLogKiller
{
}

View File

@ -53,4 +53,6 @@ public interface MetadataStorageConnector
void createAuditTable();
void createSupervisorsTable();
void deleteAllRecords(String tableName);
}

View File

@ -178,4 +178,19 @@ public class MetadataStorageTablesConfig
{
return supervisorTable;
}
public String getTasksTable()
{
return tasksTable;
}
public String getTaskLogTable()
{
return taskLogTable;
}
public String getTaskLockTable()
{
return taskLockTable;
}
}

View File

@ -0,0 +1,55 @@
---
layout: doc_page
---
# ResetCluster tool
ResetCluster tool can be used to completely wipe out Druid cluster state stored on Metadata and Deep storage. This is
intended to be used in dev/test environments where you typically want to reset the cluster before running
the test suite.
ResetCluster automatically figures out necessary information from Druid cluster configuration. So the java classpath
used in the command must have all the necessary druid configuration files.
It can be run in one of the following ways.
```
java io.druid.cli.Main tools reset-cluster [--metadataStore] [--segmentFiles] [--taskLogs] [--hadoopWorkingPath]
```
or
```
java io.druid.cli.Main tools reset-cluster --all
```
# Further Description
Usage documentation can be printed by running following command.
```
java io.druid.cli.Main help tools reset-cluster
```
```
NAME
druid tools reset-cluster - Cleanup all persisted state from metadata
and deep storage.
SYNOPSIS
druid tools reset-cluster [--all] [--hadoopWorkingPath]
[--metadataStore] [--segmentFiles] [--taskLogs]
OPTIONS
--all
delete all state stored in metadata and deep storage
--hadoopWorkingPath
delete hadoopWorkingPath
--metadataStore
delete all records in metadata storage
--segmentFiles
delete all segment files from deep storage
--taskLogs
delete all tasklogs
```

View File

@ -28,6 +28,7 @@ import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Map;
@ -67,4 +68,10 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
}
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -114,4 +114,10 @@ public class AzureTaskLogs implements TaskLogs {
private String getTaskLogKey(String taskid) {
return String.format("%s/%s/log", config.getPrefix(), taskid);
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -39,10 +39,13 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
private final Configuration config;
private final Path storageDirectory;
@Inject
public HdfsDataSegmentKiller(final Configuration config)
public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
{
this.config = config;
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
}
@Override
@ -88,6 +91,14 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
}
}
@Override
public void killAll() throws IOException
{
log.info("Deleting all segment files from hdfs dir [%s].", storageDirectory.toUri().toString());
final FileSystem fs = storageDirectory.getFileSystem(config);
fs.delete(storageDirectory, true);
}
private boolean safeNonRecursiveDelete(FileSystem fs, Path path)
{
try {

View File

@ -22,7 +22,6 @@ import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import io.druid.java.util.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.apache.hadoop.conf.Configuration;
@ -115,6 +114,15 @@ public class HdfsTaskLogs implements TaskLogs
{
return path1 + (path1.endsWith(Path.SEPARATOR) ? "" : Path.SEPARATOR) + path2;
}
@Override
public void killAll() throws IOException
{
log.info("Deleting all task logs from hdfs dir [%s].", config.getDirectory());
Path taskLogDir = new Path(config.getDirectory());
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
fs.delete(taskLogDir, true);
}
}

View File

@ -41,7 +41,17 @@ public class HdfsDataSegmentKillerTest
public void testKill() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config);
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);
FileSystem fs = FileSystem.get(config);
@ -99,7 +109,17 @@ public class HdfsDataSegmentKillerTest
public void testKillNonExistingSegment() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config);
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(
config,
new HdfsDataSegmentPusherConfig()
{
@Override
public String getStorageDirectory()
{
return "/tmp";
}
}
);
killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString()));
}

View File

@ -20,7 +20,6 @@
package io.druid.storage.s3;
import com.google.inject.Inject;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
@ -29,6 +28,7 @@ import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.IOException;
import java.util.Map;
/**
@ -69,4 +69,10 @@ public class S3DataSegmentKiller implements DataSegmentKiller
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e);
}
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -138,4 +138,10 @@ public class S3TaskLogs implements TaskLogs
{
return String.format("%s/%s/log", config.getS3Prefix(), taskid);
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Used by ResetCluster to delete the Hadoop Working Path.
*/
public class HadoopWorkingDirCleaner
{
private static final Logger log = new Logger(HadoopWorkingDirCleaner.class);
public static String runTask(String[] args) throws Exception
{
String workingPath = args[0];
log.info("Deleting indexing hadoop working path [%s].", workingPath);
Path p = new Path(workingPath);
FileSystem fs = p.getFileSystem(new Configuration());
fs.delete(p, true);
return null;
}
}

View File

@ -26,6 +26,7 @@ import com.google.inject.multibindings.MapBinder;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.indexing.common.tasklogs.FileTaskLogs;
import io.druid.tasklogs.NoopTaskLogs;
import io.druid.tasklogs.TaskLogKiller;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogs;
@ -46,5 +47,6 @@ public class IndexingServiceTaskLogsModule implements Module
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
binder.bind(TaskLogKiller.class).to(TaskLogs.class);
}
}

View File

@ -127,10 +127,16 @@ public abstract class HadoopTask extends AbstractTask
* @throws MalformedURLException from Initialization.getClassLoaderForExtension
*/
protected ClassLoader buildClassLoader(final TaskToolbox toolbox) throws MalformedURLException
{
return buildClassLoader(hadoopDependencyCoordinates, toolbox.getConfig().getDefaultHadoopCoordinates());
}
public static ClassLoader buildClassLoader(final List<String> hadoopDependencyCoordinates,
final List<String> defaultHadoopCoordinates) throws MalformedURLException
{
final List<String> finalHadoopDependencyCoordinates = hadoopDependencyCoordinates != null
? hadoopDependencyCoordinates
: toolbox.getConfig().getDefaultHadoopCoordinates();
: defaultHadoopCoordinates;
final List<URL> jobURLs = Lists.newArrayList(
Arrays.asList(((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs())

View File

@ -23,10 +23,10 @@ import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.java.util.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
@ -82,4 +82,11 @@ public class FileTaskLogs implements TaskLogs
{
return new File(config.getDirectory(), String.format("%s.log", taskid));
}
@Override
public void killAll() throws IOException
{
log.info("Deleting all task logs from local dir [%s].", config.getDirectory().getAbsolutePath());
FileUtils.deleteDirectory(config.getDirectory());
}
}

View File

@ -239,6 +239,12 @@ public class IngestSegmentFirehoseFactoryTest
{
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
},
new DataSegmentMover()
{

View File

@ -91,6 +91,7 @@ import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentKiller;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
@ -515,7 +516,7 @@ public class TaskLifecycleTest
tac,
emitter,
dataSegmentPusher,
new LocalDataSegmentKiller(),
new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()),
new DataSegmentMover()
{
@Override

View File

@ -25,6 +25,7 @@ import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Set;
public class TestDataSegmentKiller implements DataSegmentKiller
@ -41,4 +42,10 @@ public class TestDataSegmentKiller implements DataSegmentKiller
{
return ImmutableSet.copyOf(killedSegments);
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -58,6 +58,10 @@ public class LocalDataStorageDruidModule implements DruidModule
binder, "druid.storage.type", Key.get(DataSegmentPusher.class), Key.get(LocalDataSegmentPusher.class)
);
PolyBind.createChoice(
binder, "druid.storage.type", Key.get(DataSegmentKiller.class), Key.get(LocalDataSegmentKiller.class)
);
PolyBind.createChoice(binder, "druid.storage.type", Key.get(DataSegmentFinder.class), null);
}

View File

@ -628,4 +628,31 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createAuditTable(tablesConfigSupplier.get().getAuditTable());
}
}
public void deleteAllRecords(final String tableName)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
if (tableExists(handle, tableName)) {
log.info("Deleting all records from table[%s]", tableName);
final Batch batch = handle.createBatch();
batch.add("DELETE FROM " + tableName);
batch.execute();
} else {
log.info("Table[%s] does not exit.", tableName);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception while deleting records from table");
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.loading;
import com.google.inject.Inject;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -35,6 +36,13 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
private static final String PATH_KEY = "path";
private final File storageDirectory;
@Inject
public LocalDataSegmentKiller(LocalDataSegmentPusherConfig config) {
this.storageDirectory = config.getStorageDirectory();
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
@ -66,6 +74,13 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
}
}
@Override
public void killAll() throws IOException
{
log.info("Deleting all segment files from local dir [%s].", storageDirectory.getAbsolutePath());
FileUtils.deleteDirectory(storageDirectory);
}
private File getPath(DataSegment segment) throws SegmentLoadingException
{
return new File(MapUtils.getString(segment.getLoadSpec(), PATH_KEY));

View File

@ -20,10 +20,10 @@
package io.druid.segment.loading;
import com.google.inject.Inject;
import io.druid.java.util.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Map;
/**
@ -58,4 +58,9 @@ public class OmniDataSegmentKiller implements DataSegmentKiller
return loader;
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -41,7 +41,7 @@ public class LocalDataSegmentKillerTest
@Test
public void testKill() throws Exception
{
LocalDataSegmentKiller killer = new LocalDataSegmentKiller();
LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig());
// Create following segments and then delete them in this order and assert directory deletions
// /tmp/dataSource/interval1/v1/0/index.zip
@ -49,7 +49,7 @@ public class LocalDataSegmentKillerTest
// /tmp/dataSource/interval1/v2/0/index.zip
// /tmp/dataSource/interval2/v1/0/index.zip
File dataSourceDir = temporaryFolder.newFolder();
final File dataSourceDir = temporaryFolder.newFolder();
File interval1Dir = new File(dataSourceDir, "interval1");
File version11Dir = new File(interval1Dir, "v1");

View File

@ -73,7 +73,8 @@ public class Main
PullDependencies.class,
CreateTables.class,
InsertSegment.class,
DumpSegment.class
DumpSegment.class,
ResetCluster.class
);
builder.withGroup("index")

View File

@ -0,0 +1,195 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import io.druid.guice.IndexingServiceTaskLogsModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.HadoopTask;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.server.DruidNode;
import io.druid.tasklogs.TaskLogKiller;
import java.util.List;
/**
* Clean-up all Druid state from Metadata and Deep Storage
*/
@Command(
name = "reset-cluster",
description = "Cleanup all persisted state from metadata and deep storage."
)
public class ResetCluster extends GuiceRunnable
{
private static final Logger log = new Logger(ResetCluster.class);
@Option(name = "--all", description = "delete all state stored in metadata and deep storage")
private boolean all;
@Option(name = "--metadataStore", description = "delete all records in metadata storage")
private boolean metadataStore;
@Option(name = "--segmentFiles", description = "delete all segment files from deep storage")
private boolean segmentFiles;
@Option(name = "--taskLogs", description = "delete all tasklogs")
private boolean taskLogs;
@Option(name = "--hadoopWorkingPath", description = "delete hadoopWorkingPath")
private boolean hadoopWorkingPath;
public ResetCluster()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("tools", "localhost", -1)
);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
}
},
new IndexingServiceTaskLogsModule()
);
}
@Override
public void run()
{
if (all) {
metadataStore = segmentFiles = taskLogs = hadoopWorkingPath = true;
}
final Injector injector = makeInjector();
if (metadataStore) {
resetMetadataStore(injector);
}
if (segmentFiles) {
deleteAllSegmentFiles(injector);
}
if (taskLogs) {
deleteAllTaskLogs(injector);
}
if (hadoopWorkingPath) {
deleteIndexerHadoopWorkingDir(injector);
}
}
private void resetMetadataStore(Injector injector)
{
log.info("===========================================================================");
log.info("Deleting all Records from Metadata Storage.");
log.info("===========================================================================");
MetadataStorageConnector connector = injector.getInstance(MetadataStorageConnector.class);
MetadataStorageTablesConfig tablesConfig = injector.getInstance(MetadataStorageTablesConfig.class);
String[] tables = new String[]{
tablesConfig.getDataSourceTable(),
tablesConfig.getPendingSegmentsTable(),
tablesConfig.getSegmentsTable(),
tablesConfig.getRulesTable(),
tablesConfig.getConfigTable(),
tablesConfig.getTasksTable(),
tablesConfig.getTaskLockTable(),
tablesConfig.getTaskLogTable(),
tablesConfig.getAuditTable(),
tablesConfig.getSupervisorTable()
};
for (String table : tables) {
connector.deleteAllRecords(table);
}
}
private void deleteAllSegmentFiles(Injector injector)
{
try {
log.info("===========================================================================");
log.info("Deleting all Segment Files.");
log.info("===========================================================================");
DataSegmentKiller segmentKiller = injector.getInstance(DataSegmentKiller.class);
segmentKiller.killAll();
} catch (Exception ex) {
log.error(ex, "Failed to cleanup Segment Files.");
}
}
private void deleteAllTaskLogs(Injector injector)
{
try {
log.info("===========================================================================");
log.info("Deleting all TaskLogs.");
log.info("===========================================================================");
TaskLogKiller taskLogKiller = injector.getInstance(TaskLogKiller.class);;
taskLogKiller.killAll();
} catch (Exception ex) {
log.error(ex, "Failed to cleanup TaskLogs.");
}
}
private void deleteIndexerHadoopWorkingDir(Injector injector)
{
try {
log.info("===========================================================================");
log.info("Deleting hadoopWorkingPath.");
log.info("===========================================================================");
TaskConfig taskConfig = injector.getInstance(TaskConfig.class);
HadoopTask.invokeForeignLoader(
"io.druid.indexer.HadoopWorkingDirCleaner",
new String[]{
taskConfig.getHadoopWorkingPath()
},
HadoopTask.buildClassLoader(null, taskConfig.getDefaultHadoopCoordinates())
);
}
catch (Exception ex) {
log.error(ex, "Failed to cleanup indexer hadoop working directory.");
}
}
}