From 35aa7aae3a0d269d809416f6ff24599517f5b44b Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Sun, 13 Aug 2017 20:55:58 -0400 Subject: [PATCH] HBASE-14135 Merge backup images (Vladimir Rodionov) --- .../hadoop/hbase/backup/BackupAdmin.java | 20 +- .../hadoop/hbase/backup/BackupDriver.java | 2 + .../hadoop/hbase/backup/BackupInfo.java | 5 + .../hadoop/hbase/backup/BackupMergeJob.java | 40 +++ .../hbase/backup/BackupRestoreFactory.java | 20 +- .../hbase/backup/HBackupFileSystem.java | 57 +-- .../hbase/backup/impl/BackupAdminImpl.java | 211 ++++++++--- .../hbase/backup/impl/BackupCommands.java | 163 +++++++-- .../hbase/backup/impl/BackupManager.java | 21 +- .../hbase/backup/impl/BackupManifest.java | 24 +- .../hbase/backup/impl/BackupSystemTable.java | 314 ++++++++++------ .../backup/impl/RestoreTablesClient.java | 32 +- .../mapreduce/MapReduceBackupMergeJob.java | 321 +++++++++++++++++ ...ob.java => MapReduceHFileSplitterJob.java} | 12 +- .../backup/mapreduce/MapReduceRestoreJob.java | 84 ++--- .../hadoop/hbase/backup/util/BackupUtils.java | 93 +++-- ...estIncrementalBackupMergeWithFailures.java | 336 ++++++++++++++++++ .../backup/TestRepairAfterFailedDelete.java | 2 +- 18 files changed, 1398 insertions(+), 359 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java rename hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/{HFileSplitterJob.java => MapReduceHFileSplitterJob.java} (94%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java index 6f642a443b8..9dc63826c02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface BackupAdmin extends Closeable { /** - * Backup given list of tables fully. This is a synchronous operation. - * It returns backup id on success or throw exception on failure. + * Backup given list of tables fully. This is a synchronous operation. It returns backup id on + * success or throw exception on failure. * @param userRequest BackupRequest instance * @return the backup Id */ @@ -61,15 +61,23 @@ public interface BackupAdmin extends Closeable { */ BackupInfo getBackupInfo(String backupId) throws IOException; - /** * Delete backup image command - * @param backupIds backup id list + * @param backupIds array of backup ids * @return total number of deleted sessions * @throws IOException exception */ int deleteBackups(String[] backupIds) throws IOException; + /** + * Merge backup images command + * @param backupIds array of backup ids of images to be merged + * The resulting backup image will have the same backup id as the most + * recent image from a list of images to be merged + * @throws IOException exception + */ + void mergeBackups(String[] backupIds) throws IOException; + /** * Show backup history command * @param n last n backup sessions @@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable { /** * Add tables to backup set command * @param name name of backup set. - * @param tables list of tables to be added to this set. + * @param tables array of tables to be added to this set. * @throws IOException exception */ void addToBackupSet(String name, TableName[] tables) throws IOException; @@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable { /** * Remove tables from backup set * @param name name of backup set. - * @param tables list of tables to be removed from this set. + * @param tables array of tables to be removed from this set. * @throws IOException exception */ void removeFromBackupSet(String name, TableName[] tables) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index e2cdb2f4cb0..9dd85317e22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool { type = BackupCommand.SET; } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) { type = BackupCommand.REPAIR; + } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) { + type = BackupCommand.MERGE; } else { System.out.println("Unsupported command for backup: " + cmd); printToolUsage(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java index f6a1fe4eb37..1765bf35684 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java @@ -433,6 +433,11 @@ public class BackupInfo implements Comparable { } } + @Override + public String toString() { + return backupId; + } + public byte[] toByteArray() throws IOException { return toProtosBackupInfo().toByteArray(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java new file mode 100644 index 00000000000..136782f4270 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Backup merge operation job interface. Concrete implementation is provided by backup provider, see + * {@link BackupRestoreFactory} + */ + +@InterfaceAudience.Private +public interface BackupMergeJob extends Configurable { + + /** + * Run backup merge operation + * @param backupIds backup image ids + * @throws IOException + */ + void run(String[] backupIds) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java index 6d8967a06e6..d72c88432fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +33,7 @@ public final class BackupRestoreFactory { public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class"; public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class"; + public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class"; private BackupRestoreFactory() { throw new AssertionError("Instantiating utility class..."); @@ -40,7 +42,7 @@ public final class BackupRestoreFactory { /** * Gets backup restore job * @param conf configuration - * @return backup restore task instance + * @return backup restore job instance */ public static RestoreJob getRestoreJob(Configuration conf) { Class cls = @@ -53,7 +55,7 @@ public final class BackupRestoreFactory { /** * Gets backup copy job * @param conf configuration - * @return backup copy task + * @return backup copy job instance */ public static BackupCopyJob getBackupCopyJob(Configuration conf) { Class cls = @@ -63,4 +65,18 @@ public final class BackupRestoreFactory { service.setConf(conf); return service; } + + /** + * Gets backup merge job + * @param conf configuration + * @return backup merge job instance + */ + public static BackupMergeJob getBackupMergeJob(Configuration conf) { + Class cls = + conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class, + BackupMergeJob.class); + BackupMergeJob service = ReflectionUtils.newInstance(cls, conf); + service.setConf(conf); + return service; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 46044db61eb..1c43e8884fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -49,8 +49,8 @@ public class HBackupFileSystem { /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootDir backup root directory * @param backupId backup id * @param tableName table name @@ -63,18 +63,26 @@ public class HBackupFileSystem { + Path.SEPARATOR; } + public static String getTableBackupDataDir(String backupRootDir, String backupId, + TableName tableName) { + return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data"; + } + + public static Path getBackupPath(String backupRootDir, String backupId) { + return new Path(backupRootDir + Path.SEPARATOR + backupId); + } + /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootPath backup root path * @param tableName table name * @param backupId backup Id * @return backupPath for the particular table */ - public static Path getTableBackupPath(TableName tableName, - Path backupRootPath, String backupId) { + public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) { return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName)); } @@ -94,33 +102,30 @@ public class HBackupFileSystem { return new Path(getLogBackupDir(backupRootDir, backupId)); } - private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath, - String backupId) throws IOException { - Path manifestPath = - new Path(getTableBackupPath(tableName, backupRootPath, backupId), - BackupManifest.MANIFEST_FILE_NAME); + // TODO we do not keep WAL files anymore + // Move manifest file to other place + private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId) + throws IOException { + Path manifestPath = null; FileSystem fs = backupRootPath.getFileSystem(conf); + manifestPath = + new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR + + BackupManifest.MANIFEST_FILE_NAME); if (!fs.exists(manifestPath)) { - // check log dir for incremental backup case - manifestPath = - new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR - + BackupManifest.MANIFEST_FILE_NAME); - if (!fs.exists(manifestPath)) { - String errorMsg = - "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " - + backupId + ". File " + manifestPath + " does not exists. Did " + backupId - + " correspond to previously taken backup ?"; - throw new IOException(errorMsg); - } + String errorMsg = + "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " + + backupId + ". File " + manifestPath + " does not exists. Did " + backupId + + " correspond to previously taken backup ?"; + throw new IOException(errorMsg); } return manifestPath; } - public static BackupManifest getManifest(TableName tableName, Configuration conf, - Path backupRootPath, String backupId) throws IOException { + public static BackupManifest + getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException { BackupManifest manifest = - new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId)); + new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId)); return manifest; } @@ -134,7 +139,7 @@ public class HBackupFileSystem { TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId) throws IOException { for (TableName tableName : tableArray) { - BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId); + BackupManifest manifest = getManifest(conf, backupRootPath, backupId); backupManifestMap.put(tableName, manifest); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 6e35d927409..99fb06c37be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,8 +37,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupMergeJob; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; @@ -46,9 +49,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class BackupAdminImpl implements BackupAdmin { @@ -65,12 +67,8 @@ public class BackupAdminImpl implements BackupAdmin { @Override public void close() throws IOException { - if (conn != null) { - conn.close(); - } } - @Override public BackupInfo getBackupInfo(String backupId) throws IOException { BackupInfo backupInfo = null; @@ -105,12 +103,12 @@ public class BackupAdminImpl implements BackupAdmin { // is running by using startBackupSession API // If there is an active session in progress, exception will be thrown try { - sysTable.startBackupSession(); + sysTable.startBackupExclusiveOperation(); deleteSessionStarted = true; } catch (IOException e) { LOG.warn("You can not run delete command while active backup session is in progress. \n" + "If there is no active backup session running, run backup repair utility to restore \n" - +"backup system integrity."); + + "backup system integrity."); return -1; } @@ -126,7 +124,7 @@ public class BackupAdminImpl implements BackupAdmin { sysTable.startDeleteOperation(backupIds); // Step 4: Snapshot backup system table if (!BackupSystemTable.snapshotExists(conn)) { - BackupSystemTable.snapshot(conn); + BackupSystemTable.snapshot(conn); } else { LOG.warn("Backup system table snapshot exists"); } @@ -154,13 +152,13 @@ public class BackupAdminImpl implements BackupAdmin { // Fail delete operation // Step 1 if (snapshotDone) { - if(BackupSystemTable.snapshotExists(conn)) { + if (BackupSystemTable.snapshotExists(conn)) { BackupSystemTable.restoreFromSnapshot(conn); // delete snapshot BackupSystemTable.deleteSnapshot(conn); // We still have record with unfinished delete operation - LOG.error("Delete operation failed, please run backup repair utility to restore "+ - "backup system integrity", e); + LOG.error("Delete operation failed, please run backup repair utility to restore " + + "backup system integrity", e); throw e; } else { LOG.warn("Delete operation succeeded, there were some errors: ", e); @@ -169,7 +167,7 @@ public class BackupAdminImpl implements BackupAdmin { } finally { if (deleteSessionStarted) { - sysTable.finishBackupSession(); + sysTable.finishBackupExclusiveOperation(); } } } @@ -206,17 +204,17 @@ public class BackupAdminImpl implements BackupAdmin { /** * Delete single backup and all related backups
* Algorithm:
- * Backup type: FULL or INCREMENTAL
- * Is this last backup session for table T: YES or NO
- * For every table T from table list 'tables':
- * if(FULL, YES) deletes only physical data (PD)
- * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,
- * until we either reach the most recent backup for T in the system or FULL backup
- * which includes T
- * if(INCREMENTAL, YES) deletes only physical data (PD) - * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last
- * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists)
- * or last one for a particular table T and removes T from list of backup tables. + * Backup type: FULL or INCREMENTAL
+ * Is this last backup session for table T: YES or NO
+ * For every table T from table list 'tables':
+ * if(FULL, YES) deletes only physical data (PD)
+ * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,
+ * until we either reach the most recent backup for T in the system or FULL backup
+ * which includes T
+ * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data + * and for table T scans all backup images between last
+ * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists)
+ * or last one for a particular table T and removes T from list of backup tables. * @param backupId backup id * @param sysTable backup system table * @return total number of deleted backup images @@ -285,8 +283,9 @@ public class BackupAdminImpl implements BackupAdmin { return totalDeleted; } - private void removeTableFromBackupImage(BackupInfo info, TableName tn, - BackupSystemTable sysTable) throws IOException { + private void + removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable) + throws IOException { List tables = info.getTableNames(); LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables=" + info.getTableListAsString()); @@ -485,7 +484,7 @@ public class BackupAdminImpl implements BackupAdmin { private String[] toStringArray(TableName[] list) { String[] arr = new String[list.length]; - for(int i=0; i < list.length; i++) { + for (int i = 0; i < list.length; i++) { arr[i] = list[i].toString(); } return arr; @@ -521,7 +520,7 @@ public class BackupAdminImpl implements BackupAdmin { String targetRootDir = request.getTargetRootDir(); List tableList = request.getTableList(); - String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); + String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); if (type == BackupType.INCREMENTAL) { Set incrTableSet = null; try (BackupSystemTable table = new BackupSystemTable(conn)) { @@ -529,19 +528,20 @@ public class BackupAdminImpl implements BackupAdmin { } if (incrTableSet.isEmpty()) { - String msg = "Incremental backup table set contains no tables. " - + "You need to run full backup first " + - (tableList != null ? "on "+StringUtils.join(tableList, ","): ""); + String msg = + "Incremental backup table set contains no tables. " + + "You need to run full backup first " + + (tableList != null ? "on " + StringUtils.join(tableList, ",") : ""); throw new IOException(msg); } - if(tableList != null) { + if (tableList != null) { tableList.removeAll(incrTableSet); if (!tableList.isEmpty()) { String extraTables = StringUtils.join(tableList, ","); - String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+ - "Perform full backup on " + extraTables + " first, " - + "then retry the command"; + String msg = + "Some tables (" + extraTables + ") haven't gone through full backup. " + + "Perform full backup on " + extraTables + " first, " + "then retry the command"; throw new IOException(msg); } } @@ -584,14 +584,13 @@ public class BackupAdminImpl implements BackupAdmin { // update table list BackupRequest.Builder builder = new BackupRequest.Builder(); - request = builder.withBackupType(request.getBackupType()). - withTableList(tableList). - withTargetRootDir(request.getTargetRootDir()). - withBackupSetName(request.getBackupSetName()). - withTotalTasks(request.getTotalTasks()). - withBandwidthPerTasks((int)request.getBandwidth()).build(); + request = + builder.withBackupType(request.getBackupType()).withTableList(tableList) + .withTargetRootDir(request.getTargetRootDir()) + .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks()) + .withBandwidthPerTasks((int) request.getBandwidth()).build(); - TableBackupClient client =null; + TableBackupClient client = null; try { client = BackupClientFactory.create(conn, backupId, request); } catch (IOException e) { @@ -613,4 +612,132 @@ public class BackupAdminImpl implements BackupAdmin { return tableList; } + @Override + public void mergeBackups(String[] backupIds) throws IOException { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + checkIfValidForMerge(backupIds, sysTable); + BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration()); + job.run(backupIds); + } + } + + /** + * Verifies that backup images are valid for merge. + * + *
    + *
  • All backups MUST be in the same destination + *
  • No FULL backups are allowed - only INCREMENTAL + *
  • All backups must be in COMPLETE state + *
  • No holes in backup list are allowed + *
+ *

+ * @param backupIds list of backup ids + * @param table backup system table + * @throws IOException + */ + private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException { + String backupRoot = null; + + final Set allTables = new HashSet(); + final Set allBackups = new HashSet(); + long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE; + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo == null) { + String msg = "Backup session " + backupId + " not found"; + throw new IOException(msg); + } + if (backupRoot == null) { + backupRoot = bInfo.getBackupRootDir(); + } else if (!bInfo.getBackupRootDir().equals(backupRoot)) { + throw new IOException("Found different backup destinations in a list of a backup sessions \n" + + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir()); + } + if (bInfo.getType() == BackupType.FULL) { + throw new IOException("FULL backup image can not be merged for: \n" + bInfo); + } + + if (bInfo.getState() != BackupState.COMPLETE) { + throw new IOException("Backup image " + backupId + + " can not be merged becuase of its state: " + bInfo.getState()); + } + allBackups.add(backupId); + allTables.addAll(bInfo.getTableNames()); + long time = bInfo.getStartTs(); + if (time < minTime) { + minTime = time; + } + if (time > maxTime) { + maxTime = time; + } + } + + + final long startRangeTime = minTime; + final long endRangeTime = maxTime; + final String backupDest = backupRoot; + // Check we have no 'holes' in backup id list + // Filter 1 : backupRoot + // Filter 2 : time range filter + // Filter 3 : table filter + + BackupInfo.Filter destinationFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + return info.getBackupRootDir().equals(backupDest); + } + }; + + BackupInfo.Filter timeRangeFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + long time = info.getStartTs(); + return time >= startRangeTime && time <= endRangeTime ; + } + }; + + BackupInfo.Filter tableFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + List tables = info.getTableNames(); + return !Collections.disjoint(allTables, tables); + } + }; + + BackupInfo.Filter typeFilter = new BackupInfo.Filter() { + + @Override + public boolean apply(BackupInfo info) { + return info.getType() == BackupType.INCREMENTAL; + } + }; + + BackupInfo.Filter stateFilter = new BackupInfo.Filter() { + @Override + public boolean apply(BackupInfo info) { + return info.getState() == BackupState.COMPLETE; + } + }; + + List allInfos = + table.getBackupHistory( -1, destinationFilter, + timeRangeFilter, tableFilter, typeFilter, stateFilter); + if (allInfos.size() != allBackups.size()) { + // Yes we have at least one hole in backup image sequence + List missingIds = new ArrayList(); + for(BackupInfo info: allInfos) { + if(allBackups.contains(info.getBackupId())) { + continue; + } + missingIds.add(info.getBackupId()); + } + String errMsg = + "Sequence of backup ids has 'holes'. The following backup images must be added:" + + org.apache.hadoop.util.StringUtils.join(",", missingIds); + throw new IOException(errMsg); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index aa15fbaf5cc..650ba2ed9c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -59,16 +59,15 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * General backup commands, options and usage messages */ @InterfaceAudience.Private -public final class BackupCommands { +public final class BackupCommands { public final static String INCORRECT_USAGE = "Incorrect usage"; @@ -79,7 +78,8 @@ public final class BackupCommands { + " history show history of all successful backups\n" + " progress show the progress of the latest backup request\n" + " set backup set management\n" - + " repair repair backup system table" + + " repair repair backup system table\n" + + " merge merge backup images\n" + "Run \'hbase backup COMMAND -h\' to see help message for each command\n"; public static final String CREATE_CMD_USAGE = @@ -109,17 +109,20 @@ public final class BackupCommands { public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n" + " name Backup set name\n" - + " tables Comma separated list of tables.\n" - + "COMMAND is one of:\n" + " add add tables to a set, create a set if needed\n" + + " tables Comma separated list of tables.\n" + "COMMAND is one of:\n" + + " add add tables to a set, create a set if needed\n" + " remove remove tables from a set\n" + " list list all backup sets in the system\n" + " describe describe set\n" + " delete delete backup set\n"; + public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n" + + " backup_ids Comma separated list of backup image ids.\n"; public static final String USAGE_FOOTER = ""; public static abstract class Command extends Configured { CommandLine cmdline; Connection conn; + Command(Configuration conf) { if (conf == null) { conf = HBaseConfiguration.create(); @@ -140,7 +143,7 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { List sessions = table.getBackupInfos(BackupState.RUNNING); - if(sessions.size() > 0) { + if (sessions.size() > 0) { System.err.println("Found backup session in a RUNNING state: "); System.err.println(sessions.get(0)); System.err.println("This may indicate that a previous session has failed abnormally."); @@ -154,11 +157,19 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { String[] ids = table.getListOfBackupIdsFromDeleteOperation(); - if(ids !=null && ids.length > 0) { - System.err.println("Found failed backup delete coommand. "); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup DELETE coommand. "); System.err.println("Backup system recovery is required."); - throw new IOException("Failed backup delete found, aborted command execution"); + throw new IOException("Failed backup DELETE found, aborted command execution"); } + + ids = table.getListOfBackupIdsFromMergeOperation(); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup MERGE coommand. "); + System.err.println("Backup system recovery is required."); + throw new IOException("Failed backup MERGE found, aborted command execution"); + } + } } } @@ -178,10 +189,10 @@ public final class BackupCommands { protected boolean requiresNoActiveSession() { return false; } + /** - * Command requires consistent state of a backup system - * Backup system may become inconsistent because of an abnormal - * termination of a backup session or delete command + * Command requires consistent state of a backup system Backup system may become inconsistent + * because of an abnormal termination of a backup session or delete command * @return true, if yes */ protected boolean requiresConsistentState() { @@ -220,6 +231,9 @@ public final class BackupCommands { case REPAIR: cmd = new RepairCommand(conf, cmdline); break; + case MERGE: + cmd = new MergeCommand(conf, cmdline); + break; case HELP: default: cmd = new HelpCommand(conf, cmdline); @@ -257,7 +271,7 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } String[] args = cmdline.getArgs(); - if (args.length !=3) { + if (args.length != 3) { printUsage(); throw new IOException(INCORRECT_USAGE); } @@ -274,7 +288,6 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } - String tables = null; // Check if we have both: backup set and list of tables @@ -310,14 +323,14 @@ public final class BackupCommands { try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { - BackupRequest.Builder builder = new BackupRequest.Builder(); - BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase())) - .withTableList(tables != null ? - Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) - .withTargetRootDir(args[2]) - .withTotalTasks(workers) - .withBandwidthPerTasks(bandwidth) - .withBackupSetName(setName).build(); + BackupRequest.Builder builder = new BackupRequest.Builder(); + BackupRequest request = + builder + .withBackupType(BackupType.valueOf(args[1].toUpperCase())) + .withTableList( + tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) + .withTargetRootDir(args[2]).withTotalTasks(workers) + .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build(); String backupId = admin.backupTables(request); System.out.println("Backup session " + backupId + " finished. Status: SUCCESS"); } catch (IOException e) { @@ -544,7 +557,8 @@ public final class BackupCommands { int deleted = admin.deleteBackups(backupIds); System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length); } catch (IOException e) { - System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); + System.err + .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); throw e; } @@ -584,8 +598,9 @@ public final class BackupCommands { if (list.size() == 0) { // No failed sessions found System.out.println("REPAIR status: no failed sessions found." - +" Checking failed delete backup operation ..."); + + " Checking failed delete backup operation ..."); repairFailedBackupDeletionIfAny(conn, sysTable); + repairFailedBackupMergeIfAny(conn, sysTable); return; } backupInfo = list.get(0); @@ -606,32 +621,55 @@ public final class BackupCommands { // If backup session is updated to FAILED state - means we // processed recovery already. sysTable.updateBackupInfo(backupInfo); - sysTable.finishBackupSession(); - System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo); + sysTable.finishBackupExclusiveOperation(); + System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo); } } private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable) - throws IOException - { + throws IOException { String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation(); - if (backupIds == null ||backupIds.length == 0) { - System.out.println("No failed backup delete operation found"); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup DELETE operation found"); // Delete backup table snapshot if exists BackupSystemTable.deleteSnapshot(conn); return; } - System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds)); - System.out.println("Running delete again ..."); + System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running DELETE again ..."); // Restore table from snapshot BackupSystemTable.restoreFromSnapshot(conn); // Finish previous failed session - sysTable.finishBackupSession(); - try(BackupAdmin admin = new BackupAdminImpl(conn);) { + sysTable.finishBackupExclusiveOperation(); + try (BackupAdmin admin = new BackupAdminImpl(conn);) { admin.deleteBackups(backupIds); } - System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds)); + System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds)); + + } + + private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable) + throws IOException { + String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation(); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup MERGE operation found"); + // Delete backup table snapshot if exists + BackupSystemTable.deleteSnapshot(conn); + return; + } + System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running MERGE again ..."); + // Restore table from snapshot + BackupSystemTable.restoreFromSnapshot(conn); + // Unlock backupo system + sysTable.finishBackupExclusiveOperation(); + // Finish previous failed session + sysTable.finishMergeOperation(); + try (BackupAdmin admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds)); } @@ -641,6 +679,56 @@ public final class BackupCommands { } } + private static class MergeCommand extends Command { + + MergeCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + protected boolean requiresNoActiveSession() { + return true; + } + + @Override + protected boolean requiresConsistentState() { + return true; + } + + @Override + public void execute() throws IOException { + super.execute(); + + String[] args = cmdline == null ? null : cmdline.getArgs(); + if (args == null || (args.length != 2)) { + System.err.println("ERROR: wrong number of arguments: " + + (args == null ? null : args.length)); + printUsage(); + throw new IOException(INCORRECT_USAGE); + } + + String[] backupIds = args[1].split(","); + if (backupIds.length < 2) { + String msg = "ERROR: can not merge a single backup image. "+ + "Number of images must be greater than 1."; + System.err.println(msg); + throw new IOException(msg); + + } + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); + try (final Connection conn = ConnectionFactory.createConnection(conf); + final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + } + + @Override + protected void printUsage() { + System.out.println(MERGE_CMD_USAGE); + } + } + // TODO Cancel command private static class CancelCommand extends Command { @@ -672,7 +760,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - int n = parseHistoryLength(); final TableName tableName = getTableName(); final String setName = getTableSetName(); @@ -883,7 +970,7 @@ public final class BackupCommands { private TableName[] toTableNames(String[] tables) { TableName[] arr = new TableName[tables.length]; - for (int i=0; i < tables.length; i++) { + for (int i = 0; i < tables.length; i++) { arr[i] = TableName.valueOf(tables[i]); } return arr; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index bf80506d35a..8fe5eaf3494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -115,8 +115,8 @@ public class BackupManager implements Closeable { } if (LOG.isDebugEnabled()) { - LOG.debug("Added log cleaner: " + cleanerClass +"\n" + - "Added master procedure manager: " + masterProcedureClass); + LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: " + + masterProcedureClass); } } @@ -185,9 +185,8 @@ public class BackupManager implements Closeable { * @return BackupInfo * @throws BackupException exception */ - public BackupInfo createBackupInfo(String backupId, BackupType type, - List tableList, String targetRootDir, int workers, long bandwidth) - throws BackupException { + public BackupInfo createBackupInfo(String backupId, BackupType type, List tableList, + String targetRootDir, int workers, long bandwidth) throws BackupException { if (targetRootDir == null) { throw new BackupException("Wrong backup request parameter: target backup root directory"); } @@ -313,7 +312,7 @@ public class BackupManager implements Closeable { } } else { Path logBackupPath = - HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId()); + HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId()); LOG.debug("Current backup has an incremental backup ancestor, " + "touching its image manifest in " + logBackupPath.toString() + " to construct the dependency."); @@ -371,7 +370,7 @@ public class BackupManager implements Closeable { * @throws IOException if active session already exists */ public void startBackupSession() throws IOException { - systemTable.startBackupSession(); + systemTable.startBackupExclusiveOperation(); } /** @@ -379,10 +378,9 @@ public class BackupManager implements Closeable { * @throws IOException if no active session */ public void finishBackupSession() throws IOException { - systemTable.finishBackupSession(); + systemTable.finishBackupExclusiveOperation(); } - /** * Read the last backup start code (timestamp) of last successful backup. Will return null if * there is no startcode stored in backup system table or the value is of length 0. These two @@ -413,7 +411,7 @@ public class BackupManager implements Closeable { } public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { + readBulkloadRows(List tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } @@ -448,8 +446,7 @@ public class BackupManager implements Closeable { */ public void writeRegionServerLogTimestamp(Set tables, HashMap newTimestamps) throws IOException { - systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, - backupInfo.getBackupRootDir()); + systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java index b8adac9fba8..7e3201efdbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -61,9 +62,8 @@ public class BackupManifest { public static final String MANIFEST_FILE_NAME = ".backup.manifest"; /** - * Backup image, the dependency graph is made up by series of backup images - * BackupImage contains all the relevant information to restore the backup and - * is used during restore operation + * Backup image, the dependency graph is made up by series of backup images BackupImage contains + * all the relevant information to restore the backup and is used during restore operation */ public static class BackupImage implements Comparable { @@ -294,6 +294,16 @@ public class BackupManifest { return this.ancestors; } + public void removeAncestors(List backupIds) { + List toRemove = new ArrayList(); + for (BackupImage im : this.ancestors) { + if (backupIds.contains(im.getBackupId())) { + toRemove.add(im); + } + } + this.ancestors.removeAll(toRemove); + } + private void addAncestor(BackupImage backupImage) { this.getAncestors().add(backupImage); } @@ -464,18 +474,16 @@ public class BackupManifest { } /** - * Persist the manifest file. + * TODO: fix it. Persist the manifest file. * @throws IOException IOException when storing the manifest file. */ public void store(Configuration conf) throws BackupException { byte[] data = backupImage.toProto().toByteArray(); // write the file, overwrite if already exist - String logBackupDir = - BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId()); Path manifestFilePath = - new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)), - MANIFEST_FILE_NAME); + new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(), + backupImage.getBackupId()), MANIFEST_FILE_NAME); try (FSDataOutputStream out = manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) { out.write(data); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index e5a3daace24..4dab046d524 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Pair; * value = backupId and full WAL file name *

*/ + @InterfaceAudience.Private public final class BackupSystemTable implements Closeable { private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); @@ -118,7 +119,7 @@ public final class BackupSystemTable implements Closeable { private TableName tableName; /** - * Stores backup sessions (contexts) + * Stores backup sessions (contexts) */ final static byte[] SESSIONS_FAMILY = "session".getBytes(); /** @@ -127,11 +128,10 @@ public final class BackupSystemTable implements Closeable { final static byte[] META_FAMILY = "meta".getBytes(); final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); /** - * Connection to HBase cluster, shared among all instances + * Connection to HBase cluster, shared among all instances */ private final Connection connection; - private final static String BACKUP_INFO_PREFIX = "session:"; private final static String START_CODE_ROW = "startcode:"; private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); @@ -147,6 +147,7 @@ public final class BackupSystemTable implements Closeable { private final static String BULK_LOAD_PREFIX = "bulk:"; private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); + private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); final static byte[] TBL_COL = Bytes.toBytes("tbl"); final static byte[] FAM_COL = Bytes.toBytes("fam"); @@ -160,7 +161,7 @@ public final class BackupSystemTable implements Closeable { private final static String SET_KEY_PREFIX = "backupset:"; // separator between BULK_LOAD_PREFIX and ordinals - protected final static String BLK_LD_DELIM = ":"; + protected final static String BLK_LD_DELIM = ":"; private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -187,19 +188,19 @@ public final class BackupSystemTable implements Closeable { } private void verifyNamespaceExists(Admin admin) throws IOException { - String namespaceName = tableName.getNamespaceAsString(); - NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); - NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); - boolean exists = false; - for( NamespaceDescriptor nsd: list) { - if (nsd.getName().equals(ns.getName())) { - exists = true; - break; - } - } - if (!exists) { - admin.createNamespace(ns); + String namespaceName = tableName.getNamespaceAsString(); + NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); + NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); + boolean exists = false; + for (NamespaceDescriptor nsd : list) { + if (nsd.getName().equals(ns.getName())) { + exists = true; + break; } + } + if (!exists) { + admin.createNamespace(ns); + } } private void waitForSystemTable(Admin admin) throws IOException { @@ -211,15 +212,13 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); + throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); } } LOG.debug("Backup table exists and available"); } - - @Override public void close() { // do nothing @@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable { byte[] row = CellUtil.cloneRow(res.listCells().get(0)); for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); } } @@ -286,13 +285,13 @@ public final class BackupSystemTable implements Closeable { String path = null; for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, - BackupSystemTable.TBL_COL.length) == 0) { + BackupSystemTable.TBL_COL.length) == 0) { tbl = TableName.valueOf(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = CellUtil.cloneValue(cell); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } } @@ -313,9 +312,10 @@ public final class BackupSystemTable implements Closeable { } files.add(new Path(path)); if (LOG.isDebugEnabled()) { - LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); } - }; + } + ; return mapForSrc; } } @@ -359,16 +359,16 @@ public final class BackupSystemTable implements Closeable { public void writePathsPostBulkLoad(TableName tabName, byte[] region, Map> finalPaths) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - finalPaths.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, - finalPaths); + List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } + /* * For preCommitStoreFile() hook * @param tabName table name @@ -376,15 +376,15 @@ public final class BackupSystemTable implements Closeable { * @param family column family * @param pairs list of paths for hfiles */ - public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, - final byte[] family, final List> pairs) throws IOException { + public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, + final List> pairs) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - pairs.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, - family, pairs); + List puts = + BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } @@ -411,11 +411,11 @@ public final class BackupSystemTable implements Closeable { /* * Reads the rows from backup table recording bulk loaded hfiles * @param tableList list of table names - * @return The keys of the Map are table, region and column family. - * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) + * @return The keys of the Map are table, region and column family. Value of the map reflects + * whether the hfile was recorded by preCommitStoreFile hook (true) */ public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { + readBulkloadRows(List tableList) throws IOException { Map>>>> map = new HashMap<>(); List rows = new ArrayList<>(); for (TableName tTable : tableList) { @@ -437,13 +437,13 @@ public final class BackupSystemTable implements Closeable { String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, - BackupSystemTable.STATE_COL.length) == 0) { + BackupSystemTable.STATE_COL.length) == 0) { byte[] state = CellUtil.cloneValue(cell); if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { raw = true; @@ -484,12 +484,13 @@ public final class BackupSystemTable implements Closeable { Map> map = maps[idx]; TableName tn = sTableList.get(idx); if (map == null) continue; - for (Map.Entry> entry: map.entrySet()) { + for (Map.Entry> entry : map.entrySet()) { byte[] fam = entry.getKey(); List paths = entry.getValue(); for (Path p : paths) { - Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), - backupId, ts, cnt++); + Put put = + BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, + cnt++); puts.add(put); } } @@ -564,18 +565,23 @@ public final class BackupSystemTable implements Closeable { } } - public void startBackupSession() throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Start new backup session"); + /** + * Exclusive operations are: + * create, delete, merge + * @throws IOException + */ + public void startBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Start new backup exclusive operation"); } try (Table table = connection.getTable(tableName)) { Put put = createPutForStartBackupSession(); - //First try to put if row does not exist + // First try to put if row does not exist if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { // Row exists, try to put if value == ACTIVE_SESSION_NO if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO, put)) { - throw new IOException("There is an active backup session"); + throw new IOException("There is an active backup exclusive operation"); } } } @@ -587,17 +593,15 @@ public final class BackupSystemTable implements Closeable { return put; } - public void finishBackupSession() throws IOException - { - if (LOG.isTraceEnabled()) { - LOG.trace("Stop backup session"); + public void finishBackupExclusiveOperation() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Finish backup exclusive operation"); } try (Table table = connection.getTable(tableName)) { Put put = createPutForStopBackupSession(); - if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, - ACTIVE_SESSION_YES, put)) - { - throw new IOException("There is no active backup session"); + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_YES, put)) { + throw new IOException("There is no active backup exclusive operation"); } } } @@ -630,8 +634,7 @@ public final class BackupSystemTable implements Closeable { res.advance(); Cell cell = res.current(); byte[] row = CellUtil.cloneRow(cell); - String server = - getServerNameForReadRegionServerLastLogRollResult(row); + String server = getServerNameForReadRegionServerLastLogRollResult(row); byte[] data = CellUtil.cloneValue(cell); rsTimestampMap.put(server, Bytes.toLong(data)); } @@ -652,8 +655,7 @@ public final class BackupSystemTable implements Closeable { LOG.trace("write region server last roll log result to backup system table"); } try (Table table = connection.getTable(tableName)) { - Put put = - createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); table.put(put); } } @@ -685,14 +687,15 @@ public final class BackupSystemTable implements Closeable { /** * Get first n backup history records - * @param n number of records + * @param n number of records, if n== -1 - max number + * is ignored * @return list of records * @throws IOException */ public List getHistory(int n) throws IOException { List history = getBackupHistory(); - if (history.size() <= n) return history; + if (n == -1 || history.size() <= n) return history; List list = new ArrayList(); for (int i = 0; i < n; i++) { list.add(history.get(i)); @@ -703,7 +706,8 @@ public final class BackupSystemTable implements Closeable { /** * Get backup history records filtered by list of filters. - * @param n max number of records + * @param n max number of records, if n == -1 , then max number + * is ignored * @param filters list of filters * @return backup records * @throws IOException @@ -714,7 +718,7 @@ public final class BackupSystemTable implements Closeable { List history = getBackupHistory(); List result = new ArrayList(); for (BackupInfo bi : history) { - if (result.size() == n) break; + if (n >= 0 && result.size() == n) break; boolean passed = true; for (int i = 0; i < filters.length; i++) { if (!filters[i].apply(bi)) { @@ -852,9 +856,7 @@ public final class BackupSystemTable implements Closeable { List puts = new ArrayList(); for (TableName table : tables) { byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); - Put put = - createPutForWriteRegionServerLogTimestamp(table, smapData, - backupRoot); + Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); puts.add(put); } try (Table table = connection.getTable(tableName)) { @@ -1018,8 +1020,7 @@ public final class BackupSystemTable implements Closeable { } } try (Table table = connection.getTable(tableName)) { - List puts = - createPutsForAddWALFiles(files, backupId, backupRoot); + List puts = createPutsForAddWALFiles(files, backupId, backupRoot); table.put(puts); } } @@ -1087,6 +1088,7 @@ public final class BackupSystemTable implements Closeable { * @param file name of a file to check * @return true, if deletable, false otherwise. * @throws IOException exception + * TODO: multiple backup destination support */ public boolean isWALFileDeletable(String file) throws IOException { if (LOG.isTraceEnabled()) { @@ -1271,12 +1273,12 @@ public final class BackupSystemTable implements Closeable { if (disjoint.length > 0 && disjoint.length != tables.length) { Put put = createPutForBackupSet(name, disjoint); table.put(put); - } else if(disjoint.length == tables.length) { + } else if (disjoint.length == tables.length) { LOG.warn("Backup set '" + name + "' does not contain tables [" + StringUtils.join(toRemove, " ") + "]"); } else { // disjoint.length == 0 and tables.length >0 - // Delete backup set - LOG.info("Backup set '"+name+"' is empty. Deleting."); + // Delete backup set + LOG.info("Backup set '" + name + "' is empty. Deleting."); deleteBackupSet(name); } } finally { @@ -1356,7 +1358,7 @@ public final class BackupSystemTable implements Closeable { } public static String getSnapshotName(Configuration conf) { - return "snapshot_"+getTableNameAsString(conf).replace(":", "_"); + return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); } /** @@ -1589,17 +1591,16 @@ public final class BackupSystemTable implements Closeable { for (Path path : entry.getValue()) { String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); puts.add(put); - LOG.debug("writing done bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); } } return puts; @@ -1607,19 +1608,16 @@ public final class BackupSystemTable implements Closeable { public static void snapshot(Connection conn) throws IOException { - try (Admin admin = conn.getAdmin();){ + try (Admin admin = conn.getAdmin();) { Configuration conf = conn.getConfiguration(); - admin.snapshot(BackupSystemTable.getSnapshotName(conf), - BackupSystemTable.getTableName(conf)); + admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); } } - public static void restoreFromSnapshot(Connection conn) - throws IOException { + public static void restoreFromSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + - " from snapshot"); + LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { @@ -1631,8 +1629,8 @@ public final class BackupSystemTable implements Closeable { // Snapshot does not exists, i.e completeBackup failed after // deleting backup system table snapshot // In this case we log WARN and proceed - LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+ - " does not exists."); + LOG.warn("Could not restore backup system table. Snapshot " + snapshotName + + " does not exists."); } } } @@ -1640,7 +1638,7 @@ public final class BackupSystemTable implements Closeable { protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { List list = admin.listSnapshots(); - for (SnapshotDescription desc: list) { + for (SnapshotDescription desc : list) { if (desc.getName().equals(snapshotName)) { return true; } @@ -1648,26 +1646,25 @@ public final class BackupSystemTable implements Closeable { return false; } - public static boolean snapshotExists (Connection conn) throws IOException { + public static boolean snapshotExists(Connection conn) throws IOException { return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); } - public static void deleteSnapshot(Connection conn) - throws IOException { + public static void deleteSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + - " from the system"); + LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { admin.deleteSnapshot(snapshotName); LOG.debug("Done deleting backup system table snapshot"); } else { - LOG.error("Snapshot "+snapshotName+" does not exists"); + LOG.error("Snapshot " + snapshotName + " does not exists"); } } } + /* * Creates Put's for bulk load resulting from running LoadIncrementalHFiles */ @@ -1678,17 +1675,16 @@ public final class BackupSystemTable implements Closeable { Path path = pair.getSecond(); String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), + BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); puts.add(put); - LOG.debug("writing raw bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); } return puts; } @@ -1725,7 +1721,6 @@ public final class BackupSystemTable implements Closeable { return get; } - public void startDeleteOperation(String[] backupIdList) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); @@ -1765,6 +1760,96 @@ public final class BackupSystemTable implements Closeable { } } + private Put createPutForMergeOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + public boolean isMergeInProgress() throws IOException { + Get get = new Get(MERGE_OP_ROW); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + private Put createPutForUpdateTablesForMerge(List tables) { + + byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, PATH_COL, value); + return put; + } + + private Delete createDeleteForBackupMergeOperation() { + + Delete delete = new Delete(MERGE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForMergeOperation() { + + Get get = new Get(MERGE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startMergeOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForMergeOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void updateProcessedTablesForMerge(List tables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); + } + Put put = createPutForUpdateTablesForMerge(tables); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih merge operation for backup ids "); + } + Delete delete = createDeleteForBackupMergeOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get backup ids for merge operation"); + } + Get get = createGetForMergeOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { Scan scan = new Scan(); byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); @@ -1776,10 +1861,12 @@ public final class BackupSystemTable implements Closeable { scan.setMaxVersions(1); return scan; } + static String getTableNameFromOrigBulkLoadRow(String rowStr) { String[] parts = rowStr.split(BLK_LD_DELIM); return parts[1]; } + static String getRegionNameFromOrigBulkLoadRow(String rowStr) { // format is bulk : namespace : table : region : file String[] parts = rowStr.split(BLK_LD_DELIM); @@ -1791,6 +1878,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); return parts[idx]; } + /* * Used to query bulk loaded hfiles which have been copied by incremental backup * @param backupId the backup Id. It can be null when querying for all tables @@ -1798,13 +1886,14 @@ public final class BackupSystemTable implements Closeable { */ static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { Scan scan = new Scan(); - byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES : - rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM); + byte[] startRow = + backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + + BLK_LD_DELIM); byte[] stopRow = Arrays.copyOf(startRow, startRow.length); stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); scan.setStartRow(startRow); scan.setStopRow(stopRow); - //scan.setTimeRange(lower, Long.MAX_VALUE); + // scan.setTimeRange(lower, Long.MAX_VALUE); scan.addFamily(BackupSystemTable.META_FAMILY); scan.setMaxVersions(1); return scan; @@ -1812,12 +1901,13 @@ public final class BackupSystemTable implements Closeable { static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, long ts, int idx) { - Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx)); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); return put; } + /** * Creates put list for list of WAL files * @param files list of WAL file paths @@ -1825,8 +1915,9 @@ public final class BackupSystemTable implements Closeable { * @return put list * @throws IOException exception */ - private List createPutsForAddWALFiles(List files, String backupId, - String backupRoot) throws IOException { + private List + createPutsForAddWALFiles(List files, String backupId, String backupRoot) + throws IOException { List puts = new ArrayList(); for (String file : files) { @@ -1957,5 +2048,4 @@ public final class BackupSystemTable implements Closeable { return sb.toString().getBytes(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 381e9b15dfa..ea7a7b8b93b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; @@ -58,7 +58,6 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; - private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -107,8 +106,7 @@ public class RestoreTablesClient { if (existTableList.size() > 0) { if (!isOverwrite) { - LOG.error("Existing table (" - + existTableList + LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " + "\"-overwrite\" option in the command if you mean" + " to restore to these existing tables"); @@ -148,9 +146,8 @@ public class RestoreTablesClient { Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); // We need hFS only for full restore (see the code) - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { - fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -169,8 +166,8 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), - im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + String fileBackupDir = + HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); dirList.add(new Path(fileBackupDir)); } @@ -196,8 +193,10 @@ public class RestoreTablesClient { TreeSet restoreImageSet = new TreeSet(); boolean truncateIfExists = isOverwrite; Set backupIdSet = new HashSet<>(); + for (int i = 0; i < sTableArray.length; i++) { TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); // Get the image list of this backup for restore in time order from old // to new. @@ -213,11 +212,8 @@ public class RestoreTablesClient { if (restoreImageSet != null && !restoreImageSet.isEmpty()) { LOG.info("Restore includes the following image(s):"); for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); + LOG.info("Backup: " + image.getBackupId() + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); if (image.getType() == BackupType.INCREMENTAL) { backupIdSet.add(image.getBackupId()); LOG.debug("adding " + image.getBackupId() + " for bulk load"); @@ -232,13 +228,13 @@ public class RestoreTablesClient { Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); Map loaderResult; conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf); + LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); for (int i = 0; i < sTableList.size(); i++) { if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i]; + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; LOG.error(msg); throw new IOException(msg); } @@ -253,7 +249,7 @@ public class RestoreTablesClient { if (backupId == null) { return 0; } - return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1)); + return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1)); } static boolean withinRange(long a, long lower, long upper) { @@ -268,15 +264,15 @@ public class RestoreTablesClient { // case VALIDATION: // check the target tables checkTargetTables(tTableArray, isOverwrite); + // case RESTORE_IMAGES: HashMap backupManifestMap = new HashMap<>(); // check and load backup image manifest for the tables Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); } - - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java new file mode 100644 index 00000000000..00c5b839503 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupMergeJob; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Tool; + +/** + * MapReduce implementation of {@link BackupMergeJob} + * Must be initialized with configuration of a backup destination cluster + * + */ + +@InterfaceAudience.Private +public class MapReduceBackupMergeJob implements BackupMergeJob { + public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class); + + protected Tool player; + protected Configuration conf; + + public MapReduceBackupMergeJob() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List> processedTableList = new ArrayList>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Get exclusive lock on backup system + table.startBackupExclusiveOperation(); + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + + player.setConf(getConf()); + result = player.run(playerArgs); + if (!succeeded(result)) { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + // Add to processed table list + processedTableList.add(new Pair(tableNames[i], bulkOutputPath)); + LOG.debug("Merge Job finished:" + result); + } + List tableList = toTableNameList(processedTableList); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + + // Delete old data and update manifest + List backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + // Finish merge session + table.finishMergeOperation(); + // Release lock + table.finishBackupExclusiveOperation(); + } catch (RuntimeException e) { + + throw e; + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs(fs, toPathList(processedTableList)); + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + throw new IOException("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException( + "Backup merge operation failed, run backup repair tool to restore system's integrity", + e); + } + } finally { + table.close(); + conn.close(); + } + } + + protected List toPathList(List> processedTableList) { + ArrayList list = new ArrayList(); + for (Pair p : processedTableList) { + list.add(p.getSecond()); + } + return list; + } + + protected List toTableNameList(List> processedTableList) { + ArrayList list = new ArrayList(); + for (Pair p : processedTableList) { + list.add(p.getFirst()); + } + return list; + } + + protected void cleanupBulkLoadDirs(FileSystem fs, List pathList) throws IOException { + for (Path path : pathList) { + + if (!fs.delete(path, true)) { + LOG.warn("Can't delete " + path); + } + } + } + + protected void updateBackupManifest(String backupRoot, String mergedBackupId, + List backupsToDelete) throws IllegalArgumentException, IOException { + + BackupManifest manifest = + HBackupFileSystem.getManifest(conf, new Path(backupRoot), mergedBackupId); + manifest.getBackupImage().removeAncestors(backupsToDelete); + // save back + manifest.store(conf); + + } + + protected void deleteBackupImages(List backupIds, Connection conn, FileSystem fs, + String backupRoot) throws IOException { + + // Delete from backup system table + try (BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + table.deleteBackupInfo(backupId); + } + } + + // Delete from file system + for (String backupId : backupIds) { + Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, backupId); + + if (!fs.delete(backupDirPath, true)) { + LOG.warn("Could not delete " + backupDirPath); + } + } + } + + protected List getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { + List list = new ArrayList(); + for (String id : backupIds) { + if (id.equals(mergedBackupId)) { + continue; + } + list.add(id); + } + return list; + } + + protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, TableName tableName, + String mergedBackupId) throws IllegalArgumentException, IOException { + + Path dest = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName)); + + // Delete all in dest + if (!fs.delete(dest, true)) { + throw new IOException("Could not delete " + dest); + } + + FileStatus[] fsts = fs.listStatus(bulkOutputPath); + for (FileStatus fst : fsts) { + if (fst.isDirectory()) { + fs.rename(fst.getPath().getParent(), dest); + } + } + + } + + protected String findMostRecentBackupId(String[] backupIds) { + long recentTimestamp = Long.MIN_VALUE; + for (String backupId : backupIds) { + long ts = Long.parseLong(backupId.split("_")[1]); + if (ts > recentTimestamp) { + recentTimestamp = ts; + } + } + return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; + } + + protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException { + + Set allSet = new HashSet(); + + try (Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + + allSet.addAll(bInfo.getTableNames()); + } + } + + TableName[] ret = new TableName[allSet.size()]; + return allSet.toArray(ret); + } + + protected Path[] findInputDirectories(FileSystem fs, String backupRoot, TableName tableName, + String[] backupIds) throws IOException { + + List dirs = new ArrayList(); + + for (String backupId : backupIds) { + Path fileBackupDirPath = + new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName)); + if (fs.exists(fileBackupDirPath)) { + dirs.add(fileBackupDirPath); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("File: " + fileBackupDirPath + " does not exist."); + } + } + } + Path[] ret = new Path[dirs.size()]; + return dirs.toArray(ret); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index ba1b65e8da3..49e8c757911 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -53,18 +53,18 @@ import org.apache.hadoop.util.ToolRunner; * for later bulk importing. */ @InterfaceAudience.Private -public class HFileSplitterJob extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class); +public class MapReduceHFileSplitterJob extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); final static String NAME = "HFileSplitterJob"; public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; public final static String TABLES_KEY = "hfile.input.tables"; public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - public HFileSplitterJob() { + public MapReduceHFileSplitterJob() { } - protected HFileSplitterJob(final Configuration c) { + protected MapReduceHFileSplitterJob(final Configuration c) { super(c); } @@ -111,7 +111,7 @@ public class HFileSplitterJob extends Configured implements Tool { Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); - job.setJarByClass(HFileSplitterJob.class); + job.setJarByClass(MapReduceHFileSplitterJob.class); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); @@ -164,7 +164,7 @@ public class HFileSplitterJob extends Configured implements Tool { * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args); + int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); System.exit(ret); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 4161ca9ccac..1209e7c31b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -17,31 +17,31 @@ */ package org.apache.hadoop.hbase.backup.mapreduce; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; + /** * MapReduce implementation of {@link RestoreJob} * - * For full backup restore, it runs {@link HFileSplitterJob} job and creates + * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates * HFiles which are aligned with a region boundaries of a table being - * restored, for incremental backup restore it runs {@link WALPlayer} in - * bulk load mode (creates HFiles from WAL edits). + * restored. * * The resulting HFiles then are loaded using HBase bulk load tool * {@link LoadIncrementalHFiles} @@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob { if (LOG.isDebugEnabled()) { LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + " backup from directory " + dirs + " from hbase tables " - + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + - " to tables " + + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + + " to tables " + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); } @@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob { LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), + getConf()); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); String[] playerArgs = - { dirs, - fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() - }; + { + dirs, + fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i] + .getNameAsString() }; int result = 0; int loaderResult = 0; @@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(getConf()); + LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -113,60 +116,13 @@ public class MapReduceRestoreJob implements RestoreJob { } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { + LOG.error(e); throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } - } } - private String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - private boolean failed(int result) { - return result != 0; - } - - private boolean succeeded(int result) { - return result == 0; - } - - public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // hbase.rpc.timeout 600000 - // calculates - Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(config); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); - } - return loader; - } - - private Path getBulkOutputDir(String tableName) throws IOException { - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - fs.deleteOnExit(path); - return path; - } - @Override public Configuration getConf() { return conf; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index e32853d8092..ce77645e34d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public final class BackupUtils { protected static final Log LOG = LogFactory.getLog(BackupUtils.class); public static final String LOGNAME_SEPARATOR = "."; + public static final int MILLISEC_IN_HOUR = 3600000; private BackupUtils() { throw new AssertionError("Instantiating utility class..."); } /** - * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp - * value for the RS among the tables. + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value + * for the RS among the tables. * @param rsLogTimestampMap timestamp map * @return the min timestamp of each RS */ @@ -114,16 +117,17 @@ public final class BackupUtils { } /** - * copy out Table RegionInfo into incremental backup image need to consider move this - * logic into HBackupFileSystem + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem * @param conn connection * @param backupInfo backup info * @param conf configuration * @throws IOException exception * @throws InterruptedException exception */ - public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException, InterruptedException { + public static void + copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) + throws IOException, InterruptedException { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); @@ -152,10 +156,8 @@ public final class BackupUtils { LOG.debug("Starting to write region info for table " + table); for (HRegionInfo regionInfo : regions) { Path regionDir = - HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), - regionInfo); - regionDir = - new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); } LOG.debug("Finished writing region info for table " + table); @@ -301,7 +303,6 @@ public final class BackupUtils { return ret; } - /** * Check whether the backup path exist * @param backupStr backup @@ -431,8 +432,7 @@ public final class BackupUtils { * @param conf configuration * @throws IOException exception */ - private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) - throws IOException { + private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException { String logDir = backupInfo.getHLogTargetDir(); if (logDir == null) { @@ -452,7 +452,6 @@ public final class BackupUtils { } } - private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { try { // clean up the data at target directory @@ -498,8 +497,8 @@ public final class BackupUtils { * @param tableName table name * @return backupPath String for the particular table */ - public static String getTableBackupDir(String backupRootDir, String backupId, - TableName tableName) { + public static String + getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + Path.SEPARATOR; @@ -523,7 +522,6 @@ public final class BackupUtils { return list; } - /** * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and @@ -655,19 +653,16 @@ public final class BackupUtils { * @param backupId backup id * @param check check only * @param fromTables table list from - * @param toTables table list to + * @param toTables table list to * @param isOverwrite overwrite data * @return request obkect */ public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); - RestoreRequest request = builder.withBackupRootDir(backupRootDir) - .withBackupId(backupId) - .withCheck(check) - .withFromTables(fromTables) - .withToTables(toTables) - .withOvewrite(isOverwrite).build(); + RestoreRequest request = + builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) + .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); return request; } @@ -699,4 +694,54 @@ public final class BackupUtils { return isValid; } + public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit) + throws IOException { + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + return path; + } + + public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException { + return getBulkOutputDir(tableName, conf, true); + } + + public static String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + public static boolean failed(int result) { + return result != 0; + } + + public static boolean succeeded(int result) { + return result == 0; + } + + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // hbase.rpc.timeout 600000 + // calculates + Configuration conf = new Configuration(config); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java new file mode 100644 index 00000000000..7011ed32a56 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +public class TestIncrementalBackupMergeWithFailures extends TestBackupBase { + private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMergeWithFailures.class); + + static enum FailurePhase { + PHASE1, PHASE2, PHASE3, PHASE4 + } + public final static String FAILURE_PHASE_KEY = "failurePhase"; + + static class BackupMergeJobWithFailures extends MapReduceBackupMergeJob { + + FailurePhase failurePhase; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + String val = conf.get(FAILURE_PHASE_KEY); + if (val != null) { + failurePhase = FailurePhase.valueOf(val); + } else { + Assert.fail("Failure phase is not set"); + } + } + + + /** + * This is the exact copy of parent's run() with injections + * of different types of failures + */ + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List> processedTableList = new ArrayList>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Start backup exclusive operation + table.startBackupExclusiveOperation(); + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + // PHASE 1 + checkFailure(FailurePhase.PHASE1); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + // PHASE 2 + checkFailure(FailurePhase.PHASE2); + player.setConf(getConf()); + result = player.run(playerArgs); + if (succeeded(result)) { + // Add to processed table list + processedTableList.add(new Pair(tableNames[i], bulkOutputPath)); + } else { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + LOG.debug("Merge Job finished:" + result); + } + List tableList = toTableNameList(processedTableList); + // PHASE 3 + checkFailure(FailurePhase.PHASE3); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + // PHASE 4 + checkFailure(FailurePhase.PHASE4); + // Delete old data and update manifest + List backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + // Finish merge session + table.finishMergeOperation(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs(fs, toPathList(processedTableList)); + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + throw new IOException("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException( + "Backup merge operation failed, run backup repair tool to restore system's integrity", + e); + } + } finally { + table.close(); + conn.close(); + } + + } + + private void checkFailure(FailurePhase phase) throws IOException { + if ( failurePhase != null && failurePhase == phase) { + throw new IOException (phase.toString()); + } + } + + } + + + @Test + public void TestIncBackupMergeRestore() throws Exception { + + int ADD_ROWS = 99; + // #1 - create full backup for all tables + LOG.info("create full backup image for all tables"); + + List tables = Lists.newArrayList(table1, table2); + // Set custom Merge Job implementation + conf1.setClass(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS, + BackupMergeJobWithFailures.class, BackupMergeJob.class); + + Connection conn = ConnectionFactory.createConnection(conf1); + + HBaseAdmin admin = null; + admin = (HBaseAdmin) conn.getAdmin(); + BackupAdminImpl client = new BackupAdminImpl(conn); + + BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); + String backupIdFull = client.backupTables(request); + + assertTrue(checkSucceeded(backupIdFull)); + + // #2 - insert some data to table1 + HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); + LOG.debug("writing " + ADD_ROWS + " rows to " + table1); + + Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); + t1.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table1); + + HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); + + Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); + t2.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table2); + + // #3 - incremental backup for multiple tables + tables = Lists.newArrayList(table1, table2); + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple = client.backupTables(request); + + assertTrue(checkSucceeded(backupIdIncMultiple)); + + t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); + t1.close(); + + t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); + t2.close(); + + // #3 - incremental backup for multiple tables + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple2 = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple2)); + + // #4 Merge backup images with failures + + for ( FailurePhase phase : FailurePhase.values()) { + Configuration conf = conn.getConfiguration(); + + conf.set(FAILURE_PHASE_KEY, phase.toString()); + + try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) + { + String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; + bAdmin.mergeBackups(backups); + Assert.fail("Expected IOException"); + } catch (IOException e) { + BackupSystemTable table = new BackupSystemTable(conn); + if(phase.ordinal() < FailurePhase.PHASE4.ordinal()) { + // No need to repair: + // Both Merge and backup exclusive operations are finished + assertFalse(table.isMergeInProgress()); + try { + table.finishBackupExclusiveOperation(); + Assert.fail("IOException is expected"); + } catch(IOException ee) { + // Expected + } + } else { + // Repair is required + assertTrue(table.isMergeInProgress()); + try { + table.startBackupExclusiveOperation(); + Assert.fail("IOException is expected"); + } catch(IOException ee) { + // Expected - clean up before proceeding + table.finishMergeOperation(); + table.finishBackupExclusiveOperation(); + } + } + table.close(); + LOG.debug("Expected :"+ e.getMessage()); + } + } + + // Now merge w/o failures + Configuration conf = conn.getConfiguration(); + conf.unset(FAILURE_PHASE_KEY); + conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS); + + try (BackupAdmin bAdmin = new BackupAdminImpl(conn);) { + String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; + bAdmin.mergeBackups(backups); + } + + // #6 - restore incremental backup for multiple tables, with overwrite + TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; + TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, + tablesRestoreIncMultiple, tablesMapIncMultiple, true)); + + Table hTable = conn.getTable(table1_restore); + LOG.debug("After incremental restore: " + hTable.getTableDescriptor()); + LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); + Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + + hTable.close(); + + hTable = conn.getTable(table2_restore); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + hTable.close(); + + admin.close(); + conn.close(); + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java index 9c476419de3..556521f7cb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java @@ -74,7 +74,7 @@ public class TestRepairAfterFailedDelete extends TestBackupBase { admin.restoreSnapshot(snapshotName); admin.enableTable(BackupSystemTable.getTableName(conf1)); // Start backup session - table.startBackupSession(); + table.startBackupExclusiveOperation(); // Start delete operation table.startDeleteOperation(backupIds);