HBASE-19841 Every HTU should be local until DFS starts

This commit is contained in:
Mike Drob 2018-01-29 14:32:39 -06:00
parent 12a6af0de5
commit a165bd766a
10 changed files with 89 additions and 79 deletions

View File

@ -394,7 +394,13 @@ public abstract class CommonFSUtils {
public static FileSystem getWALFileSystem(final Configuration c) throws IOException { public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
Path p = getWALRootDir(c); Path p = getWALRootDir(c);
return p.getFileSystem(c); FileSystem fs = p.getFileSystem(c);
// hadoop-core does fs caching, so need to propogate this if set
String enforceStreamCapability = c.get(UNSAFE_STREAM_CAPABILITY_ENFORCE);
if (enforceStreamCapability != null) {
fs.getConf().set(UNSAFE_STREAM_CAPABILITY_ENFORCE, enforceStreamCapability);
}
return fs;
} }
private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException { private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {

View File

@ -63,11 +63,11 @@ public class HBaseCommonTestingUtility {
protected Configuration conf; protected Configuration conf;
public HBaseCommonTestingUtility() { public HBaseCommonTestingUtility() {
this(HBaseConfiguration.create()); this(null);
} }
public HBaseCommonTestingUtility(Configuration conf) { public HBaseCommonTestingUtility(Configuration conf) {
this.conf = conf; this.conf = (conf == null ? HBaseConfiguration.create() : conf);
} }
/** /**

View File

@ -72,7 +72,7 @@ public abstract class HBaseTestCase extends TestCase {
protected final HBaseTestingUtility testUtil = new HBaseTestingUtility(); protected final HBaseTestingUtility testUtil = new HBaseTestingUtility();
public volatile Configuration conf = HBaseConfiguration.create(); public volatile Configuration conf = testUtil.getConfiguration();
public final FSTableDescriptors fsTableDescriptors; public final FSTableDescriptors fsTableDescriptors;
{ {
try { try {

View File

@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
@ -213,8 +214,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
*/ */
private volatile Connection connection; private volatile Connection connection;
private boolean localMode = false;
/** Filesystem URI used for map-reduce mini-cluster setup */ /** Filesystem URI used for map-reduce mini-cluster setup */
private static String FS_URI; private static String FS_URI;
@ -298,57 +297,82 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
bloomAndCompressionCombinations(); bloomAndCompressionCombinations();
/**
* <p>Create an HBaseTestingUtility using a default configuration.
*
* <p>Initially, all tmp files are written to a local test data directory.
* Once {@link #startMiniDFSCluster} is called, either directly or via
* {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
*
* <p>Previously, there was a distinction between the type of utility returned by
* {@link #createLocalHTU()} and this constructor; this is no longer the case. All
* HBaseTestingUtility objects will behave as local until a DFS cluster is started,
* at which point they will switch to using mini DFS for storage.
*/
public HBaseTestingUtility() { public HBaseTestingUtility() {
this(HBaseConfiguration.create()); this(HBaseConfiguration.create());
} }
public HBaseTestingUtility(Configuration conf) { /**
* <p>Create an HBaseTestingUtility using a given configuration.
*
* <p>Initially, all tmp files are written to a local test data directory.
* Once {@link #startMiniDFSCluster} is called, either directly or via
* {@link #startMiniCluster()}, tmp data will be written to the DFS directory instead.
*
* <p>Previously, there was a distinction between the type of utility returned by
* {@link #createLocalHTU()} and this constructor; this is no longer the case. All
* HBaseTestingUtility objects will behave as local until a DFS cluster is started,
* at which point they will switch to using mini DFS for storage.
*
* @param conf The configuration to use for further operations
*/
public HBaseTestingUtility(@Nullable Configuration conf) {
super(conf); super(conf);
// a hbase checksum verification failure will cause unit tests to fail // a hbase checksum verification failure will cause unit tests to fail
ChecksumUtil.generateExceptionForChecksumFailureForTest(true); ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
// prevent contention for ports if other hbase thread(s) already running // if conf is provided, prevent contention for ports if other hbase thread(s) are running
if (conf != null) { if (conf != null) {
if (conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) if (conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
== HConstants.DEFAULT_MASTER_INFOPORT) { == HConstants.DEFAULT_MASTER_INFOPORT) {
conf.setInt(HConstants.MASTER_INFO_PORT, -1); conf.setInt(HConstants.MASTER_INFO_PORT, -1);
LOG.debug("Config property " + HConstants.MASTER_INFO_PORT + " changed to -1"); LOG.debug("Config property {} changed to -1", HConstants.MASTER_INFO_PORT);
} }
if (conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) if (conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT)
== HConstants.DEFAULT_REGIONSERVER_PORT) { == HConstants.DEFAULT_REGIONSERVER_PORT) {
conf.setInt(HConstants.REGIONSERVER_PORT, -1); conf.setInt(HConstants.REGIONSERVER_PORT, -1);
LOG.debug("Config property " + HConstants.REGIONSERVER_PORT + " changed to -1"); LOG.debug("Config property {} changed to -1", HConstants.REGIONSERVER_PORT);
}
} }
} }
// Every cluster is a local cluster until we start DFS
// Note that conf could be null, but this.conf will not be
String dataTestDir = getDataTestDir().toString();
this.conf.set("fs.defaultFS","file:///");
this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,false);
}
/** /**
* Create an HBaseTestingUtility where all tmp files are written to the local test data dir. * @deprecated use {@link HBaseTestingUtility#HBaseTestingUtility()} instead
* It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper * @return a normal HBaseTestingUtility
* test dir. Use this when you aren't using an Mini HDFS cluster.
* @return HBaseTestingUtility that use local fs for temp files.
*/ */
@Deprecated
public static HBaseTestingUtility createLocalHTU() { public static HBaseTestingUtility createLocalHTU() {
Configuration c = HBaseConfiguration.create(); return new HBaseTestingUtility();
return createLocalHTU(c);
} }
/** /**
* Create an HBaseTestingUtility where all tmp files are written to the local test data dir. * @deprecated use {@link HBaseTestingUtility#HBaseTestingUtility(Configuration)} instead
* It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper * @return a normal HBaseTestingUtility
* test dir. Use this when you aren't using an Mini HDFS cluster.
* @param c Configuration (will be modified)
* @return HBaseTestingUtility that use local fs for temp files.
*/ */
@Deprecated
public static HBaseTestingUtility createLocalHTU(Configuration c) { public static HBaseTestingUtility createLocalHTU(Configuration c) {
HBaseTestingUtility htu = new HBaseTestingUtility(c); return new HBaseTestingUtility(c);
String dataTestDir = htu.getDataTestDir().toString();
htu.getConfiguration().set("fs.defaultFS","file:///");
htu.getConfiguration().set(HConstants.HBASE_DIR, "file://" + dataTestDir);
LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
htu.localMode = true;
return htu;
} }
/** /**
@ -611,6 +635,23 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* @return The mini dfs cluster created. * @return The mini dfs cluster created.
*/ */
public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[]) public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
throws Exception {
return startMiniDFSCluster(servers, null, hosts);
}
private void setFs() throws IOException {
if(this.dfsCluster == null){
LOG.info("Skipping setting fs because dfsCluster is null");
return;
}
FileSystem fs = this.dfsCluster.getFileSystem();
FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
// re-enable this check with dfs
conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
}
public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
throws Exception { throws Exception {
createDirsAndSetProperties(); createDirsAndSetProperties();
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
@ -624,7 +665,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
TraceUtil.initTracer(conf); TraceUtil.initTracer(conf);
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true, this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
true, null, null, hosts, null); true, null, racks, hosts, null);
// Set this just-started cluster as our filesystem. // Set this just-started cluster as our filesystem.
setFs(); setFs();
@ -634,34 +675,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
//reset the test directory for test file system //reset the test directory for test file system
dataTestDirOnTestFS = null; dataTestDirOnTestFS = null;
String dataTestDir = getDataTestDir().toString();
return this.dfsCluster; conf.set(HConstants.HBASE_DIR, dataTestDir);
} LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
private void setFs() throws IOException {
if(this.dfsCluster == null){
LOG.info("Skipping setting fs because dfsCluster is null");
return;
}
FileSystem fs = this.dfsCluster.getFileSystem();
FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
}
public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
throws Exception {
createDirsAndSetProperties();
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
true, null, racks, hosts, null);
// Set this just-started cluster as our filesystem.
FileSystem fs = this.dfsCluster.getFileSystem();
FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
// Wait for the cluster to be totally up
this.dfsCluster.waitClusterUp();
//reset the test directory for test file system
dataTestDirOnTestFS = null;
return this.dfsCluster; return this.dfsCluster;
} }
@ -957,7 +973,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
// Bring up mini dfs cluster. This spews a bunch of warnings about missing // Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
if(this.dfsCluster == null && !localMode) { if(this.dfsCluster == null) {
LOG.info("STARTING DFS"); LOG.info("STARTING DFS");
dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts); dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
} else LOG.info("NOT STARTING DFS"); } else LOG.info("NOT STARTING DFS");

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -209,7 +208,8 @@ public class TestRegionObserverScannerOpenHook {
byte[] A = Bytes.toBytes("A"); byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A }; byte[][] FAMILIES = new byte[][] { A };
Configuration conf = HBaseConfiguration.create(); // Use new HTU to not overlap with the DFS cluster started in #CompactionStacking
Configuration conf = new HBaseTestingUtility().getConfiguration();
HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
RegionCoprocessorHost h = region.getCoprocessorHost(); RegionCoprocessorHost h = region.getCoprocessorHost();
h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(NoDataFromScan.class, Coprocessor.PRIORITY_HIGHEST, conf);
@ -234,7 +234,8 @@ public class TestRegionObserverScannerOpenHook {
byte[] A = Bytes.toBytes("A"); byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A }; byte[][] FAMILIES = new byte[][] { A };
Configuration conf = HBaseConfiguration.create(); // Use new HTU to not overlap with the DFS cluster started in #CompactionStacking
Configuration conf = new HBaseTestingUtility().getConfiguration();
HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES); HRegion region = initHRegion(TABLE, getClass().getName(), conf, FAMILIES);
RegionCoprocessorHost h = region.getCoprocessorHost(); RegionCoprocessorHost h = region.getCoprocessorHost();
h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf); h.load(NoDataFromFlush.class, Coprocessor.PRIORITY_HIGHEST, conf);

View File

@ -24,12 +24,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MockRegionServerServices;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -146,7 +144,7 @@ public class TestRegionObserverStacking extends TestCase {
byte[] A = Bytes.toBytes("A"); byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A } ; byte[][] FAMILIES = new byte[][] { A } ;
Configuration conf = HBaseConfiguration.create(); Configuration conf = TEST_UTIL.getConfiguration();
HRegion region = initHRegion(TABLE, getClass().getName(), HRegion region = initHRegion(TABLE, getClass().getName(),
conf, FAMILIES); conf, FAMILIES);
RegionCoprocessorHost h = region.getCoprocessorHost(); RegionCoprocessorHost h = region.getCoprocessorHost();

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -47,7 +47,7 @@ public class TestLocalAsyncOutput {
private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@AfterClass @AfterClass
public static void tearDownAfterClass() throws IOException { public static void tearDownAfterClass() throws IOException {

View File

@ -241,7 +241,7 @@ public class TestHStore {
public void testFlushSizeSizing() throws Exception { public void testFlushSizeSizing() throws Exception {
LOG.info("Setting up a faulty file system that cannot write in " + LOG.info("Setting up a faulty file system that cannot write in " +
this.name.getMethodName()); this.name.getMethodName());
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
// Only retry once. // Only retry once.
conf.setInt("hbase.hstore.flush.retries.number", 1); conf.setInt("hbase.hstore.flush.retries.number", 1);
User user = User.createUserForTesting(conf, this.name.getMethodName(), User user = User.createUserForTesting(conf, this.name.getMethodName(),
@ -668,7 +668,7 @@ public class TestHStore {
public void testHandleErrorsInFlush() throws Exception { public void testHandleErrorsInFlush() throws Exception {
LOG.info("Setting up a faulty file system that cannot write"); LOG.info("Setting up a faulty file system that cannot write");
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
User user = User.createUserForTesting(conf, User user = User.createUserForTesting(conf,
"testhandleerrorsinflush", new String[]{"foo"}); "testhandleerrorsinflush", new String[]{"foo"});
// Inject our faulty LocalFileSystem // Inject our faulty LocalFileSystem

View File

@ -197,8 +197,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
latencyHistogram.update(System.nanoTime() - now); latencyHistogram.update(System.nanoTime() - now);
} }
} }
long totalTime = (System.currentTimeMillis() - startTime);
logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
} catch (Exception e) { } catch (Exception e) {
LOG.error(getClass().getSimpleName() + " Thread failed", e); LOG.error(getClass().getSimpleName() + " Thread failed", e);
} }

View File

@ -158,13 +158,4 @@
<name>hbase.hconnection.threads.keepalivetime</name> <name>hbase.hconnection.threads.keepalivetime</name>
<value>3</value> <value>3</value>
</property> </property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
<description>
Controls whether HBase will check for stream capabilities (hflush/hsync).
Disable this if you intend to run on LocalFileSystem.
WARNING: Doing so may expose you to additional risk of data loss!
</description>
</property>
</configuration> </configuration>