HBASE-15935 Set up a concurrent walker that walks flushed circular linked lists as a Loop Mode
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
17edca6346
commit
cb118c8de6
|
@ -168,8 +168,9 @@ import com.google.common.collect.Sets;
|
||||||
*
|
*
|
||||||
* Below is a description of the Java programs
|
* Below is a description of the Java programs
|
||||||
*
|
*
|
||||||
* Generator - A map only job that generates data. As stated previously,·
|
* Generator - A map only job that generates data. As stated previously,·its best to generate data
|
||||||
* its best to generate data in multiples of 25M.
|
* in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
|
||||||
|
* random flushed loops during this phase.
|
||||||
*
|
*
|
||||||
* Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
|
* Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
|
||||||
* UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
|
* UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
|
||||||
|
@ -182,6 +183,11 @@ import com.google.common.collect.Sets;
|
||||||
* Delete - A standalone program that deletes a single node
|
* Delete - A standalone program that deletes a single node
|
||||||
*
|
*
|
||||||
* This class can be run as a unit test, as an integration test, or from the command line
|
* This class can be run as a unit test, as an integration test, or from the command line
|
||||||
|
*
|
||||||
|
* ex:
|
||||||
|
* ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
|
||||||
|
* loop 2 1 100000 /temp 1 1000 50 1 0
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
@Category(IntegrationTests.class)
|
@Category(IntegrationTests.class)
|
||||||
public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
|
@ -217,6 +223,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
private static final String GENERATOR_WRAP_KEY
|
private static final String GENERATOR_WRAP_KEY
|
||||||
= "IntegrationTestBigLinkedList.generator.wrap";
|
= "IntegrationTestBigLinkedList.generator.wrap";
|
||||||
|
|
||||||
|
private static final String CONCURRENT_WALKER_KEY
|
||||||
|
= "IntegrationTestBigLinkedList.generator.concurrentwalkers";
|
||||||
|
|
||||||
protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
|
protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
|
||||||
|
|
||||||
private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
|
private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
|
||||||
|
@ -225,6 +234,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
private static final int WRAP_DEFAULT = 25;
|
private static final int WRAP_DEFAULT = 25;
|
||||||
private static final int ROWKEY_LENGTH = 16;
|
private static final int ROWKEY_LENGTH = 16;
|
||||||
|
|
||||||
|
private static final int CONCURRENT_WALKER_DEFAULT = 0;
|
||||||
|
|
||||||
protected String toRun;
|
protected String toRun;
|
||||||
protected String[] otherArgs;
|
protected String[] otherArgs;
|
||||||
|
|
||||||
|
@ -256,6 +267,18 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
|
public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
|
||||||
"generator.multiple.columnfamilies";
|
"generator.multiple.columnfamilies";
|
||||||
|
|
||||||
|
public static enum Counts {
|
||||||
|
SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String USAGE = "Usage : " + Generator.class.getSimpleName() +
|
||||||
|
" <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
|
||||||
|
" <num walker threads>] \n" +
|
||||||
|
"where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
|
||||||
|
"walkers will verify random flushed loop during Generation.";
|
||||||
|
|
||||||
|
public Job job;
|
||||||
|
|
||||||
static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
|
static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
|
||||||
static class GeneratorInputSplit extends InputSplit implements Writable {
|
static class GeneratorInputSplit extends InputSplit implements Writable {
|
||||||
@Override
|
@Override
|
||||||
|
@ -371,6 +394,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
* |___________________________|
|
* |___________________________|
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static class GeneratorMapper
|
static class GeneratorMapper
|
||||||
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
|
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
|
||||||
|
|
||||||
|
@ -388,6 +412,12 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
boolean multipleUnevenColumnFamilies;
|
boolean multipleUnevenColumnFamilies;
|
||||||
byte[] tinyValue = new byte[] { 't' };
|
byte[] tinyValue = new byte[] { 't' };
|
||||||
byte[] bigValue = null;
|
byte[] bigValue = null;
|
||||||
|
Configuration conf;
|
||||||
|
|
||||||
|
volatile boolean walkersStop;
|
||||||
|
int numWalkers;
|
||||||
|
volatile List<Long> flushedLoops = new ArrayList<>();
|
||||||
|
List<Thread> walkers = new ArrayList<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setup(Context context) throws IOException, InterruptedException {
|
protected void setup(Context context) throws IOException, InterruptedException {
|
||||||
|
@ -404,6 +434,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
this.wrap = this.numNodes;
|
this.wrap = this.numNodes;
|
||||||
}
|
}
|
||||||
this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
|
this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
|
||||||
|
this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
|
||||||
|
this.walkersStop = false;
|
||||||
|
this.conf = context.getConfiguration();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void instantiateHTable() throws IOException {
|
protected void instantiateHTable() throws IOException {
|
||||||
|
@ -414,6 +447,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void cleanup(Context context) throws IOException ,InterruptedException {
|
protected void cleanup(Context context) throws IOException ,InterruptedException {
|
||||||
|
joinWalkers();
|
||||||
mutator.close();
|
mutator.close();
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -442,9 +476,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
// this block of code turns the 1 million linked list of length 25 into one giant
|
// this block of code turns the 1 million linked list of length 25 into one giant
|
||||||
//circular linked list of 25 million
|
//circular linked list of 25 million
|
||||||
circularLeftShift(first);
|
circularLeftShift(first);
|
||||||
|
|
||||||
persist(output, -1, prev, first, null);
|
persist(output, -1, prev, first, null);
|
||||||
|
// At this point the entire loop has been flushed so we can add one of its nodes to the
|
||||||
|
// concurrent walker
|
||||||
|
if (numWalkers > 0) {
|
||||||
|
addFlushed(key.getBytes());
|
||||||
|
if (walkers.isEmpty()) {
|
||||||
|
startWalkers(numWalkers, conf, output);
|
||||||
|
}
|
||||||
|
}
|
||||||
first = null;
|
first = null;
|
||||||
prev = null;
|
prev = null;
|
||||||
}
|
}
|
||||||
|
@ -457,6 +497,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
first[first.length - 1] = ez;
|
first[first.length - 1] = ez;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void addFlushed(byte[] rowKey) {
|
||||||
|
synchronized (flushedLoops) {
|
||||||
|
flushedLoops.add(Bytes.toLong(rowKey));
|
||||||
|
flushedLoops.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
|
protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (int i = 0; i < current.length; i++) {
|
for (int i = 0; i < current.length; i++) {
|
||||||
|
@ -488,27 +535,153 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
output.progress();
|
output.progress();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mutator.flush();
|
mutator.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void startWalkers(int numWalkers, Configuration conf, Context context) {
|
||||||
|
LOG.info("Starting " + numWalkers + " concurrent walkers");
|
||||||
|
for (int i = 0; i < numWalkers; i++) {
|
||||||
|
Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
|
||||||
|
walker.start();
|
||||||
|
walkers.add(walker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void joinWalkers() {
|
||||||
|
walkersStop = true;
|
||||||
|
synchronized (flushedLoops) {
|
||||||
|
flushedLoops.notifyAll();
|
||||||
|
}
|
||||||
|
for (Thread walker : walkers) {
|
||||||
|
try {
|
||||||
|
walker.join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
|
||||||
|
* spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
|
||||||
|
* configured to only log erroneous nodes.
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class ContinuousConcurrentWalker implements Runnable {
|
||||||
|
|
||||||
|
ConcurrentWalker walker;
|
||||||
|
Configuration conf;
|
||||||
|
Context context;
|
||||||
|
Random rand;
|
||||||
|
|
||||||
|
public ContinuousConcurrentWalker(Configuration conf, Context context) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.context = context;
|
||||||
|
rand = new Random();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!walkersStop) {
|
||||||
|
try {
|
||||||
|
long node = selectLoop();
|
||||||
|
try {
|
||||||
|
walkLoop(node);
|
||||||
|
} catch (IOException e) {
|
||||||
|
context.getCounter(Counts.IOEXCEPTION).increment(1l);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void walkLoop(long node) throws IOException {
|
||||||
|
walker = new ConcurrentWalker(context);
|
||||||
|
walker.setConf(conf);
|
||||||
|
walker.run(node, wrap);
|
||||||
|
}
|
||||||
|
|
||||||
|
private long selectLoop () throws InterruptedException{
|
||||||
|
synchronized (flushedLoops) {
|
||||||
|
while (flushedLoops.isEmpty() && !walkersStop) {
|
||||||
|
flushedLoops.wait();
|
||||||
|
}
|
||||||
|
if (walkersStop) {
|
||||||
|
throw new InterruptedException();
|
||||||
|
}
|
||||||
|
return flushedLoops.get(rand.nextInt(flushedLoops.size()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ConcurrentWalker extends WalkerBase {
|
||||||
|
|
||||||
|
Context context;
|
||||||
|
|
||||||
|
public ConcurrentWalker(Context context) {this.context = context;}
|
||||||
|
|
||||||
|
public void run(long startKeyIn, long maxQueriesIn) throws IOException {
|
||||||
|
|
||||||
|
long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
|
||||||
|
byte[] startKey = Bytes.toBytes(startKeyIn);
|
||||||
|
|
||||||
|
Connection connection = ConnectionFactory.createConnection(getConf());
|
||||||
|
Table table = connection.getTable(getTableName(getConf()));
|
||||||
|
long numQueries = 0;
|
||||||
|
// If isSpecificStart is set, only walk one list from that particular node.
|
||||||
|
// Note that in case of circular (or P-shaped) list it will walk forever, as is
|
||||||
|
// the case in normal run without startKey.
|
||||||
|
|
||||||
|
CINode node = findStartNode(table, startKey);
|
||||||
|
if (node == null) {
|
||||||
|
LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
|
||||||
|
throw new IOException("Start node not found: " + startKeyIn);
|
||||||
|
}
|
||||||
|
while (numQueries < maxQueries) {
|
||||||
|
numQueries++;
|
||||||
|
byte[] prev = node.prev;
|
||||||
|
long t1 = System.currentTimeMillis();
|
||||||
|
node = getNode(prev, table, node);
|
||||||
|
long t2 = System.currentTimeMillis();
|
||||||
|
if (node == null) {
|
||||||
|
LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
|
||||||
|
context.getCounter(Counts.UNDEFINED).increment(1l);
|
||||||
|
} else if (node.prev.length == NO_KEY.length) {
|
||||||
|
LOG.error("ConcurrentWalker found TERMINATING NODE: " +
|
||||||
|
Bytes.toStringBinary(node.key));
|
||||||
|
context.getCounter(Counts.TERMINATING).increment(1l);
|
||||||
|
} else {
|
||||||
|
// Increment for successful walk
|
||||||
|
context.getCounter(Counts.SUCCESS).increment(1l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
table.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length < 3) {
|
if (args.length < 3) {
|
||||||
System.out.println("Usage : " + Generator.class.getSimpleName() +
|
System.err.println(USAGE);
|
||||||
" <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
|
return 1;
|
||||||
System.out.println(" where <num nodes per map> should be a multiple of " +
|
|
||||||
" width*wrap multiplier, 25M by default");
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
int numMappers = Integer.parseInt(args[0]);
|
int numMappers = Integer.parseInt(args[0]);
|
||||||
long numNodes = Long.parseLong(args[1]);
|
long numNodes = Long.parseLong(args[1]);
|
||||||
Path tmpOutput = new Path(args[2]);
|
Path tmpOutput = new Path(args[2]);
|
||||||
Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
|
Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
|
||||||
Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
|
Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
|
||||||
return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
|
Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
||||||
|
return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
System.err.println("Parsing generator arguments failed: " + e.getMessage());
|
||||||
|
System.err.println(USAGE);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void createSchema() throws IOException {
|
protected void createSchema() throws IOException {
|
||||||
|
@ -562,7 +735,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
|
public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
|
||||||
Integer width, Integer wrapMuplitplier) throws Exception {
|
Integer width, Integer wrapMultiplier, Integer numWalkers)
|
||||||
|
throws Exception {
|
||||||
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
|
LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
|
||||||
+ ", numNodes=" + numNodes);
|
+ ", numNodes=" + numNodes);
|
||||||
Job job = Job.getInstance(getConf());
|
Job job = Job.getInstance(getConf());
|
||||||
|
@ -575,7 +749,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
job.setOutputKeyClass(BytesWritable.class);
|
job.setOutputKeyClass(BytesWritable.class);
|
||||||
job.setOutputValueClass(NullWritable.class);
|
job.setOutputValueClass(NullWritable.class);
|
||||||
|
|
||||||
setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
|
setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
|
||||||
|
|
||||||
job.setMapperClass(Mapper.class); //identity mapper
|
job.setMapperClass(Mapper.class); //identity mapper
|
||||||
|
|
||||||
|
@ -588,10 +762,11 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
|
public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
|
||||||
Integer width, Integer wrapMuplitplier) throws Exception {
|
Integer width, Integer wrapMultiplier, Integer numWalkers)
|
||||||
|
throws Exception {
|
||||||
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
|
LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
|
||||||
createSchema();
|
createSchema();
|
||||||
Job job = Job.getInstance(getConf());
|
job = Job.getInstance(getConf());
|
||||||
|
|
||||||
job.setJobName("Link Generator");
|
job.setJobName("Link Generator");
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
|
@ -602,7 +777,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
job.setOutputKeyClass(NullWritable.class);
|
job.setOutputKeyClass(NullWritable.class);
|
||||||
job.setOutputValueClass(NullWritable.class);
|
job.setOutputValueClass(NullWritable.class);
|
||||||
|
|
||||||
setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
|
setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
|
||||||
|
|
||||||
setMapperForGenerator(job);
|
setMapperForGenerator(job);
|
||||||
|
|
||||||
|
@ -629,12 +804,34 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public int run(int numMappers, long numNodes, Path tmpOutput,
|
public int run(int numMappers, long numNodes, Path tmpOutput,
|
||||||
Integer width, Integer wrapMuplitplier) throws Exception {
|
Integer width, Integer wrapMultiplier, Integer numWalkers)
|
||||||
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
|
throws Exception {
|
||||||
|
int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
|
||||||
|
numWalkers);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
|
return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean verify() {
|
||||||
|
try {
|
||||||
|
Counters counters = job.getCounters();
|
||||||
|
|
||||||
|
if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
|
||||||
|
counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
|
||||||
|
counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
|
||||||
|
LOG.error("Concurrent walker failed to verify during Generation phase");
|
||||||
|
LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
|
||||||
|
LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
|
||||||
|
LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Generator verification could not find counter");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1240,21 +1437,33 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
static class Loop extends Configured implements Tool {
|
static class Loop extends Configured implements Tool {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(Loop.class);
|
private static final Log LOG = LogFactory.getLog(Loop.class);
|
||||||
|
private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
|
||||||
|
"<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
|
||||||
|
" <num walker threads>] \n" +
|
||||||
|
"where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
|
||||||
|
"walkers will select and verify random flushed loop during Generation.";
|
||||||
|
|
||||||
IntegrationTestBigLinkedList it;
|
IntegrationTestBigLinkedList it;
|
||||||
|
|
||||||
protected void runGenerator(int numMappers, long numNodes,
|
protected void runGenerator(int numMappers, long numNodes,
|
||||||
String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
|
String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
|
||||||
|
throws Exception {
|
||||||
Path outputPath = new Path(outputDir);
|
Path outputPath = new Path(outputDir);
|
||||||
UUID uuid = UUID.randomUUID(); //create a random UUID.
|
UUID uuid = UUID.randomUUID(); //create a random UUID.
|
||||||
Path generatorOutput = new Path(outputPath, uuid.toString());
|
Path generatorOutput = new Path(outputPath, uuid.toString());
|
||||||
|
|
||||||
Generator generator = new Generator();
|
Generator generator = new Generator();
|
||||||
generator.setConf(getConf());
|
generator.setConf(getConf());
|
||||||
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
|
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
|
||||||
|
numWalkers);
|
||||||
if (retCode > 0) {
|
if (retCode > 0) {
|
||||||
throw new RuntimeException("Generator failed with return code: " + retCode);
|
throw new RuntimeException("Generator failed with return code: " + retCode);
|
||||||
}
|
}
|
||||||
|
if (numWalkers > 0) {
|
||||||
|
if (!generator.verify()) {
|
||||||
|
throw new RuntimeException("Generator.verify failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void runVerify(String outputDir,
|
protected void runVerify(String outputDir,
|
||||||
|
@ -1273,41 +1482,43 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
if (!verify.verify(expectedNumNodes)) {
|
if (!verify.verify(expectedNumNodes)) {
|
||||||
throw new RuntimeException("Verify.verify failed");
|
throw new RuntimeException("Verify.verify failed");
|
||||||
}
|
}
|
||||||
|
LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
|
||||||
LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length < 5) {
|
if (args.length < 5) {
|
||||||
System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
|
System.err.println(USAGE);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
LOG.info("Running Loop with args:" + Arrays.deepToString(args));
|
try {
|
||||||
|
|
||||||
int numIterations = Integer.parseInt(args[0]);
|
int numIterations = Integer.parseInt(args[0]);
|
||||||
int numMappers = Integer.parseInt(args[1]);
|
int numMappers = Integer.parseInt(args[1]);
|
||||||
long numNodes = Long.parseLong(args[2]);
|
long numNodes = Long.parseLong(args[2]);
|
||||||
String outputDir = args[3];
|
String outputDir = args[3];
|
||||||
int numReducers = Integer.parseInt(args[4]);
|
int numReducers = Integer.parseInt(args[4]);
|
||||||
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
||||||
Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
|
Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
|
||||||
|
Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
|
||||||
|
|
||||||
long expectedNumNodes = 0;
|
long expectedNumNodes = 0;
|
||||||
|
|
||||||
if (numIterations < 0) {
|
if (numIterations < 0) {
|
||||||
numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
|
numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
|
||||||
}
|
}
|
||||||
|
LOG.info("Running Loop with args:" + Arrays.deepToString(args));
|
||||||
for (int i = 0; i < numIterations; i++) {
|
for (int i = 0; i < numIterations; i++) {
|
||||||
LOG.info("Starting iteration = " + i);
|
LOG.info("Starting iteration = " + i);
|
||||||
runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
|
runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
|
||||||
expectedNumNodes += numMappers * numNodes;
|
expectedNumNodes += numMappers * numNodes;
|
||||||
|
|
||||||
runVerify(outputDir, numReducers, expectedNumNodes);
|
runVerify(outputDir, numReducers, expectedNumNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
System.err.println("Parsing loop arguments failed: " + e.getMessage());
|
||||||
|
System.err.println(USAGE);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1399,13 +1610,47 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract static class WalkerBase extends Configured{
|
||||||
|
protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setStartRow(startKey);
|
||||||
|
scan.setBatch(1);
|
||||||
|
scan.addColumn(FAMILY_NAME, COLUMN_PREV);
|
||||||
|
|
||||||
|
long t1 = System.currentTimeMillis();
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
Result result = scanner.next();
|
||||||
|
long t2 = System.currentTimeMillis();
|
||||||
|
scanner.close();
|
||||||
|
|
||||||
|
if ( result != null) {
|
||||||
|
CINode node = getCINode(result, new CINode());
|
||||||
|
System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("FSR " + (t2 - t1));
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
|
||||||
|
Get get = new Get(row);
|
||||||
|
get.addColumn(FAMILY_NAME, COLUMN_PREV);
|
||||||
|
Result result = table.get(get);
|
||||||
|
return getCINode(result, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* A stand alone program that follows a linked list created by {@link Generator} and prints
|
* A stand alone program that follows a linked list created by {@link Generator} and prints
|
||||||
* timing info.
|
* timing info.
|
||||||
*/
|
*/
|
||||||
private static class Walker extends Configured implements Tool {
|
private static class Walker extends WalkerBase implements Tool {
|
||||||
|
|
||||||
|
public Walker(){}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws IOException {
|
public int run(String[] args) throws IOException {
|
||||||
|
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.addOption("n", "num", true, "number of queries");
|
options.addOption("n", "num", true, "number of queries");
|
||||||
options.addOption("s", "start", true, "key to start at, binary string");
|
options.addOption("s", "start", true, "key to start at, binary string");
|
||||||
|
@ -1432,6 +1677,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
Random rand = new SecureRandom();
|
Random rand = new SecureRandom();
|
||||||
boolean isSpecificStart = cmd.hasOption('s');
|
boolean isSpecificStart = cmd.hasOption('s');
|
||||||
|
|
||||||
byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
|
byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
|
||||||
int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
|
int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
|
||||||
|
|
||||||
|
@ -1451,12 +1697,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
|
System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
|
||||||
}
|
}
|
||||||
numQueries++;
|
numQueries++;
|
||||||
while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
|
while (node != null && node.prev.length != NO_KEY.length &&
|
||||||
|
numQueries < maxQueries) {
|
||||||
byte[] prev = node.prev;
|
byte[] prev = node.prev;
|
||||||
long t1 = System.currentTimeMillis();
|
long t1 = System.currentTimeMillis();
|
||||||
node = getNode(prev, table, node);
|
node = getNode(prev, table, node);
|
||||||
long t2 = System.currentTimeMillis();
|
long t2 = System.currentTimeMillis();
|
||||||
if (numQueries % logEvery == 0) {
|
if (logEvery > 0 && numQueries % logEvery == 0) {
|
||||||
System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
|
System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
|
||||||
}
|
}
|
||||||
numQueries++;
|
numQueries++;
|
||||||
|
@ -1467,41 +1714,10 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
table.close();
|
table.close();
|
||||||
connection.close();
|
connection.close();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CINode findStartNode(Table table, byte[] startKey) throws IOException {
|
|
||||||
Scan scan = new Scan();
|
|
||||||
scan.setStartRow(startKey);
|
|
||||||
scan.setBatch(1);
|
|
||||||
scan.addColumn(FAMILY_NAME, COLUMN_PREV);
|
|
||||||
|
|
||||||
long t1 = System.currentTimeMillis();
|
|
||||||
ResultScanner scanner = table.getScanner(scan);
|
|
||||||
Result result = scanner.next();
|
|
||||||
long t2 = System.currentTimeMillis();
|
|
||||||
scanner.close();
|
|
||||||
|
|
||||||
if ( result != null) {
|
|
||||||
CINode node = getCINode(result, new CINode());
|
|
||||||
System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
|
|
||||||
return node;
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("FSR " + (t2 - t1));
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CINode getNode(byte[] row, Table table, CINode node) throws IOException {
|
|
||||||
Get get = new Get(row);
|
|
||||||
get.addColumn(FAMILY_NAME, COLUMN_PREV);
|
|
||||||
Result result = table.get(get);
|
|
||||||
return getCINode(result, node);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Clean extends Configured implements Tool {
|
private static class Clean extends Configured implements Tool {
|
||||||
|
@ -1691,7 +1907,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setJobConf(Job job, int numMappers, long numNodes,
|
private static void setJobConf(Job job, int numMappers, long numNodes,
|
||||||
Integer width, Integer wrapMultiplier) {
|
Integer width, Integer wrapMultiplier, Integer numWalkers) {
|
||||||
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
|
job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
|
||||||
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
|
job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
|
||||||
if (width != null) {
|
if (width != null) {
|
||||||
|
@ -1700,6 +1916,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
||||||
if (wrapMultiplier != null) {
|
if (wrapMultiplier != null) {
|
||||||
job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
|
job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
|
||||||
}
|
}
|
||||||
|
if (numWalkers != null) {
|
||||||
|
job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void setJobScannerConf(Job job) {
|
public static void setJobScannerConf(Job job) {
|
||||||
|
|
|
@ -480,21 +480,22 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
|
protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
|
||||||
Integer wrapMuplitplier) throws Exception {
|
Integer wrapMultiplier, Integer numWalkers) throws Exception {
|
||||||
Path outputPath = new Path(outputDir);
|
Path outputPath = new Path(outputDir);
|
||||||
UUID uuid = UUID.randomUUID(); // create a random UUID.
|
UUID uuid = UUID.randomUUID(); // create a random UUID.
|
||||||
Path generatorOutput = new Path(outputPath, uuid.toString());
|
Path generatorOutput = new Path(outputPath, uuid.toString());
|
||||||
|
|
||||||
Generator generator = new VisibilityGenerator();
|
Generator generator = new VisibilityGenerator();
|
||||||
generator.setConf(getConf());
|
generator.setConf(getConf());
|
||||||
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
|
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
|
||||||
|
numWalkers);
|
||||||
if (retCode > 0) {
|
if (retCode > 0) {
|
||||||
throw new RuntimeException("Generator failed with return code: " + retCode);
|
throw new RuntimeException("Generator failed with return code: " + retCode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
|
protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
|
||||||
Integer wrapMuplitplier, int tableIndex) throws Exception {
|
Integer wrapMultiplier, int tableIndex) throws Exception {
|
||||||
LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
|
LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
|
||||||
Copier copier = new Copier(
|
Copier copier = new Copier(
|
||||||
IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
|
IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
|
||||||
|
@ -595,8 +596,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
|
||||||
String outputDir = args[3];
|
String outputDir = args[3];
|
||||||
int numReducers = Integer.parseInt(args[4]);
|
int numReducers = Integer.parseInt(args[4]);
|
||||||
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
|
||||||
Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
|
Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
|
||||||
|
|
||||||
long expectedNumNodes = 0;
|
long expectedNumNodes = 0;
|
||||||
|
|
||||||
if (numIterations < 0) {
|
if (numIterations < 0) {
|
||||||
|
@ -606,7 +606,8 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
|
||||||
for (int i = 0; i < numIterations; i++) {
|
for (int i = 0; i < numIterations; i++) {
|
||||||
LOG.info("Starting iteration = " + i);
|
LOG.info("Starting iteration = " + i);
|
||||||
LOG.info("Generating data");
|
LOG.info("Generating data");
|
||||||
runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
|
// By default run no concurrent walkers for test with visibility
|
||||||
|
runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, 0);
|
||||||
expectedNumNodes += numMappers * numNodes;
|
expectedNumNodes += numMappers * numNodes;
|
||||||
// Copying wont work because expressions are not returned back to the
|
// Copying wont work because expressions are not returned back to the
|
||||||
// client
|
// client
|
||||||
|
@ -619,7 +620,7 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB
|
||||||
sleep(SLEEP_IN_MS);
|
sleep(SLEEP_IN_MS);
|
||||||
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
|
for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
|
||||||
LOG.info("Deleting data on table with index: "+j);
|
LOG.info("Deleting data on table with index: "+j);
|
||||||
runDelete(numMappers, numNodes, outputDir, width, wrapMuplitplier, j);
|
runDelete(numMappers, numNodes, outputDir, width, wrapMultiplier, j);
|
||||||
sleep(SLEEP_IN_MS);
|
sleep(SLEEP_IN_MS);
|
||||||
LOG.info("Verifying common table after deleting");
|
LOG.info("Verifying common table after deleting");
|
||||||
runVerify(outputDir, numReducers, expectedNumNodes, j);
|
runVerify(outputDir, numReducers, expectedNumNodes, j);
|
||||||
|
|
|
@ -267,7 +267,8 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
|
||||||
Generator generator = new Generator();
|
Generator generator = new Generator();
|
||||||
generator.setConf(source.getConfiguration());
|
generator.setConf(source.getConfiguration());
|
||||||
|
|
||||||
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier);
|
// Disable concurrent walkers for IntegrationTestReplication
|
||||||
|
int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0);
|
||||||
if (retCode > 0) {
|
if (retCode > 0) {
|
||||||
throw new RuntimeException("Generator failed with return code: " + retCode);
|
throw new RuntimeException("Generator failed with return code: " + retCode);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue