HBASE-23899 [Flakey Test] Stabilizations and Debug
A miscellaney. Add extra logging to help w/ debug to a bunch of tests. Fix some issues particular where we ran into mismatched filesystem complaint. Some modernizations, removal of unnecessary deletes (especially after seeing tests fail in table delete), and cleanup. Recategorized one tests because it starts four clusters in the one JVM from medium to large. Finally, zk standalone server won't come on occasion; added debug and thread dumping to help figure why ( manifests as test failing in startup saying master didn't launch). hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java Fixes occasional mismatched filesystems where the difference is file:// vs file:/// or we pick up hdfs schema when it a local fs test. Had to do this vetting of how we do make qualified on a Path in a few places, not just here as a few tests failed with this same issue. Code in here is used by a lot of tests that each in turn suffered this mismatch. Refactor for clarity hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotV1NoCluster.java Unused import. hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java This test fails if tmp dir is not where it expects because tries to make rootdir there. Give it a rootdir under test data dir. hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java This change is probably useless. I think the issue is actually a problem addressed later where our test for zk server being up gets stuck and never times out. hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSplitOrMergeStatus.java Move off deprecated APIs. hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java Log when we fail balance check for DEBUG Currently just says 'false' hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java NPEs on way out if setup failed. hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Add logging when assert fails to help w/ DEBUG hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAbortTimeout.java Don't bother removing stuff on teardown. All gets thrown away anyways. Saw a few hangs in here in the teardown where hdfs was down before expected messing up shutdown. hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java Add timeout on socket; was seeing check for zk server getting stuck and never timing out (test time out in startup) hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotWithTemporaryDirectory.java Write to test data dir instead. Be careful about how we make qualified paths. hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java Remove snowflake configs. hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java Add a hacky pause. Tried adding barriers but didn't work. Needs deep dive. hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java Remove code copied from zk and use zk methods directly instead. A general problem is that zk cluster doesn't come up occasionally but no clue why. Add thread dumping and state check.
This commit is contained in:
parent
00ef6c624a
commit
6777e2c2d1
|
@ -70,13 +70,6 @@ public abstract class TestTableInputFormatScanBase {
|
|||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
// test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
|
||||
// this turns it off for this test. TODO: Figure out why scr breaks recovery.
|
||||
System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
|
||||
|
||||
// switch TIF to log at DEBUG level
|
||||
TEST_UTIL.enableDebug(TableInputFormat.class);
|
||||
TEST_UTIL.enableDebug(TableInputFormatBase.class);
|
||||
// start mini hbase cluster
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
// create and fill table
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -21,9 +21,7 @@ import static org.apache.hadoop.util.ToolRunner.run;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -55,7 +53,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
|
||||
|
||||
|
@ -90,12 +87,6 @@ public class TestExportSnapshot {
|
|||
// If a single node has enough failures (default 3), resource manager will blacklist it.
|
||||
// With only 2 nodes and tests injecting faults, we don't want that.
|
||||
conf.setInt("mapreduce.job.maxtaskfailures.per.tracker", 100);
|
||||
/*
|
||||
conf.setInt("hbase.client.pause", 250);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
conf.setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||
conf.setInt("mapreduce.map.maxattempts", 10);
|
||||
*/
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -211,36 +202,38 @@ public class TestExportSnapshot {
|
|||
*/
|
||||
protected static void testExportFileSystemState(final Configuration conf, final TableName tableName,
|
||||
final String snapshotName, final String targetName, final int filesExpected,
|
||||
final Path sourceDir, Path copyDir, final boolean overwrite,
|
||||
final Path srcDir, Path rawTgtDir, final boolean overwrite,
|
||||
final RegionPredicate bypassregionPredicate, boolean success) throws Exception {
|
||||
URI hdfsUri = FileSystem.get(conf).getUri();
|
||||
FileSystem fs = FileSystem.get(copyDir.toUri(), conf);
|
||||
LOG.info("DEBUG FS {} {} {}, hdfsUri={}", fs, copyDir, copyDir.toUri(), hdfsUri);
|
||||
copyDir = copyDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
FileSystem tgtFs = rawTgtDir.getFileSystem(conf);
|
||||
FileSystem srcFs = srcDir.getFileSystem(conf);
|
||||
Path tgtDir = rawTgtDir.makeQualified(tgtFs.getUri(), tgtFs.getWorkingDirectory());
|
||||
LOG.info("tgtFsUri={}, tgtDir={}, rawTgtDir={}, srcFsUri={}, srcDir={}",
|
||||
tgtFs.getUri(), tgtDir, rawTgtDir, srcFs.getUri(), srcDir);
|
||||
List<String> opts = new ArrayList<>();
|
||||
opts.add("--snapshot");
|
||||
opts.add(snapshotName);
|
||||
opts.add("--copy-to");
|
||||
opts.add(copyDir.toString());
|
||||
if (!targetName.equals(snapshotName)) {
|
||||
opts.add("--target");
|
||||
opts.add(targetName);
|
||||
}
|
||||
if (overwrite) opts.add("--overwrite");
|
||||
if (overwrite) {
|
||||
opts.add("--overwrite");
|
||||
}
|
||||
|
||||
// Export Snapshot
|
||||
int res = run(conf, new ExportSnapshot(), opts.toArray(new String[opts.size()]));
|
||||
assertEquals("success " + success + ", res=" + res, success ? 0 : 1, res);
|
||||
if (!success) {
|
||||
final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, targetName);
|
||||
assertFalse(copyDir.toString() + " " + targetDir.toString(),
|
||||
fs.exists(new Path(copyDir, targetDir)));
|
||||
assertFalse(tgtDir.toString() + " " + targetDir.toString(),
|
||||
tgtFs.exists(new Path(tgtDir, targetDir)));
|
||||
return;
|
||||
}
|
||||
LOG.info("Exported snapshot");
|
||||
|
||||
// Verify File-System state
|
||||
FileStatus[] rootFiles = fs.listStatus(copyDir);
|
||||
FileStatus[] rootFiles = tgtFs.listStatus(tgtDir);
|
||||
assertEquals(filesExpected > 0 ? 2 : 1, rootFiles.length);
|
||||
for (FileStatus fileStatus: rootFiles) {
|
||||
String name = fileStatus.getPath().getName();
|
||||
|
@ -251,11 +244,10 @@ public class TestExportSnapshot {
|
|||
LOG.info("Verified filesystem state");
|
||||
|
||||
// Compare the snapshot metadata and verify the hfiles
|
||||
final FileSystem hdfs = FileSystem.get(hdfsUri, conf);
|
||||
final Path snapshotDir = new Path(HConstants.SNAPSHOT_DIR_NAME, snapshotName);
|
||||
final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, targetName);
|
||||
verifySnapshotDir(hdfs, new Path(sourceDir, snapshotDir), fs, new Path(copyDir, targetDir));
|
||||
Set<String> snapshotFiles = verifySnapshot(conf, fs, copyDir, tableName,
|
||||
verifySnapshotDir(srcFs, new Path(srcDir, snapshotDir), tgtFs, new Path(tgtDir, targetDir));
|
||||
Set<String> snapshotFiles = verifySnapshot(conf, tgtFs, tgtDir, tableName,
|
||||
targetName, bypassregionPredicate);
|
||||
assertEquals(filesExpected, snapshotFiles.size());
|
||||
}
|
||||
|
@ -266,8 +258,6 @@ public class TestExportSnapshot {
|
|||
@Test
|
||||
public void testExportRetry() throws Exception {
|
||||
Path copyDir = getLocalDestinationDir();
|
||||
FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
|
||||
copyDir = copyDir.makeQualified(fs);
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setBoolean(ExportSnapshot.Testing.CONF_TEST_FAILURE, true);
|
||||
conf.setInt(ExportSnapshot.Testing.CONF_TEST_FAILURE_COUNT, 2);
|
||||
|
@ -321,14 +311,13 @@ public class TestExportSnapshot {
|
|||
@Override
|
||||
public void storeFile(final RegionInfo regionInfo, final String family,
|
||||
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
if (bypassregionPredicate != null && bypassregionPredicate.evaluate(regionInfo))
|
||||
if (bypassregionPredicate != null && bypassregionPredicate.evaluate(regionInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
String hfile = storeFile.getName();
|
||||
snapshotFiles.add(hfile);
|
||||
if (storeFile.hasReference()) {
|
||||
// Nothing to do here, we have already the reference embedded
|
||||
} else {
|
||||
if (!storeFile.hasReference()) {
|
||||
verifyNonEmptyFile(new Path(exportedArchive,
|
||||
new Path(FSUtils.getTableDir(new Path("./"), tableName),
|
||||
new Path(regionInfo.getEncodedName(), new Path(family, hfile)))));
|
||||
|
@ -339,7 +328,7 @@ public class TestExportSnapshot {
|
|||
assertTrue(path + " should exists", fs.exists(path));
|
||||
assertTrue(path + " should not be empty", fs.getFileStatus(path).getLen() > 0);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Verify Snapshot description
|
||||
SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, exportedSnapshot);
|
||||
|
@ -352,7 +341,7 @@ public class TestExportSnapshot {
|
|||
throws IOException {
|
||||
Set<String> files = new HashSet<>();
|
||||
LOG.debug("List files in {} in root {} at {}", fs, root, dir);
|
||||
int rootPrefix = root.makeQualified(fs.getUri(), root).toString().length();
|
||||
int rootPrefix = root.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString().length();
|
||||
FileStatus[] list = FSUtils.listStatus(fs, dir);
|
||||
if (list != null) {
|
||||
for (FileStatus fstat: list) {
|
||||
|
@ -376,8 +365,13 @@ public class TestExportSnapshot {
|
|||
|
||||
private Path getLocalDestinationDir() {
|
||||
Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis());
|
||||
LOG.info("Local export destination path: " + path);
|
||||
return path;
|
||||
try {
|
||||
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
|
||||
LOG.info("Local export destination path: " + path);
|
||||
return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
private static void removeExportDir(final Path path) throws IOException {
|
||||
|
|
|
@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.snapshot;
|
|||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -55,21 +53,24 @@ public class TestExportSnapshotV1NoCluster {
|
|||
|
||||
private HBaseCommonTestingUtility testUtil = new HBaseCommonTestingUtility();
|
||||
private Path testDir;
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void setUpBefore() throws Exception {
|
||||
this.testDir = setup(this.testUtil);
|
||||
// Make sure testDir is on LocalFileSystem
|
||||
this.fs = FileSystem.getLocal(this.testUtil.getConfiguration());
|
||||
this.testDir = setup(fs, this.testUtil);
|
||||
LOG.info("fs={}, fsuri={}, fswd={}, testDir={}", this.fs, this.fs.getUri(),
|
||||
this.fs.getWorkingDirectory(), this.testDir);
|
||||
assertTrue("FileSystem '" + fs + "' is not local", fs instanceof LocalFileSystem);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup for test. Returns path to test data dir.
|
||||
* Setup for test. Returns path to test data dir. Sets configuration into the passed
|
||||
* hctu.getConfiguration.
|
||||
*/
|
||||
static Path setup(HBaseCommonTestingUtility hctu) throws IOException {
|
||||
// Make sure testDir is on LocalFileSystem
|
||||
Path testDir =
|
||||
hctu.getDataTestDir().makeQualified(URI.create("file:///"), new Path("/"));
|
||||
FileSystem fs = testDir.getFileSystem(hctu.getConfiguration());
|
||||
assertTrue("FileSystem '" + fs + "' is not local", fs instanceof LocalFileSystem);
|
||||
static Path setup(FileSystem fs, HBaseCommonTestingUtility hctu) throws IOException {
|
||||
Path testDir = hctu.getDataTestDir().makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
hctu.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
|
||||
hctu.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||
hctu.getConfiguration().setInt("hbase.client.pause", 250);
|
||||
|
@ -77,7 +78,7 @@ public class TestExportSnapshotV1NoCluster {
|
|||
hctu.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||
hctu.getConfiguration().setInt("mapreduce.map.maxattempts", 10);
|
||||
hctu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
|
||||
return testDir;
|
||||
return testDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -86,18 +87,19 @@ public class TestExportSnapshotV1NoCluster {
|
|||
@Test
|
||||
public void testSnapshotWithRefsExportFileSystemState() throws Exception {
|
||||
final SnapshotMock snapshotMock = new SnapshotMock(testUtil.getConfiguration(),
|
||||
testDir.getFileSystem(testUtil.getConfiguration()), testDir);
|
||||
this.fs, testDir);
|
||||
final SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV1("tableWithRefsV1",
|
||||
"tableWithRefsV1");
|
||||
testSnapshotWithRefsExportFileSystemState(builder, testUtil, testDir);
|
||||
testSnapshotWithRefsExportFileSystemState(this.fs, builder, testUtil, testDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a couple of regions for the specified SnapshotMock,
|
||||
* and then it will run the export and verification.
|
||||
*/
|
||||
static void testSnapshotWithRefsExportFileSystemState(SnapshotMock.SnapshotBuilder builder,
|
||||
HBaseCommonTestingUtility testUtil, Path testDir) throws Exception {
|
||||
static void testSnapshotWithRefsExportFileSystemState(FileSystem fs,
|
||||
SnapshotMock.SnapshotBuilder builder, HBaseCommonTestingUtility testUtil, Path testDir)
|
||||
throws Exception {
|
||||
Path[] r1Files = builder.addRegion();
|
||||
Path[] r2Files = builder.addRegion();
|
||||
builder.commit();
|
||||
|
@ -106,14 +108,16 @@ public class TestExportSnapshotV1NoCluster {
|
|||
TableName tableName = builder.getTableDescriptor().getTableName();
|
||||
TestExportSnapshot.testExportFileSystemState(testUtil.getConfiguration(),
|
||||
tableName, snapshotName, snapshotName, snapshotFilesCount,
|
||||
testDir, getDestinationDir(testUtil, testDir), false, null, true);
|
||||
testDir, getDestinationDir(fs, testUtil, testDir), false, null, true);
|
||||
}
|
||||
|
||||
static Path getDestinationDir(HBaseCommonTestingUtility hctu, Path testDir) throws IOException {
|
||||
FileSystem fs = FileSystem.get(hctu.getConfiguration());
|
||||
static Path getDestinationDir(FileSystem fs, HBaseCommonTestingUtility hctu, Path testDir)
|
||||
throws IOException {
|
||||
Path path = new Path(new Path(testDir, "export-test"),
|
||||
"export-" + System.currentTimeMillis()).makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||
LOG.info("HDFS export destination path: " + path);
|
||||
"export-" + System.currentTimeMillis()).makeQualified(fs.getUri(),
|
||||
fs.getWorkingDirectory());
|
||||
LOG.info("Export destination={}, fs={}, fsurl={}, fswd={}, testDir={}", path, fs, fs.getUri(),
|
||||
fs.getWorkingDirectory(), testDir);
|
||||
return path;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
|
@ -44,10 +46,15 @@ public class TestExportSnapshotV2NoCluster {
|
|||
|
||||
private HBaseCommonTestingUtility testUtil = new HBaseCommonTestingUtility();
|
||||
private Path testDir;
|
||||
private FileSystem fs;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
this.testDir = TestExportSnapshotV1NoCluster.setup(this.testUtil);
|
||||
// Make sure testDir is on LocalFileSystem
|
||||
this.fs = FileSystem.getLocal(this.testUtil.getConfiguration());
|
||||
this.testDir = TestExportSnapshotV1NoCluster.setup(this.fs, this.testUtil);
|
||||
LOG.info("fs={}, testDir={}", this.fs, this.testDir);
|
||||
assertTrue("FileSystem '" + fs + "' is not local", fs instanceof LocalFileSystem);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -56,7 +63,7 @@ public class TestExportSnapshotV2NoCluster {
|
|||
testDir.getFileSystem(testUtil.getConfiguration()), testDir);
|
||||
final SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2("tableWithRefsV2",
|
||||
"tableWithRefsV2");
|
||||
TestExportSnapshotV1NoCluster.testSnapshotWithRefsExportFileSystemState(builder, this.testUtil,
|
||||
this.testDir);
|
||||
TestExportSnapshotV1NoCluster.testSnapshotWithRefsExportFileSystemState(this.fs, builder,
|
||||
this.testUtil, this.testDir);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,11 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Paths;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -37,9 +36,6 @@ public class TestExportSnapshotWithTemporaryDirectory extends TestExportSnapshot
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestExportSnapshotWithTemporaryDirectory.class);
|
||||
|
||||
protected static String TEMP_DIR = Paths.get("").toAbsolutePath().toString() + Path.SEPARATOR
|
||||
+ UUID.randomUUID().toString();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
setUpBaseConf(TEST_UTIL.getConfiguration());
|
||||
|
@ -50,11 +46,18 @@ public class TestExportSnapshotWithTemporaryDirectory extends TestExportSnapshot
|
|||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TestExportSnapshot.tearDownAfterClass();
|
||||
FileUtils.deleteDirectory(new File(TEMP_DIR));
|
||||
}
|
||||
|
||||
public static void setUpBaseConf(Configuration conf) {
|
||||
Path tmpDir = null;
|
||||
try {
|
||||
FileSystem localFs = FileSystem.getLocal(conf);
|
||||
tmpDir = TEST_UTIL.getDataTestDir(UUID.randomUUID().toString()).
|
||||
makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
TestExportSnapshot.setUpBaseConf(conf);
|
||||
conf.set(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR, "file://" + new Path(TEMP_DIR, ".tmpdir").toUri());
|
||||
conf.set(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR, tmpDir.toUri().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
|
@ -85,7 +86,9 @@ public class TestWALProcedureStore {
|
|||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
testDir = htu.getDataTestDir();
|
||||
htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
|
||||
fs = testDir.getFileSystem(htu.getConfiguration());
|
||||
htu.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString());
|
||||
assertTrue(testDir.depth() > 1);
|
||||
|
||||
setupConfig(htu.getConfiguration());
|
||||
|
|
|
@ -216,6 +216,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
|||
this.scheduleThreadPool.scheduleAtFixedRate(
|
||||
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
|
||||
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
|
||||
LOG.info("{} started", this.server.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.crypto.SecretKey;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -36,9 +37,9 @@ import org.apache.hadoop.io.WritableUtils;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class AuthenticationKey implements Writable {
|
||||
private int id;
|
||||
private long expirationDate;
|
||||
private SecretKey secret;
|
||||
private volatile int id;
|
||||
private volatile long expirationDate;
|
||||
private volatile SecretKey secret;
|
||||
|
||||
public AuthenticationKey() {
|
||||
// for Writable
|
||||
|
@ -90,10 +91,11 @@ public class AuthenticationKey implements Writable {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder buf = new StringBuilder();
|
||||
buf.append("AuthenticationKey[ ")
|
||||
buf.append("AuthenticationKey[")
|
||||
.append("id=").append(id)
|
||||
.append(", expiration=").append(expirationDate)
|
||||
.append(" ]");
|
||||
.append(", expiration=").append(Instant.ofEpochMilli(this.expirationDate))
|
||||
.append(", obj=").append(super.toString())
|
||||
.append("]");
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -190,15 +190,11 @@ public class AuthenticationTokenSecretManager
|
|||
public synchronized void addKey(AuthenticationKey key) throws IOException {
|
||||
// ignore zk changes when running as master
|
||||
if (leaderElector.isMaster()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running as master, ignoring new key "+key.getKeyId());
|
||||
}
|
||||
LOG.debug("Running as master, ignoring new key {}", key);
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding key "+key.getKeyId());
|
||||
}
|
||||
LOG.debug("Adding key {}", key.getKeyId());
|
||||
|
||||
allKeys.put(key.getKeyId(), key);
|
||||
if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
|
||||
|
@ -213,14 +209,12 @@ public class AuthenticationTokenSecretManager
|
|||
synchronized boolean removeKey(Integer keyId) {
|
||||
// ignore zk changes when running as master
|
||||
if (leaderElector.isMaster()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running as master, ignoring removed key "+keyId);
|
||||
}
|
||||
LOG.debug("Running as master, ignoring removed keyid={}", keyId);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing key "+keyId);
|
||||
LOG.debug("Removing keyid={}", keyId);
|
||||
}
|
||||
|
||||
allKeys.remove(keyId);
|
||||
|
@ -246,9 +240,7 @@ public class AuthenticationTokenSecretManager
|
|||
while (iter.hasNext()) {
|
||||
AuthenticationKey key = iter.next();
|
||||
if (key.getExpiration() < now) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing expired key "+key.getKeyId());
|
||||
}
|
||||
LOG.debug("Removing expired key {}", key);
|
||||
iter.remove();
|
||||
zkWatcher.removeKeyFromZK(key);
|
||||
}
|
||||
|
|
|
@ -91,6 +91,7 @@ public class ZKSecretWatcher extends ZKListener {
|
|||
try {
|
||||
Integer id = Integer.valueOf(keyId);
|
||||
secretManager.removeKey(id);
|
||||
LOG.info("Node deleted id={}", id);
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.error("Invalid znode name for key ID '"+keyId+"'", nfe);
|
||||
}
|
||||
|
|
|
@ -79,6 +79,7 @@ public class TestZooKeeper {
|
|||
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
|
||||
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockLoadBalancer.class,
|
||||
LoadBalancer.class);
|
||||
TEST_UTIL.startMiniDFSCluster(2);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -76,8 +76,8 @@ public class TestAlwaysSetScannerId {
|
|||
for (int i = 0; i < COUNT; i++) {
|
||||
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
|
||||
}
|
||||
HRI = table.getRegionLocator().getAllRegionLocations().get(0).getRegion();
|
||||
}
|
||||
HRI = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
|
||||
CONN =
|
||||
(AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
|
||||
STUB = CONN.getRegionServerStub(UTIL.getHBaseCluster().getRegionServer(0).getServerName());
|
||||
|
|
|
@ -127,12 +127,17 @@ public class TestSeparateClientZKCluster {
|
|||
// switch active master
|
||||
HMaster master = cluster.getMaster();
|
||||
master.stopMaster();
|
||||
LOG.info("Stopped master {}", master.getServerName());
|
||||
while (!master.isShutDown()) {
|
||||
Thread.sleep(200);
|
||||
}
|
||||
LOG.info("Shutdown master {}", master.getServerName());
|
||||
while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
|
||||
LOG.info("Get master {}", cluster.getMaster() == null? "null":
|
||||
cluster.getMaster().getServerName());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
LOG.info("Got master {}", cluster.getMaster().getServerName());
|
||||
// confirm client access still works
|
||||
Assert.assertTrue(admin.balance(false));
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -86,18 +87,16 @@ public class TestSplitOrMergeStatus {
|
|||
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
initSwitchStatus(admin);
|
||||
boolean result = admin.splitSwitch(false, false);
|
||||
assertTrue(result);
|
||||
assertTrue(admin.splitSwitch(false, false));
|
||||
try {
|
||||
admin.split(t.getName());
|
||||
fail();
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
fail("Shouldn't get here");
|
||||
} catch (DoNotRetryIOException dnioe) {
|
||||
// Expected
|
||||
}
|
||||
int count = admin.getRegions(tableName).size();
|
||||
assertTrue(originalCount == count);
|
||||
result = admin.splitSwitch(true, false);
|
||||
assertFalse(result);
|
||||
assertFalse(admin.splitSwitch(true, false));
|
||||
admin.split(t.getName());
|
||||
while ((count = admin.getRegions(tableName).size()) == originalCount) {
|
||||
Threads.sleep(1);
|
||||
|
|
|
@ -91,10 +91,11 @@ public class TestBucketCacheRefCnt {
|
|||
public void testBlockInRAMCache() throws IOException {
|
||||
cache = create(1, 1000);
|
||||
disableWriter();
|
||||
final String prefix = "testBlockInRamCache";
|
||||
try {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
HFileBlock blk = createBlock(i, 1020);
|
||||
BlockCacheKey key = createKey("testHFile-00", i);
|
||||
BlockCacheKey key = createKey(prefix, i);
|
||||
assertEquals(1, blk.refCnt());
|
||||
cache.cacheBlock(key, blk);
|
||||
assertEquals(i + 1, cache.getBlockCount());
|
||||
|
@ -113,7 +114,7 @@ public class TestBucketCacheRefCnt {
|
|||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
BlockCacheKey key = createKey("testHFile-00", i);
|
||||
BlockCacheKey key = createKey(prefix, i);
|
||||
Cacheable blk = cache.getBlock(key, false, false, false);
|
||||
assertEquals(3, blk.refCnt());
|
||||
assertFalse(blk.release());
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -101,7 +101,8 @@ public class TestMasterAbortWhileMergingTable {
|
|||
.getMiniHBaseCluster().getMaster().isInitialized());
|
||||
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getMasterProcedureExecutor().isFinished(procID));
|
||||
Assert.assertTrue("Found region RIT, that's impossible!",
|
||||
Assert.assertTrue("Found region RIT, that's impossible! " +
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionsInTransition(),
|
||||
UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager()
|
||||
.getRegionsInTransition().size() == 0);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -234,8 +234,12 @@ public class BalancerTestBase {
|
|||
int max = numRegions % numServers == 0 ? min : min + 1;
|
||||
|
||||
for (ServerAndLoad server : servers) {
|
||||
if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1 || server.getLoad() < min - tablenum/2 - 1)
|
||||
if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1 ||
|
||||
server.getLoad() < min - tablenum/2 - 1) {
|
||||
LOG.warn("server={}, load={}, max={}, tablenum={}, min={}",
|
||||
server.getServerName(), server.getLoad(), max, tablenum, min);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -304,16 +304,20 @@ public class TestSnapshotFromMaster {
|
|||
master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 0, response.getSnapshotsCount());
|
||||
|
||||
// NOTE: This is going to be flakey. Its timing based. For now made it more coarse
|
||||
// so more likely to pass though we have to hang around longer.
|
||||
|
||||
// write one snapshot to the fs
|
||||
createSnapshotWithTtl("snapshot_01", 1L);
|
||||
createSnapshotWithTtl("snapshot_02", 10L);
|
||||
createSnapshotWithTtl("snapshot_01", 5L);
|
||||
createSnapshotWithTtl("snapshot_02", 100L);
|
||||
|
||||
// check that we get one snapshot
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
|
||||
|
||||
// check that 1 snapshot is auto cleaned after 1 sec of TTL expiration
|
||||
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
|
||||
// Check that 1 snapshot is auto cleaned after 5 sec of TTL expiration. Wait 10 seconds
|
||||
// just in case.
|
||||
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
|
||||
response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
|
||||
assertEquals("Found unexpected number of snapshots", 1, response.getSnapshotsCount());
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,12 +19,9 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -48,12 +45,6 @@ public class TestSCPBase {
|
|||
|
||||
protected void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
conf.set("hbase.balancer.tablesOnMaster", "none");
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RS_COUNT);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
|
||||
conf.setBoolean("hbase.split.writer.creation.bounded", true);
|
||||
conf.setInt("hbase.regionserver.hlog.splitlog.writer.threads", 8);
|
||||
conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, true);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -66,7 +57,7 @@ public class TestSCPBase {
|
|||
}
|
||||
|
||||
protected void startMiniCluster() throws Exception {
|
||||
this.util.startMiniCluster(3);
|
||||
this.util.startMiniCluster(RS_COUNT);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -71,8 +71,10 @@ public class TestSplitWALProcedure {
|
|||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
|
||||
master.getMasterProcedureExecutor(), false);
|
||||
if (this.master != null) {
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
|
||||
master.getMasterProcedureExecutor(), false);
|
||||
}
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
|
|
@ -366,7 +366,8 @@ public class TestCompaction {
|
|||
/**
|
||||
* Test no new Compaction requests are generated after calling stop compactions
|
||||
*/
|
||||
@Test public void testStopStartCompaction() throws IOException {
|
||||
@Test
|
||||
public void testStopStartCompaction() throws IOException {
|
||||
// setup a compact/split thread on a mock server
|
||||
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
|
||||
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
|
||||
|
@ -379,19 +380,21 @@ public class TestCompaction {
|
|||
createStoreFile(r);
|
||||
}
|
||||
thread.switchCompaction(false);
|
||||
thread
|
||||
.requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
|
||||
null);
|
||||
thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
assertEquals(false, thread.isCompactionsEnabled());
|
||||
assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
|
||||
.getActiveCount());
|
||||
int longCompactions = thread.getLongCompactions().getActiveCount();
|
||||
int shortCompactions = thread.getShortCompactions().getActiveCount();
|
||||
assertEquals("longCompactions=" + longCompactions + "," +
|
||||
"shortCompactions=" + shortCompactions, 0, longCompactions + shortCompactions);
|
||||
thread.switchCompaction(true);
|
||||
assertEquals(true, thread.isCompactionsEnabled());
|
||||
thread
|
||||
.requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
|
||||
null);
|
||||
assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
|
||||
.getActiveCount());
|
||||
thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
longCompactions = thread.getLongCompactions().getActiveCount();
|
||||
shortCompactions = thread.getShortCompactions().getActiveCount();
|
||||
assertEquals("longCompactions=" + longCompactions + "," +
|
||||
"shortCompactions=" + shortCompactions, 1, longCompactions + shortCompactions);
|
||||
}
|
||||
|
||||
@Test public void testInterruptingRunningCompactions() throws Exception {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -90,14 +89,6 @@ public class TestRegionServerAbortTimeout {
|
|||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
// Wait the SCP of abort rs to finish
|
||||
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster()
|
||||
.getMaster()
|
||||
.getProcedures()
|
||||
.stream()
|
||||
.anyMatch(p -> p instanceof ServerCrashProcedure && p.isFinished()));
|
||||
UTIL.getAdmin().disableTable(TABLE_NAME);
|
||||
UTIL.getAdmin().deleteTable(TABLE_NAME);
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
|
@ -59,7 +59,8 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Testcase for HBASE-23098
|
||||
*/
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
// LargeTest because spins up four clusters.
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
@ -287,4 +288,4 @@ public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLo
|
|||
Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
|
||||
assertTrue(hfiles.isEmpty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -363,6 +363,9 @@ public class TestReplicationBase {
|
|||
if (htable1 != null) {
|
||||
htable1.close();
|
||||
}
|
||||
if (hbaseAdmin != null) {
|
||||
hbaseAdmin.close();
|
||||
}
|
||||
UTIL2.shutdownMiniCluster();
|
||||
UTIL1.shutdownMiniCluster();
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
|
@ -29,16 +29,21 @@ import org.apache.hadoop.hbase.ServerMetrics;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestReplicationStatus extends TestReplicationBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
@ -55,29 +60,31 @@ public class TestReplicationStatus extends TestReplicationBase {
|
|||
@Test
|
||||
public void testReplicationStatus() throws Exception {
|
||||
Admin hbaseAdmin = UTIL1.getAdmin();
|
||||
// disable peer
|
||||
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
|
||||
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||
|
||||
final byte[] qualName = Bytes.toBytes("q");
|
||||
Put p;
|
||||
|
||||
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||
p = new Put(Bytes.toBytes("row" + i));
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
|
||||
htable1.put(p);
|
||||
}
|
||||
|
||||
LOG.info("AFTER PUTS");
|
||||
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
|
||||
// change but sleeping in main thread seems to mess up background replication.
|
||||
// HACK! To address flakeyness.
|
||||
Threads.sleep(10000);
|
||||
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
|
||||
|
||||
for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster()
|
||||
.getRegionServerThreads()) {
|
||||
for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster().
|
||||
getRegionServerThreads()) {
|
||||
ServerName server = thread.getRegionServer().getServerName();
|
||||
assertTrue("" + server, metrics.getLiveServerMetrics().containsKey(server));
|
||||
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
|
||||
List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
|
||||
ReplicationLoadSink rLoadSink = sm.getReplicationLoadSink();
|
||||
|
||||
// check SourceList only has one entry, because only has one peer
|
||||
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
|
||||
assertEquals("Failed to get ReplicationLoadSourceList " +
|
||||
rLoadSourceList + ", " + server,1, rLoadSourceList.size());
|
||||
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
|
||||
|
||||
// check Sink exist only as it is difficult to verify the value on the fly
|
||||
|
@ -88,14 +95,38 @@ public class TestReplicationStatus extends TestReplicationBase {
|
|||
}
|
||||
|
||||
// Stop rs1, then the queue of rs1 will be transfered to rs0
|
||||
UTIL1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
|
||||
Thread.sleep(10000);
|
||||
metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
|
||||
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(1);
|
||||
hrs.stop("Stop RegionServer");
|
||||
while(!hrs.isShutDown()) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
// To be sure it dead and references cleaned up. TODO: Change this to a barrier.
|
||||
// I tried waiting on replication stats to change but sleeping in main thread
|
||||
// seems to mess up background replication.
|
||||
Threads.sleep(10000);
|
||||
ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
|
||||
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
|
||||
List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
|
||||
// check SourceList still only has one entry
|
||||
assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
|
||||
List<ReplicationLoadSource> rLoadSourceList = waitOnMetricsReport(1, server);
|
||||
// The remaining server should now have two queues -- the original and then the one that was
|
||||
// added because of failover. The original should still be PEER_ID2 though.
|
||||
assertEquals("Failed ReplicationLoadSourceList " + rLoadSourceList, 2, rLoadSourceList.size());
|
||||
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
|
||||
* greater than <code>greaterThan</code> for <code>serverName</code> before
|
||||
* returning. We want to avoid case where RS hasn't yet updated Master before
|
||||
* allowing test proceed.
|
||||
* @param greaterThan size of replicationLoadSourceList must be greater before we proceed
|
||||
*/
|
||||
private List<ReplicationLoadSource> waitOnMetricsReport(int greaterThan, ServerName serverName)
|
||||
throws IOException {
|
||||
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
|
||||
List<ReplicationLoadSource> list =
|
||||
metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
|
||||
while(list.size() <= greaterThan) {
|
||||
Threads.sleep(1000);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Abortable;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SecurityTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -145,30 +146,31 @@ public class TestZKSecretWatcher {
|
|||
KEY_MASTER.rollCurrentKey();
|
||||
AuthenticationKey key1 = KEY_MASTER.getCurrentKey();
|
||||
assertNotNull(key1);
|
||||
LOG.debug("Master current key: "+key1.getKeyId());
|
||||
LOG.debug("Master current key (key1) {}", key1);
|
||||
|
||||
// wait for slave to update
|
||||
Thread.sleep(1000);
|
||||
AuthenticationKey slaveCurrent = KEY_SLAVE.getCurrentKey();
|
||||
assertNotNull(slaveCurrent);
|
||||
assertEquals(key1, slaveCurrent);
|
||||
LOG.debug("Slave current key: "+slaveCurrent.getKeyId());
|
||||
LOG.debug("Slave current key (key1) {}", slaveCurrent);
|
||||
|
||||
// generate two more keys then expire the original
|
||||
KEY_MASTER.rollCurrentKey();
|
||||
AuthenticationKey key2 = KEY_MASTER.getCurrentKey();
|
||||
LOG.debug("Master new current key: "+key2.getKeyId());
|
||||
LOG.debug("Master new current key (key2) {}", key2);
|
||||
KEY_MASTER.rollCurrentKey();
|
||||
AuthenticationKey key3 = KEY_MASTER.getCurrentKey();
|
||||
LOG.debug("Master new current key: "+key3.getKeyId());
|
||||
LOG.debug("Master new current key (key3) {}", key3);
|
||||
|
||||
// force expire the original key
|
||||
key1.setExpiration(EnvironmentEdgeManager.currentTime() - 1000);
|
||||
key1.setExpiration(EnvironmentEdgeManager.currentTime() - 100000);
|
||||
KEY_MASTER.removeExpiredKeys();
|
||||
// verify removed from master
|
||||
assertNull(KEY_MASTER.getKey(key1.getKeyId()));
|
||||
|
||||
// wait for slave to catch up
|
||||
// Wait for slave to catch up. When remove hits KEY_SLAVE, we'll clear
|
||||
// the latch and will progress beyond the await.
|
||||
KEY_SLAVE.getLatch().await();
|
||||
// make sure the slave has both new keys
|
||||
AuthenticationKey slave2 = KEY_SLAVE.getKey(key2.getKeyId());
|
||||
|
@ -179,10 +181,13 @@ public class TestZKSecretWatcher {
|
|||
assertEquals(key3, slave3);
|
||||
slaveCurrent = KEY_SLAVE.getCurrentKey();
|
||||
assertEquals(key3, slaveCurrent);
|
||||
LOG.debug("Slave current key: "+slaveCurrent.getKeyId());
|
||||
LOG.debug("Slave current key (key3) {}", slaveCurrent);
|
||||
|
||||
// verify that the expired key has been removed
|
||||
assertNull(KEY_SLAVE.getKey(key1.getKeyId()));
|
||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30000,
|
||||
() -> KEY_SLAVE.getKey(key1.getKeyId()) == null);
|
||||
assertNull("key1=" + KEY_SLAVE.getKey(key1.getKeyId()),
|
||||
KEY_SLAVE.getKey(key1.getKeyId()));
|
||||
|
||||
// bring up a new slave
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
|
|
@ -90,7 +90,7 @@ public class TestThrift2ServerCmdLine extends TestThriftServerCmdLine {
|
|||
client.createTable(tTableDescriptor, new ArrayList<>());
|
||||
tableCreated = true;
|
||||
}
|
||||
Assert.assertTrue(client.tableExists(tTableName));
|
||||
Assert.assertTrue("tableCreated " + tableCreated, client.tableExists(tTableName));
|
||||
} finally {
|
||||
sock.close();
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -188,15 +188,14 @@ public class TestThriftHBaseServiceHandler {
|
|||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UTIL.getConfiguration().set("hbase.client.retries.number", "3");
|
||||
UTIL.startMiniCluster();
|
||||
Admin admin = UTIL.getAdmin();
|
||||
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname));
|
||||
for (HColumnDescriptor family : families) {
|
||||
tableDescriptor.addFamily(family);
|
||||
}
|
||||
admin.createTable(tableDescriptor);
|
||||
admin.close();
|
||||
try (Admin admin = UTIL.getAdmin()) {
|
||||
admin.createTable(tableDescriptor);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,30 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.BindException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.server.NIOServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnLog;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -54,13 +51,16 @@ public class MiniZooKeeperCluster {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
|
||||
|
||||
private static final int TICK_TIME = 2000;
|
||||
private static final int TIMEOUT = 1000;
|
||||
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
|
||||
private static final byte[] STATIC_BYTES = Bytes.toBytes("stat");
|
||||
private final int connectionTimeout;
|
||||
|
||||
private boolean started;
|
||||
|
||||
/** The default port. If zero, we use a random port. */
|
||||
/**
|
||||
* The default port. If zero, we use a random port.
|
||||
*/
|
||||
private int defaultClientPort = 0;
|
||||
|
||||
private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
|
||||
|
@ -83,8 +83,8 @@ public class MiniZooKeeperCluster {
|
|||
zooKeeperServers = new ArrayList<>();
|
||||
clientPortList = new ArrayList<>();
|
||||
standaloneServerFactoryList = new ArrayList<>();
|
||||
connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
|
||||
DEFAULT_CONNECTION_TIMEOUT);
|
||||
connectionTimeout = configuration
|
||||
.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", DEFAULT_CONNECTION_TIMEOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -101,8 +101,7 @@ public class MiniZooKeeperCluster {
|
|||
*
|
||||
* @return clientPortList the client port list
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public List<Integer> getClientPortList() {
|
||||
@VisibleForTesting public List<Integer> getClientPortList() {
|
||||
return clientPortList;
|
||||
}
|
||||
|
||||
|
@ -117,8 +116,7 @@ public class MiniZooKeeperCluster {
|
|||
|
||||
public void setDefaultClientPort(int clientPort) {
|
||||
if (clientPort <= 0) {
|
||||
throw new IllegalArgumentException("Invalid default ZK client port: "
|
||||
+ clientPort);
|
||||
throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort);
|
||||
}
|
||||
this.defaultClientPort = clientPort;
|
||||
}
|
||||
|
@ -190,15 +188,15 @@ public class MiniZooKeeperCluster {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param baseDir the base directory to use
|
||||
* @param baseDir the base directory to use
|
||||
* @param numZooKeeperServers the number of ZooKeeper servers
|
||||
* @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
|
||||
* another port.
|
||||
* @throws IOException if an operation fails during the startup
|
||||
* another port.
|
||||
* @throws IOException if an operation fails during the startup
|
||||
* @throws InterruptedException if the startup fails
|
||||
*/
|
||||
public int startup(File baseDir, int numZooKeeperServers) throws IOException,
|
||||
InterruptedException {
|
||||
public int startup(File baseDir, int numZooKeeperServers)
|
||||
throws IOException, InterruptedException {
|
||||
if (numZooKeeperServers <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -230,21 +228,19 @@ public class MiniZooKeeperCluster {
|
|||
|
||||
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
|
||||
// Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
|
||||
server.setMinSessionTimeout(configuration.getInt(
|
||||
"hbase.zookeeper.property.minSessionTimeout", -1));
|
||||
server.setMaxSessionTimeout(configuration.getInt(
|
||||
"hbase.zookeeper.property.maxSessionTimeout", -1));
|
||||
server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout",
|
||||
-1));
|
||||
server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout",
|
||||
-1));
|
||||
NIOServerCnxnFactory standaloneServerFactory;
|
||||
while (true) {
|
||||
try {
|
||||
standaloneServerFactory = new NIOServerCnxnFactory();
|
||||
standaloneServerFactory.configure(
|
||||
new InetSocketAddress(currentClientPort),
|
||||
standaloneServerFactory.configure(new InetSocketAddress(currentClientPort),
|
||||
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
|
||||
HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
|
||||
HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
|
||||
} catch (BindException e) {
|
||||
LOG.debug("Failed binding ZK Server to client port: " +
|
||||
currentClientPort, e);
|
||||
LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
|
||||
// We're told to use some port but it's occupied, fail
|
||||
if (hasValidClientPortInList(i)) {
|
||||
return -1;
|
||||
|
@ -257,11 +253,16 @@ public class MiniZooKeeperCluster {
|
|||
break;
|
||||
}
|
||||
|
||||
// Start up this ZK server
|
||||
// Start up this ZK server. Dump its stats.
|
||||
standaloneServerFactory.startup(server);
|
||||
LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir,
|
||||
getServerConfigurationOnOneLine(server));
|
||||
// Runs a 'stat' against the servers.
|
||||
if (!waitForServerUp(currentClientPort, connectionTimeout)) {
|
||||
throw new IOException("Waiting for startup of standalone server");
|
||||
Threads.printThreadInfo(System.out,
|
||||
"Why is zk standalone server not coming up?");
|
||||
throw new IOException("Waiting for startup of standalone server; " +
|
||||
"server isRunning=" + server.isRunning());
|
||||
}
|
||||
|
||||
// We have selected a port as a client port. Update clientPortList if necessary.
|
||||
|
@ -280,11 +281,28 @@ public class MiniZooKeeperCluster {
|
|||
activeZKServerIndex = 0;
|
||||
started = true;
|
||||
int clientPort = clientPortList.get(activeZKServerIndex);
|
||||
LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' on client port={}",
|
||||
clientPort);
|
||||
LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort);
|
||||
return clientPort;
|
||||
}
|
||||
|
||||
private String getServerConfigurationOnOneLine(ZooKeeperServer server) {
|
||||
StringWriter sw = new StringWriter();
|
||||
try (PrintWriter pw = new PrintWriter(sw) {
|
||||
@Override public void println(int x) {
|
||||
super.print(x);
|
||||
super.print(", ");
|
||||
}
|
||||
|
||||
@Override public void println(String x) {
|
||||
super.print(x);
|
||||
super.print(", ");
|
||||
}
|
||||
}) {
|
||||
server.dumpConf(pw);
|
||||
}
|
||||
return sw.toString();
|
||||
}
|
||||
|
||||
private void createDir(File dir) throws IOException {
|
||||
try {
|
||||
if (!dir.exists()) {
|
||||
|
@ -395,16 +413,12 @@ public class MiniZooKeeperCluster {
|
|||
LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort);
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
// XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure.
|
||||
private static boolean waitForServerDown(int port, long timeout) throws IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
try (Socket sock = new Socket("localhost", port)) {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write(STATIC_BYTES);
|
||||
outstream.flush();
|
||||
}
|
||||
send4LetterWord("localhost", port, "stat", (int)timeout);
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
}
|
||||
|
@ -413,7 +427,7 @@ public class MiniZooKeeperCluster {
|
|||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
Thread.sleep(TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
}
|
||||
|
@ -421,40 +435,31 @@ public class MiniZooKeeperCluster {
|
|||
return false;
|
||||
}
|
||||
|
||||
// XXX: From o.a.zk.t.ClientBase
|
||||
// XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar.
|
||||
// We remove the SSL/secure bit. Not used in here.
|
||||
private static boolean waitForServerUp(int port, long timeout) throws IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write(STATIC_BYTES);
|
||||
outstream.flush();
|
||||
|
||||
Reader isr = new InputStreamReader(sock.getInputStream());
|
||||
reader = new BufferedReader(isr);
|
||||
String line = reader.readLine();
|
||||
if (line != null && line.startsWith("Zookeeper version:")) {
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
sock.close();
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
String result = send4LetterWord("localhost", port, "stat", (int)timeout);
|
||||
if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) {
|
||||
return true;
|
||||
} else {
|
||||
LOG.debug("Read {}", result);
|
||||
}
|
||||
} catch (ConnectException e) {
|
||||
// ignore as this is expected, do not log stacktrace
|
||||
LOG.info("localhost:{} not up: {}", port, e.toString());
|
||||
} catch (IOException e) {
|
||||
// ignore as this is expected
|
||||
LOG.info("server localhost:{} not up {}", port, e);
|
||||
LOG.info("localhost:{} not up", port, e);
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() > start + timeout) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
Thread.sleep(TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue