HBASE-6864 Online snapshots scaffolding
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c9b3e9b2da
commit
82df28fffe
|
@ -11318,7 +11318,7 @@ public final class HBaseProtos {
|
|||
boolean hasCreationTime();
|
||||
long getCreationTime();
|
||||
|
||||
// optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP];
|
||||
// optional .SnapshotDescription.Type type = 4 [default = FLUSH];
|
||||
boolean hasType();
|
||||
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type getType();
|
||||
|
||||
|
@ -11357,13 +11357,11 @@ public final class HBaseProtos {
|
|||
public enum Type
|
||||
implements com.google.protobuf.ProtocolMessageEnum {
|
||||
DISABLED(0, 0),
|
||||
TIMESTAMP(1, 1),
|
||||
GLOBAL(2, 2),
|
||||
FLUSH(1, 1),
|
||||
;
|
||||
|
||||
public static final int DISABLED_VALUE = 0;
|
||||
public static final int TIMESTAMP_VALUE = 1;
|
||||
public static final int GLOBAL_VALUE = 2;
|
||||
public static final int FLUSH_VALUE = 1;
|
||||
|
||||
|
||||
public final int getNumber() { return value; }
|
||||
|
@ -11371,8 +11369,7 @@ public final class HBaseProtos {
|
|||
public static Type valueOf(int value) {
|
||||
switch (value) {
|
||||
case 0: return DISABLED;
|
||||
case 1: return TIMESTAMP;
|
||||
case 2: return GLOBAL;
|
||||
case 1: return FLUSH;
|
||||
default: return null;
|
||||
}
|
||||
}
|
||||
|
@ -11403,7 +11400,7 @@ public final class HBaseProtos {
|
|||
}
|
||||
|
||||
private static final Type[] VALUES = {
|
||||
DISABLED, TIMESTAMP, GLOBAL,
|
||||
DISABLED, FLUSH,
|
||||
};
|
||||
|
||||
public static Type valueOf(
|
||||
|
@ -11501,7 +11498,7 @@ public final class HBaseProtos {
|
|||
return creationTime_;
|
||||
}
|
||||
|
||||
// optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP];
|
||||
// optional .SnapshotDescription.Type type = 4 [default = FLUSH];
|
||||
public static final int TYPE_FIELD_NUMBER = 4;
|
||||
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_;
|
||||
public boolean hasType() {
|
||||
|
@ -11525,7 +11522,7 @@ public final class HBaseProtos {
|
|||
name_ = "";
|
||||
table_ = "";
|
||||
creationTime_ = 0L;
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP;
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH;
|
||||
version_ = 0;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
|
@ -11787,7 +11784,7 @@ public final class HBaseProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000002);
|
||||
creationTime_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP;
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
version_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
|
@ -12045,8 +12042,8 @@ public final class HBaseProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional .SnapshotDescription.Type type = 4 [default = TIMESTAMP];
|
||||
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP;
|
||||
// optional .SnapshotDescription.Type type = 4 [default = FLUSH];
|
||||
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH;
|
||||
public boolean hasType() {
|
||||
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||
}
|
||||
|
@ -12064,7 +12061,7 @@ public final class HBaseProtos {
|
|||
}
|
||||
public Builder clearType() {
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.TIMESTAMP;
|
||||
type_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type.FLUSH;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
@ -12243,19 +12240,19 @@ public final class HBaseProtos {
|
|||
"value\030\002 \002(\t\",\n\rNameBytesPair\022\014\n\004name\030\001 \002" +
|
||||
"(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesBytesPair\022\r\n\005f" +
|
||||
"irst\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n\rNameInt64P" +
|
||||
"air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\301\001\n\023Sna" +
|
||||
"air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\255\001\n\023Sna" +
|
||||
"pshotDescription\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030" +
|
||||
"\002 \001(\t\022\027\n\014creationTime\030\003 \001(\003:\0010\0222\n\004type\030\004" +
|
||||
" \001(\0162\031.SnapshotDescription.Type:\tTIMESTA" +
|
||||
"MP\022\017\n\007version\030\005 \001(\005\"/\n\004Type\022\014\n\010DISABLED\020" +
|
||||
"\000\022\r\n\tTIMESTAMP\020\001\022\n\n\006GLOBAL\020\002*r\n\013CompareT" +
|
||||
"ype\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005EQUA",
|
||||
"L\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUAL\020\004" +
|
||||
"\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006*_\n\007KeyType\022\013\n\007M" +
|
||||
"INIMUM\020\000\022\007\n\003PUT\020\004\022\n\n\006DELETE\020\010\022\021\n\rDELETE_" +
|
||||
"COLUMN\020\014\022\021\n\rDELETE_FAMILY\020\016\022\014\n\007MAXIMUM\020\377" +
|
||||
"\001B>\n*org.apache.hadoop.hbase.protobuf.ge" +
|
||||
"neratedB\013HBaseProtosH\001\240\001\001"
|
||||
"\002 \001(\t\022\027\n\014creationTime\030\003 \001(\003:\0010\022.\n\004type\030\004" +
|
||||
" \001(\0162\031.SnapshotDescription.Type:\005FLUSH\022\017" +
|
||||
"\n\007version\030\005 \001(\005\"\037\n\004Type\022\014\n\010DISABLED\020\000\022\t\n" +
|
||||
"\005FLUSH\020\001*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n\rLES" +
|
||||
"S_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024",
|
||||
"\n\020GREATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_" +
|
||||
"OP\020\006*_\n\007KeyType\022\013\n\007MINIMUM\020\000\022\007\n\003PUT\020\004\022\n\n" +
|
||||
"\006DELETE\020\010\022\021\n\rDELETE_COLUMN\020\014\022\021\n\rDELETE_F" +
|
||||
"AMILY\020\016\022\014\n\007MAXIMUM\020\377\001B>\n*org.apache.hado" +
|
||||
"op.hbase.protobuf.generatedB\013HBaseProtos" +
|
||||
"H\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
|
@ -278,9 +278,8 @@ message SnapshotDescription {
|
|||
optional int64 creationTime = 3 [default = 0];
|
||||
enum Type {
|
||||
DISABLED = 0;
|
||||
TIMESTAMP = 1;
|
||||
GLOBAL = 2;
|
||||
FLUSH = 1;
|
||||
}
|
||||
optional Type type = 4 [default = TIMESTAMP];
|
||||
optional Type type = 4 [default = FLUSH];
|
||||
optional int32 version = 5;
|
||||
}
|
||||
|
|
|
@ -2133,7 +2133,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
*/
|
||||
public void snapshot(final String snapshotName, final String tableName) throws IOException,
|
||||
SnapshotCreationException, IllegalArgumentException {
|
||||
snapshot(snapshotName, tableName, SnapshotDescription.Type.TIMESTAMP);
|
||||
snapshot(snapshotName, tableName, SnapshotDescription.Type.FLUSH);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.procedure.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Handle the master side of taking a snapshot of an online table, regardless of snapshot type.
|
||||
* Uses a {@link Procedure} to run the snapshot across all the involved regions.
|
||||
* @see ProcedureCoordinator
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(EnabledTableSnapshotHandler.class);
|
||||
private final ProcedureCoordinator coordinator;
|
||||
|
||||
public EnabledTableSnapshotHandler(SnapshotDescription snapshot, MasterServices master,
|
||||
SnapshotManager manager) throws IOException {
|
||||
super(snapshot, master);
|
||||
this.coordinator = manager.getCoordinator();
|
||||
}
|
||||
|
||||
// TODO consider switching over to using regionnames, rather than server names. This would allow
|
||||
// regions to migrate during a snapshot, and then be involved when they are ready. Still want to
|
||||
// enforce a snapshot time constraints, but lets us be potentially a bit more robust.
|
||||
|
||||
/**
|
||||
* This method kicks off a snapshot procedure. Other than that it hangs around for various
|
||||
* phases to complete.
|
||||
*/
|
||||
@Override
|
||||
protected void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regions)
|
||||
throws HBaseSnapshotException {
|
||||
Set<String> regionServers = new HashSet<String>(regions.size());
|
||||
for (Pair<HRegionInfo, ServerName> region : regions) {
|
||||
regionServers.add(region.getSecond().toString());
|
||||
}
|
||||
|
||||
// start the snapshot on the RS
|
||||
Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
|
||||
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
|
||||
if (proc == null) {
|
||||
String msg = "Failed to submit distribute procedure for snapshot '"
|
||||
+ snapshot.getName() + "'";
|
||||
LOG.error(msg);
|
||||
throw new HBaseSnapshotException(msg);
|
||||
}
|
||||
|
||||
try {
|
||||
// wait for the snapshot to complete. A timer thread is kicked off that should cancel this
|
||||
// if it takes too long.
|
||||
proc.waitForCompleted();
|
||||
LOG.info("Done waiting - snapshot finished!");
|
||||
} catch (InterruptedException e) {
|
||||
ForeignException ee =
|
||||
new ForeignException("Interrupted while waiting for snapshot to finish", e);
|
||||
monitor.receive(ee);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ForeignException e) {
|
||||
monitor.receive(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -48,10 +48,16 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
|||
/**
|
||||
* General snapshot verification on the master.
|
||||
* <p>
|
||||
* This is a light-weight verification mechanism for all the files in a snapshot. It doesn't attempt
|
||||
* to verify that the files are exact copies (that would be paramount to taking the snapshot
|
||||
* again!), but instead just attempts to ensure that the files match the expected files and are the
|
||||
* same length.
|
||||
* This is a light-weight verification mechanism for all the files in a snapshot. It doesn't
|
||||
* attempt to verify that the files are exact copies (that would be paramount to taking the
|
||||
* snapshot again!), but instead just attempts to ensure that the files match the expected
|
||||
* files and are the same length.
|
||||
* <p>
|
||||
* Taking an online snapshots can race against other operations and this is an last line of
|
||||
* defense. For example, if meta changes between when snapshots are taken not all regions of a
|
||||
* table may be present. This can be caused by a region split (daughters present on this scan,
|
||||
* but snapshot took parent), or move (snapshots only checks lists of region servers, a move could
|
||||
* have caused a region to be skipped or done twice).
|
||||
* <p>
|
||||
* Current snapshot files checked:
|
||||
* <ol>
|
||||
|
@ -65,11 +71,9 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
|||
* <li>All the hfiles are present (either in .archive directory in the region)</li>
|
||||
* <li>All recovered.edits files are present (by name) and have the correct file size</li>
|
||||
* </ul>
|
||||
* <li>HLogs for each server running the snapshot have been referenced
|
||||
* <ul>
|
||||
* <li>Only checked for {@link Type#GLOBAL} snapshots</li>
|
||||
* </ul>
|
||||
* </li>
|
||||
* <li>HLogs for each server running the snapshot have been referenced. (In the design for
|
||||
* in the {@link Type#GLOBAL} or {@link Type#LOGROLL} online snapshots</li>
|
||||
* </ol>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -112,22 +116,6 @@ public final class MasterSnapshotVerifier {
|
|||
|
||||
// check that each region is valid
|
||||
verifyRegions(snapshotDir);
|
||||
|
||||
// check that the hlogs, if they exist, are valid
|
||||
if (shouldCheckLogs(snapshot.getType())) {
|
||||
verifyLogs(snapshotDir, snapshotServers);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if the snapshot should verify the logs directory based on the type of the logs.
|
||||
* @param type type of snapshot being taken
|
||||
* @return <tt>true</tt> if the logs directory should be verified, <tt>false</tt> otherwise
|
||||
*/
|
||||
private boolean shouldCheckLogs(Type type) {
|
||||
// This is better handled in the Type enum via type, but since its PB based, this is the
|
||||
// simplest way to handle it
|
||||
return type.equals(Type.GLOBAL);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -151,7 +139,7 @@ public final class MasterSnapshotVerifier {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check that all the regions in the the snapshot are valid
|
||||
* Check that all the regions in the the snapshot are valid, and accounted for.
|
||||
* @param snapshotDir snapshot directory to check
|
||||
* @throws IOException if we can't reach .META. or read the files from the FS
|
||||
*/
|
||||
|
@ -177,6 +165,7 @@ public final class MasterSnapshotVerifier {
|
|||
// make sure we have region in the snapshot
|
||||
Path regionDir = new Path(snapshotDir, region.getEncodedName());
|
||||
if (!fs.exists(regionDir)) {
|
||||
// could happen due to a move or split race.
|
||||
throw new CorruptedSnapshotException("No region directory found for region:" + region,
|
||||
snapshot);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -50,8 +51,9 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.SnapshotSentinel;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
|
||||
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
|
||||
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
|
||||
|
@ -66,6 +68,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* This class manages the procedure of taking and restoring snapshots. There is only one
|
||||
|
@ -100,7 +103,7 @@ public class SnapshotManager implements Stoppable {
|
|||
* Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
|
||||
* completion.
|
||||
*/
|
||||
public static final String SNAPSHOT_TIMEMOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
|
||||
public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
|
||||
|
||||
/** Name of the operation to use in the controller */
|
||||
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
|
||||
|
@ -113,6 +116,7 @@ public class SnapshotManager implements Stoppable {
|
|||
private boolean stopped;
|
||||
private final long wakeFrequency;
|
||||
private final MasterServices master; // Needed by TableEventHandlers
|
||||
private final ProcedureCoordinator coordinator;
|
||||
|
||||
// Is snapshot feature enabled?
|
||||
private boolean isSnapshotSupported = false;
|
||||
|
@ -132,18 +136,44 @@ public class SnapshotManager implements Stoppable {
|
|||
/**
|
||||
* Construct a snapshot manager.
|
||||
* @param master
|
||||
* @param comms
|
||||
*/
|
||||
public SnapshotManager(final MasterServices master) throws IOException, UnsupportedOperationException {
|
||||
public SnapshotManager(final MasterServices master) throws KeeperException, IOException,
|
||||
UnsupportedOperationException {
|
||||
this.master = master;
|
||||
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
|
||||
|
||||
// get the configuration for the coordinator
|
||||
Configuration conf = master.getConfiguration();
|
||||
this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
|
||||
long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||
|
||||
// setup the default procedure coordinator
|
||||
String name = master.getServerName().toString();
|
||||
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency);
|
||||
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
|
||||
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
|
||||
this.coordinator = new ProcedureCoordinator(comms, tpool);
|
||||
this.rootDir = master.getMasterFileSystem().getRootDir();
|
||||
this.executorService = master.getExecutorService();
|
||||
resetTempDir();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fully specify all necessary components of a snapshot manager. Exposed for testing.
|
||||
* @param master services for the master where the manager is running
|
||||
* @param coordinator procedure coordinator instance. exposed for testing.
|
||||
* @param pool HBase ExecutorServcie instance, exposed for testing.
|
||||
*/
|
||||
public SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, ExecutorService pool)
|
||||
throws IOException, UnsupportedOperationException {
|
||||
this.master = master;
|
||||
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
|
||||
|
||||
this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
|
||||
SNAPSHOT_WAKE_MILLIS_DEFAULT);
|
||||
this.coordinator = coordinator;
|
||||
this.rootDir = master.getMasterFileSystem().getRootDir();
|
||||
this.executorService = pool;
|
||||
resetTempDir();
|
||||
}
|
||||
|
||||
|
@ -368,6 +398,38 @@ public class SnapshotManager implements Stoppable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Take a snapshot of a enabled table.
|
||||
* <p>
|
||||
* The thread limitation on the executorService's thread pool for snapshots ensures the
|
||||
* snapshot won't be started if there is another snapshot already running. Does
|
||||
* <b>not</b> check to see if another snapshot of the same name already exists.
|
||||
* @param snapshot description of the snapshot to take.
|
||||
* @throws HBaseSnapshotException if the snapshot could not be started
|
||||
*/
|
||||
private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
|
||||
throws HBaseSnapshotException {
|
||||
TakeSnapshotHandler handler;
|
||||
try {
|
||||
handler = new EnabledTableSnapshotHandler(snapshot, master, this);
|
||||
this.executorService.submit(handler);
|
||||
this.handler = handler;
|
||||
} catch (IOException e) {
|
||||
// cleanup the working directory by trying to delete it from the fs.
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||
try {
|
||||
if (this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
|
||||
LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:"
|
||||
+ snapshot);
|
||||
}
|
||||
} catch (IOException e1) {
|
||||
LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:" + snapshot);
|
||||
}
|
||||
// fail the snapshot
|
||||
throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Take a snapshot based on the enabled/disabled state of the table.
|
||||
*
|
||||
|
@ -418,7 +480,8 @@ public class SnapshotManager implements Stoppable {
|
|||
AssignmentManager assignmentMgr = master.getAssignmentManager();
|
||||
if (assignmentMgr.getZKTable().isEnabledTable(snapshot.getTable())) {
|
||||
LOG.debug("Table enabled, starting distributed snapshot.");
|
||||
throw new UnsupportedOperationException("Snapshots of enabled tables is not yet supported");
|
||||
snapshotEnabledTable(snapshot);
|
||||
LOG.debug("Started snapshot: " + snapshot);
|
||||
}
|
||||
// For disabled table, snapshot is created by the master
|
||||
else if (assignmentMgr.getZKTable().isDisabledTable(snapshot.getTable())) {
|
||||
|
@ -442,7 +505,8 @@ public class SnapshotManager implements Stoppable {
|
|||
/**
|
||||
* Take a snapshot of a disabled table.
|
||||
* <p>
|
||||
* Ensures the snapshot won't be started if there is another snapshot already running. Does
|
||||
* The thread limitation on the executorService's thread pool for snapshots ensures the
|
||||
* snapshot won't be started if there is another snapshot already running. Does
|
||||
* <b>not</b> check to see if another snapshot of the same name already exists.
|
||||
* @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
|
||||
* @throws HBaseSnapshotException if the snapshot could not be started
|
||||
|
@ -456,8 +520,8 @@ public class SnapshotManager implements Stoppable {
|
|||
DisabledTableSnapshotHandler handler;
|
||||
try {
|
||||
handler = new DisabledTableSnapshotHandler(snapshot, this.master);
|
||||
this.handler = handler;
|
||||
this.executorService.submit(handler);
|
||||
this.handler = handler;
|
||||
} catch (IOException e) {
|
||||
// cleanup the working directory by trying to delete it from the fs.
|
||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||
|
@ -486,6 +550,13 @@ public class SnapshotManager implements Stoppable {
|
|||
this.handler = handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return distributed commit coordinator for all running snapshots
|
||||
*/
|
||||
ProcedureCoordinator getCoordinator() {
|
||||
return coordinator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if the snapshot is one of the currently completed snapshots
|
||||
* @param expected snapshot to check
|
||||
|
|
|
@ -145,6 +145,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
} catch (Exception e) {
|
||||
LOG.error("Got exception taking snapshot", e);
|
||||
String reason = "Failed due to exception:" + e.getMessage();
|
||||
LOG.error("Got exception taking snapshot", e);
|
||||
ForeignException ee = new ForeignException(reason, e);
|
||||
monitor.receive(ee);
|
||||
// need to mark this completed to close off and allow cleanup to happen.
|
||||
|
|
|
@ -52,7 +52,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -101,8 +100,8 @@ import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
|
@ -148,6 +147,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
|
@ -186,9 +187,10 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -219,12 +221,10 @@ import org.cliffc.high_scale_lib.Counter;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
|
||||
|
||||
/**
|
||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||
|
@ -435,6 +435,9 @@ public class HRegionServer implements ClientProtocol,
|
|||
|
||||
private RegionServerCoprocessorHost rsHost;
|
||||
|
||||
/** Handle all the snapshot requests to this server */
|
||||
RegionServerSnapshotManager snapshotManager;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
*
|
||||
|
@ -779,6 +782,13 @@ public class HRegionServer implements ClientProtocol,
|
|||
} catch (KeeperException e) {
|
||||
this.abort("Failed to retrieve Cluster ID",e);
|
||||
}
|
||||
|
||||
// watch for snapshots
|
||||
try {
|
||||
this.snapshotManager = new RegionServerSnapshotManager(this);
|
||||
} catch (KeeperException e) {
|
||||
this.abort("Failed to reach zk cluster when creating snapshot handler.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -856,6 +866,9 @@ public class HRegionServer implements ClientProtocol,
|
|||
}
|
||||
}
|
||||
|
||||
// start the snapshot handler, since the server is ready to run
|
||||
this.snapshotManager.start();
|
||||
|
||||
// We registered with the Master. Go into run mode.
|
||||
long lastMsg = 0;
|
||||
long oldRequestCount = -1;
|
||||
|
@ -933,6 +946,12 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (this.compactionChecker != null)
|
||||
this.compactionChecker.interrupt();
|
||||
|
||||
try {
|
||||
if (snapshotManager != null) snapshotManager.stop(this.abortRequested);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to close snapshot handler cleanly", e);
|
||||
}
|
||||
|
||||
if (this.killed) {
|
||||
// Just skip out w/o closing regions. Used when testing.
|
||||
} else if (abortRequested) {
|
||||
|
@ -949,6 +968,13 @@ public class HRegionServer implements ClientProtocol,
|
|||
// handlers are stuck waiting on meta or root.
|
||||
if (this.catalogTracker != null) this.catalogTracker.stop();
|
||||
|
||||
// stop the snapshot handler, forcefully killing all running tasks
|
||||
try {
|
||||
if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to close snapshot handler cleanly", e);
|
||||
}
|
||||
|
||||
// Closing the compactSplit thread before closing meta regions
|
||||
if (!this.killed && containsMetaTableRegions()) {
|
||||
if (!abortRequested || this.fsOk) {
|
||||
|
|
|
@ -0,0 +1,375 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureMember;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
|
||||
import org.apache.hadoop.hbase.procedure.Subprocedure;
|
||||
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
|
||||
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
|
||||
* <p>
|
||||
* This provides the mechanism necessary to kick off a online snapshot specific
|
||||
* {@link Subprocedure} that is responsible for the regions being served by this region server.
|
||||
* If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure
|
||||
* handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all
|
||||
* others.
|
||||
* <p>
|
||||
* On startup, requires {@link #start()} to be called.
|
||||
* <p>
|
||||
* On shutdown, requires {@link #close()} to be called
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class RegionServerSnapshotManager {
|
||||
private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
|
||||
|
||||
/** Maximum number of concurrent snapshot region tasks that can run concurrently */
|
||||
private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
|
||||
private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
|
||||
|
||||
/** Conf key for number of request threads to start snapshots on regionservers */
|
||||
public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
|
||||
/** # of threads for snapshotting regions on the rs. */
|
||||
public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
|
||||
|
||||
/** Conf key for max time to keep threads in snapshot request pool waiting */
|
||||
public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
|
||||
/** Keep threads alive in request pool for max of 60 seconds */
|
||||
public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
|
||||
|
||||
/** Conf key for millis between checks to see if snapshot completed or if there are errors*/
|
||||
public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
|
||||
/** Default amount of time to check for errors while regions finish snapshotting */
|
||||
private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
|
||||
|
||||
private final RegionServerServices rss;
|
||||
private final ProcedureMemberRpcs memberRpcs;
|
||||
private final ProcedureMember member;
|
||||
private final long wakeMillis;
|
||||
private final SnapshotSubprocedurePool taskManager;
|
||||
|
||||
/**
|
||||
* Exposed for testing.
|
||||
* @param conf
|
||||
* @param parent parent running the snapshot handler
|
||||
* @param controller use a custom snapshot controller
|
||||
* @param cohortMember use a custom cohort member
|
||||
*/
|
||||
RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
|
||||
ProcedureMemberRpcs controller, ProcedureMember cohortMember) {
|
||||
this.rss = parent;
|
||||
this.memberRpcs = controller;
|
||||
this.member = cohortMember;
|
||||
// read in the snapshot request configuration properties
|
||||
wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
taskManager = new SnapshotSubprocedurePool(parent, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a default snapshot handler - uses a zookeeper based cohort controller.
|
||||
* @param conf configuration to use for extracting information like thread pool properties and
|
||||
* frequency to check for errors (wake frequency).
|
||||
* @param rss region server running the handler
|
||||
* @throws KeeperException if the zookeeper cluster cannot be reached
|
||||
*/
|
||||
public RegionServerSnapshotManager(RegionServerServices rss)
|
||||
throws KeeperException {
|
||||
this.rss = rss;
|
||||
ZooKeeperWatcher zkw = rss.getZooKeeper();
|
||||
String nodeName = rss.getServerName().toString();
|
||||
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
|
||||
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName);
|
||||
|
||||
// read in the snapshot request configuration properties
|
||||
Configuration conf = rss.getConfiguration();
|
||||
wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
|
||||
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
|
||||
|
||||
// create the actual snapshot procedure member
|
||||
ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads, nodeName);
|
||||
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
|
||||
|
||||
// setup the task manager to run all the snapshots tasks
|
||||
taskManager = new SnapshotSubprocedurePool(rss, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start accepting snapshot requests.
|
||||
*/
|
||||
public void start() {
|
||||
this.memberRpcs.start(member);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close <tt>this</tt> and all running snapshot tasks
|
||||
* @param force forcefully stop all running tasks
|
||||
* @throws IOException
|
||||
*/
|
||||
public void stop(boolean force) throws IOException {
|
||||
String mode = force ? "abruptly" : "gracefully";
|
||||
LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
|
||||
|
||||
if (force) {
|
||||
this.taskManager.stop();
|
||||
} else {
|
||||
this.taskManager.shutdown();
|
||||
}
|
||||
|
||||
try {
|
||||
this.member.close();
|
||||
} finally {
|
||||
this.memberRpcs.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If in a running state, creates the specified subprocedure for handling an online snapshot.
|
||||
*
|
||||
* Because this gets the local list of regions to snapshot and not the set the master had,
|
||||
* there is a possibility of a race where regions may be missed. This detected by the master in
|
||||
* the snapshot verification step.
|
||||
*
|
||||
* @param snapshot
|
||||
* @return Subprocedure to submit to the ProcedureMemeber.
|
||||
*/
|
||||
public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
|
||||
|
||||
// don't run a snapshot if the parent is stop(ping)
|
||||
if (rss.isStopping() || rss.isStopped()) {
|
||||
throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
|
||||
+ ", because stopping/stopped!");
|
||||
}
|
||||
|
||||
// check to see if this server is hosting any regions for the snapshots
|
||||
// check to see if we have regions for the snapshot
|
||||
List<HRegion> involvedRegions;
|
||||
try {
|
||||
involvedRegions = getRegionsToSnapshot(snapshot);
|
||||
} catch (IOException e1) {
|
||||
throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
|
||||
+ "something has gone awry with the online regions.", e1);
|
||||
}
|
||||
|
||||
// We need to run the subprocedure even if we have no relevant regions. The coordinator
|
||||
// expects participation in the procedure and without sending message the snapshot attempt
|
||||
// will hang and fail.
|
||||
|
||||
LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " + snapshot.getTable());
|
||||
ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
|
||||
switch (snapshot.getType()) {
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the snapshot should be handled on this server
|
||||
*
|
||||
* NOTE: This is racy -- the master expects a list of regionservers, but the regions get the
|
||||
* regions. This means if a region moves somewhere between the calls we'll miss some regions.
|
||||
* For example, a region move during a snapshot could result in a region to be skipped or done
|
||||
* twice. This is manageable because the {@link MasterSnapshotVerifier} will double check the
|
||||
* region lists after the online portion of the snapshot completes and will explicitly fail the
|
||||
* snapshot.
|
||||
*
|
||||
* @param snapshot
|
||||
* @return the list of online regions. Empty list is returned if no regions are responsible for
|
||||
* the given snapshot.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
|
||||
byte[] table = Bytes.toBytes(snapshot.getTable());
|
||||
return rss.getOnlineRegions(table);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the actual snapshot runner that will do all the 'hard' work
|
||||
*/
|
||||
public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
|
||||
|
||||
@Override
|
||||
public Subprocedure buildSubprocedure(String name, byte[] data) {
|
||||
try {
|
||||
// unwrap the snapshot information
|
||||
SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
|
||||
return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException("Could not read snapshot information from request.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* We use the SnapshotSubprocedurePool, a class specific thread pool instead of
|
||||
* {@link org.apache.hadoop.hbase.executor.ExecutorService}.
|
||||
*
|
||||
* It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
|
||||
* completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
|
||||
* failures.
|
||||
*
|
||||
* HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
|
||||
* really built for coordinated tasks where multiple threads as part of one larger task. In
|
||||
* RS's the HBase Executor services are only used for open and close and not other threadpooled
|
||||
* operations such as compactions and replication sinks.
|
||||
*/
|
||||
static class SnapshotSubprocedurePool {
|
||||
private final ExecutorCompletionService<Void> taskPool;
|
||||
private final ThreadPoolExecutor executor;
|
||||
private volatile boolean stopped;
|
||||
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
|
||||
private final String name;
|
||||
|
||||
SnapshotSubprocedurePool(Server parent, Configuration conf) {
|
||||
// configure the executor service
|
||||
long keepAlive = conf.getLong(
|
||||
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
|
||||
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
|
||||
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
||||
this.name = parent.getServerName().toString();
|
||||
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
|
||||
+ name + ")-snapshot-pool"));
|
||||
taskPool = new ExecutorCompletionService<Void>(executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit a task to the pool.
|
||||
*
|
||||
* NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
|
||||
* version does not support issuing tasks from multiple concurrnty table snapshots requests.
|
||||
*/
|
||||
void submitTask(final Callable<Void> task) {
|
||||
Future<Void> f = this.taskPool.submit(task);
|
||||
futures.add(f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
|
||||
* This *must* be called to after all tasks are submitted via submitTask.
|
||||
*
|
||||
* TODO: to support multiple concurrent snapshots this code needs to be rewritten. The common
|
||||
* pool of futures not being thread safe is part of the problem.
|
||||
*
|
||||
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
|
||||
* @throws SnapshotCreationException if the snapshot failed while we were waiting
|
||||
*/
|
||||
boolean waitForOutstandingTasks() throws ForeignException {
|
||||
LOG.debug("Waiting for local region snapshots to finish.");
|
||||
|
||||
try {
|
||||
// Using the completion service to process the futures that finish first first.
|
||||
int sz = futures.size();
|
||||
for (int i = 0; i < sz; i++) {
|
||||
Future<Void> f = taskPool.take();
|
||||
f.get();
|
||||
if (!futures.remove(f)) {
|
||||
LOG.warn("unexpected future");
|
||||
}
|
||||
LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots.");
|
||||
}
|
||||
LOG.debug("Completed " + sz + " local region snapshots.");
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
// TODO this isn't completely right and needs to be revisited.
|
||||
LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
|
||||
if (stopped) {
|
||||
throw new ForeignException("SnapshotSubprocedurePool", e);
|
||||
}
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
if (e.getCause() instanceof ForeignException) {
|
||||
throw (ForeignException)e.getCause();
|
||||
}
|
||||
throw new ForeignException(name, e.getCause());
|
||||
} finally {
|
||||
// close off remaining tasks
|
||||
for (Future<Void> f: futures) {
|
||||
if (!f.isDone()){
|
||||
LOG.warn("cancelling region task");
|
||||
f.cancel(true);
|
||||
}
|
||||
}
|
||||
futures.clear();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This attempts to cancel out all pending and in progress tasks (interruptions issues)
|
||||
*/
|
||||
void cancelTasks() {
|
||||
for (Future<Void> f: futures) {
|
||||
f.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This politely shutsdown the thread pool. Call when gracefully exiting a region server.
|
||||
*/
|
||||
void shutdown() {
|
||||
this.executor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* This abruptly shutsdown the thread pool. Call when exiting a region server.
|
||||
*/
|
||||
void stop() {
|
||||
if (this.stopped) return;
|
||||
|
||||
this.stopped = true;
|
||||
this.executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.FileNotFoundException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -111,31 +110,15 @@ public class SnapshotDescriptionUtils {
|
|||
|
||||
/** Temporary directory under the snapshot directory to store in-progress snapshots */
|
||||
public static final String SNAPSHOT_TMP_DIR_NAME = ".tmp";
|
||||
|
||||
// snapshot operation values
|
||||
/** Default value if no start time is specified */
|
||||
public static final long NO_SNAPSHOT_START_TIME_SPECIFIED = 0;
|
||||
|
||||
public static final String MASTER_WAIT_TIME_GLOBAL_SNAPSHOT = "hbase.snapshot.global.master.timeout";
|
||||
public static final String REGION_WAIT_TIME_GLOBAL_SNAPSHOT = "hbase.snapshot.global.region.timeout";
|
||||
public static final String MASTER_WAIT_TIME_TIMESTAMP_SNAPSHOT = "hbase.snapshot.timestamp.master.timeout";
|
||||
public static final String REGION_WAIT_TIME_TIMESTAMP_SNAPSHOT = "hbase.snapshot.timestamp.region.timeout";
|
||||
public static final String MASTER_WAIT_TIME_DISABLED_SNAPSHOT = "hbase.snapshot.disabled.master.timeout";
|
||||
|
||||
/** Default timeout of 60 sec for a snapshot timeout on a region */
|
||||
public static final long DEFAULT_REGION_SNAPSHOT_TIMEOUT = 60000;
|
||||
public static final String MASTER_SNAPSHOT_TIMEOUT_MILLIS = "hbase.snapshot.master.timeout.millis";
|
||||
|
||||
/** By default, wait 60 seconds for a snapshot to complete */
|
||||
public static final long DEFAULT_MAX_WAIT_TIME = 60000;
|
||||
|
||||
/**
|
||||
* Conf key for amount of time the in the future a timestamp snapshot should be taken (ms).
|
||||
* Defaults to {@value SnapshotDescriptionUtils#DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE}
|
||||
*/
|
||||
public static final String TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION = "hbase.snapshot.timestamp.master.splittime";
|
||||
/** Start 2 seconds in the future, if no start time given */
|
||||
public static final long DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE = 2000;
|
||||
|
||||
private SnapshotDescriptionUtils() {
|
||||
// private constructor for utility class
|
||||
}
|
||||
|
@ -165,35 +148,9 @@ public class SnapshotDescriptionUtils {
|
|||
long defaultMaxWaitTime) {
|
||||
String confKey;
|
||||
switch (type) {
|
||||
case GLOBAL:
|
||||
confKey = MASTER_WAIT_TIME_GLOBAL_SNAPSHOT;
|
||||
break;
|
||||
case TIMESTAMP:
|
||||
|
||||
confKey = MASTER_WAIT_TIME_TIMESTAMP_SNAPSHOT;
|
||||
case DISABLED:
|
||||
default:
|
||||
confKey = MASTER_WAIT_TIME_DISABLED_SNAPSHOT;
|
||||
}
|
||||
return conf.getLong(confKey, defaultMaxWaitTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf {@link Configuration} from which to check for the timeout
|
||||
* @param type type of snapshot being taken
|
||||
* @param defaultMaxWaitTime Default amount of time to wait, if none is in the configuration
|
||||
* @return the max amount of time the region should wait for a snapshot to complete
|
||||
*/
|
||||
public static long getMaxRegionTimeout(Configuration conf, SnapshotDescription.Type type,
|
||||
long defaultMaxWaitTime) {
|
||||
String confKey;
|
||||
switch (type) {
|
||||
case GLOBAL:
|
||||
confKey = REGION_WAIT_TIME_GLOBAL_SNAPSHOT;
|
||||
break;
|
||||
case TIMESTAMP:
|
||||
default:
|
||||
confKey = REGION_WAIT_TIME_TIMESTAMP_SNAPSHOT;
|
||||
confKey = MASTER_SNAPSHOT_TIMEOUT_MILLIS;
|
||||
}
|
||||
return conf.getLong(confKey, defaultMaxWaitTime);
|
||||
}
|
||||
|
@ -299,13 +256,6 @@ public class SnapshotDescriptionUtils {
|
|||
long time = snapshot.getCreationTime();
|
||||
if (time == SnapshotDescriptionUtils.NO_SNAPSHOT_START_TIME_SPECIFIED) {
|
||||
time = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if (snapshot.getType().equals(SnapshotDescription.Type.TIMESTAMP)) {
|
||||
long increment = conf.getLong(
|
||||
SnapshotDescriptionUtils.TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION,
|
||||
SnapshotDescriptionUtils.DEFAULT_TIMESTAMP_SNAPSHOT_SPLIT_IN_FUTURE);
|
||||
LOG.debug("Setting timestamp snapshot in future by " + increment + " ms.");
|
||||
time += increment;
|
||||
}
|
||||
LOG.debug("Creation time not specified, setting to:" + time + " (current time:"
|
||||
+ EnvironmentEdgeManager.currentTimeMillis() + ").");
|
||||
SnapshotDescription.Builder builder = snapshot.toBuilder();
|
||||
|
|
|
@ -113,7 +113,6 @@ public class TakeSnapshotUtils {
|
|||
* @param monitor monitor to notify when the snapshot life expires
|
||||
* @return the timer to use update to signal the start and end of the snapshot
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static TimeoutExceptionInjector getMasterTimerAndBindToMonitor(SnapshotDescription snapshot,
|
||||
Configuration conf, ForeignExceptionListener monitor) {
|
||||
long maxTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
|
||||
|
|
|
@ -34,8 +34,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
|
|||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotLogCleaner;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.Test;
|
||||
|
@ -50,6 +49,7 @@ public class TestSnapshotManager {
|
|||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class);
|
||||
ExecutorService pool = Mockito.mock(ExecutorService.class);
|
||||
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
|
||||
FileSystem fs;
|
||||
|
@ -71,7 +71,7 @@ public class TestSnapshotManager {
|
|||
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
|
||||
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
|
||||
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
|
||||
return new SnapshotManager(services);
|
||||
return new SnapshotManager(services, coordinator, pool);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -70,43 +70,8 @@ public class TestSnapshotDescriptionUtils {
|
|||
private static final Log LOG = LogFactory.getLog(TestSnapshotDescriptionUtils.class);
|
||||
|
||||
@Test
|
||||
public void testValidateDescriptor() {
|
||||
EnvironmentEdge edge = new EnvironmentEdge() {
|
||||
@Override
|
||||
public long currentTimeMillis() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(edge);
|
||||
|
||||
// check a basic snapshot description
|
||||
SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
|
||||
builder.setName("snapshot");
|
||||
builder.setTable("table");
|
||||
|
||||
// check that time is to an amount in the future
|
||||
public void testValidateMissingTableName() {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.setLong(SnapshotDescriptionUtils.TIMESTAMP_SNAPSHOT_SPLIT_POINT_ADDITION, 1);
|
||||
SnapshotDescription desc = SnapshotDescriptionUtils.validate(builder.build(), conf);
|
||||
assertEquals("Description creation time wasn't set correctly", 1, desc.getCreationTime());
|
||||
|
||||
// test a global snapshot
|
||||
edge = new EnvironmentEdge() {
|
||||
@Override
|
||||
public long currentTimeMillis() {
|
||||
return 2;
|
||||
}
|
||||
};
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(edge);
|
||||
builder.setType(Type.GLOBAL);
|
||||
desc = SnapshotDescriptionUtils.validate(builder.build(), conf);
|
||||
assertEquals("Description creation time wasn't set correctly", 2, desc.getCreationTime());
|
||||
|
||||
// test that we don't override a given value
|
||||
builder.setCreationTime(10);
|
||||
desc = SnapshotDescriptionUtils.validate(builder.build(), conf);
|
||||
assertEquals("Description creation time wasn't set correctly", 10, desc.getCreationTime());
|
||||
|
||||
try {
|
||||
SnapshotDescriptionUtils.validate(SnapshotDescription.newBuilder().setName("fail").build(),
|
||||
conf);
|
||||
|
|
Loading…
Reference in New Issue