HBASE-19441: Implement retry logic around starting exclusive backup operation
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
8ab7b20f48
commit
91075276e7
|
@ -1,4 +1,5 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,7 +16,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.backup.impl;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -47,18 +47,22 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Handles backup requests, creates backup info records in backup system table to
|
||||
* keep track of backup sessions, dispatches backup request.
|
||||
* Handles backup requests, creates backup info records in backup system table to keep track of
|
||||
* backup sessions, dispatches backup request.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BackupManager implements Closeable {
|
||||
// in seconds
|
||||
public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
|
||||
"hbase.backup.exclusive.op.timeout.seconds";
|
||||
// In seconds
|
||||
private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
|
||||
|
||||
protected Configuration conf = null;
|
||||
|
@ -112,8 +116,8 @@ public class BackupManager implements Closeable {
|
|||
if (classes == null) {
|
||||
conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
|
||||
} else if (!classes.contains(masterProcedureClass)) {
|
||||
conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, classes + ","
|
||||
+ masterProcedureClass);
|
||||
conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
|
||||
classes + "," + masterProcedureClass);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -138,16 +142,16 @@ public class BackupManager implements Closeable {
|
|||
if (classes == null) {
|
||||
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
|
||||
} else if (!classes.contains(regionProcedureClass)) {
|
||||
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
|
||||
+ regionProcedureClass);
|
||||
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
|
||||
classes + "," + regionProcedureClass);
|
||||
}
|
||||
String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
|
||||
String regionObserverClass = BackupObserver.class.getName();
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
|
||||
regionObserverClass);
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
(coproc == null ? "" : coproc + ",") + regionObserverClass);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added region procedure manager: " + regionProcedureClass +
|
||||
". Added region observer: " + regionObserverClass);
|
||||
LOG.debug("Added region procedure manager: " + regionProcedureClass
|
||||
+ ". Added region observer: " + regionObserverClass);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,9 +227,8 @@ public class BackupManager implements Closeable {
|
|||
}
|
||||
|
||||
// there are one or more tables in the table list
|
||||
backupInfo =
|
||||
new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
|
||||
targetRootDir);
|
||||
backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
|
||||
targetRootDir);
|
||||
backupInfo.setBandwidth(bandwidth);
|
||||
backupInfo.setWorkers(workers);
|
||||
return backupInfo;
|
||||
|
@ -254,7 +257,7 @@ public class BackupManager implements Closeable {
|
|||
String ongoingBackupId = this.getOngoingBackupId();
|
||||
if (ongoingBackupId != null) {
|
||||
LOG.info("There is a ongoing backup " + ongoingBackupId
|
||||
+ ". Can not launch new backup until no ongoing backup remains.");
|
||||
+ ". Can not launch new backup until no ongoing backup remains.");
|
||||
throw new BackupException("There is ongoing backup.");
|
||||
}
|
||||
}
|
||||
|
@ -269,7 +272,7 @@ public class BackupManager implements Closeable {
|
|||
* @return The ancestors for the current backup
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException {
|
||||
public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException {
|
||||
LOG.debug("Getting the direct ancestors of the current backup " + backupInfo.getBackupId());
|
||||
|
||||
ArrayList<BackupImage> ancestors = new ArrayList<>();
|
||||
|
@ -286,10 +289,9 @@ public class BackupManager implements Closeable {
|
|||
|
||||
BackupImage.Builder builder = BackupImage.newBuilder();
|
||||
|
||||
BackupImage image =
|
||||
builder.withBackupId(backup.getBackupId()).withType(backup.getType())
|
||||
.withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
|
||||
.withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
|
||||
BackupImage image = builder.withBackupId(backup.getBackupId()).withType(backup.getType())
|
||||
.withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
|
||||
.withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
|
||||
|
||||
// add the full backup image as an ancestor until the last incremental backup
|
||||
if (backup.getType().equals(BackupType.FULL)) {
|
||||
|
@ -319,9 +321,9 @@ public class BackupManager implements Closeable {
|
|||
BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
|
||||
ancestors.add(lastIncrImage);
|
||||
|
||||
LOG.debug("Last dependent incremental backup image: " + "{BackupID="
|
||||
+ lastIncrImage.getBackupId() + "," + "BackupDir=" + lastIncrImage.getRootDir()
|
||||
+ "}");
|
||||
LOG.debug(
|
||||
"Last dependent incremental backup image: " + "{BackupID=" + lastIncrImage.getBackupId()
|
||||
+ "," + "BackupDir=" + lastIncrImage.getRootDir() + "}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -369,7 +371,36 @@ public class BackupManager implements Closeable {
|
|||
* @throws IOException if active session already exists
|
||||
*/
|
||||
public void startBackupSession() throws IOException {
|
||||
systemTable.startBackupExclusiveOperation();
|
||||
long startTime = System.currentTimeMillis();
|
||||
long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
|
||||
DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
|
||||
long lastWarningOutputTime = 0;
|
||||
while (System.currentTimeMillis() - startTime < timeout) {
|
||||
try {
|
||||
systemTable.startBackupExclusiveOperation();
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
if (e instanceof ExclusiveOperationException) {
|
||||
// sleep, then repeat
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e1) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (lastWarningOutputTime == 0
|
||||
|| (System.currentTimeMillis() - lastWarningOutputTime) > 60000) {
|
||||
lastWarningOutputTime = System.currentTimeMillis();
|
||||
LOG.warn("Waiting to acquire backup exclusive lock for "
|
||||
+ (lastWarningOutputTime - startTime) / 1000 + "s");
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new IOException(
|
||||
"Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -410,7 +441,7 @@ public class BackupManager implements Closeable {
|
|||
}
|
||||
|
||||
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
|
||||
readBulkloadRows(List<TableName> tableList) throws IOException {
|
||||
readBulkloadRows(List<TableName> tableList) throws IOException {
|
||||
return systemTable.readBulkloadRows(tableList);
|
||||
}
|
||||
|
||||
|
@ -483,7 +514,6 @@ public class BackupManager implements Closeable {
|
|||
|
||||
/**
|
||||
* Get WAL files iterator.
|
||||
*
|
||||
* @return WAL files iterator from backup system table
|
||||
* @throws IOException if getting the WAL files iterator fails
|
||||
*/
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -64,6 +65,8 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -71,26 +74,25 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* This class provides API to access backup system table<br>
|
||||
*
|
||||
* Backup system table schema:<br>
|
||||
* <p><ul>
|
||||
* <p>
|
||||
* <ul>
|
||||
* <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li>
|
||||
* <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li>
|
||||
* <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li>
|
||||
* <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name;
|
||||
* value = map[RS-> last WAL timestamp]</li>
|
||||
* <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL
|
||||
* timestamp]</li>
|
||||
* <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li>
|
||||
* <li>6. WALs recorded rowkey="wals:"+WAL unique file name;
|
||||
* value = backupId and full WAL file name</li>
|
||||
* </ul></p>
|
||||
* <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
|
||||
* name</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class BackupSystemTable implements Closeable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
|
||||
|
||||
static class WALItem {
|
||||
|
@ -128,10 +130,9 @@ public final class BackupSystemTable implements Closeable {
|
|||
private TableName tableName;
|
||||
|
||||
/**
|
||||
* Backup System table name for bulk loaded files.
|
||||
* We keep all bulk loaded file references in a separate table
|
||||
* because we have to isolate general backup operations: create, merge etc
|
||||
* from activity of RegionObserver, which controls process of a bulk loading
|
||||
* Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
|
||||
* separate table because we have to isolate general backup operations: create, merge etc from
|
||||
* activity of RegionObserver, which controls process of a bulk loading
|
||||
* {@link org.apache.hadoop.hbase.backup.BackupObserver}
|
||||
*/
|
||||
private TableName bulkLoadTableName;
|
||||
|
@ -198,13 +199,11 @@ public final class BackupSystemTable implements Closeable {
|
|||
verifyNamespaceExists(admin);
|
||||
Configuration conf = connection.getConfiguration();
|
||||
if (!admin.tableExists(tableName)) {
|
||||
TableDescriptor backupHTD =
|
||||
BackupSystemTable.getSystemTableDescriptor(conf);
|
||||
TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
|
||||
admin.createTable(backupHTD);
|
||||
}
|
||||
if (!admin.tableExists(bulkLoadTableName)) {
|
||||
TableDescriptor blHTD =
|
||||
BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
|
||||
TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
|
||||
admin.createTable(blHTD);
|
||||
}
|
||||
waitForSystemTable(admin, tableName);
|
||||
|
@ -237,11 +236,11 @@ public final class BackupSystemTable implements Closeable {
|
|||
} catch (InterruptedException e) {
|
||||
}
|
||||
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
|
||||
throw new IOException("Failed to create backup system table "+
|
||||
tableName +" after " + TIMEOUT + "ms");
|
||||
throw new IOException(
|
||||
"Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
|
||||
}
|
||||
}
|
||||
LOG.debug("Backup table "+tableName+" exists and available");
|
||||
LOG.debug("Backup table " + tableName + " exists and available");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
public void updateBackupInfo(BackupInfo info) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("update backup status in backup system table for: " + info.getBackupId()
|
||||
+ " set status=" + info.getState());
|
||||
+ " set status=" + info.getState());
|
||||
}
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
Put put = createPutForBackupInfo(info);
|
||||
|
@ -344,7 +343,6 @@ public final class BackupSystemTable implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Deletes backup status from backup system table table
|
||||
* @param backupId backup id
|
||||
|
@ -370,7 +368,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
Map<byte[], List<Path>> finalPaths) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
|
||||
+ " entries");
|
||||
+ " entries");
|
||||
}
|
||||
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
|
||||
|
@ -389,8 +387,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
|
||||
final List<Pair<Path, Path>> pairs) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
|
||||
+ " entries");
|
||||
LOG.debug(
|
||||
"write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
|
||||
}
|
||||
try (Table table = connection.getTable(bulkLoadTableName)) {
|
||||
List<Put> puts =
|
||||
|
@ -425,7 +423,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
* whether the hfile was recorded by preCommitStoreFile hook (true)
|
||||
*/
|
||||
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
|
||||
readBulkloadRows(List<TableName> tableList) throws IOException {
|
||||
readBulkloadRows(List<TableName> tableList) throws IOException {
|
||||
|
||||
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
|
||||
List<byte[]> rows = new ArrayList<>();
|
||||
for (TableName tTable : tableList) {
|
||||
|
@ -504,9 +503,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
byte[] fam = entry.getKey();
|
||||
List<Path> paths = entry.getValue();
|
||||
for (Path p : paths) {
|
||||
Put put =
|
||||
BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts,
|
||||
cnt++);
|
||||
Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId,
|
||||
ts, cnt++);
|
||||
puts.add(put);
|
||||
}
|
||||
}
|
||||
|
@ -580,10 +578,9 @@ public final class BackupSystemTable implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Exclusive operations are:
|
||||
* create, delete, merge
|
||||
* Exclusive operations are: create, delete, merge
|
||||
* @throws IOException if a table operation fails or an active backup exclusive operation is
|
||||
* already underway
|
||||
* already underway
|
||||
*/
|
||||
public void startBackupExclusiveOperation() throws IOException {
|
||||
LOG.debug("Start new backup exclusive operation");
|
||||
|
@ -596,7 +593,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
// Row exists, try to put if value == ACTIVE_SESSION_NO
|
||||
if (!table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
|
||||
.ifEquals(ACTIVE_SESSION_NO).thenPut(put)) {
|
||||
throw new IOException("There is an active backup exclusive operation");
|
||||
throw new ExclusiveOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -696,8 +693,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
|
||||
/**
|
||||
* Get first n backup history records
|
||||
* @param n number of records, if n== -1 - max number
|
||||
* is ignored
|
||||
* @param n number of records, if n== -1 - max number is ignored
|
||||
* @return list of records
|
||||
* @throws IOException if getting the backup history fails
|
||||
*/
|
||||
|
@ -711,8 +707,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
|
||||
/**
|
||||
* Get backup history records filtered by list of filters.
|
||||
* @param n max number of records, if n == -1 , then max number
|
||||
* is ignored
|
||||
* @param n max number of records, if n == -1 , then max number is ignored
|
||||
* @param filters list of filters
|
||||
* @return backup records
|
||||
* @throws IOException if getting the backup history fails
|
||||
|
@ -917,8 +912,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
Map<String, Long> map) {
|
||||
BackupProtos.TableServerTimestamp.Builder tstBuilder =
|
||||
BackupProtos.TableServerTimestamp.newBuilder();
|
||||
tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil
|
||||
.toProtoTableName(table));
|
||||
tstBuilder
|
||||
.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
|
||||
|
||||
for (Entry<String, Long> entry : map.entrySet()) {
|
||||
BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
|
||||
|
@ -934,8 +929,9 @@ public final class BackupSystemTable implements Closeable {
|
|||
return tstBuilder.build();
|
||||
}
|
||||
|
||||
private HashMap<String, Long> fromTableServerTimestampProto(
|
||||
BackupProtos.TableServerTimestamp proto) {
|
||||
private HashMap<String, Long>
|
||||
fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
|
||||
|
||||
HashMap<String, Long> map = new HashMap<>();
|
||||
List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
|
||||
for (BackupProtos.ServerTimestamp st : list) {
|
||||
|
@ -982,7 +978,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot
|
||||
+ " tables [" + StringUtils.join(tables, " ") + "]");
|
||||
+ " tables [" + StringUtils.join(tables, " ") + "]");
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
tables.forEach(table -> LOG.debug(Objects.toString(table)));
|
||||
|
@ -1106,12 +1102,12 @@ public final class BackupSystemTable implements Closeable {
|
|||
/**
|
||||
* Check if WAL file is eligible for deletion using multi-get
|
||||
* @param files names of a file to check
|
||||
* @return map of results
|
||||
* (key: FileStatus object. value: true if the file is deletable, false otherwise)
|
||||
* @return map of results (key: FileStatus object. value: true if the file is deletable, false
|
||||
* otherwise)
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> files)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
final int BUF_SIZE = 100;
|
||||
|
||||
Map<FileStatus, Boolean> ret = new HashMap<>();
|
||||
|
@ -1223,8 +1219,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
|
||||
res.advance();
|
||||
String[] tables = cellValueToBackupSet(res.current());
|
||||
return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)).
|
||||
collect(Collectors.toList());
|
||||
return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item))
|
||||
.collect(Collectors.toList());
|
||||
} finally {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
|
@ -1266,8 +1262,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
*/
|
||||
public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ")
|
||||
+ "]");
|
||||
LOG.trace(
|
||||
" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]");
|
||||
}
|
||||
String[] disjoint;
|
||||
String[] tables;
|
||||
|
@ -1336,23 +1332,21 @@ public final class BackupSystemTable implements Closeable {
|
|||
colBuilder.setMaxVersions(1);
|
||||
Configuration config = HBaseConfiguration.create();
|
||||
int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||
colBuilder.setTimeToLive(ttl);
|
||||
|
||||
ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
|
||||
builder.setColumnFamily(colSessionsDesc);
|
||||
|
||||
colBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||
colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||
colBuilder.setTimeToLive(ttl);
|
||||
builder.setColumnFamily(colBuilder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static TableName getTableName(Configuration conf) {
|
||||
String name =
|
||||
conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
|
||||
String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
|
||||
return TableName.valueOf(name);
|
||||
}
|
||||
|
||||
|
@ -1377,12 +1371,11 @@ public final class BackupSystemTable implements Closeable {
|
|||
colBuilder.setMaxVersions(1);
|
||||
Configuration config = HBaseConfiguration.create();
|
||||
int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
|
||||
colBuilder.setTimeToLive(ttl);
|
||||
ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
|
||||
builder.setColumnFamily(colSessionsDesc);
|
||||
colBuilder =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||
colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
|
||||
colBuilder.setTimeToLive(ttl);
|
||||
builder.setColumnFamily(colBuilder.build());
|
||||
return builder.build();
|
||||
|
@ -1390,9 +1383,10 @@ public final class BackupSystemTable implements Closeable {
|
|||
|
||||
public static TableName getTableNameForBulkLoadedData(Configuration conf) {
|
||||
String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
|
||||
BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
|
||||
return TableName.valueOf(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates Put operation for a given backup info object
|
||||
* @param context backup info
|
||||
|
@ -1622,16 +1616,15 @@ public final class BackupSystemTable implements Closeable {
|
|||
String file = path.toString();
|
||||
int lastSlash = file.lastIndexOf("/");
|
||||
String filename = file.substring(lastSlash + 1);
|
||||
Put put =
|
||||
new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
|
||||
Bytes.toString(region), BLK_LD_DELIM, filename));
|
||||
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
|
||||
Bytes.toString(region), BLK_LD_DELIM, filename));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
|
||||
puts.add(put);
|
||||
LOG.debug("writing done bulk path " + file + " for " + table + " "
|
||||
+ Bytes.toString(region));
|
||||
LOG.debug(
|
||||
"writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
|
||||
}
|
||||
}
|
||||
return puts;
|
||||
|
@ -1658,8 +1651,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
// Snapshot does not exists, i.e completeBackup failed after
|
||||
// deleting backup system table snapshot
|
||||
// In this case we log WARN and proceed
|
||||
LOG.warn("Could not restore backup system table. Snapshot " + snapshotName
|
||||
+ " does not exists.");
|
||||
LOG.warn(
|
||||
"Could not restore backup system table. Snapshot " + snapshotName + " does not exists.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1695,17 +1688,16 @@ public final class BackupSystemTable implements Closeable {
|
|||
/*
|
||||
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
|
||||
*/
|
||||
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
|
||||
final byte[] family, final List<Pair<Path, Path>> pairs) {
|
||||
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
|
||||
final List<Pair<Path, Path>> pairs) {
|
||||
List<Put> puts = new ArrayList<>(pairs.size());
|
||||
for (Pair<Path, Path> pair : pairs) {
|
||||
Path path = pair.getSecond();
|
||||
String file = path.toString();
|
||||
int lastSlash = file.lastIndexOf("/");
|
||||
String filename = file.substring(lastSlash + 1);
|
||||
Put put =
|
||||
new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region),
|
||||
BLK_LD_DELIM, filename));
|
||||
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
|
||||
Bytes.toString(region), BLK_LD_DELIM, filename));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
|
||||
|
@ -1899,9 +1891,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
*/
|
||||
static Scan createScanForBulkLoadedFiles(String backupId) {
|
||||
Scan scan = new Scan();
|
||||
byte[] startRow =
|
||||
backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId
|
||||
+ BLK_LD_DELIM);
|
||||
byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES
|
||||
: rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
|
||||
byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
|
||||
stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
|
||||
scan.setStartRow(startRow);
|
||||
|
@ -1927,7 +1918,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
* @return put list
|
||||
*/
|
||||
private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
|
||||
String backupRoot) {
|
||||
String backupRoot) {
|
||||
List<Put> puts = new ArrayList<>(files.size());
|
||||
for (String file : files) {
|
||||
Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
|
||||
|
@ -1935,7 +1926,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
Bytes.toBytes(backupId));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"),
|
||||
Bytes.toBytes(backupRoot));
|
||||
Bytes.toBytes(backupRoot));
|
||||
puts.add(put);
|
||||
}
|
||||
return puts;
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.backup.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("serial")
|
||||
public class ExclusiveOperationException extends IOException {
|
||||
|
||||
public ExclusiveOperationException() {
|
||||
super();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.backup;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLongArray;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupManager;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestBackupManager {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestBackupManager.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBackupManager.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
protected static Configuration conf = UTIL.getConfiguration();
|
||||
protected static MiniHBaseCluster cluster;
|
||||
protected static Connection conn;
|
||||
protected BackupManager backupManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
|
||||
BackupManager.decorateMasterConfiguration(conf);
|
||||
BackupManager.decorateRegionServerConfiguration(conf);
|
||||
cluster = UTIL.startMiniCluster();
|
||||
conn = UTIL.getConnection();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
backupManager = new BackupManager(conn, conn.getConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
backupManager.close();
|
||||
}
|
||||
|
||||
AtomicLongArray startTimes = new AtomicLongArray(2);
|
||||
AtomicLongArray stopTimes = new AtomicLongArray(2);
|
||||
|
||||
@Test
|
||||
public void testStartBackupExclusiveOperation() {
|
||||
|
||||
long sleepTime = 2000;
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
backupManager.startBackupSession();
|
||||
boolean result = startTimes.compareAndSet(0, 0, System.currentTimeMillis());
|
||||
if (!result) {
|
||||
result = startTimes.compareAndSet(1, 0, System.currentTimeMillis());
|
||||
if (!result) {
|
||||
throw new IOException("PANIC! Unreachable code");
|
||||
}
|
||||
}
|
||||
Thread.sleep(sleepTime);
|
||||
result = stopTimes.compareAndSet(0, 0, System.currentTimeMillis());
|
||||
if (!result) {
|
||||
result = stopTimes.compareAndSet(1, 0, System.currentTimeMillis());
|
||||
if (!result) {
|
||||
throw new IOException("PANIC! Unreachable code");
|
||||
}
|
||||
}
|
||||
backupManager.finishBackupSession();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
fail("Unexpected exception: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread[] workers = new Thread[2];
|
||||
for (int i = 0; i < workers.length; i++) {
|
||||
workers[i] = new Thread(r);
|
||||
workers[i].start();
|
||||
}
|
||||
|
||||
for (int i = 0; i < workers.length; i++) {
|
||||
Uninterruptibles.joinUninterruptibly(workers[i]);
|
||||
}
|
||||
LOG.info("Diff start time=" + (startTimes.get(1) - startTimes.get(0)) + "ms");
|
||||
LOG.info("Diff finish time=" + (stopTimes.get(1) - stopTimes.get(0)) + "ms");
|
||||
assertTrue(startTimes.get(1) - startTimes.get(0) >= sleepTime);
|
||||
assertTrue(stopTimes.get(1) - stopTimes.get(0) >= sleepTime);
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue