HBASE-7321 Simple flush online snapshot
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445830 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
82df28fffe
commit
10969660fa
|
@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
|
@ -117,6 +118,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
|
|||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
|
@ -124,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.snapshot.TakeSnapshotUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -2528,6 +2531,72 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete taking the snapshot on the region. Writes the region info and adds references to the
|
||||
* working snapshot directory.
|
||||
* @param desc snapshot being completed
|
||||
* @param exnSnare ForeignExceptionSnare that captures external exections in case we need to
|
||||
* bail out. This is allowed to be null and will just be ignored in that case.
|
||||
* @throws IOException if there is an external or internal error causing the snapshot to fail
|
||||
*
|
||||
* TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
|
||||
* arg. (In the future other cancellable HRegion methods could eventually add a
|
||||
* {@link ForeignExceptionSnare}, or we could do something fancier).
|
||||
*
|
||||
* @param desc snasphot description object
|
||||
* @param exnSnaprethe snare that used for reporting and checking for ForeignExceptions
|
||||
* @throws IOException thrown if ther are network or file system exceptions.
|
||||
*/
|
||||
public void addRegionToSnapshot(SnapshotDescription desc,
|
||||
ForeignExceptionSnare exnSnare) throws IOException {
|
||||
// This should be "fast" since we don't rewrite store files but instead
|
||||
// back up the store files by creating a reference
|
||||
Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
|
||||
Path snapshotRegionDir = TakeSnapshotUtils.getRegionSnapshotDirectory(desc, rootDir,
|
||||
regionInfo.getEncodedName());
|
||||
|
||||
// 1. dump region meta info into the snapshot directory
|
||||
LOG.debug("Storing region-info for snapshot.");
|
||||
checkRegioninfoOnFilesystem(snapshotRegionDir);
|
||||
|
||||
// 2. iterate through all the stores in the region
|
||||
LOG.debug("Creating references for hfiles");
|
||||
|
||||
// This ensures that we have an atomic view of the directory as long as we have < ls limit
|
||||
// (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
|
||||
// batches and may miss files being added/deleted. This could be more robust (iteratively
|
||||
// checking to see if we have all the files until we are sure), but the limit is currently 1000
|
||||
// files/batch, far more than the number of store files under a single column family.
|
||||
for (Store store : stores.values()) {
|
||||
// 2.1. build the snapshot reference directory for the store
|
||||
Path dstStoreDir = TakeSnapshotUtils.getStoreSnapshotDirectory(snapshotRegionDir,
|
||||
Bytes.toString(store.getFamily().getName()));
|
||||
List<StoreFile> storeFiles = store.getStorefiles();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
|
||||
}
|
||||
|
||||
// 2.2. iterate through all the store's files and create "references".
|
||||
int sz = storeFiles.size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
if (exnSnare != null) {
|
||||
exnSnare.rethrowException();
|
||||
}
|
||||
Path file = storeFiles.get(i).getPath();
|
||||
// create "reference" to this store file. It is intentionally an empty file -- all
|
||||
// necessary infomration is captured by its fs location and filename. This allows us to
|
||||
// only figure out what needs to be done via a single nn operation (instead of having to
|
||||
// open and read the files as well).
|
||||
LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
|
||||
Path referenceFile = new Path(dstStoreDir, file.getName());
|
||||
boolean success = fs.createNewFile(referenceFile);
|
||||
if (!success) {
|
||||
throw new IOException("Failed to create reference file:" + referenceFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the provided current
|
||||
* timestamp.
|
||||
|
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.snapshot;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureMember;
|
||||
import org.apache.hadoop.hbase.procedure.Subprocedure;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
|
||||
|
||||
/**
|
||||
* This online snapshot implementation forces uses the distributed procedure framework to force a
|
||||
* store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then
|
||||
* flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
|
||||
* copies .regioninfos into the snapshot working directory. At the master side, there is an atomic
|
||||
* rename of the working dir into the proper snapshot directory.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class FlushSnapshotSubprocedure extends Subprocedure {
|
||||
private static final Log LOG = LogFactory.getLog(FlushSnapshotSubprocedure.class);
|
||||
|
||||
private final List<HRegion> regions;
|
||||
private final SnapshotDescription snapshot;
|
||||
private final SnapshotSubprocedurePool taskManager;
|
||||
|
||||
public FlushSnapshotSubprocedure(ProcedureMember member,
|
||||
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
|
||||
List<HRegion> regions, SnapshotDescription snapshot,
|
||||
SnapshotSubprocedurePool taskManager) {
|
||||
super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
|
||||
this.snapshot = snapshot;
|
||||
this.regions = regions;
|
||||
this.taskManager = taskManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callable for adding files to snapshot manifest working dir. Ready for multithreading.
|
||||
*/
|
||||
private class RegionSnapshotTask implements Callable<Void> {
|
||||
HRegion region;
|
||||
RegionSnapshotTask(HRegion region) {
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
// Taking the region read lock prevents the individual region from being closed while a
|
||||
// snapshot is in progress. This is helpful but not sufficient for preventing races with
|
||||
// snapshots that involve multiple regions and regionservers. It is still possible to have
|
||||
// an interleaving such that globally regions are missing, so we still need the verification
|
||||
// step.
|
||||
region.startRegionOperation();
|
||||
try {
|
||||
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
|
||||
region.flushcache();
|
||||
region.addRegionToSnapshot(snapshot, monitor);
|
||||
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
|
||||
} finally {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void flushSnapshot() throws ForeignException {
|
||||
if (regions.isEmpty()) {
|
||||
// No regions on this RS, we are basically done.
|
||||
return;
|
||||
}
|
||||
|
||||
monitor.rethrowException();
|
||||
|
||||
// Add all hfiles already existing in region.
|
||||
for (HRegion region : regions) {
|
||||
// submit one task per region for parallelize by region.
|
||||
taskManager.submitTask(new RegionSnapshotTask(region));
|
||||
monitor.rethrowException();
|
||||
}
|
||||
|
||||
// wait for everything to complete.
|
||||
LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
|
||||
taskManager.waitForOutstandingTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* do nothing, core of snapshot is executed in {@link #insideBarrier} step.
|
||||
*/
|
||||
@Override
|
||||
public void acquireBarrier() throws ForeignException {
|
||||
// NO OP
|
||||
}
|
||||
|
||||
/**
|
||||
* do a flush snapshot of every region on this rs from the target table.
|
||||
*/
|
||||
@Override
|
||||
public void insideBarrier() throws ForeignException {
|
||||
flushSnapshot();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel threads if they haven't finished.
|
||||
*/
|
||||
@Override
|
||||
public void cleanup(Exception e) {
|
||||
LOG.info("Aborting all log roll online snapshot subprocedure task threads for '"
|
||||
+ snapshot.getName() + "' due to error", e);
|
||||
taskManager.cancelTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* Hooray!
|
||||
*/
|
||||
public void releaseBarrier() {
|
||||
// NO OP
|
||||
}
|
||||
|
||||
}
|
|
@ -95,7 +95,6 @@ public class RegionServerSnapshotManager {
|
|||
private final RegionServerServices rss;
|
||||
private final ProcedureMemberRpcs memberRpcs;
|
||||
private final ProcedureMember member;
|
||||
private final long wakeMillis;
|
||||
private final SnapshotSubprocedurePool taskManager;
|
||||
|
||||
/**
|
||||
|
@ -111,7 +110,6 @@ public class RegionServerSnapshotManager {
|
|||
this.memberRpcs = controller;
|
||||
this.member = cohortMember;
|
||||
// read in the snapshot request configuration properties
|
||||
wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
taskManager = new SnapshotSubprocedurePool(parent, conf);
|
||||
}
|
||||
|
||||
|
@ -132,7 +130,7 @@ public class RegionServerSnapshotManager {
|
|||
|
||||
// read in the snapshot request configuration properties
|
||||
Configuration conf = rss.getConfiguration();
|
||||
wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
|
||||
|
||||
|
@ -207,7 +205,14 @@ public class RegionServerSnapshotManager {
|
|||
|
||||
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " + snapshot.getTable());
|
||||
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
|
||||
Configuration conf = rss.getConfiguration();
|
||||
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||
long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
|
||||
switch (snapshot.getType()) {
|
||||
case FLUSH:
|
||||
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
|
||||
timeoutMillis, involvedRegions, snapshot, taskManager);
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,306 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test creating/using/deleting snapshots from the client
|
||||
* <p>
|
||||
* This is an end-to-end test for the snapshot utility
|
||||
*
|
||||
* TODO This is essentially a clone of TestSnapshotFromClient. This is worth refactoring this
|
||||
* because there will be a few more flavors of snapshots that need to run these tests.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestFlushSnapshotFromClient {
|
||||
private static final Log LOG = LogFactory.getLog(TestFlushSnapshotFromClient.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final int NUM_RS = 2;
|
||||
private static final String STRING_TABLE_NAME = "test";
|
||||
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
|
||||
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
|
||||
|
||||
/**
|
||||
* Setup the config for the cluster
|
||||
* @throws Exception on failure
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
UTIL.startMiniCluster(NUM_RS);
|
||||
}
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
// disable the ui
|
||||
conf.setInt("hbase.regionsever.info.port", -1);
|
||||
// change the flush size to a small amount, regulating number of store files
|
||||
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
|
||||
// so make sure we get a compaction when doing a load, but keep around some
|
||||
// files in the store
|
||||
conf.setInt("hbase.hstore.compaction.min", 10);
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 10);
|
||||
// block writes if we get to 12 store files
|
||||
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
|
||||
// drop the number of attempts for the hbase admin
|
||||
conf.setInt("hbase.client.retries.number", 1);
|
||||
// Enable snapshot
|
||||
conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
UTIL.createTable(TABLE_NAME, TEST_FAM);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
UTIL.deleteTable(TABLE_NAME);
|
||||
// and cleanup the archive directory
|
||||
try {
|
||||
UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failure to delete archive directory", e);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTest() throws Exception {
|
||||
try {
|
||||
UTIL.shutdownMiniCluster();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failure shutting down cluster", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test simple flush snapshotting a table that is online
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testFlushTableSnapshot() throws Exception {
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
// make sure we don't fail on listing snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
|
||||
// put some stuff in the table
|
||||
HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
|
||||
UTIL.loadTable(table, TEST_FAM);
|
||||
|
||||
// get the name of all the regionservers hosting the snapshotted table
|
||||
Set<String> snapshotServers = new HashSet<String>();
|
||||
List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
|
||||
for (RegionServerThread server : servers) {
|
||||
if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
|
||||
snapshotServers.add(server.getRegionServer().getServerName().toString());
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("FS state before snapshot:");
|
||||
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
|
||||
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
|
||||
|
||||
// take a snapshot of the enabled table
|
||||
String snapshotString = "offlineTableSnapshot";
|
||||
byte[] snapshot = Bytes.toBytes(snapshotString);
|
||||
admin.snapshot(snapshotString, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH);
|
||||
LOG.debug("Snapshot completed.");
|
||||
|
||||
// make sure we have the snapshot
|
||||
List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
|
||||
snapshot, TABLE_NAME);
|
||||
|
||||
// make sure its a valid snapshot
|
||||
FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
|
||||
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||
LOG.debug("FS state after snapshot:");
|
||||
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
|
||||
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
|
||||
|
||||
SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
|
||||
admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
|
||||
|
||||
admin.deleteSnapshot(snapshot);
|
||||
snapshots = admin.listSnapshots();
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnapshotFailsOnNonExistantTable() throws Exception {
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
// make sure we don't fail on listing snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
String tableName = "_not_a_table";
|
||||
|
||||
// make sure the table doesn't exist
|
||||
boolean fail = false;
|
||||
do {
|
||||
try {
|
||||
admin.getTableDescriptor(Bytes.toBytes(tableName));
|
||||
fail = true;
|
||||
LOG.error("Table:" + tableName + " already exists, checking a new name");
|
||||
tableName = tableName+"!";
|
||||
} catch (TableNotFoundException e) {
|
||||
fail = false;
|
||||
}
|
||||
} while (fail);
|
||||
|
||||
// snapshot the non-existant table
|
||||
try {
|
||||
admin.snapshot("fail", tableName, SnapshotDescription.Type.FLUSH);
|
||||
fail("Snapshot succeeded even though there is not table.");
|
||||
} catch (SnapshotCreationException e) {
|
||||
LOG.info("Correctly failed to snapshot a non-existant table:" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
public void testAsyncFlushSnapshot() throws Exception {
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("asyncSnapshot")
|
||||
.setTable(STRING_TABLE_NAME).setType(SnapshotDescription.Type.FLUSH).build();
|
||||
|
||||
// take the snapshot async
|
||||
admin.takeSnapshotAsync(snapshot);
|
||||
|
||||
// constantly loop, looking for the snapshot to complete
|
||||
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
|
||||
SnapshotTestingUtils.waitForSnapshotToComplete(master, snapshot, 200);
|
||||
LOG.info(" === Async Snapshot Completed ===");
|
||||
HBaseFsck.debugLsr(UTIL.getHBaseCluster().getConfiguration(), FSUtils.getRootDir(UTIL.getConfiguration()));
|
||||
// make sure we get the snapshot
|
||||
SnapshotTestingUtils.assertOneSnapshotThatMatches(admin, snapshot);
|
||||
|
||||
// test that we can delete the snapshot
|
||||
admin.deleteSnapshot(snapshot.getName());
|
||||
LOG.info(" === Async Snapshot Deleted ===");
|
||||
HBaseFsck.debugLsr(UTIL.getHBaseCluster().getConfiguration(), FSUtils.getRootDir(UTIL.getConfiguration()));
|
||||
// make sure we don't have any snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
LOG.info(" === Async Snapshot Test Completed ===");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic end-to-end test of simple-flush-based snapshots
|
||||
*/
|
||||
@Test
|
||||
public void testFlushCreateListDestroy() throws Exception {
|
||||
LOG.debug("------- Starting Snapshot test -------------");
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
// make sure we don't fail on listing snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
// load the table so we have some data
|
||||
UTIL.loadTable(new HTable(UTIL.getConfiguration(), TABLE_NAME), TEST_FAM);
|
||||
// and wait until everything stabilizes
|
||||
HRegionServer rs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
List<HRegion> onlineRegions = rs.getOnlineRegions(TABLE_NAME);
|
||||
for (HRegion region : onlineRegions) {
|
||||
region.waitForFlushesAndCompactions();
|
||||
}
|
||||
String snapshotName = "flushSnapshotCreateListDestroy";
|
||||
// test creating the snapshot
|
||||
admin.snapshot(snapshotName, STRING_TABLE_NAME, SnapshotDescription.Type.FLUSH);
|
||||
logFSTree(new Path(UTIL.getConfiguration().get(HConstants.HBASE_DIR)));
|
||||
|
||||
// make sure we only have 1 matching snapshot
|
||||
List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
|
||||
snapshotName, STRING_TABLE_NAME);
|
||||
|
||||
// check the directory structure
|
||||
FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
|
||||
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshots.get(0), rootDir);
|
||||
assertTrue(fs.exists(snapshotDir));
|
||||
HBaseFsck.debugLsr(UTIL.getHBaseCluster().getConfiguration(), snapshotDir);
|
||||
Path snapshotinfo = new Path(snapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
|
||||
assertTrue(fs.exists(snapshotinfo));
|
||||
|
||||
// check the table info
|
||||
HTableDescriptor desc = FSTableDescriptors.getTableDescriptor(fs, rootDir, TABLE_NAME);
|
||||
HTableDescriptor snapshotDesc = FSTableDescriptors.getTableDescriptor(fs,
|
||||
SnapshotDescriptionUtils.getSnapshotsDir(rootDir), Bytes.toBytes(snapshotName));
|
||||
assertEquals(desc, snapshotDesc);
|
||||
|
||||
// check the region snapshot for all the regions
|
||||
List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
|
||||
for (HRegionInfo info : regions) {
|
||||
String regionName = info.getEncodedName();
|
||||
Path regionDir = new Path(snapshotDir, regionName);
|
||||
HRegionInfo snapshotRegionInfo = HRegion.loadDotRegionInfoFileContent(fs, regionDir);
|
||||
assertEquals(info, snapshotRegionInfo);
|
||||
// check to make sure we have the family
|
||||
Path familyDir = new Path(regionDir, Bytes.toString(TEST_FAM));
|
||||
assertTrue(fs.exists(familyDir));
|
||||
// make sure we have some file references
|
||||
assertTrue(fs.listStatus(familyDir).length > 0);
|
||||
}
|
||||
|
||||
// test that we can delete the snapshot
|
||||
admin.deleteSnapshot(snapshotName);
|
||||
HBaseFsck.debugLsr(UTIL.getHBaseCluster().getConfiguration(), FSUtils.getRootDir(UTIL.getConfiguration()));
|
||||
|
||||
// make sure we don't have any snapshots
|
||||
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||
LOG.debug("------- Flush-Snapshot Create List Destroy-------------");
|
||||
|
||||
}
|
||||
|
||||
private void logFSTree(Path root) throws IOException {
|
||||
FSUtils.logFileSystemState(UTIL.getDFSCluster().getFileSystem(), root, LOG);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.MD5Hash;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test clone/restore snapshots from the client
|
||||
*
|
||||
* TODO This is essentially a clone of TestRestoreSnapshotFromClient. This is worth refactoring
|
||||
* this because there will be a few more flavors of snapshots that need to run these tests.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestRestoreFlushSnapshotFromClient {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private byte[] snapshotName0;
|
||||
private byte[] snapshotName1;
|
||||
private byte[] snapshotName2;
|
||||
private int snapshot0Rows;
|
||||
private int snapshot1Rows;
|
||||
private byte[] tableName;
|
||||
private HBaseAdmin admin;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
||||
TEST_UTIL.getConfiguration().setBoolean(
|
||||
"hbase.master.enabletable.roundrobin", true);
|
||||
|
||||
// Enable snapshot
|
||||
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the tests with a table filled with some data
|
||||
* and two snapshots (snapshotName0, snapshotName1) of different states.
|
||||
* The tableName, snapshotNames and the number of rows in the snapshot are initialized.
|
||||
*/
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
this.admin = TEST_UTIL.getHBaseAdmin();
|
||||
|
||||
long tid = System.currentTimeMillis();
|
||||
tableName = Bytes.toBytes("testtb-" + tid);
|
||||
snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
|
||||
snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
|
||||
snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
|
||||
|
||||
// create Table and disable it
|
||||
createTable(tableName, FAMILY);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
loadData(table, 500, FAMILY);
|
||||
snapshot0Rows = TEST_UTIL.countRows(table);
|
||||
LOG.info("=== before snapshot with 500 rows");
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
|
||||
// take a snapshot
|
||||
admin.snapshot(Bytes.toString(snapshotName0), Bytes.toString(tableName), SnapshotDescription.Type.FLUSH);
|
||||
|
||||
LOG.info("=== after snapshot with 500 rows");
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
|
||||
// insert more data
|
||||
loadData(table, 500, FAMILY);
|
||||
snapshot1Rows = TEST_UTIL.countRows(table);
|
||||
LOG.info("=== before snapshot with 1000 rows");
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
|
||||
// take a snapshot of the updated table
|
||||
admin.snapshot(Bytes.toString(snapshotName1), Bytes.toString(tableName), SnapshotDescription.Type.FLUSH);
|
||||
LOG.info("=== after snapshot with 1000 rows");
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
admin.deleteSnapshot(snapshotName0);
|
||||
admin.deleteSnapshot(snapshotName1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTakeFlushSnapshot() throws IOException {
|
||||
// taking happens in setup.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreSnapshot() throws IOException {
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
|
||||
|
||||
// Restore from snapshot-0
|
||||
admin.disableTable(tableName);
|
||||
admin.restoreSnapshot(snapshotName0);
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
admin.enableTable(tableName);
|
||||
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
|
||||
LOG.info("=== after restore with 500 row snapshot");
|
||||
logFSTree(FSUtils.getRootDir(TEST_UTIL.getConfiguration()));
|
||||
|
||||
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
|
||||
|
||||
// Restore from snapshot-1
|
||||
admin.disableTable(tableName);
|
||||
admin.restoreSnapshot(snapshotName1);
|
||||
admin.enableTable(tableName);
|
||||
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
|
||||
}
|
||||
|
||||
@Test(expected=SnapshotDoesNotExistException.class)
|
||||
public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
|
||||
String snapshotName = "random-snapshot-" + System.currentTimeMillis();
|
||||
String tableName = "random-table-" + System.currentTimeMillis();
|
||||
admin.cloneSnapshot(snapshotName, tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloneSnapshot() throws IOException, InterruptedException {
|
||||
byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
|
||||
testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
|
||||
testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
|
||||
}
|
||||
|
||||
private void testCloneSnapshot(final byte[] tableName, final byte[] snapshotName,
|
||||
int snapshotRows) throws IOException, InterruptedException {
|
||||
// create a new table from snapshot
|
||||
admin.cloneSnapshot(snapshotName, tableName);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
assertEquals(snapshotRows, TEST_UTIL.countRows(table));
|
||||
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreSnapshotOfCloned() throws IOException, InterruptedException {
|
||||
byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
|
||||
admin.cloneSnapshot(snapshotName0, clonedTableName);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
|
||||
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
|
||||
admin.snapshot(Bytes.toString(snapshotName2), Bytes.toString(clonedTableName), SnapshotDescription.Type.FLUSH);
|
||||
admin.disableTable(clonedTableName);
|
||||
admin.deleteTable(clonedTableName);
|
||||
|
||||
admin.cloneSnapshot(snapshotName2, clonedTableName);
|
||||
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
|
||||
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
|
||||
admin.disableTable(clonedTableName);
|
||||
admin.deleteTable(clonedTableName);
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// Helpers
|
||||
// ==========================================================================
|
||||
private void createTable(final byte[] tableName, final byte[]... families) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
for (byte[] family: families) {
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
byte[][] splitKeys = new byte[16][];
|
||||
byte[] hex = Bytes.toBytes("0123456789abcdef");
|
||||
for (int i = 0; i < 16; ++i) {
|
||||
splitKeys[i] = new byte[] { hex[i] };
|
||||
}
|
||||
admin.createTable(htd, splitKeys);
|
||||
}
|
||||
|
||||
public void loadData(final HTable table, int rows, byte[]... families) throws IOException {
|
||||
byte[] qualifier = Bytes.toBytes("q");
|
||||
table.setAutoFlush(false);
|
||||
while (rows-- > 0) {
|
||||
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
|
||||
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
|
||||
Put put = new Put(key);
|
||||
put.setWriteToWAL(false);
|
||||
for (byte[] family: families) {
|
||||
put.add(family, qualifier, value);
|
||||
}
|
||||
table.put(put);
|
||||
}
|
||||
table.flushCommits();
|
||||
}
|
||||
|
||||
private void logFSTree(Path root) throws IOException {
|
||||
LOG.debug("Current file system:");
|
||||
logFSTree(root, "|-");
|
||||
}
|
||||
|
||||
private void logFSTree(Path root, String prefix) throws IOException {
|
||||
for (FileStatus file : TEST_UTIL.getDFSCluster().getFileSystem().listStatus(root)) {
|
||||
if (file.isDir()) {
|
||||
LOG.debug(prefix + file.getPath().getName() + "/");
|
||||
logFSTree(file.getPath(), prefix + "---");
|
||||
} else {
|
||||
LOG.debug(prefix + file.getPath().getName() + "\tsz=" + file.getLen());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue