HBASE-18398: Snapshot operation fails with FileNotFoundException
This commit is contained in:
parent
6f6f0a4849
commit
2f88bf6d40
|
@ -7922,7 +7922,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
case DELETE:
|
||||
case BATCH_MUTATE:
|
||||
case COMPACT_REGION:
|
||||
// when a region is in recovering state, no read, split or merge is allowed
|
||||
case SNAPSHOT:
|
||||
// when a region is in recovering state, no read, split, merge or snapshot is allowed
|
||||
if (isRecovering() && (this.disallowWritesInRecovering ||
|
||||
(op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
|
||||
throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
|
||||
|
@ -7946,6 +7947,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
lock.readLock().unlock();
|
||||
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
|
||||
}
|
||||
// The unit for snapshot is a region. So, all stores for this region must be
|
||||
// prepared for snapshot operation before proceeding.
|
||||
if (op == Operation.SNAPSHOT) {
|
||||
for (Store store : stores.values()) {
|
||||
if (store instanceof HStore) {
|
||||
((HStore)store).preSnapshotOperation();
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postStartRegionOperation(op);
|
||||
|
@ -7961,12 +7971,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
closeRegionOperation(Operation.ANY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the lock. This needs to be called in the finally block corresponding
|
||||
* to the try block of {@link #startRegionOperation(Operation)}
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void closeRegionOperation(Operation operation) throws IOException {
|
||||
if (operation == Operation.SNAPSHOT) {
|
||||
for (Store store: stores.values()) {
|
||||
if (store instanceof HStore) {
|
||||
((HStore)store).postSnapshotOperation();
|
||||
}
|
||||
}
|
||||
}
|
||||
lock.readLock().unlock();
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postCloseRegionOperation(operation);
|
||||
|
|
|
@ -2486,6 +2486,22 @@ public class HStore implements Store {
|
|||
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the store up for a region level snapshot operation.
|
||||
* @see #postSnapshotOperation()
|
||||
*/
|
||||
public void preSnapshotOperation() {
|
||||
archiveLock.lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform tasks needed after the completion of snapshot operation.
|
||||
* @see #preSnapshotOperation()
|
||||
*/
|
||||
public void postSnapshotOperation() {
|
||||
archiveLock.unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
|
||||
// ensure other threads do not attempt to archive the same files on close()
|
||||
|
|
|
@ -247,7 +247,7 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
enum Operation {
|
||||
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -277,6 +277,13 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
void closeRegionOperation() throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the region operation lock. This needs to be called in the finally block corresponding
|
||||
* to the try block of {@link #startRegionOperation(Operation)}
|
||||
* @throws IOException
|
||||
*/
|
||||
void closeRegionOperation(Operation op) throws IOException;
|
||||
|
||||
// Row write locks
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
|
@ -74,10 +75,18 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||
/**
|
||||
* Callable for adding files to snapshot manifest working dir. Ready for multithreading.
|
||||
*/
|
||||
private class RegionSnapshotTask implements Callable<Void> {
|
||||
Region region;
|
||||
RegionSnapshotTask(Region region) {
|
||||
public static class RegionSnapshotTask implements Callable<Void> {
|
||||
private Region region;
|
||||
private boolean skipFlush;
|
||||
private ForeignExceptionDispatcher monitor;
|
||||
private SnapshotDescription snapshotDesc;
|
||||
|
||||
public RegionSnapshotTask(Region region, SnapshotDescription snapshotDesc,
|
||||
boolean skipFlush, ForeignExceptionDispatcher monitor) {
|
||||
this.region = region;
|
||||
this.skipFlush = skipFlush;
|
||||
this.monitor = monitor;
|
||||
this.snapshotDesc = snapshotDesc;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -87,10 +96,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||
// snapshots that involve multiple regions and regionservers. It is still possible to have
|
||||
// an interleaving such that globally regions are missing, so we still need the verification
|
||||
// step.
|
||||
LOG.debug("Starting region operation on " + region);
|
||||
region.startRegionOperation();
|
||||
LOG.debug("Starting snapshot operation on " + region);
|
||||
region.startRegionOperation(Operation.SNAPSHOT);
|
||||
try {
|
||||
if (snapshotSkipFlush) {
|
||||
if (skipFlush) {
|
||||
/*
|
||||
* This is to take an online-snapshot without force a coordinated flush to prevent pause
|
||||
* The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
|
||||
|
@ -123,15 +132,15 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||
throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
|
||||
}
|
||||
}
|
||||
((HRegion)region).addRegionToSnapshot(snapshot, monitor);
|
||||
if (snapshotSkipFlush) {
|
||||
((HRegion)region).addRegionToSnapshot(snapshotDesc, monitor);
|
||||
if (skipFlush) {
|
||||
LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
|
||||
} else {
|
||||
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
|
||||
}
|
||||
} finally {
|
||||
LOG.debug("Closing region operation on " + region);
|
||||
region.closeRegionOperation();
|
||||
LOG.debug("Closing snapshot operation on " + region);
|
||||
region.closeRegionOperation(Operation.SNAPSHOT);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -155,7 +164,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||
// Add all hfiles already existing in region.
|
||||
for (Region region : regions) {
|
||||
// submit one task per region for parallelize by region.
|
||||
taskManager.submitTask(new RegionSnapshotTask(region));
|
||||
taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor));
|
||||
monitor.rethrowException();
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -112,6 +113,7 @@ public final class SnapshotManifest {
|
|||
final Path workingDir, final SnapshotDescription desc,
|
||||
final ForeignExceptionSnare monitor) {
|
||||
return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -162,9 +164,15 @@ public final class SnapshotManifest {
|
|||
}
|
||||
|
||||
public void addMobRegion(HRegionInfo regionInfo) throws IOException {
|
||||
// 0. Get the ManifestBuilder/RegionVisitor
|
||||
// Get the ManifestBuilder/RegionVisitor
|
||||
RegionVisitor visitor = createRegionVisitor(desc);
|
||||
|
||||
// Visit the region and add it to the manifest
|
||||
addMobRegion(regionInfo, visitor);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void addMobRegion(HRegionInfo regionInfo, RegionVisitor visitor) throws IOException {
|
||||
// 1. dump region meta info into the snapshot directory
|
||||
LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot.");
|
||||
Object regionData = visitor.regionOpen(regionInfo);
|
||||
|
@ -203,9 +211,15 @@ public final class SnapshotManifest {
|
|||
* This is used by the "online snapshot" when the table is enabled.
|
||||
*/
|
||||
public void addRegion(final HRegion region) throws IOException {
|
||||
// 0. Get the ManifestBuilder/RegionVisitor
|
||||
// Get the ManifestBuilder/RegionVisitor
|
||||
RegionVisitor visitor = createRegionVisitor(desc);
|
||||
|
||||
// Visit the region and add it to the manifest
|
||||
addRegion(region, visitor);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException {
|
||||
// 1. dump region meta info into the snapshot directory
|
||||
LOG.debug("Storing '" + region + "' region-info for snapshot.");
|
||||
Object regionData = visitor.regionOpen(region.getRegionInfo());
|
||||
|
@ -216,7 +230,8 @@ public final class SnapshotManifest {
|
|||
|
||||
for (Store store : region.getStores()) {
|
||||
// 2.1. build the snapshot reference for the store
|
||||
Object familyData = visitor.familyOpen(regionData, store.getColumnFamilyDescriptor().getName());
|
||||
Object familyData = visitor.familyOpen(regionData,
|
||||
store.getColumnFamilyDescriptor().getName());
|
||||
monitor.rethrowException();
|
||||
|
||||
List<StoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
|
||||
|
@ -243,9 +258,16 @@ public final class SnapshotManifest {
|
|||
* This is used by the "offline snapshot" when the table is disabled.
|
||||
*/
|
||||
public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
|
||||
// 0. Get the ManifestBuilder/RegionVisitor
|
||||
// Get the ManifestBuilder/RegionVisitor
|
||||
RegionVisitor visitor = createRegionVisitor(desc);
|
||||
|
||||
// Visit the region and add it to the manifest
|
||||
addRegion(tableDir, regionInfo, visitor);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void addRegion(final Path tableDir, final HRegionInfo regionInfo, RegionVisitor visitor)
|
||||
throws IOException {
|
||||
boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
|
||||
try {
|
||||
Path baseDir = tableDir;
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
/**
|
||||
* Testing the region snapshot task on a cluster.
|
||||
* @see org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure.RegionSnapshotTask
|
||||
*/
|
||||
@Category({MediumTests.class, RegionServerTests.class})
|
||||
public class TestRegionSnapshotTask {
|
||||
private final Log LOG = LogFactory.getLog(getClass());
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
private static Configuration conf;
|
||||
private static FileSystem fs;
|
||||
private static Path rootDir;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
// Try to frequently clean up compacted files
|
||||
conf.setInt("hbase.hfile.compaction.discharger.interval", 1000);
|
||||
conf.setInt("hbase.master.hfilecleaner.ttl", 1000);
|
||||
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
|
||||
|
||||
rootDir = FSUtils.getRootDir(conf);
|
||||
fs = TEST_UTIL.getTestFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests adding a region to the snapshot manifest while compactions are running on the region.
|
||||
* The idea is to slow down the process of adding a store file to the manifest while
|
||||
* triggering compactions on the region, allowing the store files to be marked for archival while
|
||||
* snapshot operation is running.
|
||||
* This test checks for the correct behavior in such a case that the compacted files should
|
||||
* not be moved around if a snapshot operation is in progress.
|
||||
* See HBASE-18398
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testAddRegionWithCompactions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("test_table");
|
||||
Table table = setupTable(tableName);
|
||||
|
||||
List<HRegion> hRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
|
||||
|
||||
final SnapshotProtos.SnapshotDescription snapshot =
|
||||
SnapshotProtos.SnapshotDescription.newBuilder()
|
||||
.setTable(tableName.getNameAsString())
|
||||
.setType(SnapshotProtos.SnapshotDescription.Type.FLUSH)
|
||||
.setName("test_table_snapshot")
|
||||
.setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION)
|
||||
.build();
|
||||
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
|
||||
|
||||
final HRegion region = spy(hRegions.get(0));
|
||||
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||
final SnapshotManifest manifest =
|
||||
SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor);
|
||||
manifest.addTableDescriptor(table.getTableDescriptor());
|
||||
|
||||
if (!fs.exists(workingDir)) {
|
||||
fs.mkdirs(workingDir);
|
||||
}
|
||||
assertTrue(fs.exists(workingDir));
|
||||
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs);
|
||||
|
||||
doAnswer(__ -> {
|
||||
addRegionToSnapshot(snapshot, region, manifest);
|
||||
return null;
|
||||
}).when(region).addRegionToSnapshot(snapshot, monitor);
|
||||
|
||||
FlushSnapshotSubprocedure.RegionSnapshotTask snapshotTask =
|
||||
new FlushSnapshotSubprocedure.RegionSnapshotTask(region, snapshot, true, monitor);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future f = executor.submit(snapshotTask);
|
||||
|
||||
// Trigger major compaction and wait for snaphot operation to finish
|
||||
LOG.info("Starting major compaction");
|
||||
region.compact(true);
|
||||
LOG.info("Finished major compaction");
|
||||
f.get();
|
||||
|
||||
// Consolidate region manifests into a single snapshot manifest
|
||||
manifest.consolidate();
|
||||
|
||||
// Make sure that the region manifest exists, which means the snapshot operation succeeded
|
||||
assertNotNull(manifest.getRegionManifests());
|
||||
// Sanity check, there should be only one region
|
||||
assertEquals(1, manifest.getRegionManifests().size());
|
||||
|
||||
// Make sure that no files went missing after the snapshot operation
|
||||
SnapshotReferenceUtil.verifySnapshot(conf, fs, manifest);
|
||||
}
|
||||
|
||||
private void addRegionToSnapshot(SnapshotProtos.SnapshotDescription snapshot,
|
||||
HRegion region, SnapshotManifest manifest) throws Exception {
|
||||
LOG.info("Adding region to snapshot: " + region.getRegionInfo().getRegionNameAsString());
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest.RegionVisitor visitor = createRegionVisitorWithDelay(snapshot, workingDir);
|
||||
manifest.addRegion(region, visitor);
|
||||
LOG.info("Added the region to snapshot: " + region.getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
|
||||
private SnapshotManifest.RegionVisitor createRegionVisitorWithDelay(
|
||||
SnapshotProtos.SnapshotDescription desc, Path workingDir) {
|
||||
return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir) {
|
||||
@Override
|
||||
public void storeFile(final SnapshotProtos.SnapshotRegionManifest.Builder region,
|
||||
final SnapshotProtos.SnapshotRegionManifest.FamilyFiles.Builder family,
|
||||
final StoreFileInfo storeFile) throws IOException {
|
||||
try {
|
||||
LOG.debug("Introducing delay before adding store file to manifest");
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.error("Interrupted due to error: " + ex);
|
||||
}
|
||||
super.storeFile(region, family, storeFile);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Table setupTable(TableName tableName) throws Exception {
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
// Flush many files, but do not compact immediately
|
||||
// Make sure that the region does not split
|
||||
builder
|
||||
.setMemStoreFlushSize(5000)
|
||||
.setRegionSplitPolicyClassName(ConstantSizeRegionSplitPolicy.class.getName())
|
||||
.setMaxFileSize(100 * 1024 * 1024)
|
||||
.setConfiguration("hbase.hstore.compactionThreshold", "250");
|
||||
|
||||
TableDescriptor td = builder.build();
|
||||
byte[] fam = Bytes.toBytes("fam");
|
||||
Table table = TEST_UTIL.createTable(td, new byte[][] {fam},
|
||||
TEST_UTIL.getConfiguration());
|
||||
TEST_UTIL.loadTable(table, fam);
|
||||
return table;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue