HBASE-19469 Review Of BackupSystemTable (BELUGA BEHR)

This commit is contained in:
tedyu 2017-12-09 09:38:00 -08:00
parent c98bab51de
commit 82e278be55
2 changed files with 90 additions and 160 deletions

View File

@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.TreeMap;
import java.util.TreeSet;
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.ArrayUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@ -530,9 +533,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public String readBackupStartCode(String backupRoot) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("read backup start code from backup system table");
}
LOG.trace("read backup start code from backup system table");
try (Table table = connection.getTable(tableName)) {
Get get = createGetForStartCode(backupRoot);
Result res = table.get(get);
@ -570,9 +572,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException
*/
public void startBackupExclusiveOperation() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Start new backup exclusive operation");
}
LOG.debug("Start new backup exclusive operation");
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStartBackupSession();
// First try to put if row does not exist
@ -593,9 +594,8 @@ public final class BackupSystemTable implements Closeable {
}
public void finishBackupExclusiveOperation() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Finish backup exclusive operation");
}
LOG.debug("Finish backup exclusive operation");
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStopBackupSession();
if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
@ -619,9 +619,7 @@ public final class BackupSystemTable implements Closeable {
*/
public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("read region server last roll log result to backup system table");
}
LOG.trace("read region server last roll log result to backup system table");
Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot);
@ -650,9 +648,8 @@ public final class BackupSystemTable implements Closeable {
*/
public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("write region server last roll log result to backup system table");
}
LOG.trace("write region server last roll log result to backup system table");
try (Table table = connection.getTable(tableName)) {
Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
table.put(put);
@ -666,12 +663,10 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("get backup history from backup system table");
}
ArrayList<BackupInfo> list;
LOG.trace("get backup history from backup system table");
BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
list = getBackupInfos(state);
ArrayList<BackupInfo> list = getBackupInfos(state);
return BackupUtils.sortHistoryListDesc(list);
}
@ -692,15 +687,11 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException
*/
public List<BackupInfo> getHistory(int n) throws IOException {
List<BackupInfo> history = getBackupHistory();
if (n == -1 || history.size() <= n) return history;
List<BackupInfo> list = new ArrayList<BackupInfo>();
for (int i = 0; i < n; i++) {
list.add(history.get(i));
if (n == -1 || history.size() <= n) {
return history;
}
return list;
return Collections.unmodifiableList(history.subList(0, n));
}
/**
@ -742,10 +733,11 @@ public final class BackupSystemTable implements Closeable {
Set<TableName> names = new HashSet<>();
List<BackupInfo> infos = getBackupHistory(true);
for (BackupInfo info : infos) {
if (info.getType() != type) continue;
names.addAll(info.getTableNames());
if (info.getType() == type) {
names.addAll(info.getTableNames());
}
}
return new ArrayList(names);
return new ArrayList<>(names);
}
/**
@ -815,9 +807,7 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("get backup infos from backup system table");
}
LOG.trace("get backup infos from backup system table");
Scan scan = createScanForBackupHistory();
ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
@ -946,9 +936,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("get incremental backup table set from backup system table");
}
LOG.trace("get incremental backup table set from backup system table");
TreeSet<TableName> set = new TreeSet<>();
try (Table table = connection.getTable(tableName)) {
@ -977,9 +966,9 @@ public final class BackupSystemTable implements Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
+ " tables [" + StringUtils.join(tables, " ") + "]");
for (TableName table : tables) {
LOG.debug(table);
}
}
if (LOG.isDebugEnabled()) {
tables.forEach(table -> LOG.debug(table));
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForIncrBackupTableSet(tables, backupRoot);
@ -1014,9 +1003,9 @@ public final class BackupSystemTable implements Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
+ StringUtils.join(files, ",") + "]");
for (String f : files) {
LOG.debug("add :" + f);
}
}
if (LOG.isDebugEnabled()) {
files.forEach(file -> LOG.debug("add :" + file));
}
try (Table table = connection.getTable(tableName)) {
List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
@ -1030,9 +1019,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("get WAL files from backup system table");
}
LOG.trace("get WAL files from backup system table");
final Table table = connection.getTable(tableName);
Scan scan = createScanForGetWALs(backupRoot);
final ResultScanner scanner = table.getScanner(scan);
@ -1096,10 +1084,7 @@ public final class BackupSystemTable implements Closeable {
try (Table table = connection.getTable(tableName)) {
Get get = createGetForCheckWALFile(file);
Result res = table.get(get);
if (res.isEmpty()) {
return false;
}
return true;
return (!res.isEmpty());
}
}
@ -1110,9 +1095,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException exception
*/
public boolean hasBackupSessions() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Has backup sessions from backup system table");
}
LOG.trace("Has backup sessions from backup system table");
boolean result = false;
Scan scan = createScanForBackupHistory();
scan.setCaching(1);
@ -1135,9 +1119,8 @@ public final class BackupSystemTable implements Closeable {
* @throws IOException
*/
public List<String> listBackupSets() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(" Backup set list");
}
LOG.trace("Backup set list");
List<String> list = new ArrayList<String>();
Table table = null;
ResultScanner scanner = null;
@ -1180,7 +1163,8 @@ public final class BackupSystemTable implements Closeable {
if (res.isEmpty()) return null;
res.advance();
String[] tables = cellValueToBackupSet(res.current());
return toList(tables);
return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)).
collect(Collectors.toList());
} finally {
if (table != null) {
table.close();
@ -1188,14 +1172,6 @@ public final class BackupSystemTable implements Closeable {
}
}
private List<TableName> toList(String[] tables) {
List<TableName> list = new ArrayList<TableName>(tables.length);
for (String name : tables) {
list.add(TableName.valueOf(name));
}
return list;
}
/**
* Add backup set (list of tables)
* @param name set name
@ -1206,10 +1182,8 @@ public final class BackupSystemTable implements Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
}
Table table = null;
String[] union = null;
try {
table = connection.getTable(tableName);
try (Table table = connection.getTable(tableName)) {
Get get = createGetForBackupSet(name);
Result res = table.get(get);
if (res.isEmpty()) {
@ -1221,28 +1195,9 @@ public final class BackupSystemTable implements Closeable {
}
Put put = createPutForBackupSet(name, union);
table.put(put);
} finally {
if (table != null) {
table.close();
}
}
}
private String[] merge(String[] tables, String[] newTables) {
List<String> list = new ArrayList<String>();
// Add all from tables
for (String t : tables) {
list.add(t);
}
for (String nt : newTables) {
if (list.contains(nt)) continue;
list.add(nt);
}
String[] arr = new String[list.size()];
list.toArray(arr);
return arr;
}
/**
* Remove tables from backup set (list of tables)
* @param name set name
@ -1254,11 +1209,9 @@ public final class BackupSystemTable implements Closeable {
LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ")
+ "]");
}
Table table = null;
String[] disjoint = null;
String[] tables = null;
try {
table = connection.getTable(tableName);
try (Table table = connection.getTable(tableName)) {
Get get = createGetForBackupSet(name);
Result res = table.get(get);
if (res.isEmpty()) {
@ -1280,27 +1233,19 @@ public final class BackupSystemTable implements Closeable {
LOG.info("Backup set '" + name + "' is empty. Deleting.");
deleteBackupSet(name);
}
} finally {
if (table != null) {
table.close();
}
}
}
private String[] disjoin(String[] tables, String[] toRemove) {
List<String> list = new ArrayList<String>();
// Add all from tables
for (String t : tables) {
list.add(t);
}
for (String nt : toRemove) {
if (list.contains(nt)) {
list.remove(nt);
}
}
String[] arr = new String[list.size()];
list.toArray(arr);
return arr;
private String[] merge(String[] existingTables, String[] newTables) {
Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
tables.addAll(Arrays.asList(newTables));
return tables.toArray(new String[0]);
}
private String[] disjoin(String[] existingTables, String[] toRemove) {
Set<String> tables = new HashSet<>(Arrays.asList(existingTables));
Arrays.asList(toRemove).forEach(table -> tables.remove(table));
return tables.toArray(new String[0]);
}
/**
@ -1312,15 +1257,9 @@ public final class BackupSystemTable implements Closeable {
if (LOG.isTraceEnabled()) {
LOG.trace(" Backup set delete: " + name);
}
Table table = null;
try {
table = connection.getTable(tableName);
try (Table table = connection.getTable(tableName)) {
Delete del = createDeleteForBackupSet(name);
table.delete(del);
} finally {
if (table != null) {
table.close();
}
}
}
@ -1606,7 +1545,6 @@ public final class BackupSystemTable implements Closeable {
}
public static void snapshot(Connection conn) throws IOException {
try (Admin admin = conn.getAdmin();) {
Configuration conf = conn.getConfiguration();
admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf));
@ -1614,7 +1552,6 @@ public final class BackupSystemTable implements Closeable {
}
public static void restoreFromSnapshot(Connection conn) throws IOException {
Configuration conf = conn.getConfiguration();
LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot");
try (Admin admin = conn.getAdmin();) {
@ -1635,7 +1572,6 @@ public final class BackupSystemTable implements Closeable {
}
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
List<SnapshotDescription> list = admin.listSnapshots();
for (SnapshotDescription desc : list) {
if (desc.getName().equals(snapshotName)) {
@ -1650,7 +1586,6 @@ public final class BackupSystemTable implements Closeable {
}
public static void deleteSnapshot(Connection conn) throws IOException {
Configuration conf = conn.getConfiguration();
LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system");
try (Admin admin = conn.getAdmin();) {
@ -1669,7 +1604,7 @@ public final class BackupSystemTable implements Closeable {
*/
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
final byte[] family, final List<Pair<Path, Path>> pairs) {
List<Put> puts = new ArrayList<>();
List<Put> puts = new ArrayList<>(pairs.size());
for (Pair<Path, Path> pair : pairs) {
Path path = pair.getSecond();
String file = path.toString();
@ -1689,7 +1624,7 @@ public final class BackupSystemTable implements Closeable {
}
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
List<Delete> lstDels = new ArrayList<>();
List<Delete> lstDels = new ArrayList<>(lst.size());
for (TableName table : lst) {
Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
del.addFamily(BackupSystemTable.META_FAMILY);
@ -1699,7 +1634,6 @@ public final class BackupSystemTable implements Closeable {
}
private Put createPutForDeleteOperation(String[] backupIdList) {
byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
Put put = new Put(DELETE_OP_ROW);
put.addColumn(META_FAMILY, FAM_COL, value);
@ -1707,14 +1641,12 @@ public final class BackupSystemTable implements Closeable {
}
private Delete createDeleteForBackupDeleteOperation() {
Delete delete = new Delete(DELETE_OP_ROW);
delete.addFamily(META_FAMILY);
return delete;
}
private Get createGetForDeleteOperation() {
Get get = new Get(DELETE_OP_ROW);
get.addFamily(META_FAMILY);
return get;
@ -1731,9 +1663,8 @@ public final class BackupSystemTable implements Closeable {
}
public void finishDeleteOperation() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Finsih delete operation for backup ids ");
}
LOG.trace("Finsih delete operation for backup ids");
Delete delete = createDeleteForBackupDeleteOperation();
try (Table table = connection.getTable(tableName)) {
table.delete(delete);
@ -1741,9 +1672,8 @@ public final class BackupSystemTable implements Closeable {
}
public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Get delete operation for backup ids ");
}
LOG.trace("Get delete operation for backup ids");
Get get = createGetForDeleteOperation();
try (Table table = connection.getTable(tableName)) {
Result res = table.get(get);
@ -1760,7 +1690,6 @@ public final class BackupSystemTable implements Closeable {
}
private Put createPutForMergeOperation(String[] backupIdList) {
byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
Put put = new Put(MERGE_OP_ROW);
put.addColumn(META_FAMILY, FAM_COL, value);
@ -1771,15 +1700,11 @@ public final class BackupSystemTable implements Closeable {
Get get = new Get(MERGE_OP_ROW);
try (Table table = connection.getTable(tableName)) {
Result res = table.get(get);
if (res.isEmpty()) {
return false;
}
return true;
return (!res.isEmpty());
}
}
private Put createPutForUpdateTablesForMerge(List<TableName> tables) {
byte[] value = Bytes.toBytes(StringUtils.join(tables, ","));
Put put = new Put(MERGE_OP_ROW);
put.addColumn(META_FAMILY, PATH_COL, value);
@ -1787,14 +1712,12 @@ public final class BackupSystemTable implements Closeable {
}
private Delete createDeleteForBackupMergeOperation() {
Delete delete = new Delete(MERGE_OP_ROW);
delete.addFamily(META_FAMILY);
return delete;
}
private Get createGetForMergeOperation() {
Get get = new Get(MERGE_OP_ROW);
get.addFamily(META_FAMILY);
return get;
@ -1821,9 +1744,8 @@ public final class BackupSystemTable implements Closeable {
}
public void finishMergeOperation() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Finsih merge operation for backup ids ");
}
LOG.trace("Finish merge operation for backup ids");
Delete delete = createDeleteForBackupMergeOperation();
try (Table table = connection.getTable(tableName)) {
table.delete(delete);
@ -1831,9 +1753,8 @@ public final class BackupSystemTable implements Closeable {
}
public String[] getListOfBackupIdsFromMergeOperation() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Get backup ids for merge operation");
}
LOG.trace("Get backup ids for merge operation");
Get get = createGetForMergeOperation();
try (Table table = connection.getTable(tableName)) {
Result res = table.get(get);
@ -1917,8 +1838,7 @@ public final class BackupSystemTable implements Closeable {
private List<Put>
createPutsForAddWALFiles(List<String> files, String backupId, String backupRoot)
throws IOException {
List<Put> puts = new ArrayList<Put>();
List<Put> puts = new ArrayList<Put>(files.size());
for (String file : files) {
Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
@ -2021,11 +1941,10 @@ public final class BackupSystemTable implements Closeable {
*/
private String[] cellValueToBackupSet(Cell current) throws IOException {
byte[] data = CellUtil.cloneValue(current);
if (data != null && data.length > 0) {
if (!ArrayUtils.isEmpty(data)) {
return Bytes.toString(data).split(",");
} else {
return new String[0];
}
return new String[0];
}
/**

View File

@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -369,11 +370,14 @@ public class TestBackupSystemTable {
String[] addTables = new String[] { "table4", "table5", "table6" };
table.addToBackupSet(setName, addTables);
Set<String> expectedTables = new HashSet<>(Arrays.asList("table1", "table2", "table3",
"table4", "table5", "table6"));
List<TableName> tnames = table.describeBackupSet(setName);
assertTrue(tnames != null);
assertTrue(tnames.size() == tables.length + addTables.length);
for (int i = 0; i < tnames.size(); i++) {
assertTrue(tnames.get(i).getNameAsString().equals("table" + (i + 1)));
assertTrue(tnames.size() == expectedTables.size());
for (TableName tableName : tnames) {
assertTrue(expectedTables.remove(tableName.getNameAsString()));
}
cleanBackupTable();
}
@ -389,11 +393,14 @@ public class TestBackupSystemTable {
String[] addTables = new String[] { "table3", "table4", "table5", "table6" };
table.addToBackupSet(setName, addTables);
Set<String> expectedTables = new HashSet<>(Arrays.asList("table1", "table2", "table3",
"table4", "table5", "table6"));
List<TableName> tnames = table.describeBackupSet(setName);
assertTrue(tnames != null);
assertTrue(tnames.size() == tables.length + addTables.length - 1);
for (int i = 0; i < tnames.size(); i++) {
assertTrue(tnames.get(i).getNameAsString().equals("table" + (i + 1)));
assertTrue(tnames.size() == expectedTables.size());
for (TableName tableName : tnames) {
assertTrue(expectedTables.remove(tableName.getNameAsString()));
}
cleanBackupTable();
}
@ -409,11 +416,13 @@ public class TestBackupSystemTable {
String[] removeTables = new String[] { "table4", "table5", "table6" };
table.removeFromBackupSet(setName, removeTables);
Set<String> expectedTables = new HashSet<>(Arrays.asList("table1", "table2", "table3"));
List<TableName> tnames = table.describeBackupSet(setName);
assertTrue(tnames != null);
assertTrue(tnames.size() == tables.length - 1);
for (int i = 0; i < tnames.size(); i++) {
assertTrue(tnames.get(i).getNameAsString().equals("table" + (i + 1)));
assertTrue(tnames.size() == expectedTables.size());
for (TableName tableName : tnames) {
assertTrue(expectedTables.remove(tableName.getNameAsString()));
}
cleanBackupTable();
}
@ -429,11 +438,13 @@ public class TestBackupSystemTable {
String[] removeTables = new String[] { "table4", "table3" };
table.removeFromBackupSet(setName, removeTables);
Set<String> expectedTables = new HashSet<>(Arrays.asList("table1", "table2"));
List<TableName> tnames = table.describeBackupSet(setName);
assertTrue(tnames != null);
assertTrue(tnames.size() == tables.length - 2);
for (int i = 0; i < tnames.size(); i++) {
assertTrue(tnames.get(i).getNameAsString().equals("table" + (i + 1)));
assertTrue(tnames.size() == expectedTables.size());
for (TableName tableName : tnames) {
assertTrue(expectedTables.remove(tableName.getNameAsString()));
}
cleanBackupTable();
}