HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
057e80c163
commit
a5601c8eac
|
@ -271,7 +271,7 @@ public class BackupAdminImpl implements BackupAdmin {
|
||||||
LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
|
LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
|
||||||
}
|
}
|
||||||
if (success) {
|
if (success) {
|
||||||
sysTable.deleteBulkLoadedFiles(map);
|
sysTable.deleteBulkLoadedRows(new ArrayList<byte[]>(map.keySet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
sysTable.deleteBackupInfo(backupInfo.getBackupId());
|
sysTable.deleteBackupInfo(backupInfo.getBackupId());
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||||
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
|
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
|
||||||
|
import org.apache.hadoop.hbase.backup.BackupObserver;
|
||||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||||
import org.apache.hadoop.hbase.backup.BackupType;
|
import org.apache.hadoop.hbase.backup.BackupType;
|
||||||
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
|
||||||
import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
|
import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
|
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -140,10 +142,14 @@ public class BackupManager implements Closeable {
|
||||||
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
|
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
|
||||||
+ regionProcedureClass);
|
+ regionProcedureClass);
|
||||||
}
|
}
|
||||||
|
String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
|
||||||
|
String regionObserverClass = BackupObserver.class.getName();
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
|
||||||
|
regionObserverClass);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Added region procedure manager: " + regionProcedureClass);
|
LOG.debug("Added region procedure manager: " + regionProcedureClass +
|
||||||
|
". Added region observer: " + regionObserverClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isBackupEnabled(Configuration conf) {
|
public static boolean isBackupEnabled(Configuration conf) {
|
||||||
|
@ -415,13 +421,8 @@ public class BackupManager implements Closeable {
|
||||||
return systemTable.readBulkloadRows(tableList);
|
return systemTable.readBulkloadRows(tableList);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
|
public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
|
||||||
systemTable.removeBulkLoadedRows(lst, rows);
|
systemTable.deleteBulkLoadedRows(rows);
|
||||||
}
|
|
||||||
|
|
||||||
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
|
|
||||||
throws IOException {
|
|
||||||
systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||||
import org.apache.hadoop.hbase.backup.BackupType;
|
import org.apache.hadoop.hbase.backup.BackupType;
|
||||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -122,7 +124,21 @@ public final class BackupSystemTable implements Closeable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Backup system table (main) name
|
||||||
|
*/
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Backup System table name for bulk loaded files.
|
||||||
|
* We keep all bulk loaded file references in a separate table
|
||||||
|
* because we have to isolate general backup operations: create, merge etc
|
||||||
|
* from activity of RegionObserver, which controls process of a bulk loading
|
||||||
|
* {@link org.apache.hadoop.hbase.backup.BackupObserver}
|
||||||
|
*/
|
||||||
|
|
||||||
|
private TableName bulkLoadTableName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores backup sessions (contexts)
|
* Stores backup sessions (contexts)
|
||||||
*/
|
*/
|
||||||
|
@ -174,20 +190,29 @@ public final class BackupSystemTable implements Closeable {
|
||||||
|
|
||||||
public BackupSystemTable(Connection conn) throws IOException {
|
public BackupSystemTable(Connection conn) throws IOException {
|
||||||
this.connection = conn;
|
this.connection = conn;
|
||||||
tableName = BackupSystemTable.getTableName(conn.getConfiguration());
|
Configuration conf = this.connection.getConfiguration();
|
||||||
|
tableName = BackupSystemTable.getTableName(conf);
|
||||||
|
bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
|
||||||
checkSystemTable();
|
checkSystemTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkSystemTable() throws IOException {
|
private void checkSystemTable() throws IOException {
|
||||||
try (Admin admin = connection.getAdmin()) {
|
try (Admin admin = connection.getAdmin()) {
|
||||||
verifyNamespaceExists(admin);
|
verifyNamespaceExists(admin);
|
||||||
|
Configuration conf = connection.getConfiguration();
|
||||||
if (!admin.tableExists(tableName)) {
|
if (!admin.tableExists(tableName)) {
|
||||||
HTableDescriptor backupHTD =
|
TableDescriptor backupHTD =
|
||||||
BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration());
|
BackupSystemTable.getSystemTableDescriptor(conf);
|
||||||
admin.createTable(backupHTD);
|
admin.createTable(backupHTD);
|
||||||
}
|
}
|
||||||
waitForSystemTable(admin);
|
if (!admin.tableExists(bulkLoadTableName)) {
|
||||||
|
TableDescriptor blHTD =
|
||||||
|
BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
|
||||||
|
admin.createTable(blHTD);
|
||||||
|
}
|
||||||
|
waitForSystemTable(admin, tableName);
|
||||||
|
waitForSystemTable(admin, bulkLoadTableName);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,7 +232,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForSystemTable(Admin admin) throws IOException {
|
private void waitForSystemTable(Admin admin, TableName tableName) throws IOException {
|
||||||
long TIMEOUT = 60000;
|
long TIMEOUT = 60000;
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
long startTime = EnvironmentEdgeManager.currentTime();
|
||||||
while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
|
while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) {
|
||||||
|
@ -216,10 +241,11 @@ public final class BackupSystemTable implements Closeable {
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
}
|
}
|
||||||
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
|
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
|
||||||
throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms");
|
throw new IOException("Failed to create backup system table "+
|
||||||
|
tableName +" after " + TIMEOUT + "ms");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("Backup table exists and available");
|
LOG.debug("Backup table "+tableName+" exists and available");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,7 +277,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
*/
|
*/
|
||||||
Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
|
Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
|
||||||
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
|
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
|
||||||
try (Table table = connection.getTable(tableName);
|
try (Table table = connection.getTable(bulkLoadTableName);
|
||||||
ResultScanner scanner = table.getScanner(scan)) {
|
ResultScanner scanner = table.getScanner(scan)) {
|
||||||
Result res = null;
|
Result res = null;
|
||||||
Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -279,7 +305,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
|
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
|
||||||
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
|
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
|
||||||
try (Table table = connection.getTable(tableName);
|
try (Table table = connection.getTable(bulkLoadTableName);
|
||||||
ResultScanner scanner = table.getScanner(scan)) {
|
ResultScanner scanner = table.getScanner(scan)) {
|
||||||
Result res = null;
|
Result res = null;
|
||||||
while ((res = scanner.next()) != null) {
|
while ((res = scanner.next()) != null) {
|
||||||
|
@ -324,18 +350,6 @@ public final class BackupSystemTable implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* @param map Map of row keys to path of bulk loaded hfile
|
|
||||||
*/
|
|
||||||
void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
|
|
||||||
try (Table table = connection.getTable(tableName)) {
|
|
||||||
List<Delete> dels = new ArrayList<>();
|
|
||||||
for (byte[] row : map.keySet()) {
|
|
||||||
dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
|
|
||||||
}
|
|
||||||
table.delete(dels);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes backup status from backup system table table
|
* Deletes backup status from backup system table table
|
||||||
|
@ -366,7 +380,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
|
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
|
||||||
+ " entries");
|
+ " entries");
|
||||||
}
|
}
|
||||||
try (Table table = connection.getTable(tableName)) {
|
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||||
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
|
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
|
||||||
table.put(puts);
|
table.put(puts);
|
||||||
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
|
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
|
||||||
|
@ -386,7 +400,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
|
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
|
||||||
+ " entries");
|
+ " entries");
|
||||||
}
|
}
|
||||||
try (Table table = connection.getTable(tableName)) {
|
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||||
List<Put> puts =
|
List<Put> puts =
|
||||||
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
|
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
|
||||||
table.put(puts);
|
table.put(puts);
|
||||||
|
@ -399,8 +413,8 @@ public final class BackupSystemTable implements Closeable {
|
||||||
* @param lst list of table names
|
* @param lst list of table names
|
||||||
* @param rows the rows to be deleted
|
* @param rows the rows to be deleted
|
||||||
*/
|
*/
|
||||||
public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
|
public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
|
||||||
try (Table table = connection.getTable(tableName)) {
|
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||||
List<Delete> lstDels = new ArrayList<>();
|
List<Delete> lstDels = new ArrayList<>();
|
||||||
for (byte[] row : rows) {
|
for (byte[] row : rows) {
|
||||||
Delete del = new Delete(row);
|
Delete del = new Delete(row);
|
||||||
|
@ -408,7 +422,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
LOG.debug("orig deleting the row: " + Bytes.toString(row));
|
LOG.debug("orig deleting the row: " + Bytes.toString(row));
|
||||||
}
|
}
|
||||||
table.delete(lstDels);
|
table.delete(lstDels);
|
||||||
LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
|
LOG.debug("deleted " + rows.size() + " original bulkload rows");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,7 +439,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
for (TableName tTable : tableList) {
|
for (TableName tTable : tableList) {
|
||||||
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
|
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
|
||||||
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
|
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
|
||||||
try (Table table = connection.getTable(tableName);
|
try (Table table = connection.getTable(bulkLoadTableName);
|
||||||
ResultScanner scanner = table.getScanner(scan)) {
|
ResultScanner scanner = table.getScanner(scan)) {
|
||||||
Result res = null;
|
Result res = null;
|
||||||
while ((res = scanner.next()) != null) {
|
while ((res = scanner.next()) != null) {
|
||||||
|
@ -480,7 +494,7 @@ public final class BackupSystemTable implements Closeable {
|
||||||
*/
|
*/
|
||||||
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
|
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
|
||||||
String backupId) throws IOException {
|
String backupId) throws IOException {
|
||||||
try (Table table = connection.getTable(tableName)) {
|
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||||
long ts = EnvironmentEdgeManager.currentTime();
|
long ts = EnvironmentEdgeManager.currentTime();
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
List<Put> puts = new ArrayList<>();
|
List<Put> puts = new ArrayList<>();
|
||||||
|
@ -1311,21 +1325,28 @@ public final class BackupSystemTable implements Closeable {
|
||||||
* Get backup system table descriptor
|
* Get backup system table descriptor
|
||||||
* @return table's descriptor
|
* @return table's descriptor
|
||||||
*/
|
*/
|
||||||
public static HTableDescriptor getSystemTableDescriptor(Configuration conf) {
|
public static TableDescriptor getSystemTableDescriptor(Configuration conf) {
|
||||||
|
|
||||||
HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf));
|
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf));
|
||||||
HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
|
|
||||||
colSessionsDesc.setMaxVersions(1);
|
ColumnFamilyDescriptorBuilder colBuilder =
|
||||||
// Time to keep backup sessions (secs)
|
ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
|
||||||
|
|
||||||
|
colBuilder.setMaxVersions(1);
|
||||||
Configuration config = HBaseConfiguration.create();
|
Configuration config = HBaseConfiguration.create();
|
||||||
int ttl =
|
int ttl =
|
||||||
config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
|
config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
|
||||||
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||||
colSessionsDesc.setTimeToLive(ttl);
|
colBuilder.setTimeToLive(ttl);
|
||||||
tableDesc.addFamily(colSessionsDesc);
|
|
||||||
HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
|
ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
|
||||||
tableDesc.addFamily(colMetaDesc);
|
builder.addColumnFamily(colSessionsDesc);
|
||||||
return tableDesc;
|
|
||||||
|
colBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||||
|
colBuilder.setTimeToLive(ttl);
|
||||||
|
builder.addColumnFamily(colBuilder.build());
|
||||||
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TableName getTableName(Configuration conf) {
|
public static TableName getTableName(Configuration conf) {
|
||||||
|
@ -1343,6 +1364,38 @@ public final class BackupSystemTable implements Closeable {
|
||||||
return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
|
return "snapshot_" + getTableNameAsString(conf).replace(":", "_");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get backup system table descriptor
|
||||||
|
* @return table's descriptor
|
||||||
|
*/
|
||||||
|
public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) {
|
||||||
|
|
||||||
|
TableDescriptorBuilder builder =
|
||||||
|
TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf));
|
||||||
|
|
||||||
|
ColumnFamilyDescriptorBuilder colBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY);
|
||||||
|
colBuilder.setMaxVersions(1);
|
||||||
|
Configuration config = HBaseConfiguration.create();
|
||||||
|
int ttl =
|
||||||
|
config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
|
||||||
|
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||||
|
colBuilder.setTimeToLive(ttl);
|
||||||
|
ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
|
||||||
|
builder.addColumnFamily(colSessionsDesc);
|
||||||
|
colBuilder =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||||
|
colBuilder.setTimeToLive(ttl);
|
||||||
|
builder.addColumnFamily(colBuilder.build());
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TableName getTableNameForBulkLoadedData(Configuration conf) {
|
||||||
|
String name =
|
||||||
|
conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
|
||||||
|
BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
|
||||||
|
return TableName.valueOf(name);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Creates Put operation for a given backup info object
|
* Creates Put operation for a given backup info object
|
||||||
* @param context backup info
|
* @param context backup info
|
||||||
|
|
|
@ -72,7 +72,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
|
protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
List<String> list = new ArrayList<String>();
|
List<String> list = new ArrayList<String>();
|
||||||
for (String file : incrBackupFileList) {
|
for (String file : incrBackupFileList) {
|
||||||
Path p = new Path(file);
|
Path p = new Path(file);
|
||||||
|
@ -110,6 +109,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
* @param sTableList list of tables to be backed up
|
* @param sTableList list of tables to be backed up
|
||||||
* @return map of table to List of files
|
* @return map of table to List of files
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
|
protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
|
||||||
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
|
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
|
||||||
List<String> activeFiles = new ArrayList<String>();
|
List<String> activeFiles = new ArrayList<String>();
|
||||||
|
@ -117,7 +117,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
|
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
|
||||||
backupManager.readBulkloadRows(sTableList);
|
backupManager.readBulkloadRows(sTableList);
|
||||||
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
|
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
FileSystem tgtFs;
|
FileSystem tgtFs;
|
||||||
try {
|
try {
|
||||||
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
|
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
|
||||||
|
@ -126,6 +125,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
}
|
}
|
||||||
Path rootdir = FSUtils.getRootDir(conf);
|
Path rootdir = FSUtils.getRootDir(conf);
|
||||||
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
|
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
|
||||||
|
|
||||||
for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
|
for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
|
||||||
map.entrySet()) {
|
map.entrySet()) {
|
||||||
TableName srcTable = tblEntry.getKey();
|
TableName srcTable = tblEntry.getKey();
|
||||||
|
@ -192,26 +192,47 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
copyBulkLoadedFiles(activeFiles, archiveFiles);
|
copyBulkLoadedFiles(activeFiles, archiveFiles);
|
||||||
|
backupManager.deleteBulkLoadedRows(pair.getSecond());
|
||||||
backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
|
|
||||||
backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
|
|
||||||
return mapForSrc;
|
return mapForSrc;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
|
private void copyBulkLoadedFiles(List<String> activeFiles, List<String> archiveFiles)
|
||||||
throws IOException
|
throws IOException {
|
||||||
{
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Enable special mode of BackupDistCp
|
// Enable special mode of BackupDistCp
|
||||||
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
|
conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
|
||||||
// Copy active files
|
// Copy active files
|
||||||
String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
|
String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId();
|
||||||
if (activeFiles.size() > 0) {
|
int attempt = 1;
|
||||||
|
while (activeFiles.size() > 0) {
|
||||||
|
LOG.info("Copy "+ activeFiles.size() +
|
||||||
|
" active bulk loaded files. Attempt ="+ (attempt++));
|
||||||
String[] toCopy = new String[activeFiles.size()];
|
String[] toCopy = new String[activeFiles.size()];
|
||||||
activeFiles.toArray(toCopy);
|
activeFiles.toArray(toCopy);
|
||||||
incrementalCopyHFiles(toCopy, tgtDest);
|
// Active file can be archived during copy operation,
|
||||||
|
// we need to handle this properly
|
||||||
|
try {
|
||||||
|
incrementalCopyHFiles(toCopy, tgtDest);
|
||||||
|
break;
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Check if some files got archived
|
||||||
|
// Update active and archived lists
|
||||||
|
// When file is being moved from active to archive
|
||||||
|
// directory, the number of active files decreases
|
||||||
|
|
||||||
|
int numOfActive = activeFiles.size();
|
||||||
|
updateFileLists(activeFiles, archiveFiles);
|
||||||
|
if (activeFiles.size() < numOfActive) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// if not - throw exception
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// If incremental copy will fail for archived files
|
||||||
|
// we will have partially loaded files in backup destination (only files from active data
|
||||||
|
// directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
|
||||||
if (archiveFiles.size() > 0) {
|
if (archiveFiles.size() > 0) {
|
||||||
String[] toCopy = new String[archiveFiles.size()];
|
String[] toCopy = new String[archiveFiles.size()];
|
||||||
archiveFiles.toArray(toCopy);
|
archiveFiles.toArray(toCopy);
|
||||||
|
@ -224,6 +245,25 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
|
||||||
|
throws IOException {
|
||||||
|
List<String> newlyArchived = new ArrayList<String>();
|
||||||
|
|
||||||
|
for (String spath : activeFiles) {
|
||||||
|
if (!fs.exists(new Path(spath))) {
|
||||||
|
newlyArchived.add(spath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newlyArchived.size() > 0) {
|
||||||
|
activeFiles.removeAll(newlyArchived);
|
||||||
|
archiveFiles.addAll(newlyArchived);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug(newlyArchived.size() + " files have been archived.");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute() throws IOException {
|
public void execute() throws IOException {
|
||||||
|
|
||||||
|
@ -322,7 +362,6 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
protected void deleteBulkLoadDirectory() throws IOException {
|
protected void deleteBulkLoadDirectory() throws IOException {
|
||||||
// delete original bulk load directory on method exit
|
// delete original bulk load directory on method exit
|
||||||
Path path = getBulkOutputDir();
|
Path path = getBulkOutputDir();
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
boolean result = fs.delete(path, true);
|
boolean result = fs.delete(path, true);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
LOG.warn("Could not delete " + path);
|
LOG.warn("Could not delete " + path);
|
||||||
|
|
|
@ -21,33 +21,31 @@ package org.apache.hadoop.hbase.backup.impl;
|
||||||
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
|
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.backup.BackupType;
|
import org.apache.hadoop.hbase.backup.BackupType;
|
||||||
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
||||||
import org.apache.hadoop.hbase.backup.RestoreRequest;
|
import org.apache.hadoop.hbase.backup.RestoreRequest;
|
||||||
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
|
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
|
||||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
|
||||||
import org.apache.hadoop.hbase.backup.util.RestoreTool;
|
import org.apache.hadoop.hbase.backup.util.RestoreTool;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
|
|
||||||
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore table implementation
|
* Restore table implementation
|
||||||
|
@ -171,8 +169,10 @@ public class RestoreTablesClient {
|
||||||
for (int i = 1; i < images.length; i++) {
|
for (int i = 1; i < images.length; i++) {
|
||||||
BackupImage im = images[i];
|
BackupImage im = images[i];
|
||||||
String fileBackupDir =
|
String fileBackupDir =
|
||||||
HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable);
|
HBackupFileSystem.getTableBackupDir(im.getRootDir(), im.getBackupId(), sTable);
|
||||||
dirList.add(new Path(fileBackupDir));
|
List<Path> list = getFilesRecursively(fileBackupDir);
|
||||||
|
dirList.addAll(list);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String dirs = StringUtils.join(dirList, ",");
|
String dirs = StringUtils.join(dirList, ",");
|
||||||
|
@ -185,6 +185,20 @@ public class RestoreTablesClient {
|
||||||
LOG.info(sTable + " has been successfully restored to " + tTable);
|
LOG.info(sTable + " has been successfully restored to " + tTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<Path> getFilesRecursively(String fileBackupDir)
|
||||||
|
throws IllegalArgumentException, IOException {
|
||||||
|
FileSystem fs = FileSystem.get((new Path(fileBackupDir)).toUri(), new Configuration());
|
||||||
|
List<Path> list = new ArrayList<Path>();
|
||||||
|
RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path(fileBackupDir), true);
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Path p = it.next().getPath();
|
||||||
|
if (HFile.isHFileFormat(fs, p)) {
|
||||||
|
list.add(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore operation. Stage 2: resolved Backup Image dependency
|
* Restore operation. Stage 2: resolved Backup Image dependency
|
||||||
* @param backupManifestMap : tableName, Manifest
|
* @param backupManifestMap : tableName, Manifest
|
||||||
|
@ -226,27 +240,6 @@ public class RestoreTablesClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try (BackupSystemTable table = new BackupSystemTable(conn)) {
|
|
||||||
List<TableName> sTableList = Arrays.asList(sTableArray);
|
|
||||||
for (String id : backupIdSet) {
|
|
||||||
LOG.debug("restoring bulk load for " + id);
|
|
||||||
Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
|
|
||||||
Map<LoadQueueItem, ByteBuffer> loaderResult;
|
|
||||||
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
|
|
||||||
LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
|
|
||||||
for (int i = 0; i < sTableList.size(); i++) {
|
|
||||||
if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
|
|
||||||
loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
|
|
||||||
LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
|
|
||||||
if (loaderResult.isEmpty()) {
|
|
||||||
String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];
|
|
||||||
LOG.error(msg);
|
|
||||||
throw new IOException(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("restoreStage finished");
|
LOG.debug("restoreStage finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,7 @@ public abstract class TableBackupClient {
|
||||||
|
|
||||||
protected BackupManager backupManager;
|
protected BackupManager backupManager;
|
||||||
protected BackupInfo backupInfo;
|
protected BackupInfo backupInfo;
|
||||||
|
protected FileSystem fs;
|
||||||
|
|
||||||
public TableBackupClient() {
|
public TableBackupClient() {
|
||||||
}
|
}
|
||||||
|
@ -90,6 +91,7 @@ public abstract class TableBackupClient {
|
||||||
this.tableList = request.getTableList();
|
this.tableList = request.getTableList();
|
||||||
this.conn = conn;
|
this.conn = conn;
|
||||||
this.conf = conn.getConfiguration();
|
this.conf = conn.getConfiguration();
|
||||||
|
this.fs = FSUtils.getCurrentFileSystem(conf);
|
||||||
backupInfo =
|
backupInfo =
|
||||||
backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
|
backupManager.createBackupInfo(backupId, request.getBackupType(), tableList,
|
||||||
request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
|
request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth());
|
||||||
|
@ -258,22 +260,21 @@ public abstract class TableBackupClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
|
public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo,
|
||||||
Configuration conf) throws IOException
|
Configuration conf) throws IOException {
|
||||||
{
|
|
||||||
BackupType type = backupInfo.getType();
|
BackupType type = backupInfo.getType();
|
||||||
// if full backup, then delete HBase snapshots if there already are snapshots taken
|
// if full backup, then delete HBase snapshots if there already are snapshots taken
|
||||||
// and also clean up export snapshot log files if exist
|
// and also clean up export snapshot log files if exist
|
||||||
if (type == BackupType.FULL) {
|
if (type == BackupType.FULL) {
|
||||||
deleteSnapshots(conn, backupInfo, conf);
|
deleteSnapshots(conn, backupInfo, conf);
|
||||||
cleanupExportSnapshotLog(conf);
|
cleanupExportSnapshotLog(conf);
|
||||||
}
|
}
|
||||||
BackupSystemTable.restoreFromSnapshot(conn);
|
BackupSystemTable.restoreFromSnapshot(conn);
|
||||||
BackupSystemTable.deleteSnapshot(conn);
|
BackupSystemTable.deleteSnapshot(conn);
|
||||||
// clean up the uncompleted data at target directory if the ongoing backup has already entered
|
// clean up the uncompleted data at target directory if the ongoing backup has already entered
|
||||||
// the copy phase
|
// the copy phase
|
||||||
// For incremental backup, DistCp logs will be cleaned with the targetDir.
|
// For incremental backup, DistCp logs will be cleaned with the targetDir.
|
||||||
cleanupTargetDir(backupInfo, conf);
|
cleanupTargetDir(backupInfo, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -355,7 +356,6 @@ public abstract class TableBackupClient {
|
||||||
*/
|
*/
|
||||||
protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
|
protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
|
||||||
Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
|
Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
|
||||||
FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
|
|
||||||
FileStatus[] files = FSUtils.listStatus(fs, rootPath);
|
FileStatus[] files = FSUtils.listStatus(fs, rootPath);
|
||||||
if (files == null) {
|
if (files == null) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
|
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
|
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
|
||||||
|
@ -297,9 +296,6 @@ public class TestBackupBase {
|
||||||
// setup configuration
|
// setup configuration
|
||||||
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
|
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
|
||||||
}
|
}
|
||||||
String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
|
|
||||||
conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
|
|
||||||
BackupObserver.class.getName());
|
|
||||||
conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
|
conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
|
||||||
BackupManager.decorateMasterConfiguration(conf1);
|
BackupManager.decorateMasterConfiguration(conf1);
|
||||||
BackupManager.decorateRegionServerConfiguration(conf1);
|
BackupManager.decorateRegionServerConfiguration(conf1);
|
||||||
|
|
|
@ -113,18 +113,32 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
|
||||||
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
||||||
String backupIdIncMultiple = client.backupTables(request);
|
String backupIdIncMultiple = client.backupTables(request);
|
||||||
assertTrue(checkSucceeded(backupIdIncMultiple));
|
assertTrue(checkSucceeded(backupIdIncMultiple));
|
||||||
|
// #4 bulk load again
|
||||||
|
LOG.debug("bulk loading into " + testName);
|
||||||
|
int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
|
||||||
|
qualName, false, null,
|
||||||
|
new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") },
|
||||||
|
new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, },
|
||||||
|
true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2);
|
||||||
|
|
||||||
|
// #5 - incremental backup for table1
|
||||||
|
tables = Lists.newArrayList(table1);
|
||||||
|
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
||||||
|
String backupIdIncMultiple1 = client.backupTables(request);
|
||||||
|
assertTrue(checkSucceeded(backupIdIncMultiple1));
|
||||||
|
// Delete all data in table1
|
||||||
|
TEST_UTIL.deleteTableData(table1);
|
||||||
// #5.1 - check tables for full restore */
|
// #5.1 - check tables for full restore */
|
||||||
HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
|
HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
|
||||||
|
|
||||||
// #6 - restore incremental backup for table1
|
// #6 - restore incremental backup for table1
|
||||||
TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
|
TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
|
||||||
TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
|
//TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
|
||||||
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
|
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1,
|
||||||
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
|
false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true));
|
||||||
|
|
||||||
HTable hTable = (HTable) conn.getTable(table1_restore);
|
HTable hTable = (HTable) conn.getTable(table1);
|
||||||
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
|
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1);
|
||||||
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
|
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
|
||||||
|
|
||||||
backupIdFull = client.backupTables(request);
|
backupIdFull = client.backupTables(request);
|
||||||
|
|
Loading…
Reference in New Issue