HBASE-21582 If call HBaseAdmin#snapshotAsync but forget call isSnapshotFinished, then SnapshotHFileCleaner will skip to run every time
This commit is contained in:
parent
b8405bfea4
commit
9b139ca011
|
@ -28,7 +28,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
@ -91,6 +95,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringP
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
/**
|
||||
* This class manages the procedure of taking and restoring snapshots. There is only one
|
||||
|
@ -120,7 +126,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* At this point, if the user asks for the snapshot/restore status, the result will be
|
||||
* snapshot done if exists or failed if it doesn't exists.
|
||||
*/
|
||||
private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
|
||||
public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
|
||||
"hbase.snapshot.sentinels.cleanup.timeoutMillis";
|
||||
public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
|
||||
|
||||
/** Enable or disable snapshot support */
|
||||
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
|
||||
|
@ -151,7 +159,11 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
// The map is always accessed and modified under the object lock using synchronized.
|
||||
// snapshotTable() will insert an Handler in the table.
|
||||
// isSnapshotDone() will remove the handler requested if the operation is finished.
|
||||
private Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
|
||||
private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduleThreadPool =
|
||||
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
|
||||
.setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
|
||||
private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
|
||||
|
||||
// Restore map, with table name as key, procedure ID as value.
|
||||
// The map is always accessed and modified under the object lock using synchronized.
|
||||
|
@ -181,17 +193,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* @param coordinator procedure coordinator instance. exposed for testing.
|
||||
* @param pool HBase ExecutorServcie instance, exposed for testing.
|
||||
*/
|
||||
public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
|
||||
ProcedureCoordinator coordinator, ExecutorService pool)
|
||||
@VisibleForTesting
|
||||
SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
|
||||
ExecutorService pool, int sentinelCleanInterval)
|
||||
throws IOException, UnsupportedOperationException {
|
||||
this.master = master;
|
||||
|
||||
this.rootDir = master.getMasterFileSystem().getRootDir();
|
||||
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
|
||||
Configuration conf = master.getConfiguration();
|
||||
checkSnapshotSupport(conf, master.getMasterFileSystem());
|
||||
|
||||
this.coordinator = coordinator;
|
||||
this.executorService = pool;
|
||||
resetTempDir();
|
||||
snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
|
||||
this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -274,7 +290,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*
|
||||
* @throws IOException if we can't reach the filesystem
|
||||
*/
|
||||
void resetTempDir() throws IOException {
|
||||
private void resetTempDir() throws IOException {
|
||||
// cleanup any existing snapshots.
|
||||
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
|
||||
master.getConfiguration());
|
||||
|
@ -290,7 +306,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
|
||||
* @throws IOException For filesystem IOExceptions
|
||||
*/
|
||||
public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
|
||||
public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
|
||||
// check to see if it is completed
|
||||
if (!isSnapshotCompleted(snapshot)) {
|
||||
throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
|
||||
|
@ -934,7 +950,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
this.restoreTableToProcIdMap.remove(tableName);
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -989,14 +1004,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*/
|
||||
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
|
||||
sentinels.entrySet().iterator();
|
||||
long sentinelsCleanupTimeoutMillis =
|
||||
master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
|
||||
SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
|
||||
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
|
||||
SnapshotSentinel sentinel = entry.getValue();
|
||||
if (sentinel.isFinished() &&
|
||||
(currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
|
||||
{
|
||||
if (sentinel.isFinished()
|
||||
&& (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1031,7 +1047,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
|
||||
snapshotHandler.cancel(why);
|
||||
}
|
||||
|
||||
if (snapshotHandlerChoreCleanerTask != null) {
|
||||
snapshotHandlerChoreCleanerTask.cancel(true);
|
||||
}
|
||||
try {
|
||||
if (coordinator != null) {
|
||||
coordinator.close();
|
||||
|
@ -1166,6 +1184,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
|
||||
this.executorService = master.getExecutorService();
|
||||
resetTempDir();
|
||||
snapshotHandlerChoreCleanerTask =
|
||||
scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -25,6 +26,8 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -32,8 +35,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.SnapshotType;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
|
@ -129,11 +136,11 @@ public class TestSnapshotFromMaster {
|
|||
conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, "");
|
||||
// Enable snapshot
|
||||
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 3 * 1000L);
|
||||
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
|
||||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||
ConstantSizeRegionSplitPolicy.class.getName());
|
||||
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
|
||||
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -419,4 +426,22 @@ public class TestSnapshotFromMaster {
|
|||
builder.commit();
|
||||
return builder.getSnapshotDescription();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
|
||||
// Write some data
|
||||
Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
|
||||
table.put(put);
|
||||
}
|
||||
String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
|
||||
UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription(
|
||||
snapshotName, TABLE_NAME, SnapshotType.FLUSH));
|
||||
Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L,
|
||||
() -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1);
|
||||
assertTrue(master.getSnapshotManager().isTakingAnySnapshot());
|
||||
Thread.sleep(11 * 1000L);
|
||||
assertFalse(master.getSnapshotManager().isTakingAnySnapshot());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.MetricsMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
|
@ -62,7 +61,6 @@ public class TestSnapshotManager {
|
|||
public TestName name = new TestName();
|
||||
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
MetricsMaster metrics = Mockito.mock(MetricsMaster.class);
|
||||
ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
|
||||
ExecutorService pool = Mockito.mock(ExecutorService.class);
|
||||
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
|
||||
|
@ -79,14 +77,44 @@ public class TestSnapshotManager {
|
|||
return getNewManager(UTIL.getConfiguration());
|
||||
}
|
||||
|
||||
private SnapshotManager getNewManager(final Configuration conf)
|
||||
private SnapshotManager getNewManager(Configuration conf) throws IOException, KeeperException {
|
||||
return getNewManager(conf, 1);
|
||||
}
|
||||
|
||||
private SnapshotManager getNewManager(Configuration conf, int intervalSeconds)
|
||||
throws IOException, KeeperException {
|
||||
Mockito.reset(services);
|
||||
Mockito.when(services.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
|
||||
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
|
||||
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
|
||||
return new SnapshotManager(services, metrics, coordinator, pool);
|
||||
return new SnapshotManager(services, coordinator, pool, intervalSeconds);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanFinishedHandler() throws Exception {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
try {
|
||||
conf.setLong(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 5 * 1000L);
|
||||
SnapshotManager manager = getNewManager(conf, 1);
|
||||
TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
|
||||
assertFalse("Manager is in process when there is no current handler",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
manager.setSnapshotHandlerForTesting(tableName, handler);
|
||||
Mockito.when(handler.isFinished()).thenReturn(false);
|
||||
assertTrue(manager.isTakingAnySnapshot());
|
||||
assertTrue("Manager isn't in process when handler is running",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
Mockito.when(handler.isFinished()).thenReturn(true);
|
||||
assertFalse("Manager is process when handler isn't running",
|
||||
manager.isTakingSnapshot(tableName));
|
||||
assertTrue(manager.isTakingAnySnapshot());
|
||||
Thread.sleep(6 * 1000);
|
||||
assertFalse(manager.isTakingAnySnapshot());
|
||||
} finally {
|
||||
conf.unset(SnapshotManager.HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue