HBASE-7681 Address some recent random test failures
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1439003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a3f0aea354
commit
1cdaa710be
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
@ -61,26 +62,27 @@ public class TestNodeHealthCheckChore {
|
||||||
Configuration config = getConfForNodeHealthScript();
|
Configuration config = getConfForNodeHealthScript();
|
||||||
config.addResource(healthScriptFile.getName());
|
config.addResource(healthScriptFile.getName());
|
||||||
String location = healthScriptFile.getAbsolutePath();
|
String location = healthScriptFile.getAbsolutePath();
|
||||||
long timeout = config.getLong(HConstants.HEALTH_SCRIPT_TIMEOUT, 100);
|
long timeout = config.getLong(HConstants.HEALTH_SCRIPT_TIMEOUT, 200);
|
||||||
|
|
||||||
|
HealthChecker checker = new HealthChecker();
|
||||||
|
checker.init(location, timeout);
|
||||||
|
|
||||||
String normalScript = "echo \"I am all fine\"";
|
String normalScript = "echo \"I am all fine\"";
|
||||||
createScript(normalScript, true);
|
createScript(normalScript, true);
|
||||||
HealthChecker checker = new HealthChecker();
|
|
||||||
checker.init(location, timeout);
|
|
||||||
HealthReport report = checker.checkHealth();
|
HealthReport report = checker.checkHealth();
|
||||||
assertTrue(report.getStatus() == HealthCheckerExitStatus.SUCCESS);
|
assertEquals(HealthCheckerExitStatus.SUCCESS, report.getStatus());
|
||||||
LOG.info("Health Status:" + checker);
|
LOG.info("Health Status:" + checker);
|
||||||
|
|
||||||
String errorScript = "echo ERROR\n echo \"Server not healthy\"";
|
String errorScript = "echo ERROR\n echo \"Server not healthy\"";
|
||||||
createScript(errorScript, true);
|
createScript(errorScript, true);
|
||||||
report = checker.checkHealth();
|
report = checker.checkHealth();
|
||||||
assertTrue(report.getStatus() == HealthCheckerExitStatus.FAILED);
|
assertEquals(HealthCheckerExitStatus.FAILED, report.getStatus());
|
||||||
LOG.info("Health Status:" + report.getHealthReport());
|
LOG.info("Health Status:" + report.getHealthReport());
|
||||||
|
|
||||||
String timeOutScript = "sleep 4\n echo\"I am fine\"";
|
String timeOutScript = "sleep 4\n echo\"I am fine\"";
|
||||||
createScript(timeOutScript, true);
|
createScript(timeOutScript, true);
|
||||||
report = checker.checkHealth();
|
report = checker.checkHealth();
|
||||||
assertTrue(report.getStatus() == HealthCheckerExitStatus.TIMED_OUT);
|
assertEquals(HealthCheckerExitStatus.TIMED_OUT, report.getStatus());
|
||||||
LOG.info("Health Status:" + report.getHealthReport());
|
LOG.info("Health Status:" + report.getHealthReport());
|
||||||
|
|
||||||
healthScriptFile.delete();
|
healthScriptFile.delete();
|
||||||
|
@ -119,7 +121,7 @@ public class TestNodeHealthCheckChore {
|
||||||
conf.set(HConstants.HEALTH_SCRIPT_LOC,
|
conf.set(HConstants.HEALTH_SCRIPT_LOC,
|
||||||
healthScriptFile.getAbsolutePath());
|
healthScriptFile.getAbsolutePath());
|
||||||
conf.setLong(HConstants.HEALTH_FAILURE_THRESHOLD, 3);
|
conf.setLong(HConstants.HEALTH_FAILURE_THRESHOLD, 3);
|
||||||
conf.setLong(HConstants.HEALTH_SCRIPT_TIMEOUT, 100);
|
conf.setLong(HConstants.HEALTH_SCRIPT_TIMEOUT, 200);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -859,120 +859,118 @@ public class TestAdmin {
|
||||||
assertFalse(admin.tableExists(tableName));
|
assertFalse(admin.tableExists(tableName));
|
||||||
final HTable table = TEST_UTIL.createTable(tableName, familyNames,
|
final HTable table = TEST_UTIL.createTable(tableName, familyNames,
|
||||||
numVersions, blockSize);
|
numVersions, blockSize);
|
||||||
try {
|
|
||||||
int rowCount = 0;
|
|
||||||
byte[] q = new byte[0];
|
|
||||||
|
|
||||||
// insert rows into column families. The number of rows that have values
|
int rowCount = 0;
|
||||||
// in a specific column family is decided by rowCounts[familyIndex]
|
byte[] q = new byte[0];
|
||||||
for (int index = 0; index < familyNames.length; index++) {
|
|
||||||
ArrayList<Put> puts = new ArrayList<Put>(rowCounts[index]);
|
|
||||||
for (int i = 0; i < rowCounts[index]; i++) {
|
|
||||||
byte[] k = Bytes.toBytes(i);
|
|
||||||
Put put = new Put(k);
|
|
||||||
put.add(familyNames[index], q, k);
|
|
||||||
puts.add(put);
|
|
||||||
}
|
|
||||||
table.put(puts);
|
|
||||||
|
|
||||||
if ( rowCount < rowCounts[index] ) {
|
// insert rows into column families. The number of rows that have values
|
||||||
rowCount = rowCounts[index];
|
// in a specific column family is decided by rowCounts[familyIndex]
|
||||||
}
|
for (int index = 0; index < familyNames.length; index++) {
|
||||||
|
ArrayList<Put> puts = new ArrayList<Put>(rowCounts[index]);
|
||||||
|
for (int i = 0; i < rowCounts[index]; i++) {
|
||||||
|
byte[] k = Bytes.toBytes(i);
|
||||||
|
Put put = new Put(k);
|
||||||
|
put.add(familyNames[index], q, k);
|
||||||
|
puts.add(put);
|
||||||
}
|
}
|
||||||
|
table.put(puts);
|
||||||
|
|
||||||
// get the initial layout (should just be one region)
|
if ( rowCount < rowCounts[index] ) {
|
||||||
Map<HRegionInfo, ServerName> m = table.getRegionLocations();
|
rowCount = rowCounts[index];
|
||||||
System.out.println("Initial regions (" + m.size() + "): " + m);
|
|
||||||
assertTrue(m.size() == 1);
|
|
||||||
|
|
||||||
// Verify row count
|
|
||||||
Scan scan = new Scan();
|
|
||||||
ResultScanner scanner = table.getScanner(scan);
|
|
||||||
int rows = 0;
|
|
||||||
for(@SuppressWarnings("unused") Result result : scanner) {
|
|
||||||
rows++;
|
|
||||||
}
|
}
|
||||||
scanner.close();
|
|
||||||
assertEquals(rowCount, rows);
|
|
||||||
|
|
||||||
// Have an outstanding scan going on to make sure we can scan over splits.
|
|
||||||
scan = new Scan();
|
|
||||||
scanner = table.getScanner(scan);
|
|
||||||
// Scan first row so we are into first region before split happens.
|
|
||||||
scanner.next();
|
|
||||||
|
|
||||||
final AtomicInteger count = new AtomicInteger(0);
|
|
||||||
Thread t = new Thread("CheckForSplit") {
|
|
||||||
public void run() {
|
|
||||||
for (int i = 0; i < 20; i++) {
|
|
||||||
try {
|
|
||||||
sleep(1000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// check again table = new HTable(conf, tableName);
|
|
||||||
Map<HRegionInfo, ServerName> regions = null;
|
|
||||||
try {
|
|
||||||
regions = table.getRegionLocations();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
if (regions == null) continue;
|
|
||||||
count.set(regions.size());
|
|
||||||
if (count.get() >= 2) break;
|
|
||||||
LOG.debug("Cycle waiting on split");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
t.start();
|
|
||||||
// Split the table
|
|
||||||
this.admin.split(tableName, splitPoint);
|
|
||||||
t.join();
|
|
||||||
|
|
||||||
// Verify row count
|
|
||||||
rows = 1; // We counted one row above.
|
|
||||||
for (@SuppressWarnings("unused") Result result : scanner) {
|
|
||||||
rows++;
|
|
||||||
if (rows > rowCount) {
|
|
||||||
scanner.close();
|
|
||||||
assertTrue("Scanned more than expected (" + rowCount + ")", false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
scanner.close();
|
|
||||||
assertEquals(rowCount, rows);
|
|
||||||
|
|
||||||
Map<HRegionInfo, ServerName> regions = null;
|
|
||||||
try {
|
|
||||||
regions = table.getRegionLocations();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
assertEquals(2, regions.size());
|
|
||||||
Set<HRegionInfo> hRegionInfos = regions.keySet();
|
|
||||||
HRegionInfo[] r = hRegionInfos.toArray(new HRegionInfo[hRegionInfos.size()]);
|
|
||||||
if (splitPoint != null) {
|
|
||||||
// make sure the split point matches our explicit configuration
|
|
||||||
assertEquals(Bytes.toString(splitPoint),
|
|
||||||
Bytes.toString(r[0].getEndKey()));
|
|
||||||
assertEquals(Bytes.toString(splitPoint),
|
|
||||||
Bytes.toString(r[1].getStartKey()));
|
|
||||||
LOG.debug("Properly split on " + Bytes.toString(splitPoint));
|
|
||||||
} else {
|
|
||||||
if (familyNames.length > 1) {
|
|
||||||
int splitKey = Bytes.toInt(r[0].getEndKey());
|
|
||||||
// check if splitKey is based on the largest column family
|
|
||||||
// in terms of it store size
|
|
||||||
int deltaForLargestFamily = Math.abs(rowCount/2 - splitKey);
|
|
||||||
for (int index = 0; index < familyNames.length; index++) {
|
|
||||||
int delta = Math.abs(rowCounts[index]/2 - splitKey);
|
|
||||||
assertTrue(delta >= deltaForLargestFamily);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
TEST_UTIL.deleteTable(tableName);
|
|
||||||
table.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the initial layout (should just be one region)
|
||||||
|
Map<HRegionInfo, ServerName> m = table.getRegionLocations();
|
||||||
|
System.out.println("Initial regions (" + m.size() + "): " + m);
|
||||||
|
assertTrue(m.size() == 1);
|
||||||
|
|
||||||
|
// Verify row count
|
||||||
|
Scan scan = new Scan();
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
int rows = 0;
|
||||||
|
for(@SuppressWarnings("unused") Result result : scanner) {
|
||||||
|
rows++;
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
assertEquals(rowCount, rows);
|
||||||
|
|
||||||
|
// Have an outstanding scan going on to make sure we can scan over splits.
|
||||||
|
scan = new Scan();
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
// Scan first row so we are into first region before split happens.
|
||||||
|
scanner.next();
|
||||||
|
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
Thread t = new Thread("CheckForSplit") {
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
try {
|
||||||
|
sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// check again table = new HTable(conf, tableName);
|
||||||
|
Map<HRegionInfo, ServerName> regions = null;
|
||||||
|
try {
|
||||||
|
regions = table.getRegionLocations();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
if (regions == null) continue;
|
||||||
|
count.set(regions.size());
|
||||||
|
if (count.get() >= 2) break;
|
||||||
|
LOG.debug("Cycle waiting on split");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
// Split the table
|
||||||
|
this.admin.split(tableName, splitPoint);
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
// Verify row count
|
||||||
|
rows = 1; // We counted one row above.
|
||||||
|
for (@SuppressWarnings("unused") Result result : scanner) {
|
||||||
|
rows++;
|
||||||
|
if (rows > rowCount) {
|
||||||
|
scanner.close();
|
||||||
|
assertTrue("Scanned more than expected (" + rowCount + ")", false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
assertEquals(rowCount, rows);
|
||||||
|
|
||||||
|
Map<HRegionInfo, ServerName> regions = null;
|
||||||
|
try {
|
||||||
|
regions = table.getRegionLocations();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
assertEquals(2, regions.size());
|
||||||
|
Set<HRegionInfo> hRegionInfos = regions.keySet();
|
||||||
|
HRegionInfo[] r = hRegionInfos.toArray(new HRegionInfo[hRegionInfos.size()]);
|
||||||
|
if (splitPoint != null) {
|
||||||
|
// make sure the split point matches our explicit configuration
|
||||||
|
assertEquals(Bytes.toString(splitPoint),
|
||||||
|
Bytes.toString(r[0].getEndKey()));
|
||||||
|
assertEquals(Bytes.toString(splitPoint),
|
||||||
|
Bytes.toString(r[1].getStartKey()));
|
||||||
|
LOG.debug("Properly split on " + Bytes.toString(splitPoint));
|
||||||
|
} else {
|
||||||
|
if (familyNames.length > 1) {
|
||||||
|
int splitKey = Bytes.toInt(r[0].getEndKey());
|
||||||
|
// check if splitKey is based on the largest column family
|
||||||
|
// in terms of it store size
|
||||||
|
int deltaForLargestFamily = Math.abs(rowCount/2 - splitKey);
|
||||||
|
for (int index = 0; index < familyNames.length; index++) {
|
||||||
|
int delta = Math.abs(rowCounts[index]/2 - splitKey);
|
||||||
|
assertTrue(delta >= deltaForLargestFamily);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
table.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -106,8 +106,6 @@ public class SimpleRegionObserver extends BaseRegionObserver {
|
||||||
Leases leases = re.getRegionServerServices().getLeases();
|
Leases leases = re.getRegionServerServices().getLeases();
|
||||||
leases.createLease("x", 2000, null);
|
leases.createLease("x", 2000, null);
|
||||||
leases.cancelLease("x");
|
leases.cancelLease("x");
|
||||||
Integer lid = re.getRegion().getLock(null, Bytes.toBytes("some row"), true);
|
|
||||||
re.getRegion().releaseRowLock(lid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -77,7 +77,7 @@ public class TestLruBlockCache {
|
||||||
int n = 0;
|
int n = 0;
|
||||||
while(cache.getEvictionCount() == 0) {
|
while(cache.getEvictionCount() == 0) {
|
||||||
Thread.sleep(200);
|
Thread.sleep(200);
|
||||||
assertTrue(n++ < 10);
|
assertTrue(n++ < 20);
|
||||||
}
|
}
|
||||||
System.out.println("Background Evictions run: " + cache.getEvictionCount());
|
System.out.println("Background Evictions run: " + cache.getEvictionCount());
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.UnsupportedEncodingException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.*;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -51,6 +53,7 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestImportTsv {
|
public class TestImportTsv {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestImportTsv.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTsvParserSpecParsing() {
|
public void testTsvParserSpecParsing() {
|
||||||
|
@ -266,7 +269,6 @@ public class TestImportTsv {
|
||||||
args = opts.getRemainingArgs();
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
|
@ -280,8 +282,11 @@ public class TestImportTsv {
|
||||||
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
|
if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
|
||||||
HTableDescriptor desc = new HTableDescriptor(TAB);
|
HTableDescriptor desc = new HTableDescriptor(TAB);
|
||||||
desc.addFamily(new HColumnDescriptor(FAM));
|
desc.addFamily(new HColumnDescriptor(FAM));
|
||||||
new HBaseAdmin(conf).createTable(desc);
|
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||||
|
admin.createTable(desc);
|
||||||
|
admin.close();
|
||||||
} else { // set the hbaseAdmin as we are not going through main()
|
} else { // set the hbaseAdmin as we are not going through main()
|
||||||
|
LOG.info("set the hbaseAdmin");
|
||||||
ImportTsv.createHbaseAdmin(conf);
|
ImportTsv.createHbaseAdmin(conf);
|
||||||
}
|
}
|
||||||
Job job = ImportTsv.createSubmittableJob(conf, args);
|
Job job = ImportTsv.createSubmittableJob(conf, args);
|
||||||
|
@ -323,6 +328,7 @@ public class TestImportTsv {
|
||||||
// continue
|
// continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
table.close();
|
||||||
assertTrue(verified);
|
assertTrue(verified);
|
||||||
} finally {
|
} finally {
|
||||||
htu1.shutdownMiniMapReduceCluster();
|
htu1.shutdownMiniMapReduceCluster();
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class TestSplitLogManager {
|
||||||
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
|
Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
|
||||||
Mockito.when(master.getServerManager()).thenReturn(sm);
|
Mockito.when(master.getServerManager()).thenReturn(sm);
|
||||||
|
|
||||||
to = 4000;
|
to = 6000;
|
||||||
conf.setInt("hbase.splitlog.manager.timeout", to);
|
conf.setInt("hbase.splitlog.manager.timeout", to);
|
||||||
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
|
conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
|
||||||
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
|
||||||
|
|
|
@ -140,7 +140,7 @@ public class TestSplitLogWorker {
|
||||||
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
|
SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), RS, neverEndingTask);
|
||||||
slw.start();
|
slw.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
|
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
assertTrue(slt.isOwned(RS));
|
assertTrue(slt.isOwned(RS));
|
||||||
|
@ -176,10 +176,10 @@ public class TestSplitLogWorker {
|
||||||
slw1.start();
|
slw1.start();
|
||||||
slw2.start();
|
slw2.start();
|
||||||
try {
|
try {
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
|
// Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
|
||||||
// not it, that we fell through to the next counter in line and it was set.
|
// not it, that we fell through to the next counter in line and it was set.
|
||||||
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1000, false) ||
|
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, 1500, false) ||
|
||||||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
|
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
|
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
@ -207,14 +207,14 @@ public class TestSplitLogWorker {
|
||||||
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
assertEquals(1, slw.taskReadySeq);
|
assertEquals(1, slw.taskReadySeq);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
byte [] bytes = ZKUtil.getData(zkw, PATH);
|
||||||
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
|
||||||
assertTrue(slt.isOwned(SRV));
|
assertTrue(slt.isOwned(SRV));
|
||||||
slt = new SplitLogTask.Unassigned(MANAGER);
|
slt = new SplitLogTask.Unassigned(MANAGER);
|
||||||
ZKUtil.setData(zkw, PATH, slt.toByteArray());
|
ZKUtil.setData(zkw, PATH, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500);
|
||||||
} finally {
|
} finally {
|
||||||
stopSplitLogWorker(slw);
|
stopSplitLogWorker(slw);
|
||||||
}
|
}
|
||||||
|
@ -235,7 +235,7 @@ public class TestSplitLogWorker {
|
||||||
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
|
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// create another task
|
// create another task
|
||||||
|
@ -247,9 +247,9 @@ public class TestSplitLogWorker {
|
||||||
final ServerName anotherWorker = new ServerName("another-worker,1,1");
|
final ServerName anotherWorker = new ServerName("another-worker,1,1");
|
||||||
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
|
SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
|
||||||
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
|
ZKUtil.setData(zkw, PATH1, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1500);
|
||||||
assertEquals(2, slw.taskReadySeq);
|
assertEquals(2, slw.taskReadySeq);
|
||||||
byte [] bytes = ZKUtil.getData(zkw, PATH2);
|
byte [] bytes = ZKUtil.getData(zkw, PATH2);
|
||||||
slt = SplitLogTask.parseFrom(bytes);
|
slt = SplitLogTask.parseFrom(bytes);
|
||||||
|
@ -274,25 +274,25 @@ public class TestSplitLogWorker {
|
||||||
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT);
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 1500);
|
||||||
// now the worker is busy doing the above task
|
// now the worker is busy doing the above task
|
||||||
|
|
||||||
// preempt the task, have it owned by another worker
|
// preempt the task, have it owned by another worker
|
||||||
ZKUtil.setData(zkw, task, slt.toByteArray());
|
ZKUtil.setData(zkw, task, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500);
|
||||||
|
|
||||||
// create a RESCAN node
|
// create a RESCAN node
|
||||||
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
|
String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
|
||||||
rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
|
||||||
CreateMode.PERSISTENT_SEQUENTIAL);
|
CreateMode.PERSISTENT_SEQUENTIAL);
|
||||||
|
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, 1500);
|
||||||
// RESCAN node might not have been processed if the worker became busy
|
// RESCAN node might not have been processed if the worker became busy
|
||||||
// with the above task. preempt the task again so that now the RESCAN
|
// with the above task. preempt the task again so that now the RESCAN
|
||||||
// node is processed
|
// node is processed
|
||||||
ZKUtil.setData(zkw, task, slt.toByteArray());
|
ZKUtil.setData(zkw, task, slt.toByteArray());
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, 1500);
|
||||||
waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1000);
|
waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, 1500);
|
||||||
|
|
||||||
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
|
||||||
LOG.debug(nodes);
|
LOG.debug(nodes);
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class TestThriftServerCmdLine {
|
||||||
cmdLineThread.start();
|
cmdLineThread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60 * 1000)
|
@Test(timeout=120 * 1000)
|
||||||
public void testRunThriftServer() throws Exception {
|
public void testRunThriftServer() throws Exception {
|
||||||
List<String> args = new ArrayList<String>();
|
List<String> args = new ArrayList<String>();
|
||||||
if (implType != null) {
|
if (implType != null) {
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class TestMiniClusterLoadSequential {
|
||||||
protected static final byte[] CF = Bytes.toBytes("load_test_cf");
|
protected static final byte[] CF = Bytes.toBytes("load_test_cf");
|
||||||
protected static final int NUM_THREADS = 8;
|
protected static final int NUM_THREADS = 8;
|
||||||
protected static final int NUM_RS = 2;
|
protected static final int NUM_RS = 2;
|
||||||
protected static final int TIMEOUT_MS = 120000;
|
protected static final int TIMEOUT_MS = 180000;
|
||||||
protected static final HBaseTestingUtility TEST_UTIL =
|
protected static final HBaseTestingUtility TEST_UTIL =
|
||||||
new HBaseTestingUtility();
|
new HBaseTestingUtility();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue