HBASE-19258 IntegrationTest for Backup and Restore
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
ff5250ca2b
commit
64ef120808
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
|
||||
import org.apache.hadoop.hbase.backup.BackupObserver;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureM
|
|||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -120,9 +122,13 @@ public class BackupManager implements Closeable {
|
|||
classes + "," + masterProcedureClass);
|
||||
}
|
||||
|
||||
plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
|
||||
conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, (plugins == null ? "" : plugins + ",") +
|
||||
BackupHFileCleaner.class.getName());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: "
|
||||
+ masterProcedureClass);
|
||||
LOG.debug("Added log cleaner: {}. Added master procedure manager: {}."
|
||||
+"Added master procedure manager: {}", cleanerClass, masterProcedureClass,
|
||||
BackupHFileCleaner.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,8 +156,8 @@ public class BackupManager implements Closeable {
|
|||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
(coproc == null ? "" : coproc + ",") + regionObserverClass);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added region procedure manager: " + regionProcedureClass
|
||||
+ ". Added region observer: " + regionObserverClass);
|
||||
LOG.debug("Added region procedure manager: {}. Added region observer: {}",
|
||||
regionProcedureClass, regionObserverClass);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,7 +228,7 @@ public class BackupManager implements Closeable {
|
|||
tableList.add(hTableDescriptor.getTableName());
|
||||
}
|
||||
|
||||
LOG.info("Full backup all the tables available in the cluster: " + tableList);
|
||||
LOG.info("Full backup all the tables available in the cluster: {}", tableList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -256,9 +262,9 @@ public class BackupManager implements Closeable {
|
|||
public void initialize() throws IOException {
|
||||
String ongoingBackupId = this.getOngoingBackupId();
|
||||
if (ongoingBackupId != null) {
|
||||
LOG.info("There is a ongoing backup " + ongoingBackupId
|
||||
+ ". Can not launch new backup until no ongoing backup remains.");
|
||||
throw new BackupException("There is ongoing backup.");
|
||||
LOG.info("There is a ongoing backup {}"
|
||||
+ ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
|
||||
throw new BackupException("There is ongoing backup seesion.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,7 +279,7 @@ public class BackupManager implements Closeable {
|
|||
* @throws IOException exception
|
||||
*/
|
||||
public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException {
|
||||
LOG.debug("Getting the direct ancestors of the current backup " + backupInfo.getBackupId());
|
||||
LOG.debug("Getting the direct ancestors of the current backup {}", backupInfo.getBackupId());
|
||||
|
||||
ArrayList<BackupImage> ancestors = new ArrayList<>();
|
||||
|
||||
|
@ -309,25 +315,25 @@ public class BackupManager implements Closeable {
|
|||
if (BackupManifest.canCoverImage(ancestors, image)) {
|
||||
LOG.debug("Met the backup boundary of the current table set:");
|
||||
for (BackupImage image1 : ancestors) {
|
||||
LOG.debug(" BackupID=" + image1.getBackupId() + ", BackupDir=" + image1.getRootDir());
|
||||
LOG.debug(" BackupID={}, BackupDir={}", image1.getBackupId(), image1.getRootDir());
|
||||
}
|
||||
} else {
|
||||
Path logBackupPath =
|
||||
HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId());
|
||||
LOG.debug("Current backup has an incremental backup ancestor, "
|
||||
+ "touching its image manifest in " + logBackupPath.toString()
|
||||
+ " to construct the dependency.");
|
||||
+ "touching its image manifest in {}"
|
||||
+ " to construct the dependency.", logBackupPath.toString());
|
||||
BackupManifest lastIncrImgManifest = new BackupManifest(conf, logBackupPath);
|
||||
BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
|
||||
ancestors.add(lastIncrImage);
|
||||
|
||||
LOG.debug(
|
||||
"Last dependent incremental backup image: " + "{BackupID=" + lastIncrImage.getBackupId()
|
||||
+ "," + "BackupDir=" + lastIncrImage.getRootDir() + "}");
|
||||
"Last dependent incremental backup image: {BackupID={}" +
|
||||
"BackupDir={}}", lastIncrImage.getBackupId(), lastIncrImage.getRootDir());
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.debug("Got " + ancestors.size() + " ancestors for the current backup.");
|
||||
LOG.debug("Got {} ancestors for the current backup.", ancestors.size());
|
||||
return ancestors;
|
||||
}
|
||||
|
||||
|
@ -391,8 +397,8 @@ public class BackupManager implements Closeable {
|
|||
if (lastWarningOutputTime == 0
|
||||
|| (System.currentTimeMillis() - lastWarningOutputTime) > 60000) {
|
||||
lastWarningOutputTime = System.currentTimeMillis();
|
||||
LOG.warn("Waiting to acquire backup exclusive lock for "
|
||||
+ (lastWarningOutputTime - startTime) / 1000 + "s");
|
||||
LOG.warn("Waiting to acquire backup exclusive lock for {}s",
|
||||
+(lastWarningOutputTime - startTime) / 1000);
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -38,13 +40,22 @@ import org.apache.hadoop.hbase.backup.RestoreRequest;
|
|||
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupManager;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
|
||||
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
|
||||
import org.apache.hadoop.hbase.chaos.policies.Policy;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -52,7 +63,6 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* An integration test to detect regressions in HBASE-7912. Create
|
||||
|
@ -65,18 +75,31 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
||||
private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
|
||||
protected static final TableName TABLE_NAME1 = TableName.valueOf(CLASS_NAME + ".table1");
|
||||
protected static final TableName TABLE_NAME2 = TableName.valueOf(CLASS_NAME + ".table2");
|
||||
protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
|
||||
protected static final String COLUMN_NAME = "f";
|
||||
protected static final String REGION_COUNT_KEY = "regions_per_rs";
|
||||
protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
|
||||
protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
|
||||
protected static final String NUM_ITERATIONS_KEY = "num_iterations";
|
||||
protected static final int DEFAULT_REGION_COUNT = 10;
|
||||
protected static final int DEFAULT_REGIONSERVER_COUNT = 2;
|
||||
protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
|
||||
protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
|
||||
protected static final int DEFAULT_NUM_ITERATIONS = 10;
|
||||
protected static final int DEFAULT_ROWS_IN_ITERATION = 500000;
|
||||
protected static final String SLEEP_TIME_KEY = "sleeptime";
|
||||
// short default interval because tests don't run very long.
|
||||
protected static final long SLEEP_TIME_DEFAULT = 50000L;
|
||||
|
||||
protected static int rowsInIteration;
|
||||
protected static int regionsCountPerServer;
|
||||
protected static int regionServerCount;
|
||||
protected static final String NB_ROWS_IN_BATCH_KEY = "rows_in_batch";
|
||||
protected static final int DEFAULT_NB_ROWS_IN_BATCH = 20000;
|
||||
private static int rowsInBatch;
|
||||
|
||||
protected static int numIterations;
|
||||
protected static int numTables;
|
||||
protected static TableName[] tableNames;
|
||||
protected long sleepTime;
|
||||
protected static Object lock = new Object();
|
||||
|
||||
private static String BACKUP_ROOT_DIR = "backupIT";
|
||||
|
||||
@Override
|
||||
|
@ -87,24 +110,22 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
|
||||
regionServerCount =
|
||||
conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
|
||||
rowsInBatch = conf.getInt(NB_ROWS_IN_BATCH_KEY, DEFAULT_NB_ROWS_IN_BATCH);
|
||||
rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
|
||||
numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
|
||||
numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
|
||||
sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
|
||||
enableBackup(conf);
|
||||
LOG.info(String.format("Initializing cluster with %d region servers.", regionServerCount));
|
||||
LOG.info("Initializing cluster with {} region servers.", regionServerCount);
|
||||
util.initializeCluster(regionServerCount);
|
||||
LOG.info("Cluster initialized");
|
||||
util.deleteTableIfAny(TABLE_NAME1);
|
||||
util.deleteTableIfAny(TABLE_NAME2);
|
||||
LOG.info("Cluster ready");
|
||||
LOG.info("Cluster initialized and ready");
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
LOG.info("Cleaning up after test.");
|
||||
if(util.isDistributedCluster()) {
|
||||
util.deleteTableIfAny(TABLE_NAME1);
|
||||
LOG.info("Cleaning up after test. TABLE1 done");
|
||||
util.deleteTableIfAny(TABLE_NAME2);
|
||||
LOG.info("Cleaning up after test. TABLE2 done");
|
||||
deleteTablesIfAny();
|
||||
LOG.info("Cleaning up after test. Deleted tables");
|
||||
cleanUpBackupDir();
|
||||
}
|
||||
LOG.info("Restoring cluster.");
|
||||
|
@ -112,6 +133,30 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
LOG.info("Cluster restored.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUpMonkey() throws Exception {
|
||||
Policy p = new PeriodicRandomActionPolicy(sleepTime,
|
||||
new RestartRandomRsExceptMetaAction(sleepTime));
|
||||
this.monkey = new PolicyBasedChaosMonkey(util, p);
|
||||
startMonkey();
|
||||
}
|
||||
|
||||
private void deleteTablesIfAny() throws IOException {
|
||||
for (TableName table : tableNames) {
|
||||
util.deleteTableIfAny(table);
|
||||
}
|
||||
}
|
||||
|
||||
private void createTables() throws Exception {
|
||||
tableNames = new TableName[numTables];
|
||||
for (int i = 0; i < numTables; i++) {
|
||||
tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
|
||||
}
|
||||
for (TableName table : tableNames) {
|
||||
createTable(table);
|
||||
}
|
||||
}
|
||||
|
||||
private void enableBackup(Configuration conf) {
|
||||
// Enable backup
|
||||
conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
|
||||
|
@ -127,25 +172,53 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
@Test
|
||||
public void testBackupRestore() throws Exception {
|
||||
BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
|
||||
createTable(TABLE_NAME1);
|
||||
createTable(TABLE_NAME2);
|
||||
runTest();
|
||||
createTables();
|
||||
runTestMulti();
|
||||
}
|
||||
|
||||
private void runTestMulti() throws IOException {
|
||||
LOG.info("IT backup & restore started");
|
||||
Thread[] workers = new Thread[numTables];
|
||||
for (int i = 0; i < numTables; i++) {
|
||||
final TableName table = tableNames[i];
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runTestSingle(table);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed", e);
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
workers[i] = new Thread(r);
|
||||
workers[i].start();
|
||||
}
|
||||
// Wait all workers to finish
|
||||
for (Thread t : workers) {
|
||||
Uninterruptibles.joinUninterruptibly(t);
|
||||
}
|
||||
LOG.info("IT backup & restore finished");
|
||||
}
|
||||
|
||||
private void createTable(TableName tableName) throws Exception {
|
||||
long startTime, endTime;
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor[] columns =
|
||||
new HColumnDescriptor[]{new HColumnDescriptor(COLUMN_NAME)};
|
||||
LOG.info(String.format("Creating table %s with %d splits.", tableName,
|
||||
regionsCountPerServer));
|
||||
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||
|
||||
TableDescriptor desc = builder.build();
|
||||
ColumnFamilyDescriptorBuilder cbuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
|
||||
ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
|
||||
LOG.info("Creating table {} with {} splits.", tableName,
|
||||
regionsCountPerServer * regionServerCount);
|
||||
startTime = System.currentTimeMillis();
|
||||
HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
|
||||
regionsCountPerServer);
|
||||
util.waitTableAvailable(tableName);
|
||||
endTime = System.currentTimeMillis();
|
||||
LOG.info(String.format("Pre-split table created successfully in %dms.",
|
||||
(endTime - startTime)));
|
||||
LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
|
||||
}
|
||||
|
||||
private void loadData(TableName table, int numRows) throws IOException {
|
||||
|
@ -157,77 +230,102 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
conn.getAdmin().flush(TableName.valueOf(table.getName()));
|
||||
}
|
||||
|
||||
private void runTest() throws IOException {
|
||||
// Check if backup is enabled
|
||||
if (!BackupManager.isBackupEnabled(getConf())) {
|
||||
LOG.error(BackupRestoreConstants.ENABLE_BACKUP);
|
||||
System.exit(EXIT_FAILURE);
|
||||
}
|
||||
private String backup(BackupRequest request, BackupAdmin client)
|
||||
throws IOException {
|
||||
String backupId = client.backupTables(request);
|
||||
return backupId;
|
||||
}
|
||||
|
||||
LOG.info(BackupRestoreConstants.VERIFY_BACKUP);
|
||||
private void restore(RestoreRequest request, BackupAdmin client)
|
||||
throws IOException {
|
||||
client.restore(request);
|
||||
}
|
||||
|
||||
private void merge(String[] backupIds, BackupAdmin client)
|
||||
throws IOException {
|
||||
client.mergeBackups(backupIds);
|
||||
}
|
||||
|
||||
private void runTestSingle(TableName table) throws IOException {
|
||||
|
||||
List<String> backupIds = new ArrayList<String>();
|
||||
List<Integer> tableSizes = new ArrayList<Integer>();
|
||||
|
||||
try (Connection conn = util.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
BackupAdmin client = new BackupAdminImpl(conn)) {
|
||||
// #0- insert some data to table TABLE_NAME1, TABLE_NAME2
|
||||
loadData(TABLE_NAME1, rowsInBatch);
|
||||
loadData(TABLE_NAME2, rowsInBatch);
|
||||
// #1 - create full backup for all tables
|
||||
LOG.info("create full backup image for all tables");
|
||||
List<TableName> tables = Lists.newArrayList(TABLE_NAME1, TABLE_NAME2);
|
||||
Admin admin = conn.getAdmin();
|
||||
BackupAdmin client = new BackupAdminImpl(conn);) {
|
||||
|
||||
// #0- insert some data to table 'table'
|
||||
loadData(table, rowsInIteration);
|
||||
tableSizes.add(rowsInIteration);
|
||||
|
||||
// #1 - create full backup for table first
|
||||
LOG.info("create full backup image for {}", table);
|
||||
List<TableName> tables = Lists.newArrayList(table);
|
||||
BackupRequest.Builder builder = new BackupRequest.Builder();
|
||||
BackupRequest request =
|
||||
builder.withBackupType(BackupType.FULL).withTableList(tables)
|
||||
.withTargetRootDir(BACKUP_ROOT_DIR).build();
|
||||
String backupIdFull = client.backupTables(request);
|
||||
BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
|
||||
.withTargetRootDir(BACKUP_ROOT_DIR).build();
|
||||
|
||||
String backupIdFull = backup(request, client);
|
||||
assertTrue(checkSucceeded(backupIdFull));
|
||||
// #2 - insert some data to table
|
||||
loadData(TABLE_NAME1, rowsInBatch);
|
||||
loadData(TABLE_NAME2, rowsInBatch);
|
||||
|
||||
try (HTable t1 = (HTable) conn.getTable(TABLE_NAME1)) {
|
||||
Assert.assertEquals(util.countRows(t1), rowsInBatch * 2);
|
||||
}
|
||||
try (HTable t2 = (HTable) conn.getTable(TABLE_NAME2)) {
|
||||
Assert.assertEquals(util.countRows(t2), rowsInBatch * 2);
|
||||
}
|
||||
backupIds.add(backupIdFull);
|
||||
// Now continue with incremental backups
|
||||
int count = 1;
|
||||
while (count++ < numIterations) {
|
||||
|
||||
// #3 - incremental backup for tables
|
||||
tables = Lists.newArrayList(TABLE_NAME1, TABLE_NAME2);
|
||||
builder = new BackupRequest.Builder();
|
||||
request =
|
||||
builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
|
||||
.withTargetRootDir(BACKUP_ROOT_DIR).build();
|
||||
String backupIdIncMultiple = client.backupTables(request);
|
||||
assertTrue(checkSucceeded(backupIdIncMultiple));
|
||||
// #4 - restore full backup for all tables, without overwrite
|
||||
TableName[] tablesRestoreFull = new TableName[] { TABLE_NAME1, TABLE_NAME2 };
|
||||
client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, tablesRestoreFull,
|
||||
null, true));
|
||||
// #5.1 - check tables for full restore
|
||||
assertTrue(admin.tableExists(TABLE_NAME1));
|
||||
assertTrue(admin.tableExists(TABLE_NAME2));
|
||||
// #5.2 - checking row count of tables for full restore
|
||||
HTable hTable = (HTable) conn.getTable(TABLE_NAME1);
|
||||
Assert.assertEquals(util.countRows(hTable), rowsInBatch);
|
||||
hTable.close();
|
||||
hTable = (HTable) conn.getTable(TABLE_NAME2);
|
||||
Assert.assertEquals(util.countRows(hTable), rowsInBatch);
|
||||
hTable.close();
|
||||
// #6 - restore incremental backup for multiple tables, with overwrite
|
||||
TableName[] tablesRestoreIncMultiple = new TableName[] { TABLE_NAME1, TABLE_NAME2 };
|
||||
client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false,
|
||||
tablesRestoreIncMultiple, null, true));
|
||||
hTable = (HTable) conn.getTable(TABLE_NAME1);
|
||||
Assert.assertEquals(util.countRows(hTable), rowsInBatch * 2);
|
||||
hTable.close();
|
||||
hTable = (HTable) conn.getTable(TABLE_NAME2);
|
||||
Assert.assertEquals(util.countRows(hTable), rowsInBatch * 2);
|
||||
// Load data
|
||||
loadData(table, rowsInIteration);
|
||||
tableSizes.add(rowsInIteration * count);
|
||||
// Do incremental backup
|
||||
builder = new BackupRequest.Builder();
|
||||
request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
|
||||
.withTargetRootDir(BACKUP_ROOT_DIR).build();
|
||||
String backupId = backup(request, client);
|
||||
assertTrue(checkSucceeded(backupId));
|
||||
backupIds.add(backupId);
|
||||
|
||||
// Restore incremental backup for table, with overwrite for previous backup
|
||||
String previousBackupId = backupIds.get(backupIds.size() - 2);
|
||||
restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
|
||||
// Restore incremental backup for table, with overwrite for last backup
|
||||
restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
|
||||
}
|
||||
// Now merge all incremental and restore
|
||||
String[] incBackupIds = allIncremental(backupIds);
|
||||
merge(incBackupIds, client);
|
||||
// Restore last one
|
||||
String backupId = incBackupIds[incBackupIds.length - 1];
|
||||
// restore incremental backup for table, with overwrite
|
||||
TableName[] tablesRestoreIncMultiple = new TableName[] { table };
|
||||
restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
|
||||
true), client);
|
||||
Table hTable = conn.getTable(table);
|
||||
Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
|
||||
hTable.close();
|
||||
LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count-1));
|
||||
}
|
||||
}
|
||||
|
||||
private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
|
||||
String backupId, long expectedRows) throws IOException {
|
||||
|
||||
TableName[] tablesRestoreIncMultiple = new TableName[] { table };
|
||||
restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
|
||||
tablesRestoreIncMultiple, null, true), client);
|
||||
Table hTable = conn.getTable(table);
|
||||
Assert.assertEquals(expectedRows, util.countRows(hTable));
|
||||
hTable.close();
|
||||
}
|
||||
|
||||
private String[] allIncremental(List<String> backupIds) {
|
||||
int size = backupIds.size();
|
||||
backupIds = backupIds.subList(1, size);
|
||||
String[] arr = new String[size - 1];
|
||||
backupIds.toArray(arr);
|
||||
return arr;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param backupId pass backup ID to check status of
|
||||
|
@ -260,18 +358,18 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
|
||||
RestoreRequest.Builder builder = new RestoreRequest.Builder();
|
||||
return builder.withBackupRootDir(backupRootDir)
|
||||
.withBackupId(backupId)
|
||||
.withCheck(check)
|
||||
.withFromTables(fromTables)
|
||||
.withToTables(toTables)
|
||||
.withOvewrite(isOverwrite).build();
|
||||
.withBackupId(backupId)
|
||||
.withCheck(check)
|
||||
.withFromTables(fromTables)
|
||||
.withToTables(toTables)
|
||||
.withOvewrite(isOverwrite).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUpCluster() throws Exception {
|
||||
util = getTestingUtil(getConf());
|
||||
enableBackup(getConf());
|
||||
LOG.debug("Initializing/checking cluster has " + regionServerCount + " servers");
|
||||
LOG.debug("Initializing/checking cluster has {} servers",regionServerCount);
|
||||
util.initializeCluster(regionServerCount);
|
||||
LOG.debug("Done initializing/checking cluster");
|
||||
}
|
||||
|
@ -282,14 +380,12 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
*/
|
||||
@Override
|
||||
public int runTestFromCommandLine() throws Exception {
|
||||
// Check if backup is enabled
|
||||
// Check if backup is enabled
|
||||
if (!BackupManager.isBackupEnabled(getConf())) {
|
||||
System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
|
||||
return -1;
|
||||
}
|
||||
|
||||
System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
|
||||
|
||||
testBackupRestore();
|
||||
return 0;
|
||||
}
|
||||
|
@ -308,11 +404,16 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
|
||||
@Override
|
||||
protected void addOptions() {
|
||||
addOptWithArg(REGIONSERVER_COUNT_KEY, "Total number of region servers. Default: '"
|
||||
+ DEFAULT_REGIONSERVER_COUNT + "'");
|
||||
addOptWithArg(REGIONSERVER_COUNT_KEY,
|
||||
"Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
|
||||
addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
|
||||
addOptWithArg(NB_ROWS_IN_BATCH_KEY, "Total number of data rows to be loaded (per table/batch."
|
||||
+ " Total number of batches=2). Default: " + DEFAULT_NB_ROWS_IN_BATCH);
|
||||
addOptWithArg(ROWS_PER_ITERATION_KEY,
|
||||
"Total number of data rows to be loaded during one iteration." + " Default: "
|
||||
+ DEFAULT_ROWS_IN_ITERATION);
|
||||
addOptWithArg(NUM_ITERATIONS_KEY,
|
||||
"Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
|
||||
addOptWithArg(NUMBER_OF_TABLES_KEY,
|
||||
"Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
|
||||
|
||||
}
|
||||
|
||||
|
@ -325,13 +426,18 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
|
|||
regionServerCount =
|
||||
Integer.parseInt(cmd.getOptionValue(REGIONSERVER_COUNT_KEY,
|
||||
Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
|
||||
rowsInBatch =
|
||||
Integer.parseInt(cmd.getOptionValue(NB_ROWS_IN_BATCH_KEY,
|
||||
Integer.toString(DEFAULT_NB_ROWS_IN_BATCH)));
|
||||
rowsInIteration =
|
||||
Integer.parseInt(cmd.getOptionValue(ROWS_PER_ITERATION_KEY,
|
||||
Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
|
||||
numIterations = Integer.parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY,
|
||||
Integer.toString(DEFAULT_NUM_ITERATIONS)));
|
||||
numTables = Integer.parseInt(cmd.getOptionValue(NUMBER_OF_TABLES_KEY,
|
||||
Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
|
||||
|
||||
LOG.info(MoreObjects.toStringHelper("Parsed Options").
|
||||
add(REGION_COUNT_KEY, regionsCountPerServer)
|
||||
.add(REGIONSERVER_COUNT_KEY, regionServerCount).add(NB_ROWS_IN_BATCH_KEY, rowsInBatch)
|
||||
.toString());
|
||||
.add(REGIONSERVER_COUNT_KEY, regionServerCount).add(ROWS_PER_ITERATION_KEY, rowsInIteration)
|
||||
.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue