HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time
This commit is contained in:
parent
bd33a1ef96
commit
81dc223ae7
|
@ -27,7 +27,11 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -85,6 +89,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This class manages the procedure of taking and restoring snapshots. There is only one
|
||||
* SnapshotManager for the master.
|
||||
|
@ -113,7 +120,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* At this point, if the user asks for the snapshot/restore status, the result will be
|
||||
* snapshot done if exists or failed if it doesn't exists.
|
||||
*/
|
||||
private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
|
||||
public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
|
||||
"hbase.snapshot.sentinels.cleanup.timeoutMillis";
|
||||
public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
|
||||
|
||||
/** Enable or disable snapshot support */
|
||||
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
|
||||
|
@ -144,8 +153,12 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
// The map is always accessed and modified under the object lock using synchronized.
|
||||
// snapshotTable() will insert an Handler in the table.
|
||||
// isSnapshotDone() will remove the handler requested if the operation is finished.
|
||||
private Map<TableName, SnapshotSentinel> snapshotHandlers =
|
||||
private final Map<TableName, SnapshotSentinel> snapshotHandlers =
|
||||
new HashMap<TableName, SnapshotSentinel>();
|
||||
private final ScheduledExecutorService scheduleThreadPool =
|
||||
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
|
||||
.setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
|
||||
private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
|
||||
|
||||
// Restore Sentinels map, with table name as key.
|
||||
// The map is always accessed and modified under the object lock using synchronized.
|
||||
|
@ -173,17 +186,29 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* @param coordinator procedure coordinator instance. exposed for testing.
|
||||
* @param pool HBase ExecutorServcie instance, exposed for testing.
|
||||
*/
|
||||
public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
|
||||
ProcedureCoordinator coordinator, ExecutorService pool)
|
||||
@VisibleForTesting
|
||||
SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
|
||||
ExecutorService pool, int sentinelCleanInterval)
|
||||
throws IOException, UnsupportedOperationException {
|
||||
this.master = master;
|
||||
|
||||
this.rootDir = master.getMasterFileSystem().getRootDir();
|
||||
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
|
||||
Configuration conf = master.getConfiguration();
|
||||
checkSnapshotSupport(conf, master.getMasterFileSystem());
|
||||
|
||||
this.coordinator = coordinator;
|
||||
this.executorService = pool;
|
||||
resetTempDir();
|
||||
initSnapshotHandlerChoreCleanerTask(sentinelCleanInterval);
|
||||
}
|
||||
|
||||
private void initSnapshotHandlerChoreCleanerTask(long sentinelCleanInterval) {
|
||||
snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cleanupSentinels();
|
||||
}
|
||||
}, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,7 +286,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*
|
||||
* @throws IOException if we can't reach the filesystem
|
||||
*/
|
||||
void resetTempDir() throws IOException {
|
||||
private void resetTempDir() throws IOException {
|
||||
// cleanup any existing snapshots.
|
||||
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
|
||||
master.getConfiguration());
|
||||
|
@ -277,7 +302,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
|
||||
* @throws IOException For filesystem IOExceptions
|
||||
*/
|
||||
public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
|
||||
public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
|
||||
// check to see if it is completed
|
||||
if (!isSnapshotCompleted(snapshot)) {
|
||||
throw new SnapshotDoesNotExistException(snapshot);
|
||||
|
@ -1010,14 +1035,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*/
|
||||
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
|
||||
sentinels.entrySet().iterator();
|
||||
long sentinelsCleanupTimeoutMillis =
|
||||
master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
|
||||
SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
|
||||
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
|
||||
SnapshotSentinel sentinel = entry.getValue();
|
||||
if (sentinel.isFinished() &&
|
||||
(currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
|
||||
{
|
||||
if (sentinel.isFinished()
|
||||
&& (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1037,7 +1063,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
|
||||
snapshotHandler.cancel(why);
|
||||
}
|
||||
|
||||
if (snapshotHandlerChoreCleanerTask != null) {
|
||||
snapshotHandlerChoreCleanerTask.cancel(true);
|
||||
}
|
||||
// pass the stop onto all the restore handlers
|
||||
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
|
||||
restoreHandler.cancel(why);
|
||||
|
@ -1176,6 +1204,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
|
||||
this.executorService = master.getExecutorService();
|
||||
resetTempDir();
|
||||
initSnapshotHandlerChoreCleanerTask(10);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -25,6 +26,7 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -34,6 +36,9 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -121,11 +126,11 @@ public class TestSnapshotFromMaster {
|
|||
conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, "");
|
||||
// Enable snapshot
|
||||
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L);
|
||||
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
|
||||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||
ConstantSizeRegionSplitPolicy.class.getName());
|
||||
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
|
||||
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -406,4 +411,27 @@ public class TestSnapshotFromMaster {
|
|||
private static void ensureHFileCleanersRun() {
|
||||
UTIL.getHBaseCluster().getMaster().getHFileCleaner().chore();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
|
||||
// Write some data
|
||||
Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
|
||||
table.put(put);
|
||||
}
|
||||
final String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
|
||||
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName(snapshotName)
|
||||
.setTable(TABLE_NAME.getNameAsString()).setType(SnapshotDescription.Type.FLUSH).build();
|
||||
UTIL.getHBaseAdmin().takeSnapshotAsync(snapshot);
|
||||
UTIL.waitFor(10 * 1000L, 200L, new Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return UTIL.getHBaseAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1;
|
||||
}
|
||||
});
|
||||
assertTrue(master.getSnapshotManager().isTakingAnySnapshot());
|
||||
Thread.sleep(11 * 1000L);
|
||||
assertFalse(master.getSnapshotManager().isTakingAnySnapshot());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.MetricsMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
|
@ -50,7 +49,6 @@ public class TestSnapshotManager {
|
|||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
MetricsMaster metrics = Mockito.mock(MetricsMaster.class);
|
||||
ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
|
||||
ExecutorService pool = Mockito.mock(ExecutorService.class);
|
||||
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
|
||||
|
@ -67,14 +65,44 @@ public class TestSnapshotManager {
|
|||
return getNewManager(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
private SnapshotManager getNewManager(final Configuration conf)
|
||||
private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException {
|
||||
return getNewManager(conf, 1);
|
||||
}
|
||||
|
||||
private SnapshotManager getNewManager(Configuration conf, int intervalSeconds)
|
||||
throws IOException, KeeperException {
|
||||
Mockito.reset(services);
|
||||
Mockito.when(services.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
|
||||
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
|
||||
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
|
||||
return new SnapshotManager(services, metrics, coordinator, pool);
|
||||
return new SnapshotManager(services, coordinator, pool, intervalSeconds);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanFinishedHandler() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testCleanFinishedHandler");
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
try {
|
||||
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L);
|
||||
SnapshotManager manager = getNewManager(conf, 1);
|
||||
TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
|
||||
assertFalse("Manager is in process when there is no current handler",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
manager.setSnapshotHandlerForTesting(tableName, handler);
|
||||
Mockito.when(handler.isFinished()).thenReturn(false);
|
||||
assertTrue(manager.isTakingAnySnapshot());
|
||||
assertTrue("Manager isn't in process when handler is running",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
Mockito.when(handler.isFinished()).thenReturn(true);
|
||||
assertFalse("Manager is process when handler isn't running",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
assertTrue(manager.isTakingAnySnapshot());
|
||||
Thread.sleep(6 * 1000);
|
||||
assertFalse(manager.isTakingAnySnapshot());
|
||||
} finally {
|
||||
conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue