HBASE-10935 support snapshot policy where flush memstore can be skipped to prevent production cluster freeze (Tianying Chang)
This commit is contained in:
parent
7f2f7c23a7
commit
86e5db5099
@ -2626,6 +2626,26 @@ public class HBaseAdmin implements Admin {
|
|||||||
SnapshotDescription.Type.FLUSH);
|
SnapshotDescription.Type.FLUSH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create snapshot for the given table of given flush type.
|
||||||
|
* <p>
|
||||||
|
* Snapshots are considered unique based on <b>the name of the snapshot</b>. Attempts to take a
|
||||||
|
* snapshot with the same name (even a different type or with different parameters) will fail with
|
||||||
|
* a {@link SnapshotCreationException} indicating the duplicate naming.
|
||||||
|
* <p>
|
||||||
|
* Snapshot names follow the same naming constraints as tables in HBase.
|
||||||
|
* @param snapshotName name of the snapshot to be created
|
||||||
|
* @param tableName name of the table for which snapshot is created
|
||||||
|
* @param flushType if the snapshot should be taken without flush memstore first
|
||||||
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
* @throws SnapshotCreationException if snapshot creation failed
|
||||||
|
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
|
||||||
|
*/
|
||||||
|
public void snapshot(final byte[] snapshotName, final byte[] tableName,
|
||||||
|
final SnapshotDescription.Type flushType) throws
|
||||||
|
IOException, SnapshotCreationException, IllegalArgumentException {
|
||||||
|
snapshot(Bytes.toString(snapshotName), Bytes.toString(tableName), flushType);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
public void snapshot(final String snapshotName,
|
public void snapshot(final String snapshotName,
|
||||||
* Create a timestamp consistent snapshot for the given table.
|
* Create a timestamp consistent snapshot for the given table.
|
||||||
|
@ -10476,6 +10476,10 @@ public final class HBaseProtos {
|
|||||||
* <code>FLUSH = 1;</code>
|
* <code>FLUSH = 1;</code>
|
||||||
*/
|
*/
|
||||||
FLUSH(1, 1),
|
FLUSH(1, 1),
|
||||||
|
/**
|
||||||
|
* <code>SKIPFLUSH = 2;</code>
|
||||||
|
*/
|
||||||
|
SKIPFLUSH(2, 2),
|
||||||
;
|
;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -10486,6 +10490,10 @@ public final class HBaseProtos {
|
|||||||
* <code>FLUSH = 1;</code>
|
* <code>FLUSH = 1;</code>
|
||||||
*/
|
*/
|
||||||
public static final int FLUSH_VALUE = 1;
|
public static final int FLUSH_VALUE = 1;
|
||||||
|
/**
|
||||||
|
* <code>SKIPFLUSH = 2;</code>
|
||||||
|
*/
|
||||||
|
public static final int SKIPFLUSH_VALUE = 2;
|
||||||
|
|
||||||
|
|
||||||
public final int getNumber() { return value; }
|
public final int getNumber() { return value; }
|
||||||
@ -10494,6 +10502,7 @@ public final class HBaseProtos {
|
|||||||
switch (value) {
|
switch (value) {
|
||||||
case 0: return DISABLED;
|
case 0: return DISABLED;
|
||||||
case 1: return FLUSH;
|
case 1: return FLUSH;
|
||||||
|
case 2: return SKIPFLUSH;
|
||||||
default: return null;
|
default: return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -16241,26 +16250,27 @@ public final class HBaseProtos {
|
|||||||
",\n\rNameBytesPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030" +
|
",\n\rNameBytesPair\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030" +
|
||||||
"\002 \001(\014\"/\n\016BytesBytesPair\022\r\n\005first\030\001 \002(\014\022\016" +
|
"\002 \001(\014\"/\n\016BytesBytesPair\022\r\n\005first\030\001 \002(\014\022\016" +
|
||||||
"\n\006second\030\002 \002(\014\",\n\rNameInt64Pair\022\014\n\004name\030" +
|
"\n\006second\030\002 \002(\014\",\n\rNameInt64Pair\022\014\n\004name\030" +
|
||||||
"\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\256\001\n\023SnapshotDescrip" +
|
"\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\275\001\n\023SnapshotDescrip" +
|
||||||
"tion\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030\002 \001(\t\022\030\n\rcre" +
|
"tion\022\014\n\004name\030\001 \002(\t\022\r\n\005table\030\002 \001(\t\022\030\n\rcre" +
|
||||||
"ation_time\030\003 \001(\003:\0010\022.\n\004type\030\004 \001(\0162\031.Snap" +
|
"ation_time\030\003 \001(\003:\0010\022.\n\004type\030\004 \001(\0162\031.Snap" +
|
||||||
"shotDescription.Type:\005FLUSH\022\017\n\007version\030\005" +
|
"shotDescription.Type:\005FLUSH\022\017\n\007version\030\005" +
|
||||||
" \001(\005\"\037\n\004Type\022\014\n\010DISABLED\020\000\022\t\n\005FLUSH\020\001\"}\n",
|
" \001(\005\".\n\004Type\022\014\n\010DISABLED\020\000\022\t\n\005FLUSH\020\001\022\r\n",
|
||||||
"\024ProcedureDescription\022\021\n\tsignature\030\001 \002(\t" +
|
"\tSKIPFLUSH\020\002\"}\n\024ProcedureDescription\022\021\n\t" +
|
||||||
"\022\020\n\010instance\030\002 \001(\t\022\030\n\rcreation_time\030\003 \001(" +
|
"signature\030\001 \002(\t\022\020\n\010instance\030\002 \001(\t\022\030\n\rcre" +
|
||||||
"\003:\0010\022&\n\rconfiguration\030\004 \003(\0132\017.NameString" +
|
"ation_time\030\003 \001(\003:\0010\022&\n\rconfiguration\030\004 \003" +
|
||||||
"Pair\"\n\n\010EmptyMsg\"\033\n\007LongMsg\022\020\n\010long_msg\030" +
|
"(\0132\017.NameStringPair\"\n\n\010EmptyMsg\"\033\n\007LongM" +
|
||||||
"\001 \002(\003\"\037\n\tDoubleMsg\022\022\n\ndouble_msg\030\001 \002(\001\"\'" +
|
"sg\022\020\n\010long_msg\030\001 \002(\003\"\037\n\tDoubleMsg\022\022\n\ndou" +
|
||||||
"\n\rBigDecimalMsg\022\026\n\016bigdecimal_msg\030\001 \002(\014\"" +
|
"ble_msg\030\001 \002(\001\"\'\n\rBigDecimalMsg\022\026\n\016bigdec" +
|
||||||
"5\n\004UUID\022\026\n\016least_sig_bits\030\001 \002(\004\022\025\n\rmost_" +
|
"imal_msg\030\001 \002(\014\"5\n\004UUID\022\026\n\016least_sig_bits" +
|
||||||
"sig_bits\030\002 \002(\004\"K\n\023NamespaceDescriptor\022\014\n" +
|
"\030\001 \002(\004\022\025\n\rmost_sig_bits\030\002 \002(\004\"K\n\023Namespa" +
|
||||||
"\004name\030\001 \002(\014\022&\n\rconfiguration\030\002 \003(\0132\017.Nam" +
|
"ceDescriptor\022\014\n\004name\030\001 \002(\014\022&\n\rconfigurat" +
|
||||||
"eStringPair\"$\n\020RegionServerInfo\022\020\n\010infoP",
|
"ion\030\002 \003(\0132\017.NameStringPair\"$\n\020RegionServ",
|
||||||
"ort\030\001 \001(\005*r\n\013CompareType\022\010\n\004LESS\020\000\022\021\n\rLE" +
|
"erInfo\022\020\n\010infoPort\030\001 \001(\005*r\n\013CompareType\022" +
|
||||||
"SS_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022" +
|
"\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005EQUAL\020\002\022" +
|
||||||
"\024\n\020GREATER_OR_EQUAL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO" +
|
"\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQUAL\020\004\022\013\n\007" +
|
||||||
"_OP\020\006B>\n*org.apache.hadoop.hbase.protobu" +
|
"GREATER\020\005\022\t\n\005NO_OP\020\006B>\n*org.apache.hadoo" +
|
||||||
"f.generatedB\013HBaseProtosH\001\240\001\001"
|
"p.hbase.protobuf.generatedB\013HBaseProtosH" +
|
||||||
|
"\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -159,6 +159,7 @@ message SnapshotDescription {
|
|||||||
enum Type {
|
enum Type {
|
||||||
DISABLED = 0;
|
DISABLED = 0;
|
||||||
FLUSH = 1;
|
FLUSH = 1;
|
||||||
|
SKIPFLUSH = 2;
|
||||||
}
|
}
|
||||||
optional Type type = 4 [default = FLUSH];
|
optional Type type = 4 [default = FLUSH];
|
||||||
optional int32 version = 5;
|
optional int32 version = 5;
|
||||||
|
@ -48,6 +48,7 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||||||
private final List<HRegion> regions;
|
private final List<HRegion> regions;
|
||||||
private final SnapshotDescription snapshot;
|
private final SnapshotDescription snapshot;
|
||||||
private final SnapshotSubprocedurePool taskManager;
|
private final SnapshotSubprocedurePool taskManager;
|
||||||
|
private boolean snapshotSkipFlush = false;
|
||||||
|
|
||||||
public FlushSnapshotSubprocedure(ProcedureMember member,
|
public FlushSnapshotSubprocedure(ProcedureMember member,
|
||||||
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
|
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
|
||||||
@ -55,6 +56,10 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||||||
SnapshotSubprocedurePool taskManager) {
|
SnapshotSubprocedurePool taskManager) {
|
||||||
super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
|
super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
|
||||||
this.snapshot = snapshot;
|
this.snapshot = snapshot;
|
||||||
|
|
||||||
|
if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
|
||||||
|
snapshotSkipFlush = true;
|
||||||
|
}
|
||||||
this.regions = regions;
|
this.regions = regions;
|
||||||
this.taskManager = taskManager;
|
this.taskManager = taskManager;
|
||||||
}
|
}
|
||||||
@ -78,10 +83,25 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
|||||||
LOG.debug("Starting region operation on " + region);
|
LOG.debug("Starting region operation on " + region);
|
||||||
region.startRegionOperation();
|
region.startRegionOperation();
|
||||||
try {
|
try {
|
||||||
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
|
if (snapshotSkipFlush) {
|
||||||
region.flushcache();
|
/*
|
||||||
|
* This is to take an online-snapshot without force a coordinated flush to prevent pause
|
||||||
|
* The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
|
||||||
|
* should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
|
||||||
|
* turned on/off based on the flush type.
|
||||||
|
* To minimized the code change, class name is not changed.
|
||||||
|
*/
|
||||||
|
LOG.debug("take snapshot without flush memstore first");
|
||||||
|
} else {
|
||||||
|
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
|
||||||
|
region.flushcache();
|
||||||
|
}
|
||||||
region.addRegionToSnapshot(snapshot, monitor);
|
region.addRegionToSnapshot(snapshot, monitor);
|
||||||
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
|
if (snapshotSkipFlush) {
|
||||||
|
LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
|
||||||
|
} else {
|
||||||
|
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Closing region operation on " + region);
|
LOG.debug("Closing region operation on " + region);
|
||||||
region.closeRegionOperation();
|
region.closeRegionOperation();
|
||||||
|
@ -186,6 +186,19 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
|
|||||||
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
|
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
|
||||||
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
|
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
|
||||||
timeoutMillis, involvedRegions, snapshot, taskManager);
|
timeoutMillis, involvedRegions, snapshot, taskManager);
|
||||||
|
case SKIPFLUSH:
|
||||||
|
/*
|
||||||
|
* This is to take an online-snapshot without force a coordinated flush to prevent pause
|
||||||
|
* The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
|
||||||
|
* should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
|
||||||
|
* turned on/off based on the flush type.
|
||||||
|
* To minimized the code change, class name is not changed.
|
||||||
|
*/
|
||||||
|
SnapshotSubprocedurePool taskManager2 =
|
||||||
|
new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
|
||||||
|
return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
|
||||||
|
timeoutMillis, involvedRegions, snapshot, taskManager2);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
|
throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
|
||||||
}
|
}
|
||||||
|
@ -175,6 +175,49 @@ public class TestFlushSnapshotFromClient {
|
|||||||
admin, fs);
|
admin, fs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test snapshotting a table that is online without flushing
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testSkipFlushTableSnapshot() throws Exception {
|
||||||
|
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||||
|
// make sure we don't fail on listing snapshots
|
||||||
|
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||||
|
|
||||||
|
// put some stuff in the table
|
||||||
|
HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
|
||||||
|
UTIL.loadTable(table, TEST_FAM);
|
||||||
|
|
||||||
|
LOG.debug("FS state before snapshot:");
|
||||||
|
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
|
||||||
|
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
|
||||||
|
|
||||||
|
// take a snapshot of the enabled table
|
||||||
|
String snapshotString = "skipFlushTableSnapshot";
|
||||||
|
byte[] snapshot = Bytes.toBytes(snapshotString);
|
||||||
|
admin.snapshot(snapshotString, STRING_TABLE_NAME, SnapshotDescription.Type.SKIPFLUSH);
|
||||||
|
LOG.debug("Snapshot completed.");
|
||||||
|
|
||||||
|
// make sure we have the snapshot
|
||||||
|
List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
|
||||||
|
snapshot, TABLE_NAME);
|
||||||
|
|
||||||
|
// make sure its a valid snapshot
|
||||||
|
FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
|
||||||
|
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
|
||||||
|
LOG.debug("FS state after snapshot:");
|
||||||
|
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
|
||||||
|
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
|
||||||
|
|
||||||
|
SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
|
||||||
|
admin, fs);
|
||||||
|
|
||||||
|
admin.deleteSnapshot(snapshot);
|
||||||
|
snapshots = admin.listSnapshots();
|
||||||
|
SnapshotTestingUtils.assertNoSnapshots(admin);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test simple flush snapshotting a table that is online
|
* Test simple flush snapshotting a table that is online
|
||||||
|
@ -61,6 +61,7 @@ module HBaseConstants
|
|||||||
ATTRIBUTES="ATTRIBUTES"
|
ATTRIBUTES="ATTRIBUTES"
|
||||||
VISIBILITY="VISIBILITY"
|
VISIBILITY="VISIBILITY"
|
||||||
AUTHORIZATIONS = "AUTHORIZATIONS"
|
AUTHORIZATIONS = "AUTHORIZATIONS"
|
||||||
|
SKIP_FLUSH = 'SKIP_FLUSH'
|
||||||
|
|
||||||
# Load constants from hbase java API
|
# Load constants from hbase java API
|
||||||
def self.promote_constants(constants)
|
def self.promote_constants(constants)
|
||||||
|
@ -22,6 +22,7 @@ java_import java.util.Arrays
|
|||||||
java_import org.apache.hadoop.hbase.util.Pair
|
java_import org.apache.hadoop.hbase.util.Pair
|
||||||
java_import org.apache.hadoop.hbase.util.RegionSplitter
|
java_import org.apache.hadoop.hbase.util.RegionSplitter
|
||||||
java_import org.apache.hadoop.hbase.util.Bytes
|
java_import org.apache.hadoop.hbase.util.Bytes
|
||||||
|
java_import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos::SnapshotDescription
|
||||||
|
|
||||||
# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
|
# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
|
||||||
|
|
||||||
@ -718,8 +719,18 @@ module Hbase
|
|||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Take a snapshot of specified table
|
# Take a snapshot of specified table
|
||||||
def snapshot(table, snapshot_name)
|
def snapshot(table, snapshot_name, *args)
|
||||||
@admin.snapshot(snapshot_name.to_java_bytes, table.to_java_bytes)
|
if args.empty?
|
||||||
|
@admin.snapshot(snapshot_name.to_java_bytes, table.to_java_bytes)
|
||||||
|
else
|
||||||
|
args.each do |arg|
|
||||||
|
if arg[SKIP_FLUSH] == true
|
||||||
|
@admin.snapshot(snapshot_name.to_java_bytes, table.to_java_bytes, SnapshotDescription::Type::SKIPFLUSH)
|
||||||
|
else
|
||||||
|
@admin.snapshot(snapshot_name.to_java_bytes, table.to_java_bytes)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
|
@ -24,13 +24,13 @@ module Shell
|
|||||||
Take a snapshot of specified table. Examples:
|
Take a snapshot of specified table. Examples:
|
||||||
|
|
||||||
hbase> snapshot 'sourceTable', 'snapshotName'
|
hbase> snapshot 'sourceTable', 'snapshotName'
|
||||||
hbase> snapshot 'namespace:sourceTable', 'snapshotName'
|
hbase> snapshot 'namespace:sourceTable', 'snapshotName', {SKIP_FLUSH => true}
|
||||||
EOF
|
EOF
|
||||||
end
|
end
|
||||||
|
|
||||||
def command(table, snapshot_name)
|
def command(table, snapshot_name, *args)
|
||||||
format_simple_command do
|
format_simple_command do
|
||||||
admin.snapshot(table, snapshot_name)
|
admin.snapshot(table, snapshot_name, *args)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
x
Reference in New Issue
Block a user