HBASE-27252 Clean up error-prone findings in hbase-it

Close #4662

Co-authored-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2022-08-20 23:29:20 +08:00 committed by Duo Zhang
parent 61cd63c7b6
commit 1004876bad
46 changed files with 240 additions and 214 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.chaos;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -224,10 +225,11 @@ public class ChaosAgent implements Watcher, Closeable, Runnable {
} catch (Exception e) { } catch (Exception e) {
break; break;
} }
zk.getData(path, false, getTaskForExecutionCallback, new String(data)); zk.getData(path, false, getTaskForExecutionCallback,
new String(data, StandardCharsets.UTF_8));
break; break;
case OK: case OK:
String cmd = new String(data); String cmd = new String(data, StandardCharsets.UTF_8);
LOG.info("Executing command : " + cmd); LOG.info("Executing command : " + cmd);
String status = ChaosConstants.TASK_COMPLETION_STRING; String status = ChaosConstants.TASK_COMPLETION_STRING;
try { try {
@ -368,7 +370,8 @@ public class ChaosAgent implements Watcher, Closeable, Runnable {
*/ */
public void setStatusOfTaskZNode(String taskZNode, String status) { public void setStatusOfTaskZNode(String taskZNode, String status) {
LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status); LOG.info("Setting status of Task ZNode: " + taskZNode + " status : " + status);
zk.setData(taskZNode, status.getBytes(), -1, setStatusOfTaskZNodeCallback, null); zk.setData(taskZNode, status.getBytes(StandardCharsets.UTF_8), -1, setStatusOfTaskZNodeCallback,
null);
} }
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -111,7 +112,7 @@ public class ChaosZKClient {
zk.create( zk.create(
CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname() CHAOS_AGENT_STATUS_ZNODE + ZNODE_PATH_SEPARATOR + taskObject.getTaskHostname()
+ ZNODE_PATH_SEPARATOR + TASK_PREFIX, + ZNODE_PATH_SEPARATOR + TASK_PREFIX,
taskObject.getCommand().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, taskObject.getCommand().getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject); CreateMode.EPHEMERAL_SEQUENTIAL, submitTaskCallback, taskObject);
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
@ -189,7 +190,7 @@ public class ChaosZKClient {
case OK: case OK:
if (ctx != null) { if (ctx != null) {
String status = new String(data); String status = new String(data, StandardCharsets.UTF_8);
taskStatus = status; taskStatus = status;
switch (status) { switch (status) {
case TASK_COMPLETION_STRING: case TASK_COMPLETION_STRING:

View File

@ -40,7 +40,7 @@ interface ClusterManager extends Configurable {
HBASE_MASTER("master"), HBASE_MASTER("master"),
HBASE_REGIONSERVER("regionserver"); HBASE_REGIONSERVER("regionserver");
private String name; private final String name;
ServiceType(String name) { ServiceType(String name) {
this.name = name; this.name = name;

View File

@ -344,9 +344,9 @@ public class DistributedHBaseCluster extends HBaseClusterInterface {
// do a best effort restore // do a best effort restore
boolean success = true; boolean success = true;
success = restoreMasters(initial, current) & success; success = restoreMasters(initial, current) && success;
success = restoreRegionServers(initial, current) & success; success = restoreRegionServers(initial, current) && success;
success = restoreAdmin() & success; success = restoreAdmin() && success;
LOG.info("Restoring cluster - done"); LOG.info("Restoring cluster - done");
return success; return success;

View File

@ -280,22 +280,14 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
*/ */
static class ZookeeperShellCommandProvider extends CommandProvider { static class ZookeeperShellCommandProvider extends CommandProvider {
private final String zookeeperHome; private final String zookeeperHome;
private final String confDir;
ZookeeperShellCommandProvider(Configuration conf) throws IOException { ZookeeperShellCommandProvider(Configuration conf) throws IOException {
zookeeperHome = zookeeperHome =
conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR")); conf.get("hbase.it.clustermanager.zookeeper.home", System.getenv("ZOOBINDIR"));
String tmp =
conf.get("hbase.it.clustermanager.zookeeper.conf.dir", System.getenv("ZOOCFGDIR"));
if (zookeeperHome == null) { if (zookeeperHome == null) {
throw new IOException("ZooKeeper home configuration parameter i.e. " throw new IOException("ZooKeeper home configuration parameter i.e. "
+ "'hbase.it.clustermanager.zookeeper.home' is not configured properly."); + "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
} }
if (tmp != null) {
confDir = String.format("--config %s", tmp);
} else {
confDir = "";
}
} }
@Override @Override

View File

@ -244,14 +244,12 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
private void runTestSingle(TableName table) throws IOException { private void runTestSingle(TableName table) throws IOException {
List<String> backupIds = new ArrayList<String>(); List<String> backupIds = new ArrayList<String>();
List<Integer> tableSizes = new ArrayList<Integer>();
try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin(); try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin();
BackupAdmin client = new BackupAdminImpl(conn);) { BackupAdmin client = new BackupAdminImpl(conn);) {
// #0- insert some data to table 'table' // #0- insert some data to table 'table'
loadData(table, rowsInIteration); loadData(table, rowsInIteration);
tableSizes.add(rowsInIteration);
// #1 - create full backup for table first // #1 - create full backup for table first
LOG.info("create full backup image for {}", table); LOG.info("create full backup image for {}", table);
@ -270,7 +268,6 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
// Load data // Load data
loadData(table, rowsInIteration); loadData(table, rowsInIteration);
tableSizes.add(rowsInIteration * count);
// Do incremental backup // Do incremental backup
builder = new BackupRequest.Builder(); builder = new BackupRequest.Builder();
request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
@ -321,10 +318,7 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
return arr; return arr;
} }
/** /** Returns status of backup */
* @param backupId pass backup ID to check status of
* @return status of backup
*/
protected boolean checkSucceeded(String backupId) throws IOException { protected boolean checkSucceeded(String backupId) throws IOException {
BackupInfo status = getBackupInfo(backupId); BackupInfo status = getBackupInfo(backupId);
if (status == null) { if (status == null) {
@ -428,9 +422,6 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
.add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString());
} }
/**
* @param args argument list
*/
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf); IntegrationTestingUtility.setUseDistributedCluster(conf);

View File

@ -36,6 +36,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
/** /**
@ -63,7 +64,6 @@ public class IntegrationTestIngest extends IntegrationTestBase {
// Log is being used in IntegrationTestIngestWithEncryption, hence it is protected // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class); protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class);
protected IntegrationTestingUtility util;
protected HBaseClusterInterface cluster; protected HBaseClusterInterface cluster;
protected LoadTestTool loadTool; protected LoadTestTool loadTool;
@ -137,7 +137,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
families.add(Bytes.toString(family)); families.add(Bytes.toString(family));
} }
} else { } else {
for (String family : familiesString.split(",")) { for (String family : Splitter.on(',').split(familiesString)) {
families.add(family); families.add(family);
} }
} }
@ -168,8 +168,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
LOG.info("Intended run time: " + (runtime / 60000) + " min, left:" LOG.info("Intended run time: " + (runtime / 60000) + " min, left:"
+ ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min"); + ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min");
int ret = -1; int ret = loadTool.run(getArgsForLoadTestTool("-write",
ret = loadTool.run(getArgsForLoadTestTool("-write",
String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
if (0 != ret) { if (0 != ret) {
String errorMsg = "Load failed with error code " + ret; String errorMsg = "Load failed with error code " + ret;

View File

@ -105,6 +105,7 @@ public class IntegrationTestIngestWithMOB extends IntegrationTestIngest {
} }
@Test @Test
@Override
public void testIngest() throws Exception { public void testIngest() throws Exception {
runIngestTest(JUNIT_RUN_TIME, 100, 10, 1024, 10, 20); runIngestTest(JUNIT_RUN_TIME, 100, 10, 1024, 10, 20);
} }

View File

@ -58,7 +58,9 @@ import org.slf4j.LoggerFactory;
public class IntegrationTestLazyCfLoading { public class IntegrationTestLazyCfLoading {
private static final TableName TABLE_NAME = private static final TableName TABLE_NAME =
TableName.valueOf(IntegrationTestLazyCfLoading.class.getSimpleName()); TableName.valueOf(IntegrationTestLazyCfLoading.class.getSimpleName());
@SuppressWarnings("InlineFormatString")
private static final String TIMEOUT_KEY = "hbase.%s.timeout"; private static final String TIMEOUT_KEY = "hbase.%s.timeout";
@SuppressWarnings("InlineFormatString")
private static final String ENCODING_KEY = "hbase.%s.datablock.encoding"; private static final String ENCODING_KEY = "hbase.%s.datablock.encoding";
/** A soft test timeout; duration of the test, as such, depends on number of keys to put. */ /** A soft test timeout; duration of the test, as such, depends on number of keys to put. */

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -226,7 +227,7 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
} }
class MajorCompaction implements Runnable { static class MajorCompaction implements Runnable {
@Override @Override
public void run() { public void run() {
@ -242,7 +243,7 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
} }
} }
class CleanMobAndArchive implements Runnable { static class CleanMobAndArchive implements Runnable {
@Override @Override
public void run() { public void run() {
@ -257,7 +258,7 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
Thread.sleep(130000); Thread.sleep(130000);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); LOG.warn("Exception in CleanMobAndArchive", e);
} }
} }
} }
@ -288,7 +289,8 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException ee) { } catch (InterruptedException ee) {
// Restore interrupt status
Thread.currentThread().interrupt();
} }
} }
if (i % 100000 == 0) { if (i % 100000 == 0) {
@ -323,7 +325,7 @@ public class IntegrationTestMobCompaction extends IntegrationTestBase {
Thread.sleep(1000); Thread.sleep(1000);
} }
getNumberOfMobFiles(conf, table.getName(), new String(fam)); getNumberOfMobFiles(conf, table.getName(), new String(fam, StandardCharsets.UTF_8));
LOG.info("Waiting for write thread to finish ..."); LOG.info("Waiting for write thread to finish ...");
writeData.join(); writeData.join();
// Cleanup again // Cleanup again

View File

@ -23,9 +23,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
@ -104,7 +104,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
* Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}. * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
*/ */
static class PerfEvalCallable implements Callable<TimingResult> { static class PerfEvalCallable implements Callable<TimingResult> {
private final Queue<String> argv = new LinkedList<>(); private final Queue<String> argv = new ArrayDeque<>();
private final Admin admin; private final Admin admin;
public PerfEvalCallable(Admin admin, String argv) { public PerfEvalCallable(Admin admin, String argv) {

View File

@ -175,7 +175,6 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge
int verifyPercent = 100; int verifyPercent = 100;
int updatePercent = 20; int updatePercent = 20;
int ret = -1;
int regionReplicaId = int regionReplicaId =
conf.getInt(String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1); conf.getInt(String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
@ -191,7 +190,7 @@ public class IntegrationTestRegionReplicaReplication extends IntegrationTestInge
args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID); args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
args.add(String.valueOf(regionReplicaId)); args.add(String.valueOf(regionReplicaId));
ret = loadTool.run(args.toArray(new String[args.size()])); int ret = loadTool.run(args.toArray(new String[args.size()]));
if (0 != ret) { if (0 != ret) {
String errorMsg = "Load failed with error code " + ret; String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg); LOG.error(errorMsg);

View File

@ -116,14 +116,13 @@ public class IntegrationTestingUtility extends HBaseTestingUtil {
} }
/** /**
* @return whether we are interacting with a distributed cluster as opposed to and in-process mini * Returns whether we are interacting with a distributed cluster as opposed to and in-process mini
* cluster or a local cluster. * cluster or a local cluster.
* @see IntegrationTestingUtility#setUseDistributedCluster(Configuration) * @see IntegrationTestingUtility#setUseDistributedCluster(Configuration)
*/ */
public boolean isDistributedCluster() { public boolean isDistributedCluster() {
Configuration conf = getConfiguration(); Configuration conf = getConfiguration();
boolean isDistributedCluster = false; boolean isDistributedCluster =
isDistributedCluster =
Boolean.parseBoolean(System.getProperty(IS_DISTRIBUTED_CLUSTER, "false")); Boolean.parseBoolean(System.getProperty(IS_DISTRIBUTED_CLUSTER, "false"));
if (!isDistributedCluster) { if (!isDistributedCluster) {
isDistributedCluster = conf.getBoolean(IS_DISTRIBUTED_CLUSTER, false); isDistributedCluster = conf.getBoolean(IS_DISTRIBUTED_CLUSTER, false);

View File

@ -47,7 +47,7 @@ public class IntegrationTestsDriver extends AbstractHBaseTool {
System.exit(ret); System.exit(ret);
} }
private class IntegrationTestFilter extends ClassTestFinder.TestClassFilter { private static class IntegrationTestFilter extends ClassTestFinder.TestClassFilter {
private Pattern testFilterRe = Pattern.compile(".*\\.IntegrationTest.*"); private Pattern testFilterRe = Pattern.compile(".*\\.IntegrationTest.*");
public IntegrationTestFilter() { public IntegrationTestFilter() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -43,6 +44,8 @@ import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/** /**
@ -108,10 +111,12 @@ public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
int minValueSize = 0, maxValueSize = 0; int minValueSize = 0, maxValueSize = 0;
String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
if (valueSize.contains(":")) { if (valueSize.contains(":")) {
String[] valueSizes = valueSize.split(":"); List<String> valueSizes = Splitter.on(':').splitToList(valueSize);
if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize); if (valueSizes.size() != 2) {
minValueSize = Integer.parseInt(valueSizes[0]); throw new RuntimeException("Invalid value size: " + valueSize);
maxValueSize = Integer.parseInt(valueSizes[1]); }
minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0));
maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1));
} else { } else {
minValueSize = maxValueSize = Integer.parseInt(valueSize); minValueSize = maxValueSize = Integer.parseInt(valueSize);
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -106,7 +107,8 @@ public class TestShellExecEndpointCoprocessor {
assertFalse("the response from a background task should have no stderr", resp.hasStderr()); assertFalse("the response from a background task should have no stderr", resp.hasStderr());
Waiter.waitFor(conn.getConfiguration(), 5_000, () -> testFile.length() > 0); Waiter.waitFor(conn.getConfiguration(), 5_000, () -> testFile.length() > 0);
final String content = new String(Files.readAllBytes(testFile.toPath())).trim(); final String content =
new String(Files.readAllBytes(testFile.toPath()), StandardCharsets.UTF_8).trim();
assertEquals("hello world", content); assertEquals("hello world", content);
} }

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -277,13 +276,13 @@ public abstract class Action {
protected void unbalanceRegions(ClusterMetrics clusterStatus, List<ServerName> fromServers, protected void unbalanceRegions(ClusterMetrics clusterStatus, List<ServerName> fromServers,
List<ServerName> toServers, double fractionOfRegions) throws Exception { List<ServerName> toServers, double fractionOfRegions) throws Exception {
List<byte[]> victimRegions = new LinkedList<>(); List<byte[]> victimRegions = new ArrayList<>();
for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics() for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics()
.entrySet()) { .entrySet()) {
ServerName sn = entry.getKey(); ServerName sn = entry.getKey();
ServerMetrics serverLoad = entry.getValue(); ServerMetrics serverLoad = entry.getValue();
// Ugh. // Ugh.
List<byte[]> regions = new LinkedList<>(serverLoad.getRegionMetrics().keySet()); List<byte[]> regions = new ArrayList<>(serverLoad.getRegionMetrics().keySet());
int victimRegionCount = (int) Math.ceil(fractionOfRegions * regions.size()); int victimRegionCount = (int) Math.ceil(fractionOfRegions * regions.size());
getLogger().debug("Removing {} regions from {}", victimRegionCount, sn); getLogger().debug("Removing {} regions from {}", victimRegionCount, sn);
Random rand = ThreadLocalRandom.current(); Random rand = ThreadLocalRandom.current();

View File

@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
*/ */
public class AddCPULoadAction extends SudoCommandAction { public class AddCPULoadAction extends SudoCommandAction {
private static final Logger LOG = LoggerFactory.getLogger(AddCPULoadAction.class); private static final Logger LOG = LoggerFactory.getLogger(AddCPULoadAction.class);
@SuppressWarnings("InlineFormatString")
private static final String CPU_LOAD_COMMAND = private static final String CPU_LOAD_COMMAND =
"seq 1 %s | xargs -I{} -n 1 -P %s timeout %s dd if=/dev/urandom of=/dev/null bs=1M " "seq 1 %s | xargs -I{} -n 1 -P %s timeout %s dd if=/dev/urandom of=/dev/null bs=1M "
+ "iflag=fullblock"; + "iflag=fullblock";
@ -51,6 +52,7 @@ public class AddCPULoadAction extends SudoCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute AddCPULoadAction"); getLogger().info("Starting to execute AddCPULoadAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -28,16 +28,10 @@ import org.slf4j.LoggerFactory;
* Action that tries to adjust the bloom filter setting on all the columns of a table * Action that tries to adjust the bloom filter setting on all the columns of a table
*/ */
public class ChangeBloomFilterAction extends Action { public class ChangeBloomFilterAction extends Action {
private final long sleepTime;
private final TableName tableName;
private static final Logger LOG = LoggerFactory.getLogger(ChangeBloomFilterAction.class); private static final Logger LOG = LoggerFactory.getLogger(ChangeBloomFilterAction.class);
private final TableName tableName;
public ChangeBloomFilterAction(TableName tableName) { public ChangeBloomFilterAction(TableName tableName) {
this(-1, tableName);
}
public ChangeBloomFilterAction(int sleepTime, TableName tableName) {
this.sleepTime = sleepTime;
this.tableName = tableName; this.tableName = tableName;
} }

View File

@ -50,6 +50,7 @@ public class CorruptPacketsCommandAction extends TCCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute CorruptPacketsCommandAction"); getLogger().info("Starting to execute CorruptPacketsCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -50,6 +50,7 @@ public class DelayPacketsCommandAction extends TCCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute DelayPacketsCommandAction"); getLogger().info("Starting to execute DelayPacketsCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -50,6 +50,7 @@ public class DuplicatePacketsCommandAction extends TCCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute DuplicatePacketsCommandAction"); getLogger().info("Starting to execute DuplicatePacketsCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -54,6 +54,7 @@ public class FillDiskCommandAction extends SudoCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute FillDiskCommandAction"); getLogger().info("Starting to execute FillDiskCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -50,6 +50,7 @@ public class LosePacketsCommandAction extends TCCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute LosePacketsCommandAction"); getLogger().info("Starting to execute LosePacketsCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -54,6 +54,7 @@ public class ReorderPacketsCommandAction extends TCCommandAction {
return LOG; return LOG;
} }
@Override
protected void localPerform() throws IOException { protected void localPerform() throws IOException {
getLogger().info("Starting to execute ReorderPacketsCommandAction"); getLogger().info("Starting to execute ReorderPacketsCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
@ -56,7 +56,7 @@ public class RestartRandomDataNodeAction extends RestartActionBaseAction {
DistributedFileSystem fs = DistributedFileSystem fs =
(DistributedFileSystem) CommonFSUtils.getRootDir(getConf()).getFileSystem(getConf()); (DistributedFileSystem) CommonFSUtils.getRootDir(getConf()).getFileSystem(getConf());
DFSClient dfsClient = fs.getClient(); DFSClient dfsClient = fs.getClient();
List<ServerName> hosts = new LinkedList<>(); List<ServerName> hosts = new ArrayList<>();
for (DatanodeInfo dataNode : dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)) { for (DatanodeInfo dataNode : dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)) {
hosts.add(ServerName.valueOf(dataNode.getHostName(), -1, -1)); hosts.add(ServerName.valueOf(dataNode.getHostName(), -1, -1));
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
@ -66,11 +67,13 @@ public class RollingBatchRestartRsAction extends BatchRestartRsAction {
} }
@Override @Override
// deadServers is both list and queue here, a valid use case for LinkedList
@SuppressWarnings("JdkObsolete")
public void perform() throws Exception { public void perform() throws Exception {
getLogger().info("Performing action: Rolling batch restarting {}% of region servers", getLogger().info("Performing action: Rolling batch restarting {}% of region servers",
(int) (ratio * 100)); (int) (ratio * 100));
List<ServerName> selectedServers = selectServers(); List<ServerName> selectedServers = selectServers();
Queue<ServerName> serversToBeKilled = new LinkedList<>(selectedServers); Queue<ServerName> serversToBeKilled = new ArrayDeque<>(selectedServers);
LinkedList<ServerName> deadServers = new LinkedList<>(); LinkedList<ServerName> deadServers = new LinkedList<>();
Random rand = ThreadLocalRandom.current(); Random rand = ThreadLocalRandom.current();
// loop while there are servers to be killed or dead servers to be restarted // loop while there are servers to be killed or dead servers to be restarted

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.ArrayDeque;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Random; import java.util.Random;
@ -68,8 +68,8 @@ public class RollingBatchSuspendResumeRsAction extends Action {
getLogger().info("Performing action: Rolling batch restarting {}% of region servers", getLogger().info("Performing action: Rolling batch restarting {}% of region servers",
(int) (ratio * 100)); (int) (ratio * 100));
List<ServerName> selectedServers = selectServers(); List<ServerName> selectedServers = selectServers();
Queue<ServerName> serversToBeSuspended = new LinkedList<>(selectedServers); Queue<ServerName> serversToBeSuspended = new ArrayDeque<>(selectedServers);
Queue<ServerName> suspendedServers = new LinkedList<>(); Queue<ServerName> suspendedServers = new ArrayDeque<>();
Random rand = ThreadLocalRandom.current(); Random rand = ThreadLocalRandom.current();
// loop while there are servers to be suspended or suspended servers to be resumed // loop while there are servers to be suspended or suspended servers to be resumed
while ( while (

View File

@ -38,6 +38,7 @@ public class SplitAllRegionOfTableAction extends Action {
this.tableName = tableName; this.tableName = tableName;
} }
@Override
public void init(ActionContext context) throws IOException { public void init(ActionContext context) throws IOException {
super.init(context); super.init(context);
this.maxFullTableSplits = getConf().getInt(MAX_SPLIT_KEY, DEFAULT_MAX_SPLITS); this.maxFullTableSplits = getConf().getInt(MAX_SPLIT_KEY, DEFAULT_MAX_SPLITS);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.chaos.actions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -66,7 +65,7 @@ public class UnbalanceKillAndRebalanceAction extends Action {
@Override @Override
public void perform() throws Exception { public void perform() throws Exception {
ClusterMetrics status = this.cluster.getClusterMetrics(); ClusterMetrics status = this.cluster.getClusterMetrics();
List<ServerName> victimServers = new LinkedList<>(status.getLiveServerMetrics().keySet()); List<ServerName> victimServers = new ArrayList<>(status.getLiveServerMetrics().keySet());
Set<ServerName> killedServers = new HashSet<>(); Set<ServerName> killedServers = new HashSet<>();
int liveCount = (int) Math.ceil(FRC_SERVERS_THAT_HOARD_AND_LIVE * victimServers.size()); int liveCount = (int) Math.ceil(FRC_SERVERS_THAT_HOARD_AND_LIVE * victimServers.size());
int deadCount = (int) Math.ceil(FRC_SERVERS_THAT_HOARD_AND_DIE * victimServers.size()); int deadCount = (int) Math.ceil(FRC_SERVERS_THAT_HOARD_AND_DIE * victimServers.size());

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.chaos.actions; package org.apache.hadoop.hbase.chaos.actions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -55,7 +54,7 @@ public class UnbalanceRegionsAction extends Action {
public void perform() throws Exception { public void perform() throws Exception {
getLogger().info("Unbalancing regions"); getLogger().info("Unbalancing regions");
ClusterMetrics status = this.cluster.getClusterMetrics(); ClusterMetrics status = this.cluster.getClusterMetrics();
List<ServerName> victimServers = new LinkedList<>(status.getLiveServerMetrics().keySet()); List<ServerName> victimServers = new ArrayList<>(status.getLiveServerMetrics().keySet());
int targetServerCount = (int) Math.ceil(fractionOfServers * victimServers.size()); int targetServerCount = (int) Math.ceil(fractionOfServers * victimServers.size());
List<ServerName> targetServers = new ArrayList<>(targetServerCount); List<ServerName> targetServers = new ArrayList<>(targetServerCount);
Random rand = ThreadLocalRandom.current(); Random rand = ThreadLocalRandom.current();

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.chaos.factories; package org.apache.hadoop.hbase.chaos.factories;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.List;
import java.util.function.Function; import java.util.function.Function;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.chaos.actions.Action; import org.apache.hadoop.hbase.chaos.actions.Action;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
public class ConfigurableSlowDeterministicMonkeyFactory extends SlowDeterministicMonkeyFactory { public class ConfigurableSlowDeterministicMonkeyFactory extends SlowDeterministicMonkeyFactory {
private static final Logger LOG = private static final Logger LOG =
@ -32,6 +36,7 @@ public class ConfigurableSlowDeterministicMonkeyFactory extends SlowDeterministi
final static String HEAVY_ACTIONS = "heavy.actions"; final static String HEAVY_ACTIONS = "heavy.actions";
final static String TABLE_PARAM = "\\$table_name"; final static String TABLE_PARAM = "\\$table_name";
@SuppressWarnings("ImmutableEnumChecker")
public enum SupportedTypes { public enum SupportedTypes {
FLOAT(p -> Float.parseFloat(p)), FLOAT(p -> Float.parseFloat(p)),
LONG(p -> Long.parseLong(p)), LONG(p -> Long.parseLong(p)),
@ -56,12 +61,13 @@ public class ConfigurableSlowDeterministicMonkeyFactory extends SlowDeterministi
return super.getHeavyWeightedActions(); return super.getHeavyWeightedActions();
} else { } else {
try { try {
String[] actionClasses = actions.split(";"); List<String> actionClasses = Splitter.on(';').splitToList(actions);
Action[] heavyActions = new Action[actionClasses.length]; Action[] heavyActions = new Action[actionClasses.size()];
for (int i = 0; i < actionClasses.length; i++) { int i = 0;
heavyActions[i] = instantiateAction(actionClasses[i]); for (String action : actionClasses) {
heavyActions[i++] = instantiateAction(action);
} }
LOG.info("Created actions {}", heavyActions); LOG.info("Created actions {}", (Object[]) heavyActions); // non-varargs call to LOG#info
return heavyActions; return heavyActions;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error trying to instantiate heavy actions. Returning null array.", e); LOG.error("Error trying to instantiate heavy actions. Returning null array.", e);
@ -72,10 +78,13 @@ public class ConfigurableSlowDeterministicMonkeyFactory extends SlowDeterministi
private Action instantiateAction(String actionString) throws Exception { private Action instantiateAction(String actionString) throws Exception {
final String packageName = "org.apache.hadoop.hbase.chaos.actions"; final String packageName = "org.apache.hadoop.hbase.chaos.actions";
String[] classAndParams = actionString.split("\\)")[0].split("\\("); Iterable<String> classAndParams =
String className = packageName + "." + classAndParams[0]; Splitter.on('(').split(Iterables.get(Splitter.on(')').split(actionString), 0));
String[] params = String className = packageName + "." + Iterables.get(classAndParams, 0);
classAndParams[1].replaceAll(TABLE_PARAM, tableName.getNameAsString()).split(","); String[] params = Splitter.on(',')
.splitToStream(
Iterables.get(classAndParams, 1).replaceAll(TABLE_PARAM, tableName.getNameAsString()))
.toArray(String[]::new);
LOG.info("About to instantiate action class: {}; With constructor params: {}", className, LOG.info("About to instantiate action class: {}; With constructor params: {}", className,
params); params);
Class<? extends Action> actionClass = (Class<? extends Action>) Class.forName(className); Class<? extends Action> actionClass = (Class<? extends Action>) Class.forName(className);

View File

@ -167,7 +167,7 @@ public class ChaosMonkeyRunner extends AbstractHBaseTool {
return Sets.newHashSet(familyName); return Sets.newHashSet(familyName);
} }
/* /**
* If caller wants to add config parameters from a file, the path to the conf file can be passed * If caller wants to add config parameters from a file, the path to the conf file can be passed
* like this: -c <path-to-conf>. The file is presumed to have the Configuration file xml format * like this: -c <path-to-conf>. The file is presumed to have the Configuration file xml format
* and is added as a new Resource to the current Configuration. Use this mechanism to set * and is added as a new Resource to the current Configuration. Use this mechanism to set

View File

@ -124,8 +124,8 @@ public class IntegrationTestRpcClient {
} }
void stopRandomServer() throws Exception { void stopRandomServer() throws Exception {
lock.writeLock().lock();
RpcServer rpcServer = null; RpcServer rpcServer = null;
lock.writeLock().lock();
try { try {
if (rpcServers.size() <= minServers) { if (rpcServers.size() <= minServers) {
return; return;
@ -243,6 +243,7 @@ public class IntegrationTestRpcClient {
} }
@Override @Override
@SuppressWarnings("AssertionFailureIgnored") // intended
public void run() { public void run() {
while (running.get()) { while (running.get()) {
boolean isBigPayload = ThreadLocalRandom.current().nextBoolean(); boolean isBigPayload = ThreadLocalRandom.current().nextBoolean();

View File

@ -365,7 +365,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
taskId = taskId + iteration * numMapTasks; taskId = taskId + iteration * numMapTasks;
numMapTasks = numMapTasks * numIterations; numMapTasks = numMapTasks * numIterations;
long chainId = Math.abs(ThreadLocalRandom.current().nextLong()); long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per
// task and across iterations // task and across iterations
LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) }; LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) };

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -60,6 +59,8 @@ import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
/** /**
* Validate ImportTsv + BulkLoadFiles on a distributed cluster. * Validate ImportTsv + BulkLoadFiles on a distributed cluster.
*/ */
@ -83,13 +84,13 @@ public class IntegrationTestImportTsv extends Configured implements Tool {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
{ {
byte[] family = Bytes.toBytes("d"); byte[] family = Bytes.toBytes("d");
for (String line : simple_tsv.split("\n")) { for (String line : Splitter.on('\n').split(simple_tsv)) {
String[] row = line.split("\t"); String[] row = line.split("\t");
byte[] key = Bytes.toBytes(row[0]); byte[] key = Bytes.toBytes(row[0]);
long ts = Long.parseLong(row[1]); long ts = Long.parseLong(row[1]);
byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) }; byte[][] fields = { Bytes.toBytes(row[2]), Bytes.toBytes(row[3]) };
add(new KeyValue(key, family, fields[0], ts, Type.Put, fields[0])); add(new KeyValue(key, family, fields[0], ts, KeyValue.Type.Put, fields[0]));
add(new KeyValue(key, family, fields[1], ts, Type.Put, fields[1])); add(new KeyValue(key, family, fields[1], ts, KeyValue.Type.Put, fields[1]));
} }
} }
}; };
@ -98,10 +99,12 @@ public class IntegrationTestImportTsv extends Configured implements Tool {
// JUnit/Maven or by main when run from the CLI. // JUnit/Maven or by main when run from the CLI.
protected static IntegrationTestingUtility util = null; protected static IntegrationTestingUtility util = null;
@Override
public Configuration getConf() { public Configuration getConf() {
return util.getConfiguration(); return util.getConfiguration();
} }
@Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
LOG.debug("Ignoring setConf call."); LOG.debug("Ignoring setConf call.");
} }
@ -207,6 +210,7 @@ public class IntegrationTestImportTsv extends Configured implements Tool {
LOG.info("testGenerateAndLoad completed successfully."); LOG.info("testGenerateAndLoad completed successfully.");
} }
@Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length != 0) { if (args.length != 0) {
System.err.println(format("%s [genericOptions]", NAME)); System.err.println(format("%s [genericOptions]", NAME));

View File

@ -88,6 +88,7 @@ public class IntegrationTestTableMapReduceUtil implements Configurable, Tool {
return 0; return 0;
} }
@Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
if (util != null) { if (util != null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(

View File

@ -41,11 +41,11 @@ import org.slf4j.LoggerFactory;
* f2:(null) to be the the same as the row value. * f2:(null) to be the the same as the row value.
* *
* <pre> * <pre>
* aaa, f1: => aaa * aaa, f1: =&gt; aaa
* aaa, f2: => aaa * aaa, f2: =&gt; aaa
* aab, f1: => aab * aab, f1: =&gt; aab
* .... * ....
* zzz, f2: => zzz * zzz, f2: =&gt; zzz
* </pre> * </pre>
* *
* Then the test creates a snapshot from this table, and overrides the values in the original table * Then the test creates a snapshot from this table, and overrides the values in the original table
@ -90,8 +90,6 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa"); private static final byte[] MAPRED_START_ROW = Bytes.toBytes("aaa");
private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{' private static final byte[] MAPRED_END_ROW = Bytes.toBytes("zz{"); // 'z' + 1 => '{'
private IntegrationTestingUtility util;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
super.setConf(conf); super.setConf(conf);

View File

@ -115,6 +115,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
@ -289,7 +290,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
*/ */
public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
public static enum Counts { public static enum GeneratorCounts {
SUCCESS, SUCCESS,
TERMINATING, TERMINATING,
UNDEFINED, UNDEFINED,
@ -448,6 +449,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
volatile boolean walkersStop; volatile boolean walkersStop;
int numWalkers; int numWalkers;
final Object flushedLoopsLock = new Object();
volatile List<Long> flushedLoops = new ArrayList<>(); volatile List<Long> flushedLoops = new ArrayList<>();
List<Thread> walkers = new ArrayList<>(); List<Thread> walkers = new ArrayList<>();
@ -550,9 +552,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
private void addFlushed(byte[] rowKey) { private void addFlushed(byte[] rowKey) {
synchronized (flushedLoops) { synchronized (flushedLoopsLock) {
flushedLoops.add(Bytes.toLong(rowKey)); flushedLoops.add(Bytes.toLong(rowKey));
flushedLoops.notifyAll(); flushedLoopsLock.notifyAll();
} }
} }
@ -597,16 +599,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
private void joinWalkers() { private void joinWalkers() {
walkersStop = true; synchronized (flushedLoopsLock) {
synchronized (flushedLoops) { walkersStop = true;
flushedLoops.notifyAll(); flushedLoopsLock.notifyAll();
} }
for (Thread walker : walkers) { for (Thread walker : walkers) {
try { Uninterruptibles.joinUninterruptibly(walker);
walker.join();
} catch (InterruptedException e) {
// no-op
}
} }
} }
@ -635,7 +633,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
try { try {
walkLoop(node); walkLoop(node);
} catch (IOException e) { } catch (IOException e) {
context.getCounter(Counts.IOEXCEPTION).increment(1l); context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l);
return; return;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -651,9 +649,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
private long selectLoop() throws InterruptedException { private long selectLoop() throws InterruptedException {
synchronized (flushedLoops) { synchronized (flushedLoopsLock) {
while (flushedLoops.isEmpty() && !walkersStop) { while (flushedLoops.isEmpty() && !walkersStop) {
flushedLoops.wait(); flushedLoopsLock.wait();
} }
if (walkersStop) { if (walkersStop) {
throw new InterruptedException(); throw new InterruptedException();
@ -691,19 +689,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
while (numQueries < maxQueries) { while (numQueries < maxQueries) {
numQueries++; numQueries++;
byte[] prev = node.prev; byte[] prev = node.prev;
long t1 = EnvironmentEdgeManager.currentTime();
node = getNode(prev, table, node); node = getNode(prev, table, node);
long t2 = EnvironmentEdgeManager.currentTime();
if (node == null) { if (node == null) {
LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
context.getCounter(Counts.UNDEFINED).increment(1l); context.getCounter(GeneratorCounts.UNDEFINED).increment(1l);
} else if (node.prev.length == NO_KEY.length) { } else if (node.prev.length == NO_KEY.length) {
LOG.error( LOG.error(
"ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key)); "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key));
context.getCounter(Counts.TERMINATING).increment(1l); context.getCounter(GeneratorCounts.TERMINATING).increment(1l);
} else { } else {
// Increment for successful walk // Increment for successful walk
context.getCounter(Counts.SUCCESS).increment(1l); context.getCounter(GeneratorCounts.SUCCESS).increment(1l);
} }
} }
table.close(); table.close();
@ -875,14 +871,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
if ( if (
counters.findCounter(Counts.TERMINATING).getValue() > 0 counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0
|| counters.findCounter(Counts.UNDEFINED).getValue() > 0 || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0
|| counters.findCounter(Counts.IOEXCEPTION).getValue() > 0 || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0
) { ) {
LOG.error("Concurrent walker failed to verify during Generation phase"); LOG.error("Concurrent walker failed to verify during Generation phase");
LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue()); LOG.error(
LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue()); "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue());
LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue()); LOG.error(
"UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue());
LOG.error(
"IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue());
return false; return false;
} }
} catch (IOException e) { } catch (IOException e) {
@ -1033,16 +1032,17 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
LocatedFileStatus keyFileStatus = iterator.next(); LocatedFileStatus keyFileStatus = iterator.next();
// Skip "_SUCCESS" file. // Skip "_SUCCESS" file.
if (keyFileStatus.getPath().getName().startsWith("_")) continue; if (keyFileStatus.getPath().getName().startsWith("_")) continue;
result.addAll(readFileToSearch(conf, fs, keyFileStatus)); result.addAll(readFileToSearch(conf, keyFileStatus));
} }
} }
return result; return result;
} }
private static SortedSet<byte[]> readFileToSearch(final Configuration conf, final FileSystem fs, private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException {
SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
// Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This
// is
// what is missing. // what is missing.
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
@ -1053,7 +1053,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
while (rr.nextKeyValue()) { while (rr.nextKeyValue()) {
rr.getCurrentKey(); rr.getCurrentKey();
BytesWritable bw = rr.getCurrentValue(); BytesWritable bw = rr.getCurrentValue();
if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) {
byte[] key = new byte[rr.getCurrentKey().getLength()]; byte[] key = new byte[rr.getCurrentKey().getLength()];
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
rr.getCurrentKey().getLength()); rr.getCurrentKey().getLength());
@ -1114,7 +1114,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* Don't change the order of these enums. Their ordinals are used as type flag when we emit * Don't change the order of these enums. Their ordinals are used as type flag when we emit
* problems found from the reducer. * problems found from the reducer.
*/ */
public static enum Counts { public static enum VerifyCounts {
UNREFERENCED, UNREFERENCED,
UNDEFINED, UNDEFINED,
REFERENCED, REFERENCED,
@ -1133,9 +1133,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
private ArrayList<byte[]> refs = new ArrayList<>(); private ArrayList<byte[]> refs = new ArrayList<>();
private final BytesWritable UNREF = private final BytesWritable UNREF =
new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new byte[] {})); new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {}));
private final BytesWritable LOSTFAM = private final BytesWritable LOSTFAM =
new BytesWritable(addPrefixFlag(Counts.LOST_FAMILIES.ordinal(), new byte[] {})); new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {}));
private AtomicInteger rows = new AtomicInteger(0); private AtomicInteger rows = new AtomicInteger(0);
private Connection connection; private Connection connection;
@ -1177,9 +1177,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* n * @return Type from the Counts enum of this row. Reads prefix added by * n * @return Type from the Counts enum of this row. Reads prefix added by
* {@link #addPrefixFlag(int, byte[])} * {@link #addPrefixFlag(int, byte[])}
*/ */
public static Counts whichType(final byte[] bs) { public static VerifyCounts whichType(final byte[] bs) {
int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
return Counts.values()[ordinal]; return VerifyCounts.values()[ordinal];
} }
/** /**
@ -1221,7 +1221,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (lostFamilies) { if (lostFamilies) {
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
context.getCounter(Counts.LOST_FAMILIES).increment(1); context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1);
context.write(key, LOSTFAM); context.write(key, LOSTFAM);
} }
@ -1233,11 +1233,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
byte[] bs = refs.get(i); byte[] bs = refs.get(i);
int ordinal; int ordinal;
if (i <= 0) { if (i <= 0) {
ordinal = Counts.UNDEFINED.ordinal(); ordinal = VerifyCounts.UNDEFINED.ordinal();
context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
context.getCounter(Counts.UNDEFINED).increment(1); context.getCounter(VerifyCounts.UNDEFINED).increment(1);
} else { } else {
ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal();
context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
} }
} }
@ -1252,7 +1252,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} else if (defCount > 0 && refs.isEmpty()) { } else if (defCount > 0 && refs.isEmpty()) {
// node is defined but not referenced // node is defined but not referenced
context.write(key, UNREF); context.write(key, UNREF);
context.getCounter(Counts.UNREFERENCED).increment(1); context.getCounter(VerifyCounts.UNREFERENCED).increment(1);
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
context.getCounter("unref", keyString).increment(1); context.getCounter("unref", keyString).increment(1);
@ -1261,13 +1261,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
if (refs.size() > 1) { if (refs.size() > 1) {
// Skip first reference. // Skip first reference.
for (int i = 1; i < refs.size(); i++) { for (int i = 1; i < refs.size(); i++) {
context.write(key, context.write(key, new BytesWritable(
new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i))));
} }
context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1);
} }
// node is defined and referenced // node is defined and referenced
context.getCounter(Counts.REFERENCED).increment(1); context.getCounter(VerifyCounts.REFERENCED).increment(1);
} }
} }
@ -1275,6 +1275,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* Dump out extra info around references if there are any. Helps debugging. * Dump out extra info around references if there are any. Helps debugging.
* @return StringBuilder filled with references if any. n * @return StringBuilder filled with references if any. n
*/ */
@SuppressWarnings("JavaUtilDate")
private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
final List<byte[]> refs) throws IOException { final List<byte[]> refs) throws IOException {
StringBuilder refsSb = null; StringBuilder refsSb = null;
@ -1429,8 +1430,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* @return True if the values match what's expected, false otherwise * @return True if the values match what's expected, false otherwise
*/ */
protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
final Counter referenced = counters.findCounter(Counts.REFERENCED); final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED);
final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED);
boolean success = true; boolean success = true;
if (expectedReferenced != referenced.getValue()) { if (expectedReferenced != referenced.getValue()) {
@ -1440,7 +1441,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
} }
if (unreferenced.getValue() > 0) { if (unreferenced.getValue() > 0) {
final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES);
boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
LOG.error( LOG.error(
"Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
@ -1457,8 +1458,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
* @return True if the "bad" counter objects are 0, false otherwise * @return True if the "bad" counter objects are 0, false otherwise
*/ */
protected boolean verifyUnexpectedValues(Counters counters) { protected boolean verifyUnexpectedValues(Counters counters) {
final Counter undefined = counters.findCounter(Counts.UNDEFINED); final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED);
final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES); final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES);
boolean success = true; boolean success = true;
if (undefined.getValue() > 0) { if (undefined.getValue() > 0) {
@ -1839,8 +1840,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
return node; return node;
} }
protected IntegrationTestingUtility util;
@Override @Override
public void setUpCluster() throws Exception { public void setUpCluster() throws Exception {
util = getTestingUtil(getConf()); util = getTestingUtil(getConf());

View File

@ -71,6 +71,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/** /**
@ -204,7 +205,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count, protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
byte[][] prev, byte[][] current, byte[] id) throws IOException { byte[][] prev, byte[][] current, byte[] id) throws IOException {
String visibilityExps = ""; String visibilityExps = "";
String[] split = labels.split(COMMA); String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
for (int i = 0; i < current.length; i++) { for (int i = 0; i < current.length; i++) {
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
Put put = new Put(current[i]); Put put = new Put(current[i]);
@ -248,18 +249,16 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
} }
public int runCopier(String outputDir) throws Exception { public int runCopier(String outputDir) throws Exception {
Job job = null; Job job = new Job(getConf());
Scan scan = null;
job = new Job(getConf());
job.setJobName("Data copier"); job.setJobName("Data copier");
job.getConfiguration().setInt("INDEX", labelIndex); job.getConfiguration().setInt("INDEX", labelIndex);
job.getConfiguration().set("LABELS", labels); job.getConfiguration().set("LABELS", labels);
job.setJarByClass(getClass()); job.setJarByClass(getClass());
scan = new Scan(); Scan scan = new Scan();
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
scan.setRaw(true); scan.setRaw(true);
String[] split = labels.split(COMMA); String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
scan.setAuthorizations( scan.setAuthorizations(
new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
if (delete) { if (delete) {
@ -424,7 +423,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
scan.addColumn(FAMILY_NAME, COLUMN_PREV); scan.addColumn(FAMILY_NAME, COLUMN_PREV);
scan.setCaching(10000); scan.setCaching(10000);
scan.setCacheBlocks(false); scan.setCacheBlocks(false);
String[] split = labels.split(COMMA); String[] split = Splitter.on(COMMA).splitToStream(labels).toArray(String[]::new);
scan.setAuthorizations( scan.setAuthorizations(
new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1])); new Authorizations(split[this.labelIndex * 2], split[(this.labelIndex * 2) + 1]));
@ -470,7 +469,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
static class VisibilityLoop extends Loop { static class VisibilityLoop extends Loop {
private static final int SLEEP_IN_MS = 5000; private static final int SLEEP_IN_MS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class); private static final Logger LOG = LoggerFactory.getLogger(VisibilityLoop.class);
IntegrationTestBigLinkedListWithVisibility it;
@Override @Override
protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
@ -652,10 +650,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
@Override @Override
public int runTestFromCommandLine() throws Exception { public int runTestFromCommandLine() throws Exception {
Tool tool = null; return ToolRunner.run(getConf(), new VisibilityLoop(), otherArgs);
Loop loop = new VisibilityLoop();
loop.it = this;
tool = loop;
return ToolRunner.run(getConf(), tool, otherArgs);
} }
} }

View File

@ -26,6 +26,8 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
@ -85,6 +87,8 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
@ -441,34 +445,32 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
} }
if (!fs.isDirectory(keysInputDir)) { if (!fs.isDirectory(keysInputDir)) {
FileStatus keyFileStatus = fs.getFileStatus(keysInputDir); FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
readFileToSearch(conf, fs, keyFileStatus, result); readFileToSearch(fs, keyFileStatus, result);
} else { } else {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
while (iterator.hasNext()) { while (iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next(); LocatedFileStatus keyFileStatus = iterator.next();
// Skip "_SUCCESS" file. // Skip "_SUCCESS" file.
if (keyFileStatus.getPath().getName().startsWith("_")) continue; if (keyFileStatus.getPath().getName().startsWith("_")) continue;
readFileToSearch(conf, fs, keyFileStatus, result); readFileToSearch(fs, keyFileStatus, result);
} }
} }
return result; return result;
} }
private static SortedSet<byte[]> readFileToSearch(final Configuration conf, final FileSystem fs, private static SortedSet<byte[]> readFileToSearch(final FileSystem fs,
final FileStatus keyFileStatus, SortedSet<byte[]> result) final FileStatus keyFileStatus, SortedSet<byte[]> result)
throws IOException, InterruptedException { throws IOException, InterruptedException {
// verify uses file output format and writes <Text, Text>. We can read it as a text file // verify uses file output format and writes <Text, Text>. We can read it as a text file
try (InputStream in = fs.open(keyFileStatus.getPath()); try (InputStream in = fs.open(keyFileStatus.getPath()); BufferedReader reader =
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
// extract out the key and return that missing as a missing key // extract out the key and return that missing as a missing key
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue; if (line.isEmpty()) continue;
List<String> parts = Splitter.onPattern("\\s+").splitToList(line);
String[] parts = line.split("\\s+"); if (parts.size() >= 1) {
if (parts.length >= 1) { result.add(Bytes.toBytesBinary(Iterables.get(parts, 0)));
String key = parts[0];
result.add(Bytes.toBytesBinary(key));
} else { } else {
LOG.info("Cannot parse key from: " + line); LOG.info("Cannot parse key from: " + line);
} }
@ -477,7 +479,7 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
return result; return result;
} }
private int doSearch(Configuration conf, String keysDir) throws Exception { private int doSearch(String keysDir) throws Exception {
Path inputDir = new Path(keysDir); Path inputDir = new Path(keysDir);
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
@ -618,7 +620,7 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
} }
} }
if (doSearch) { if (doSearch) {
return doSearch(getConf(), keysDir); return doSearch(keysDir);
} }
return 0; return 0;
} }

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/** /**
@ -220,12 +221,11 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
LOG.error("Loader failed"); LOG.error("Loader failed");
return -1; return -1;
} }
res = runVerify(outputDir); return runVerify(outputDir);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Tool failed with exception", e); LOG.error("Tool failed with exception", e);
return -1; return -1;
} }
return 0;
} }
@Override @Override
@ -527,9 +527,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
try (FSDataInputStream is = fs.open(warcFileInput)) { try (FSDataInputStream is = fs.open(warcFileInput)) {
InputStreamReader reader; InputStreamReader reader;
if (warcFileInput.getName().toLowerCase().endsWith(".gz")) { if (warcFileInput.getName().toLowerCase().endsWith(".gz")) {
reader = new InputStreamReader(new GZIPInputStream(is)); reader = new InputStreamReader(new GZIPInputStream(is), StandardCharsets.UTF_8);
} else { } else {
reader = new InputStreamReader(is); reader = new InputStreamReader(is, StandardCharsets.UTF_8);
} }
try (BufferedReader br = new BufferedReader(reader)) { try (BufferedReader br = new BufferedReader(reader)) {
String line; String line;
@ -949,8 +949,9 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
// Reverse the components of the hostname // Reverse the components of the hostname
String reversedHost; String reversedHost;
if (uri.getHost() != null) { if (uri.getHost() != null) {
final String[] hostComponents =
Splitter.on('.').splitToStream(uri.getHost()).toArray(String[]::new);
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();
final String[] hostComponents = uri.getHost().split("\\.");
for (int i = hostComponents.length - 1; i >= 0; i--) { for (int i = hostComponents.length - 1; i >= 0; i--) {
sb.append(hostComponents[i]); sb.append(hostComponents[i]);
if (i != 0) { if (i != 0) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.test;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -43,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/** /**
@ -109,7 +111,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
* Wrapper around an HBase ClusterID allowing us to get admin connections and configurations for * Wrapper around an HBase ClusterID allowing us to get admin connections and configurations for
* it * it
*/ */
protected class ClusterID { protected static class ClusterID {
private final Configuration configuration; private final Configuration configuration;
private Connection connection = null; private Connection connection = null;
@ -121,10 +123,10 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
*/ */
public ClusterID(Configuration base, String key) { public ClusterID(Configuration base, String key) {
configuration = new Configuration(base); configuration = new Configuration(base);
String[] parts = key.split(":"); Iterator<String> iter = Splitter.on(':').split(key).iterator();
configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); configuration.set(HConstants.ZOOKEEPER_QUORUM, iter.next());
configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, iter.next());
configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, iter.next());
} }
@Override @Override
@ -150,8 +152,20 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
this.connection = null; this.connection = null;
} }
public boolean equals(ClusterID other) { @Override
return this.toString().equalsIgnoreCase(other.toString()); public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof ClusterID)) {
return false;
}
return toString().equalsIgnoreCase(other.toString());
}
@Override
public int hashCode() {
return toString().hashCode();
} }
} }
@ -306,7 +320,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
if (!noReplicationSetup) { if (!noReplicationSetup) {
setupTablesAndReplication(); setupTablesAndReplication();
} }
int expectedNumNodes = 0; long expectedNumNodes = 0;
for (int i = 0; i < numIterations; i++) { for (int i = 0; i < numIterations; i++) {
LOG.info("Starting iteration = " + i); LOG.info("Starting iteration = " + i);

View File

@ -267,7 +267,8 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr
try { try {
this.timeoutThread.join(); this.timeoutThread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); // Restore interrupt status
Thread.currentThread().interrupt();
} }
this.aborted = true; this.aborted = true;
super.waitForFinish(); super.waitForFinish();
@ -293,7 +294,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr
return new TimeBoundedMultiThreadedReaderThread(readerId); return new TimeBoundedMultiThreadedReaderThread(readerId);
} }
private class TimeoutThread extends Thread { private static class TimeoutThread extends Thread {
long timeout; long timeout;
long reportInterval = 60000; long reportInterval = 60000;

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
/** /**
@ -119,14 +121,16 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT
conf.set("hbase.superuser", User.getCurrent().getName()); conf.set("hbase.superuser", User.getCurrent().getName());
conf.setBoolean("dfs.permissions", false); conf.setBoolean("dfs.permissions", false);
super.setUpCluster(); super.setUpCluster();
String[] users = userNames.split(","); List<String> users = Splitter.on(',').splitToList(userNames);
if (users.length != 2) { if (users.size() != 2) {
System.err.println(ERROR_STR); System.err.println(ERROR_STR);
throw new IOException(ERROR_STR); throw new IOException(ERROR_STR);
} }
System.out.println(userNames + " " + users[0] + " " + users[1]); String user1 = Iterables.get(users, 0);
USER1 = User.createUserForTesting(conf, users[0], new String[] {}); String user2 = Iterables.get(users, 1);
USER2 = User.createUserForTesting(conf, users[1], new String[] {}); System.out.println(userNames + " " + user1 + " " + user2);
USER1 = User.createUserForTesting(conf, user1, new String[] {});
USER2 = User.createUserForTesting(conf, user2, new String[] {});
addLabelsAndAuths(); addLabelsAndAuths();
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTestingUtility;
@ -137,11 +138,9 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
ht.close(); ht.close();
ht = null; ht = null;
} catch (IOException e) { } catch (Exception e) {
e.printStackTrace();
span.addEvent("exception", span.addEvent("exception",
Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName())); Attributes.of(AttributeKey.stringKey("exception"), e.getClass().getSimpleName()));
} catch (Exception e) {
} finally { } finally {
span.end(); span.end();
if (rs != null) { if (rs != null) {
@ -162,36 +161,36 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
@Override @Override
public void run() { public void run() {
Table ht = null; Table ht = null;
try { try {
ht = util.getConnection().getTable(tableName); ht = util.getConnection().getTable(tableName);
long accum = 0;
for (int x = 0; x < 5; x++) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan();
try (Scope scope = span.makeCurrent()) {
long rk = rowKeyQueue.take();
Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
if (r1 != null) {
accum |= Bytes.toLong(r1.getRow());
}
Result r2 = ht.get(new Get(Bytes.toBytes(rk)));
if (r2 != null) {
accum |= Bytes.toLong(r2.getRow());
}
span.addEvent("Accum = " + accum);
} catch (IOException | InterruptedException ie) {
// IGNORED
} finally {
span.end();
}
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); // IGNORED
} } finally {
if (ht != null) {
long accum = 0; IOUtils.closeQuietly(ht);
for (int x = 0; x < 5; x++) {
Span span = TraceUtil.getGlobalTracer().spanBuilder("gets").startSpan();
try (Scope scope = span.makeCurrent()) {
long rk = rowKeyQueue.take();
Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
if (r1 != null) {
accum |= Bytes.toLong(r1.getRow());
}
Result r2 = ht.get(new Get(Bytes.toBytes(rk)));
if (r2 != null) {
accum |= Bytes.toLong(r2.getRow());
}
span.addEvent("Accum = " + accum);
} catch (IOException | InterruptedException ie) {
// IGNORED
} finally {
span.end();
} }
} }
} }
}; };
service.execute(runnable); service.execute(runnable);