From 82df28fffe7025db6fa20e06343d16f52f40e3f7 Mon Sep 17 00:00:00 2001 From: Jonathan Hsieh Date: Wed, 13 Feb 2013 18:43:44 +0000 Subject: [PATCH] HBASE-6864 Online snapshots scaffolding git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445828 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/protobuf/generated/HBaseProtos.java | 49 ++- hbase-protocol/src/main/protobuf/hbase.proto | 5 +- .../hadoop/hbase/client/HBaseAdmin.java | 2 +- .../snapshot/EnabledTableSnapshotHandler.java | 97 +++++ .../snapshot/MasterSnapshotVerifier.java | 41 +- .../master/snapshot/SnapshotManager.java | 87 +++- .../master/snapshot/TakeSnapshotHandler.java | 1 + .../hbase/regionserver/HRegionServer.java | 38 +- .../snapshot/RegionServerSnapshotManager.java | 375 ++++++++++++++++++ .../snapshot/SnapshotDescriptionUtils.java | 54 +-- .../hbase/snapshot/TakeSnapshotUtils.java | 1 - .../hbase/client/TestSnapshotFromClient.java | 10 +- .../master/snapshot/TestSnapshotManager.java | 6 +- .../TestSnapshotDescriptionUtils.java | 37 +- 14 files changed, 636 insertions(+), 167 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java index 716d8f6b565..0e24b409977 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java @@ -11318,7 +11318,7 @@ public final class HBaseProtos { boolean hasCreationTime(); long getCreationTime(); - // optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP]; + // optional .SnapshotDescription.Type type = 4 [default = FLUSH]; boolean hasType(); org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type getType(); @@ -11357,13 +11357,11 @@ public final class HBaseProtos { public enum Type implements com.google.protobuf.ProtocolMessageEnum { DISABLED(0, 0), - TIMESTAMP(1, 1), - GLOBAL(2, 2), + FLUSH(1, 1), ; public static final int DISABLED_VALUE = 0; - public static final int TIMESTAMP_VALUE = 1; - public static final int GLOBAL_VALUE = 2; + public static final int FLUSH_VALUE = 1; public final int getNumber() { return value; } @@ -11371,8 +11369,7 @@ public final class HBaseProtos { public static Type valueOf(int value) { switch (value) { case 0: return DISABLED; - case 1: return TIMESTAMP; - case 2: return GLOBAL; + case 1: return FLUSH; default: return null; } } @@ -11403,7 +11400,7 @@ public final class HBaseProtos { } private static final Type[] VALUES = { - DISABLED, TIMESTAMP, GLOBAL, + DISABLED, FLUSH, }; public static Type valueOf( @@ -11501,7 +11498,7 @@ public final class HBaseProtos { return creationTime_; } - // optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP]; + // optional .SnapshotDescription.Type type = 4 [default = FLUSH]; public static final int TYPE_FIELD_NUMBER = 4; private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_; public boolean hasType() { @@ -11525,7 +11522,7 @@ public final class HBaseProtos { name_ = ""; table_ = ""; creationTime_ = 0L; - type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP; + type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH; version_ = 0; } private byte memoizedIsInitialized = -1; @@ -11787,7 +11784,7 @@ public final class HBaseProtos { bitField0_ = (bitField0_ & ~0x00000002); creationTime_ = 0L; bitField0_ = (bitField0_ & ~0x00000004); - type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP; + type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH; bitField0_ = (bitField0_ & ~0x00000008); version_ = 0; bitField0_ = (bitField0_ & ~0x00000010); @@ -12045,8 +12042,8 @@ public final class HBaseProtos { return this; } - // optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP]; - private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP; + // optional .SnapshotDescription.Type type = 4 [default = FLUSH]; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH; public boolean hasType() { return ((bitField0_ & 0x00000008) == 0x00000008); } @@ -12064,7 +12061,7 @@ public final class HBaseProtos { } public Builder clearType() { bitField0_ = (bitField0_ & ~0x00000008); - type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP; + type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH; onChanged(); return this; } @@ -12243,19 +12240,19 @@ public final class HBaseProtos { "value\030\002 \002(\t\",\n\rNameBytesPair\022\014\n\004name\030\001 \002" + "(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesBytesPair\022\r\n\005f" + "irst\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n\rNameInt64P" + - "air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\301\001\n\023Sna" + + "air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\255\001\n\023Sna" + "pshotDescription\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030" + - "\002 \001(\t\022\027\n\014creationTime\030\003 \001(\003:\0010\0222\n\004type\030\004" + - " \001(\0162\031.SnapshotDescription.Type:\tTIMESTA" + - "MP\022\017\n\007version\030\005 \001(\005\"/\n\004Type\022\014\n\010DISABLED\020" + - "\000\022\r\n\tTIMESTAMP\020\001\022\n\n\006GLOBAL\020\002*r\n\013CompareT" + - "ype\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005EQUA", - "L\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUAL\020\004" + - "\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006*_\n\007KeyType\022\013\n\007M" + - "INIMUM\020\000\022\007\n\003PUT\020\004\022\n\n\006DELETE\020\010\022\021\n\rDELETE_" + - "COLUMN\020\014\022\021\n\rDELETE_FAMILY\020\016\022\014\n\007MAXIMUM\020\377" + - "\001B>\n*org.apache.hadoop.hbase.protobuf.ge" + - "neratedB\013HBaseProtosH\001\240\001\001" + "\002 \001(\t\022\027\n\014creationTime\030\003 \001(\003:\0010\022.\n\004type\030\004" + + " \001(\0162\031.SnapshotDescription.Type:\005FLUSH\022\017" + + "\n\007version\030\005 \001(\005\"\037\n\004Type\022\014\n\010DISABLED\020\000\022\t\n" + + "\005FLUSH\020\001*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n\rLES" + + "S_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024", + "\n\020GREATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_" + + "OP\020\006*_\n\007KeyType\022\013\n\007MINIMUM\020\000\022\007\n\003PUT\020\004\022\n\n" + + "\006DELETE\020\010\022\021\n\rDELETE_COLUMN\020\014\022\021\n\rDELETE_F" + + "AMILY\020\016\022\014\n\007MAXIMUM\020\377\001B>\n*org.apache.hado" + + "op.hbase.protobuf.generatedB\013HBaseProtos" + + "H\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hbase-protocol/src/main/protobuf/hbase.proto b/hbase-protocol/src/main/protobuf/hbase.proto index f0c86b0635e..ca2045b2220 100644 --- a/hbase-protocol/src/main/protobuf/hbase.proto +++ b/hbase-protocol/src/main/protobuf/hbase.proto @@ -278,9 +278,8 @@ message SnapshotDescription { optional int64 creationTime = 3 [default = 0]; enum Type { DISABLED = 0; - TIMESTAMP = 1; - GLOBAL = 2; + FLUSH = 1; } - optional Type type = 4 [default = TIMESTAMP]; + optional Type type = 4 [default = FLUSH]; optional int32 version = 5; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 1f24d5358ac..4a75813d0e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -2133,7 +2133,7 @@ public class HBaseAdmin implements Abortable, Closeable { */ public void snapshot(final String snapshotName, final String tableName) throws IOException, SnapshotCreationException, IllegalArgumentException { - snapshot(snapshotName, tableName, SnapshotDescription.Type.TIMESTAMP); + snapshot(snapshotName, tableName, SnapshotDescription.Type.FLUSH); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java new file mode 100644 index 00000000000..f3476b4226c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/EnabledTableSnapshotHandler.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.snapshot; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.collect.Lists; + +/** + * Handle the master side of taking a snapshot of an online table, regardless of snapshot type. + * Uses a {@link Procedure} to run the snapshot across all the involved regions. + * @see ProcedureCoordinator + */ +@InterfaceAudience.Private +public class EnabledTableSnapshotHandler extends TakeSnapshotHandler { + + private static final Log LOG = LogFactory.getLog(EnabledTableSnapshotHandler.class); + private final ProcedureCoordinator coordinator; + + public EnabledTableSnapshotHandler(SnapshotDescription snapshot, MasterServices master, + SnapshotManager manager) throws IOException { + super(snapshot, master); + this.coordinator = manager.getCoordinator(); + } + + // TODO consider switching over to using regionnames, rather than server names. This would allow + // regions to migrate during a snapshot, and then be involved when they are ready. Still want to + // enforce a snapshot time constraints, but lets us be potentially a bit more robust. + + /** + * This method kicks off a snapshot procedure. Other than that it hangs around for various + * phases to complete. + */ + @Override + protected void snapshotRegions(List> regions) + throws HBaseSnapshotException { + Set regionServers = new HashSet(regions.size()); + for (Pair region : regions) { + regionServers.add(region.getSecond().toString()); + } + + // start the snapshot on the RS + Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(), + this.snapshot.toByteArray(), Lists.newArrayList(regionServers)); + if (proc == null) { + String msg = "Failed to submit distribute procedure for snapshot '" + + snapshot.getName() + "'"; + LOG.error(msg); + throw new HBaseSnapshotException(msg); + } + + try { + // wait for the snapshot to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - snapshot finished!"); + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for snapshot to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + monitor.receive(e); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java index 9489110e933..8a5f5676193 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java @@ -48,10 +48,16 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil; /** * General snapshot verification on the master. *

- * This is a light-weight verification mechanism for all the files in a snapshot. It doesn't attempt - * to verify that the files are exact copies (that would be paramount to taking the snapshot - * again!), but instead just attempts to ensure that the files match the expected files and are the - * same length. + * This is a light-weight verification mechanism for all the files in a snapshot. It doesn't + * attempt to verify that the files are exact copies (that would be paramount to taking the + * snapshot again!), but instead just attempts to ensure that the files match the expected + * files and are the same length. + *

+ * Taking an online snapshots can race against other operations and this is an last line of + * defense. For example, if meta changes between when snapshots are taken not all regions of a + * table may be present. This can be caused by a region split (daughters present on this scan, + * but snapshot took parent), or move (snapshots only checks lists of region servers, a move could + * have caused a region to be skipped or done twice). *

* Current snapshot files checked: *

    @@ -65,11 +71,9 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil; *
  1. All the hfiles are present (either in .archive directory in the region)
  2. *
  3. All recovered.edits files are present (by name) and have the correct file size
  4. * - *
  5. HLogs for each server running the snapshot have been referenced - *
      - *
    • Only checked for {@link Type#GLOBAL} snapshots
    • - *
    - *
  6. + *
  7. HLogs for each server running the snapshot have been referenced. (In the design for + * in the {@link Type#GLOBAL} or {@link Type#LOGROLL} online snapshots
  8. + *
*/ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -112,22 +116,6 @@ public final class MasterSnapshotVerifier { // check that each region is valid verifyRegions(snapshotDir); - - // check that the hlogs, if they exist, are valid - if (shouldCheckLogs(snapshot.getType())) { - verifyLogs(snapshotDir, snapshotServers); - } - } - - /** - * Check to see if the snapshot should verify the logs directory based on the type of the logs. - * @param type type of snapshot being taken - * @return true if the logs directory should be verified, false otherwise - */ - private boolean shouldCheckLogs(Type type) { - // This is better handled in the Type enum via type, but since its PB based, this is the - // simplest way to handle it - return type.equals(Type.GLOBAL); } /** @@ -151,7 +139,7 @@ public final class MasterSnapshotVerifier { } /** - * Check that all the regions in the the snapshot are valid + * Check that all the regions in the the snapshot are valid, and accounted for. * @param snapshotDir snapshot directory to check * @throws IOException if we can't reach .META. or read the files from the FS */ @@ -177,6 +165,7 @@ public final class MasterSnapshotVerifier { // make sure we have region in the snapshot Path regionDir = new Path(snapshotDir, region.getEncodedName()); if (!fs.exists(regionDir)) { + // could happen due to a move or split race. throw new CorruptedSnapshotException("No region directory found for region:" + region, snapshot); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 9a2f1229931..83e01f1d2d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,8 +51,9 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.SnapshotSentinel; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; -import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner; -import org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; @@ -66,6 +68,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.zookeeper.KeeperException; /** * This class manages the procedure of taking and restoring snapshots. There is only one @@ -100,7 +103,7 @@ public class SnapshotManager implements Stoppable { * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for * completion. */ - public static final String SNAPSHOT_TIMEMOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis"; + public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis"; /** Name of the operation to use in the controller */ public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot"; @@ -113,6 +116,7 @@ public class SnapshotManager implements Stoppable { private boolean stopped; private final long wakeFrequency; private final MasterServices master; // Needed by TableEventHandlers + private final ProcedureCoordinator coordinator; // Is snapshot feature enabled? private boolean isSnapshotSupported = false; @@ -132,18 +136,44 @@ public class SnapshotManager implements Stoppable { /** * Construct a snapshot manager. * @param master - * @param comms */ - public SnapshotManager(final MasterServices master) throws IOException, UnsupportedOperationException { + public SnapshotManager(final MasterServices master) throws KeeperException, IOException, + UnsupportedOperationException { this.master = master; + checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); // get the configuration for the coordinator Configuration conf = master.getConfiguration(); this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); + long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); + + // setup the default procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency); + ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( + master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); + this.coordinator = new ProcedureCoordinator(comms, tpool); this.rootDir = master.getMasterFileSystem().getRootDir(); this.executorService = master.getExecutorService(); + resetTempDir(); + } + /** + * Fully specify all necessary components of a snapshot manager. Exposed for testing. + * @param master services for the master where the manager is running + * @param coordinator procedure coordinator instance. exposed for testing. + * @param pool HBase ExecutorServcie instance, exposed for testing. + */ + public SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, ExecutorService pool) + throws IOException, UnsupportedOperationException { + this.master = master; checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); + + this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY, + SNAPSHOT_WAKE_MILLIS_DEFAULT); + this.coordinator = coordinator; + this.rootDir = master.getMasterFileSystem().getRootDir(); + this.executorService = pool; resetTempDir(); } @@ -368,6 +398,38 @@ public class SnapshotManager implements Stoppable { } } + /** + * Take a snapshot of a enabled table. + *

+ * The thread limitation on the executorService's thread pool for snapshots ensures the + * snapshot won't be started if there is another snapshot already running. Does + * not check to see if another snapshot of the same name already exists. + * @param snapshot description of the snapshot to take. + * @throws HBaseSnapshotException if the snapshot could not be started + */ + private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) + throws HBaseSnapshotException { + TakeSnapshotHandler handler; + try { + handler = new EnabledTableSnapshotHandler(snapshot, master, this); + this.executorService.submit(handler); + this.handler = handler; + } catch (IOException e) { + // cleanup the working directory by trying to delete it from the fs. + Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); + try { + if (this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) { + LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:" + + snapshot); + } + } catch (IOException e1) { + LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:" + snapshot); + } + // fail the snapshot + throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot); + } + } + /** * Take a snapshot based on the enabled/disabled state of the table. * @@ -418,7 +480,8 @@ public class SnapshotManager implements Stoppable { AssignmentManager assignmentMgr = master.getAssignmentManager(); if (assignmentMgr.getZKTable().isEnabledTable(snapshot.getTable())) { LOG.debug("Table enabled, starting distributed snapshot."); - throw new UnsupportedOperationException("Snapshots of enabled tables is not yet supported"); + snapshotEnabledTable(snapshot); + LOG.debug("Started snapshot: " + snapshot); } // For disabled table, snapshot is created by the master else if (assignmentMgr.getZKTable().isDisabledTable(snapshot.getTable())) { @@ -442,7 +505,8 @@ public class SnapshotManager implements Stoppable { /** * Take a snapshot of a disabled table. *

- * Ensures the snapshot won't be started if there is another snapshot already running. Does + * The thread limitation on the executorService's thread pool for snapshots ensures the + * snapshot won't be started if there is another snapshot already running. Does * not check to see if another snapshot of the same name already exists. * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}. * @throws HBaseSnapshotException if the snapshot could not be started @@ -456,8 +520,8 @@ public class SnapshotManager implements Stoppable { DisabledTableSnapshotHandler handler; try { handler = new DisabledTableSnapshotHandler(snapshot, this.master); - this.handler = handler; this.executorService.submit(handler); + this.handler = handler; } catch (IOException e) { // cleanup the working directory by trying to delete it from the fs. Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); @@ -486,6 +550,13 @@ public class SnapshotManager implements Stoppable { this.handler = handler; } + /** + * @return distributed commit coordinator for all running snapshots + */ + ProcedureCoordinator getCoordinator() { + return coordinator; + } + /** * Check to see if the snapshot is one of the currently completed snapshots * @param expected snapshot to check diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index ac1d75fb4a5..69ffed03f71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -145,6 +145,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh } catch (Exception e) { LOG.error("Got exception taking snapshot", e); String reason = "Failed due to exception:" + e.getMessage(); + LOG.error("Got exception taking snapshot", e); ForeignException ee = new ForeignException(reason, e); monitor.receive(ee); // need to mark this completed to close off and allow cleanup to happen. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4f000f60891..62c908e7961 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -52,7 +52,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; -import com.google.protobuf.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -101,8 +100,8 @@ import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; @@ -148,6 +147,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -186,9 +187,10 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler; +import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -219,12 +221,10 @@ import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Function; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; - /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -435,6 +435,9 @@ public class HRegionServer implements ClientProtocol, private RegionServerCoprocessorHost rsHost; + /** Handle all the snapshot requests to this server */ + RegionServerSnapshotManager snapshotManager; + /** * Starts a HRegionServer at the default location * @@ -779,6 +782,13 @@ public class HRegionServer implements ClientProtocol, } catch (KeeperException e) { this.abort("Failed to retrieve Cluster ID",e); } + + // watch for snapshots + try { + this.snapshotManager = new RegionServerSnapshotManager(this); + } catch (KeeperException e) { + this.abort("Failed to reach zk cluster when creating snapshot handler."); + } } /** @@ -856,6 +866,9 @@ public class HRegionServer implements ClientProtocol, } } + // start the snapshot handler, since the server is ready to run + this.snapshotManager.start(); + // We registered with the Master. Go into run mode. long lastMsg = 0; long oldRequestCount = -1; @@ -933,6 +946,12 @@ public class HRegionServer implements ClientProtocol, if (this.compactionChecker != null) this.compactionChecker.interrupt(); + try { + if (snapshotManager != null) snapshotManager.stop(this.abortRequested); + } catch (IOException e) { + LOG.warn("Failed to close snapshot handler cleanly", e); + } + if (this.killed) { // Just skip out w/o closing regions. Used when testing. } else if (abortRequested) { @@ -949,6 +968,13 @@ public class HRegionServer implements ClientProtocol, // handlers are stuck waiting on meta or root. if (this.catalogTracker != null) this.catalogTracker.stop(); + // stop the snapshot handler, forcefully killing all running tasks + try { + if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed); + } catch (IOException e) { + LOG.warn("Failed to close snapshot handler cleanly", e); + } + // Closing the compactSplit thread before closing meta regions if (!this.killed && containsMetaTableRegions()) { if (!abortRequested || this.fsOk) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java new file mode 100644 index 00000000000..d1317c62a32 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.snapshot; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * This manager class handles the work dealing with snapshots for a {@link HRegionServer}. + *

+ * This provides the mechanism necessary to kick off a online snapshot specific + * {@link Subprocedure} that is responsible for the regions being served by this region server. + * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure + * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all + * others. + *

+ * On startup, requires {@link #start()} to be called. + *

+ * On shutdown, requires {@link #close()} to be called + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RegionServerSnapshotManager { + private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class); + + /** Maximum number of concurrent snapshot region tasks that can run concurrently */ + private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3; + + /** Conf key for number of request threads to start snapshots on regionservers */ + public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads"; + /** # of threads for snapshotting regions on the rs. */ + public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10; + + /** Conf key for max time to keep threads in snapshot request pool waiting */ + public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout"; + /** Keep threads alive in request pool for max of 60 seconds */ + public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000; + + /** Conf key for millis between checks to see if snapshot completed or if there are errors*/ + public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency"; + /** Default amount of time to check for errors while regions finish snapshotting */ + private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private final RegionServerServices rss; + private final ProcedureMemberRpcs memberRpcs; + private final ProcedureMember member; + private final long wakeMillis; + private final SnapshotSubprocedurePool taskManager; + + /** + * Exposed for testing. + * @param conf + * @param parent parent running the snapshot handler + * @param controller use a custom snapshot controller + * @param cohortMember use a custom cohort member + */ + RegionServerSnapshotManager(Configuration conf, HRegionServer parent, + ProcedureMemberRpcs controller, ProcedureMember cohortMember) { + this.rss = parent; + this.memberRpcs = controller; + this.member = cohortMember; + // read in the snapshot request configuration properties + wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); + taskManager = new SnapshotSubprocedurePool(parent, conf); + } + + /** + * Create a default snapshot handler - uses a zookeeper based cohort controller. + * @param conf configuration to use for extracting information like thread pool properties and + * frequency to check for errors (wake frequency). + * @param rss region server running the handler + * @throws KeeperException if the zookeeper cluster cannot be reached + */ + public RegionServerSnapshotManager(RegionServerServices rss) + throws KeeperException { + this.rss = rss; + ZooKeeperWatcher zkw = rss.getZooKeeper(); + String nodeName = rss.getServerName().toString(); + this.memberRpcs = new ZKProcedureMemberRpcs(zkw, + SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName); + + // read in the snapshot request configuration properties + Configuration conf = rss.getConfiguration(); + wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); + long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); + + // create the actual snapshot procedure member + ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads, nodeName); + this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); + + // setup the task manager to run all the snapshots tasks + taskManager = new SnapshotSubprocedurePool(rss, conf); + } + + /** + * Start accepting snapshot requests. + */ + public void start() { + this.memberRpcs.start(member); + } + + /** + * Close this and all running snapshot tasks + * @param force forcefully stop all running tasks + * @throws IOException + */ + public void stop(boolean force) throws IOException { + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping RegionServerSnapshotManager " + mode + "."); + + if (force) { + this.taskManager.stop(); + } else { + this.taskManager.shutdown(); + } + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure for handling an online snapshot. + * + * Because this gets the local list of regions to snapshot and not the set the master had, + * there is a possibility of a race where regions may be missed. This detected by the master in + * the snapshot verification step. + * + * @param snapshot + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure(SnapshotDescription snapshot) { + + // don't run a snapshot if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName() + + ", because stopping/stopped!"); + } + + // check to see if this server is hosting any regions for the snapshots + // check to see if we have regions for the snapshot + List involvedRegions; + try { + involvedRegions = getRegionsToSnapshot(snapshot); + } catch (IOException e1) { + throw new IllegalStateException("Failed to figure out if we should handle a snapshot - " + + "something has gone awry with the online regions.", e1); + } + + // We need to run the subprocedure even if we have no relevant regions. The coordinator + // expects participation in the procedure and without sending message the snapshot attempt + // will hang and fail. + + LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " + snapshot.getTable()); + ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(); + switch (snapshot.getType()) { + default: + throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType()); + } + } + + /** + * Determine if the snapshot should be handled on this server + * + * NOTE: This is racy -- the master expects a list of regionservers, but the regions get the + * regions. This means if a region moves somewhere between the calls we'll miss some regions. + * For example, a region move during a snapshot could result in a region to be skipped or done + * twice. This is manageable because the {@link MasterSnapshotVerifier} will double check the + * region lists after the online portion of the snapshot completes and will explicitly fail the + * snapshot. + * + * @param snapshot + * @return the list of online regions. Empty list is returned if no regions are responsible for + * the given snapshot. + * @throws IOException + */ + private List getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException { + byte[] table = Bytes.toBytes(snapshot.getTable()); + return rss.getOnlineRegions(table); + } + + /** + * Build the actual snapshot runner that will do all the 'hard' work + */ + public class SnapshotSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + try { + // unwrap the snapshot information + SnapshotDescription snapshot = SnapshotDescription.parseFrom(data); + return RegionServerSnapshotManager.this.buildSubprocedure(snapshot); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Could not read snapshot information from request."); + } + } + + } + + /** + * We use the SnapshotSubprocedurePool, a class specific thread pool instead of + * {@link org.apache.hadoop.hbase.executor.ExecutorService}. + * + * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of + * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation + * failures. + * + * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't + * really built for coordinated tasks where multiple threads as part of one larger task. In + * RS's the HBase Executor services are only used for open and close and not other threadpooled + * operations such as compactions and replication sinks. + */ + static class SnapshotSubprocedurePool { + private final ExecutorCompletionService taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean stopped; + private final List> futures = new ArrayList>(); + private final String name; + + SnapshotSubprocedurePool(Server parent, Configuration conf) { + // configure the executor service + long keepAlive = conf.getLong( + RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, + RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); + this.name = parent.getServerName().toString(); + executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new DaemonThreadFactory("rs(" + + name + ")-snapshot-pool")); + taskPool = new ExecutorCompletionService(executor); + } + + /** + * Submit a task to the pool. + * + * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This + * version does not support issuing tasks from multiple concurrnty table snapshots requests. + */ + void submitTask(final Callable task) { + Future f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. + * This *must* be called to after all tasks are submitted via submitTask. + * + * TODO: to support multiple concurrent snapshots this code needs to be rewritten. The common + * pool of futures not being thread safe is part of the problem. + * + * @return true on success, false otherwise + * @throws SnapshotCreationException if the snapshot failed while we were waiting + */ + boolean waitForOutstandingTasks() throws ForeignException { + LOG.debug("Waiting for local region snapshots to finish."); + + try { + // Using the completion service to process the futures that finish first first. + int sz = futures.size(); + for (int i = 0; i < sz; i++) { + Future f = taskPool.take(); + f.get(); + if (!futures.remove(f)) { + LOG.warn("unexpected future"); + } + LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots."); + } + LOG.debug("Completed " + sz + " local region snapshots."); + return true; + } catch (InterruptedException e) { + // TODO this isn't completely right and needs to be revisited. + LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e); + if (stopped) { + throw new ForeignException("SnapshotSubprocedurePool", e); + } + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + throw (ForeignException)e.getCause(); + } + throw new ForeignException(name, e.getCause()); + } finally { + // close off remaining tasks + for (Future f: futures) { + if (!f.isDone()){ + LOG.warn("cancelling region task"); + f.cancel(true); + } + } + futures.clear(); + } + return false; + } + + /** + * This attempts to cancel out all pending and in progress tasks (interruptions issues) + */ + void cancelTasks() { + for (Future f: futures) { + f.cancel(true); + } + } + + /** + * This politely shutsdown the thread pool. Call when gracefully exiting a region server. + */ + void shutdown() { + this.executor.shutdown(); + } + + /** + * This abruptly shutsdown the thread pool. Call when exiting a region server. + */ + void stop() { + if (this.stopped) return; + + this.stopped = true; + this.executor.shutdownNow(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java index e42615bc749..7753c23a73b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotDescriptionUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.snapshot; import java.io.IOException; -import java.io.FileNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -111,31 +110,15 @@ public class SnapshotDescriptionUtils { /** Temporary directory under the snapshot directory to store in-progress snapshots */ public static final String SNAPSHOT_TMP_DIR_NAME = ".tmp"; - // snapshot operation values /** Default value if no start time is specified */ public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0; - public static final String MASTER_WAIT_TIME_GLOBAL_SNAPSHOT = "hbase.snapshot.global.master.timeout"; - public static final String REGION_WAIT_TIME_GLOBAL_SNAPSHOT = "hbase.snapshot.global.region.timeout"; - public static final String MASTER_WAIT_TIME_TIMESTAMP_SNAPSHOT = "hbase.snapshot.timestamp.master.timeout"; - public static final String REGION_WAIT_TIME_TIMESTAMP_SNAPSHOT = "hbase.snapshot.timestamp.region.timeout"; - public static final String MASTER_WAIT_TIME_DISABLED_SNAPSHOT = "hbase.snapshot.disabled.master.timeout"; - - /** Default timeout of 60 sec for a snapshot timeout on a region */ - public static final long DEFAULT_REGION_SNAPSHOT_TIMEOUT = 60000; + public static final String MASTER_SNAPSHOT_TIMEOUT_MILLIS = "hbase.snapshot.master.timeout.millis"; /** By default, wait 60 seconds for a snapshot to complete */ public static final long DEFAULT_MAX_WAIT_TIME = 60000; - /** - * Conf key for amount of time the in the future a timestamp snapshot should be taken (ms). - * Defaults to {@value SnapshotDescriptionUtils#DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE} - */ - public static final String TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION = "hbase.snapshot.timestamp.master.splittime"; - /** Start 2 seconds in the future, if no start time given */ - public static final long DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE = 2000; - private SnapshotDescriptionUtils() { // private constructor for utility class } @@ -165,35 +148,9 @@ public class SnapshotDescriptionUtils { long defaultMaxWaitTime) { String confKey; switch (type) { - case GLOBAL: - confKey = MASTER_WAIT_TIME_GLOBAL_SNAPSHOT; - break; - case TIMESTAMP: - - confKey = MASTER_WAIT_TIME_TIMESTAMP_SNAPSHOT; case DISABLED: default: - confKey = MASTER_WAIT_TIME_DISABLED_SNAPSHOT; - } - return conf.getLong(confKey, defaultMaxWaitTime); - } - - /** - * @param conf {@link Configuration} from which to check for the timeout - * @param type type of snapshot being taken - * @param defaultMaxWaitTime Default amount of time to wait, if none is in the configuration - * @return the max amount of time the region should wait for a snapshot to complete - */ - public static long getMaxRegionTimeout(Configuration conf, SnapshotDescription.Type type, - long defaultMaxWaitTime) { - String confKey; - switch (type) { - case GLOBAL: - confKey = REGION_WAIT_TIME_GLOBAL_SNAPSHOT; - break; - case TIMESTAMP: - default: - confKey = REGION_WAIT_TIME_TIMESTAMP_SNAPSHOT; + confKey = MASTER_SNAPSHOT_TIMEOUT_MILLIS; } return conf.getLong(confKey, defaultMaxWaitTime); } @@ -299,13 +256,6 @@ public class SnapshotDescriptionUtils { long time = snapshot.getCreationTime(); if (time == SnapshotDescriptionUtils.NO_SNAPSHOT_START_TIME_SPECIFIED) { time = EnvironmentEdgeManager.currentTimeMillis(); - if (snapshot.getType().equals(SnapshotDescription.Type.TIMESTAMP)) { - long increment = conf.getLong( - SnapshotDescriptionUtils.TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION, - SnapshotDescriptionUtils.DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE); - LOG.debug("Setting timestamp snapshot in future by " + increment + " ms."); - time += increment; - } LOG.debug("Creation time not specified, setting to:" + time + " (current time:" + EnvironmentEdgeManager.currentTimeMillis() + ")."); SnapshotDescription.Builder builder = snapshot.toBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java index d8689b87b73..c3bad50bf90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/TakeSnapshotUtils.java @@ -113,7 +113,6 @@ public class TakeSnapshotUtils { * @param monitor monitor to notify when the snapshot life expires * @return the timer to use update to signal the start and end of the snapshot */ - @SuppressWarnings("rawtypes") public static TimeoutExceptionInjector getMasterTimerAndBindToMonitor(SnapshotDescription snapshot, Configuration conf, ForeignExceptionListener monitor) { long maxTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java index 9baa5c17ab4..3de6b456039 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromClient.java @@ -182,11 +182,11 @@ public class TestSnapshotFromClient { boolean fail = false; do { try { - admin.getTableDescriptor(Bytes.toBytes(tableName)); - fail = true; - LOG.error("Table:" + tableName + " already exists, checking a new name"); - tableName = tableName+"!"; - }catch(TableNotFoundException e) { + admin.getTableDescriptor(Bytes.toBytes(tableName)); + fail = true; + LOG.error("Table:" + tableName + " already exists, checking a new name"); + tableName = tableName+"!"; + } catch (TableNotFoundException e) { fail = false; } } while (fail); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java index fe6eb8abeeb..c84b6c03f51 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/snapshot/TestSnapshotManager.java @@ -34,8 +34,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; -import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner; -import org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.zookeeper.KeeperException; import org.junit.Test; @@ -50,6 +49,7 @@ public class TestSnapshotManager { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); MasterServices services = Mockito.mock(MasterServices.class); + ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class); ExecutorService pool = Mockito.mock(ExecutorService.class); MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class); FileSystem fs; @@ -71,7 +71,7 @@ public class TestSnapshotManager { Mockito.when(services.getMasterFileSystem()).thenReturn(mfs); Mockito.when(mfs.getFileSystem()).thenReturn(fs); Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir()); - return new SnapshotManager(services); + return new SnapshotManager(services, coordinator, pool); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotDescriptionUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotDescriptionUtils.java index c93739dba90..3c452f7eae9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotDescriptionUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSnapshotDescriptionUtils.java @@ -70,43 +70,8 @@ public class TestSnapshotDescriptionUtils { private static final Log LOG = LogFactory.getLog(TestSnapshotDescriptionUtils.class); @Test - public void testValidateDescriptor() { - EnvironmentEdge edge = new EnvironmentEdge() { - @Override - public long currentTimeMillis() { - return 0; - } - }; - EnvironmentEdgeManagerTestHelper.injectEdge(edge); - - // check a basic snapshot description - SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); - builder.setName("snapshot"); - builder.setTable("table"); - - // check that time is to an amount in the future + public void testValidateMissingTableName() { Configuration conf = new Configuration(false); - conf.setLong(SnapshotDescriptionUtils.TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION, 1); - SnapshotDescription desc = SnapshotDescriptionUtils.validate(builder.build(), conf); - assertEquals("Description creation time wasn't set correctly", 1, desc.getCreationTime()); - - // test a global snapshot - edge = new EnvironmentEdge() { - @Override - public long currentTimeMillis() { - return 2; - } - }; - EnvironmentEdgeManagerTestHelper.injectEdge(edge); - builder.setType(Type.GLOBAL); - desc = SnapshotDescriptionUtils.validate(builder.build(), conf); - assertEquals("Description creation time wasn't set correctly", 2, desc.getCreationTime()); - - // test that we don't override a given value - builder.setCreationTime(10); - desc = SnapshotDescriptionUtils.validate(builder.build(), conf); - assertEquals("Description creation time wasn't set correctly", 10, desc.getCreationTime()); - try { SnapshotDescriptionUtils.validate(SnapshotDescription.newBuilder().setName("fail").build(), conf);