HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time

This commit is contained in:
huzheng 2018-12-11 20:27:56 +08:00
parent 911b322e9f
commit 763e395d9e
3 changed files with 92 additions and 19 deletions

View File

@ -28,7 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -91,6 +95,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringP
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class manages the procedure of taking and restoring snapshots. There is only one
@ -120,7 +126,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* At this point, if the user asks for the snapshot/restore status, the result will be
* snapshot done if exists or failed if it doesn't exists.
*/
private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
"hbase.snapshot.sentinels.cleanup.timeoutMillis";
public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
/** Enable or disable snapshot support */
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
@ -151,7 +159,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// The map is always accessed and modified under the object lock using synchronized.
// snapshotTable() will insert an Handler in the table.
// isSnapshotDone() will remove the handler requested if the operation is finished.
private Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
// Restore map, with table name as key, procedure ID as value.
// The map is always accessed and modified under the object lock using synchronized.
@ -181,17 +193,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* @param coordinator procedure coordinator instance. exposed for testing.
* @param pool HBase ExecutorServcie instance, exposed for testing.
*/
public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
ProcedureCoordinator coordinator, ExecutorService pool)
@VisibleForTesting
SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
ExecutorService pool, int sentinelCleanInterval)
throws IOException, UnsupportedOperationException {
this.master = master;
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
Configuration conf = master.getConfiguration();
checkSnapshotSupport(conf, master.getMasterFileSystem());
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
}
/**
@ -274,7 +290,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
*
* @throws IOException if we can't reach the filesystem
*/
void resetTempDir() throws IOException {
private void resetTempDir() throws IOException {
// cleanup any existing snapshots.
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
@ -290,7 +306,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
* @throws IOException For filesystem IOExceptions
*/
public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
// check to see if it is completed
if (!isSnapshotCompleted(snapshot)) {
throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
@ -930,7 +946,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
this.restoreTableToProcIdMap.remove(tableName);
return false;
}
}
/**
@ -985,14 +1000,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
*/
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
long currentTime = EnvironmentEdgeManager.currentTime();
Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
sentinels.entrySet().iterator();
long sentinelsCleanupTimeoutMillis =
master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
if (sentinel.isFinished() &&
(currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
{
if (sentinel.isFinished()
&& (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
it.remove();
}
}
@ -1027,7 +1043,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
snapshotHandler.cancel(why);
}
if (snapshotHandlerChoreCleanerTask != null) {
snapshotHandlerChoreCleanerTask.cancel(true);
}
try {
if (coordinator != null) {
coordinator.close();
@ -1162,6 +1180,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
snapshotHandlerChoreCleanerTask =
scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -25,6 +26,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -32,8 +35,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
@ -129,11 +136,11 @@ public class TestSnapshotFromMaster {
conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, "");
// Enable snapshot
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L);
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName());
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
}
@Before
@ -419,4 +426,22 @@ public class TestSnapshotFromMaster {
builder.commit();
return builder.getSnapshotDescription();
}
@Test
public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
// Write some data
Table table = UTIL.getConnection().getTable(TABLE_NAME);
for (int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
table.put(put);
}
String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription(
snapshotName, TABLE_NAME, SnapshotType.FLUSH));
Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L,
() -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1);
assertTrue(master.getSnapshotManager().isTakingAnySnapshot());
Thread.sleep(11 * 1000L);
assertFalse(master.getSnapshotManager().isTakingAnySnapshot());
}
}

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
@ -62,7 +61,6 @@ public class TestSnapshotManager {
public TestName name = new TestName();
MasterServices services = Mockito.mock(MasterServices.class);
MetricsMaster metrics = Mockito.mock(MetricsMaster.class);
ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
ExecutorService pool = Mockito.mock(ExecutorService.class);
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
@ -79,14 +77,44 @@ public class TestSnapshotManager {
return getNewManager(UTIL.getConfiguration());
}
private SnapshotManager getNewManager(final Configuration conf)
private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException {
return getNewManager(conf, 1);
}
private SnapshotManager getNewManager(Configuration conf, int intervalSeconds)
throws IOException, KeeperException {
Mockito.reset(services);
Mockito.when(services.getConfiguration()).thenReturn(conf);
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
return new SnapshotManager(services, metrics, coordinator, pool);
return new SnapshotManager(services, coordinator, pool, intervalSeconds);
}
@Test
public void testCleanFinishedHandler() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
Configuration conf = UTIL.getConfiguration();
try {
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L);
SnapshotManager manager = getNewManager(conf, 1);
TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
assertFalse("Manager is in process when there is no current handler",
manager.isTakingSnapshot(tableName));
manager.setSnapshotHandlerForTesting(tableName, handler);
Mockito.when(handler.isFinished()).thenReturn(false);
assertTrue(manager.isTakingAnySnapshot());
assertTrue("Manager isn't in process when handler is running",
manager.isTakingSnapshot(tableName));
Mockito.when(handler.isFinished()).thenReturn(true);
assertFalse("Manager is process when handler isn't running",
manager.isTakingSnapshot(tableName));
assertTrue(manager.isTakingAnySnapshot());
Thread.sleep(6 * 1000);
assertFalse(manager.isTakingAnySnapshot());
} finally {
conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS);
}
}
@Test