HBASE-18398: Snapshot operation fails with FileNotFoundException

This commit is contained in:
Ashu Pachauri 2017-08-07 18:10:33 -07:00
parent d5f34adcdb
commit ded0842caf
6 changed files with 293 additions and 22 deletions

View File

@ -7922,7 +7922,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
case DELETE:
case BATCH_MUTATE:
case COMPACT_REGION:
// when a region is in recovering state, no read, split or merge is allowed
case SNAPSHOT:
// when a region is in recovering state, no read, split, merge or snapshot is allowed
if (isRecovering() && (this.disallowWritesInRecovering ||
(op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
@ -7946,6 +7947,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
lock.readLock().unlock();
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
}
// The unit for snapshot is a region. So, all stores for this region must be
// prepared for snapshot operation before proceeding.
if (op == Operation.SNAPSHOT) {
for (Store store : stores.values()) {
if (store instanceof HStore) {
((HStore)store).preSnapshotOperation();
}
}
}
try {
if (coprocessorHost != null) {
coprocessorHost.postStartRegionOperation(op);
@ -7961,12 +7971,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
closeRegionOperation(Operation.ANY);
}
/**
* Closes the lock. This needs to be called in the finally block corresponding
* to the try block of {@link #startRegionOperation(Operation)}
* @throws IOException
*/
@Override
public void closeRegionOperation(Operation operation) throws IOException {
if (operation == Operation.SNAPSHOT) {
for (Store store: stores.values()) {
if (store instanceof HStore) {
((HStore)store).postSnapshotOperation();
}
}
}
lock.readLock().unlock();
if (coprocessorHost != null) {
coprocessorHost.postCloseRegionOperation(operation);

View File

@ -2486,6 +2486,22 @@ public class HStore implements Store {
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
}
/**
* Sets the store up for a region level snapshot operation.
* @see #postSnapshotOperation()
*/
public void preSnapshotOperation() {
archiveLock.lock();
}
/**
* Perform tasks needed after the completion of snapshot operation.
* @see #preSnapshotOperation()
*/
public void postSnapshotOperation() {
archiveLock.unlock();
}
@Override
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
// ensure other threads do not attempt to archive the same files on close()

View File

@ -247,7 +247,7 @@ public interface Region extends ConfigurationObserver {
*/
enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT
}
/**
@ -277,6 +277,13 @@ public interface Region extends ConfigurationObserver {
*/
void closeRegionOperation() throws IOException;
/**
* Closes the region operation lock. This needs to be called in the finally block corresponding
* to the try block of {@link #startRegionOperation(Operation)}
* @throws IOException
*/
void closeRegionOperation(Operation op) throws IOException;
// Row write locks
/**

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@ -74,10 +75,18 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
/**
* Callable for adding files to snapshot manifest working dir. Ready for multithreading.
*/
private class RegionSnapshotTask implements Callable<Void> {
Region region;
RegionSnapshotTask(Region region) {
public static class RegionSnapshotTask implements Callable<Void> {
private Region region;
private boolean skipFlush;
private ForeignExceptionDispatcher monitor;
private SnapshotDescription snapshotDesc;
public RegionSnapshotTask(Region region, SnapshotDescription snapshotDesc,
boolean skipFlush, ForeignExceptionDispatcher monitor) {
this.region = region;
this.skipFlush = skipFlush;
this.monitor = monitor;
this.snapshotDesc = snapshotDesc;
}
@Override
@ -87,10 +96,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
// snapshots that involve multiple regions and regionservers. It is still possible to have
// an interleaving such that globally regions are missing, so we still need the verification
// step.
LOG.debug("Starting region operation on " + region);
region.startRegionOperation();
LOG.debug("Starting snapshot operation on " + region);
region.startRegionOperation(Operation.SNAPSHOT);
try {
if (snapshotSkipFlush) {
if (skipFlush) {
/*
* This is to take an online-snapshot without force a coordinated flush to prevent pause
* The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
@ -123,15 +132,15 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
}
}
((HRegion)region).addRegionToSnapshot(snapshot, monitor);
if (snapshotSkipFlush) {
((HRegion)region).addRegionToSnapshot(snapshotDesc, monitor);
if (skipFlush) {
LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
} else {
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
}
} finally {
LOG.debug("Closing region operation on " + region);
region.closeRegionOperation();
LOG.debug("Closing snapshot operation on " + region);
region.closeRegionOperation(Operation.SNAPSHOT);
}
return null;
}
@ -155,7 +164,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
// Add all hfiles already existing in region.
for (Region region : regions) {
// submit one task per region for parallelize by region.
taskManager.submitTask(new RegionSnapshotTask(region));
taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor));
monitor.rethrowException();
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -112,6 +113,7 @@ public final class SnapshotManifest {
final Path workingDir, final SnapshotDescription desc,
final ForeignExceptionSnare monitor) {
return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
}
/**
@ -162,9 +164,15 @@ public final class SnapshotManifest {
}
public void addMobRegion(HRegionInfo regionInfo) throws IOException {
// 0. Get the ManifestBuilder/RegionVisitor
// Get the ManifestBuilder/RegionVisitor
RegionVisitor visitor = createRegionVisitor(desc);
// Visit the region and add it to the manifest
addMobRegion(regionInfo, visitor);
}
@VisibleForTesting
protected void addMobRegion(HRegionInfo regionInfo, RegionVisitor visitor) throws IOException {
// 1. dump region meta info into the snapshot directory
LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot.");
Object regionData = visitor.regionOpen(regionInfo);
@ -203,9 +211,15 @@ public final class SnapshotManifest {
* This is used by the "online snapshot" when the table is enabled.
*/
public void addRegion(final HRegion region) throws IOException {
// 0. Get the ManifestBuilder/RegionVisitor
// Get the ManifestBuilder/RegionVisitor
RegionVisitor visitor = createRegionVisitor(desc);
// Visit the region and add it to the manifest
addRegion(region, visitor);
}
@VisibleForTesting
protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException {
// 1. dump region meta info into the snapshot directory
LOG.debug("Storing '" + region + "' region-info for snapshot.");
Object regionData = visitor.regionOpen(region.getRegionInfo());
@ -216,7 +230,8 @@ public final class SnapshotManifest {
for (Store store : region.getStores()) {
// 2.1. build the snapshot reference for the store
Object familyData = visitor.familyOpen(regionData, store.getColumnFamilyDescriptor().getName());
Object familyData = visitor.familyOpen(regionData,
store.getColumnFamilyDescriptor().getName());
monitor.rethrowException();
List<StoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
@ -243,9 +258,16 @@ public final class SnapshotManifest {
* This is used by the "offline snapshot" when the table is disabled.
*/
public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
// 0. Get the ManifestBuilder/RegionVisitor
// Get the ManifestBuilder/RegionVisitor
RegionVisitor visitor = createRegionVisitor(desc);
// Visit the region and add it to the manifest
addRegion(tableDir, regionInfo, visitor);
}
@VisibleForTesting
protected void addRegion(final Path tableDir, final HRegionInfo regionInfo, RegionVisitor visitor)
throws IOException {
boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
try {
Path baseDir = tableDir;

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.snapshot;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
/**
* Testing the region snapshot task on a cluster.
* @see org.apache.hadoop.hbase.regionserver.snapshot.FlushSnapshotSubprocedure.RegionSnapshotTask
*/
@Category({MediumTests.class, RegionServerTests.class})
public class TestRegionSnapshotTask {
private final Log LOG = LogFactory.getLog(getClass());
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static FileSystem fs;
private static Path rootDir;
@BeforeClass
public static void setupBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
// Try to frequently clean up compacted files
conf.setInt("hbase.hfile.compaction.discharger.interval", 1000);
conf.setInt("hbase.master.hfilecleaner.ttl", 1000);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
rootDir = FSUtils.getRootDir(conf);
fs = TEST_UTIL.getTestFileSystem();
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Tests adding a region to the snapshot manifest while compactions are running on the region.
* The idea is to slow down the process of adding a store file to the manifest while
* triggering compactions on the region, allowing the store files to be marked for archival while
* snapshot operation is running.
* This test checks for the correct behavior in such a case that the compacted files should
* not be moved around if a snapshot operation is in progress.
* See HBASE-18398
*/
@Test(timeout = 30000)
public void testAddRegionWithCompactions() throws Exception {
final TableName tableName = TableName.valueOf("test_table");
Table table = setupTable(tableName);
List<HRegion> hRegions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
final SnapshotProtos.SnapshotDescription snapshot =
SnapshotProtos.SnapshotDescription.newBuilder()
.setTable(tableName.getNameAsString())
.setType(SnapshotProtos.SnapshotDescription.Type.FLUSH)
.setName("test_table_snapshot")
.setVersion(SnapshotManifestV2.DESCRIPTOR_VERSION)
.build();
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
final HRegion region = spy(hRegions.get(0));
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
final SnapshotManifest manifest =
SnapshotManifest.create(conf, fs, workingDir, snapshot, monitor);
manifest.addTableDescriptor(table.getTableDescriptor());
if (!fs.exists(workingDir)) {
fs.mkdirs(workingDir);
}
assertTrue(fs.exists(workingDir));
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, fs);
doAnswer(__ -> {
addRegionToSnapshot(snapshot, region, manifest);
return null;
}).when(region).addRegionToSnapshot(snapshot, monitor);
FlushSnapshotSubprocedure.RegionSnapshotTask snapshotTask =
new FlushSnapshotSubprocedure.RegionSnapshotTask(region, snapshot, true, monitor);
ExecutorService executor = Executors.newFixedThreadPool(1);
Future f = executor.submit(snapshotTask);
// Trigger major compaction and wait for snaphot operation to finish
LOG.info("Starting major compaction");
region.compact(true);
LOG.info("Finished major compaction");
f.get();
// Consolidate region manifests into a single snapshot manifest
manifest.consolidate();
// Make sure that the region manifest exists, which means the snapshot operation succeeded
assertNotNull(manifest.getRegionManifests());
// Sanity check, there should be only one region
assertEquals(1, manifest.getRegionManifests().size());
// Make sure that no files went missing after the snapshot operation
SnapshotReferenceUtil.verifySnapshot(conf, fs, manifest);
}
private void addRegionToSnapshot(SnapshotProtos.SnapshotDescription snapshot,
HRegion region, SnapshotManifest manifest) throws Exception {
LOG.info("Adding region to snapshot: " + region.getRegionInfo().getRegionNameAsString());
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
SnapshotManifest.RegionVisitor visitor = createRegionVisitorWithDelay(snapshot, workingDir);
manifest.addRegion(region, visitor);
LOG.info("Added the region to snapshot: " + region.getRegionInfo().getRegionNameAsString());
}
private SnapshotManifest.RegionVisitor createRegionVisitorWithDelay(
SnapshotProtos.SnapshotDescription desc, Path workingDir) {
return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir) {
@Override
public void storeFile(final SnapshotProtos.SnapshotRegionManifest.Builder region,
final SnapshotProtos.SnapshotRegionManifest.FamilyFiles.Builder family,
final StoreFileInfo storeFile) throws IOException {
try {
LOG.debug("Introducing delay before adding store file to manifest");
Thread.sleep(2000);
} catch (InterruptedException ex) {
LOG.error("Interrupted due to error: " + ex);
}
super.storeFile(region, family, storeFile);
}
};
}
private Table setupTable(TableName tableName) throws Exception {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
// Flush many files, but do not compact immediately
// Make sure that the region does not split
builder
.setMemStoreFlushSize(5000)
.setRegionSplitPolicyClassName(ConstantSizeRegionSplitPolicy.class.getName())
.setMaxFileSize(100 * 1024 * 1024)
.setConfiguration("hbase.hstore.compactionThreshold", "250");
TableDescriptor td = builder.build();
byte[] fam = Bytes.toBytes("fam");
Table table = TEST_UTIL.createTable(td, new byte[][] {fam},
TEST_UTIL.getConfiguration());
TEST_UTIL.loadTable(table, fam);
return table;
}
}