HBASE-19289 Add flag to disable stream capability enforcement
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
d5aefbd2c7
commit
2c9ef8a471
|
@ -62,6 +62,9 @@ public abstract class CommonFSUtils {
|
||||||
/** Parameter name for HBase WAL directory */
|
/** Parameter name for HBase WAL directory */
|
||||||
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
|
public static final String HBASE_WAL_DIR = "hbase.wal.dir";
|
||||||
|
|
||||||
|
/** Parameter to disable stream capability enforcement checks */
|
||||||
|
public static final String UNSAFE_STREAM_CAPABILITY_ENFORCE = "hbase.unsafe.stream.capability.enforce";
|
||||||
|
|
||||||
/** Full access permissions (starting point for a umask) */
|
/** Full access permissions (starting point for a umask) */
|
||||||
public static final String FULL_RWX_PERMISSIONS = "777";
|
public static final String FULL_RWX_PERMISSIONS = "777";
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
private final Path walDir;
|
private final Path walDir;
|
||||||
private final Path walArchiveDir;
|
private final Path walArchiveDir;
|
||||||
|
private final boolean enforceStreamCapability;
|
||||||
|
|
||||||
private final AtomicReference<Throwable> syncException = new AtomicReference<>();
|
private final AtomicReference<Throwable> syncException = new AtomicReference<>();
|
||||||
private final AtomicBoolean loading = new AtomicBoolean(true);
|
private final AtomicBoolean loading = new AtomicBoolean(true);
|
||||||
|
@ -205,6 +206,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
this.walDir = walDir;
|
this.walDir = walDir;
|
||||||
this.walArchiveDir = walArchiveDir;
|
this.walArchiveDir = walArchiveDir;
|
||||||
this.fs = walDir.getFileSystem(conf);
|
this.fs = walDir.getFileSystem(conf);
|
||||||
|
this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
|
||||||
|
|
||||||
// Create the log directory for the procedure store
|
// Create the log directory for the procedure store
|
||||||
if (!fs.exists(walDir)) {
|
if (!fs.exists(walDir)) {
|
||||||
|
@ -1028,8 +1030,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
||||||
// ensure that we can provide the level of data safety we're configured
|
// ensure that we can provide the level of data safety we're configured
|
||||||
// to provide.
|
// to provide.
|
||||||
final String durability = useHsync ? "hsync" : "hflush";
|
final String durability = useHsync ? "hsync" : "hflush";
|
||||||
if (!(CommonFSUtils.hasCapability(newStream, durability))) {
|
if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
|
||||||
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
||||||
" for proper operation during component failures, but the underlying filesystem does " +
|
" for proper operation during component failures, but the underlying filesystem does " +
|
||||||
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
|
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
|
||||||
"' to set the desired level of robustness and ensure the config value of '" +
|
"' to set the desired level of robustness and ensure the config value of '" +
|
||||||
|
|
|
@ -32,4 +32,13 @@
|
||||||
procedure to have an owner
|
procedure to have an owner
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.unsafe.stream.capability.enforce</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Controls whether HBase will check for stream capabilities (hflush/hsync).
|
||||||
|
Disable this if you intend to run on LocalFileSystem.
|
||||||
|
WARNING: Doing so may expose you to additional risk of data loss!
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -73,8 +73,9 @@ public final class AsyncFSOutputHelper {
|
||||||
// After we create the stream but before we attempt to use it at all
|
// After we create the stream but before we attempt to use it at all
|
||||||
// ensure that we can provide the level of data safety we're configured
|
// ensure that we can provide the level of data safety we're configured
|
||||||
// to provide.
|
// to provide.
|
||||||
if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
|
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
|
||||||
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
|
!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
|
||||||
|
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
|
||||||
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
|
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
|
||||||
}
|
}
|
||||||
final ExecutorService flushExecutor =
|
final ExecutorService flushExecutor =
|
||||||
|
|
|
@ -93,7 +93,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
|
this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
|
||||||
null);
|
null);
|
||||||
// TODO Be sure to add a check for hsync if this branch includes HBASE-19024
|
// TODO Be sure to add a check for hsync if this branch includes HBASE-19024
|
||||||
if (!(CommonFSUtils.hasCapability(output, "hflush"))) {
|
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
|
||||||
|
!(CommonFSUtils.hasCapability(output, "hflush"))) {
|
||||||
throw new StreamLacksCapabilityException("hflush");
|
throw new StreamLacksCapabilityException("hflush");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,4 +158,13 @@
|
||||||
<name>hbase.hconnection.threads.keepalivetime</name>
|
<name>hbase.hconnection.threads.keepalivetime</name>
|
||||||
<value>3</value>
|
<value>3</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.unsafe.stream.capability.enforce</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Controls whether HBase will check for stream capabilities (hflush/hsync).
|
||||||
|
Disable this if you intend to run on LocalFileSystem.
|
||||||
|
WARNING: Doing so may expose you to additional risk of data loss!
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
Loading…
Reference in New Issue