HBASE-21256 Improve IntegrationTestBigLinkedList for testing huge data
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
a074269ee0
commit
2b1716fd8e
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
*
|
||||
* An instance of this class is used to generate a stream of
|
||||
* pseudorandom numbers. The class uses a 64-bit seed, which is
|
||||
* modified using a linear congruential formula.
|
||||
*
|
||||
* see https://en.wikipedia.org/wiki/Linear_congruential_generator
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class Random64 {
|
||||
|
||||
private static final long multiplier = 6364136223846793005L;
|
||||
private static final long addend = 1442695040888963407L;
|
||||
|
||||
private static final AtomicLong seedUniquifier
|
||||
= new AtomicLong(8682522807148012L);
|
||||
|
||||
private long seed;
|
||||
|
||||
/**
|
||||
* Copy from {@link Random#seedUniquifier()}
|
||||
*/
|
||||
private static long seedUniquifier() {
|
||||
for (; ; ) {
|
||||
long current = seedUniquifier.get();
|
||||
long next = current * 181783497276652981L;
|
||||
if (seedUniquifier.compareAndSet(current, next)) {
|
||||
return next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Random64() {
|
||||
this(seedUniquifier() ^ System.nanoTime());
|
||||
}
|
||||
|
||||
public Random64(long seed) {
|
||||
this.seed = seed;
|
||||
}
|
||||
|
||||
public long nextLong() {
|
||||
return next64(64);
|
||||
}
|
||||
|
||||
public void nextBytes(byte[] bytes) {
|
||||
for (int i = 0, len = bytes.length; i < len;) {
|
||||
// We regard seed as unsigned long, therefore used '>>>' instead of '>>'.
|
||||
for (long rnd = nextLong(), n = Math.min(len - i, Long.SIZE / Byte.SIZE);
|
||||
n-- > 0; rnd >>>= Byte.SIZE) {
|
||||
bytes[i++] = (byte) rnd;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long next64(int bits) {
|
||||
seed = seed * multiplier + addend;
|
||||
return seed >>> (64 - bits);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Random64 is a pseudorandom algorithm(LCG). Therefore, we will get same sequence
|
||||
* if seeds are the same. This main will test how many calls nextLong() it will
|
||||
* get the same seed.
|
||||
*
|
||||
* We do not need to save all numbers (that is too large). We could save
|
||||
* once every 100000 calls nextLong(). If it get a same seed, we can
|
||||
* detect this by calling nextLong() 100000 times continuously.
|
||||
*
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
long defaultTotalTestCnt = 1000000000000L; // 1 trillion
|
||||
|
||||
if (args.length == 1) {
|
||||
defaultTotalTestCnt = Long.parseLong(args[0]);
|
||||
}
|
||||
|
||||
Preconditions.checkArgument(defaultTotalTestCnt > 0, "totalTestCnt <= 0");
|
||||
|
||||
final int precision = 100000;
|
||||
final long totalTestCnt = defaultTotalTestCnt + precision;
|
||||
final int reportPeriod = 100 * precision;
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Do collision test, totalTestCnt=" + totalTestCnt);
|
||||
|
||||
Random64 rand = new Random64();
|
||||
Set<Long> longSet = new HashSet<>();
|
||||
|
||||
for (long cnt = 1; cnt <= totalTestCnt; cnt++) {
|
||||
final long randLong = rand.nextLong();
|
||||
|
||||
if (longSet.contains(randLong)) {
|
||||
System.err.println("Conflict! count=" + cnt);
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if (cnt % precision == 0) {
|
||||
if (!longSet.add(randLong)) {
|
||||
System.err.println("Conflict! count=" + cnt);
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
if (cnt % reportPeriod == 0) {
|
||||
long cost = System.currentTimeMillis() - startTime;
|
||||
long remainingMs = (long) (1.0 * (totalTestCnt - cnt) * cost / cnt);
|
||||
System.out.println(
|
||||
String.format(
|
||||
"Progress: %.3f%%, remaining %d minutes",
|
||||
100.0 * cnt / totalTestCnt, remainingMs / 60000
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
System.out.println("No collision!");
|
||||
}
|
||||
|
||||
}
|
|
@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.chaos.actions;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
@ -139,16 +141,13 @@ public class Action {
|
|||
return new ServerName [] {};
|
||||
}
|
||||
ServerName master = clusterStatus.getMasterName();
|
||||
if (master == null || !regionServers.contains(master)) {
|
||||
return regionServers.toArray(new ServerName[count]);
|
||||
}
|
||||
if (count == 1) {
|
||||
return new ServerName [] {};
|
||||
}
|
||||
Set<ServerName> masters = new HashSet<ServerName>();
|
||||
masters.add(master);
|
||||
masters.addAll(clusterStatus.getBackupMasterNames());
|
||||
ArrayList<ServerName> tmp = new ArrayList<>(count);
|
||||
tmp.addAll(regionServers);
|
||||
tmp.remove(master);
|
||||
return tmp.toArray(new ServerName[count-1]);
|
||||
tmp.removeAll(masters);
|
||||
return tmp.toArray(new ServerName[tmp.size()]);
|
||||
}
|
||||
|
||||
protected void killMaster(ServerName server) throws IOException {
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.test;
|
||||
|
||||
import java.io.DataInput;
|
||||
|
@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
|
@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
|
|||
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Random64;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
|
@ -113,6 +114,8 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
|
||||
|
@ -267,6 +270,15 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
|
||||
"generator.multiple.columnfamilies";
|
||||
|
||||
/**
|
||||
* Set this configuration if you want to scale up the size of test data quickly.
|
||||
* <p>
|
||||
* $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
|
||||
* -Dgenerator.big.family.value.size=1024 generator 1 10 output
|
||||
*/
|
||||
public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
|
||||
|
||||
|
||||
public static enum Counts {
|
||||
SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
|
||||
}
|
||||
|
@ -300,7 +312,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
|
||||
private long count;
|
||||
private long numNodes;
|
||||
private Random rand;
|
||||
private Random64 rand;
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
@ -327,8 +339,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
public void initialize(InputSplit arg0, TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
|
||||
// Use SecureRandom to avoid issue described in HBASE-13382.
|
||||
rand = new SecureRandom();
|
||||
// Use Random64 to avoid issue described in HBASE-21256.
|
||||
rand = new Random64();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -437,6 +449,36 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
|
||||
this.walkersStop = false;
|
||||
this.conf = context.getConfiguration();
|
||||
|
||||
if (multipleUnevenColumnFamilies) {
|
||||
int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
|
||||
int limit = context.getConfiguration().getInt(
|
||||
ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
|
||||
ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
|
||||
|
||||
Preconditions.checkArgument(
|
||||
n <= limit,
|
||||
"%s(%s) > %s(%s)",
|
||||
BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
|
||||
|
||||
bigValue = new byte[n];
|
||||
ThreadLocalRandom.current().nextBytes(bigValue);
|
||||
LOG.info("Create a bigValue with " + n + " bytes.");
|
||||
}
|
||||
|
||||
Preconditions.checkArgument(
|
||||
numNodes > 0,
|
||||
"numNodes(%s) <= 0",
|
||||
numNodes);
|
||||
Preconditions.checkArgument(
|
||||
numNodes % width == 0,
|
||||
"numNodes(%s) mod width(%s) != 0",
|
||||
numNodes, width);
|
||||
Preconditions.checkArgument(
|
||||
numNodes % wrap == 0,
|
||||
"numNodes(%s) mod wrap(%s) != 0",
|
||||
numNodes, wrap
|
||||
);
|
||||
}
|
||||
|
||||
protected void instantiateHTable() throws IOException {
|
||||
|
@ -457,9 +499,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
current[i] = new byte[key.getLength()];
|
||||
System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
|
||||
if (++i == current.length) {
|
||||
LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
|
||||
Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
|
||||
", i=" + i);
|
||||
LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
|
||||
current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
|
||||
persist(output, count, prev, current, id);
|
||||
i = 0;
|
||||
|
||||
|
@ -526,11 +567,6 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
if (this.multipleUnevenColumnFamilies) {
|
||||
// Use any column name.
|
||||
put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
|
||||
// If we've not allocated bigValue, do it now. Reuse same value each time.
|
||||
if (this.bigValue == null) {
|
||||
this.bigValue = new byte[current[i].length * 10];
|
||||
ThreadLocalRandom.current().nextBytes(this.bigValue);
|
||||
}
|
||||
// Use any column name.
|
||||
put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
|
||||
}
|
||||
|
@ -759,6 +795,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
FileOutputFormat.setOutputPath(job, tmpOutput);
|
||||
job.setOutputFormatClass(SequenceFileOutputFormat.class);
|
||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
|
||||
|
||||
boolean success = jobCompletion(job);
|
||||
|
||||
|
@ -1155,13 +1192,14 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
|
||||
// TODO check for more than one def, should not happen
|
||||
StringBuilder refsSb = null;
|
||||
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
|
||||
if (defCount == 0 || refs.size() != 1) {
|
||||
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
|
||||
refsSb = dumpExtraInfoOnRefs(key, context, refs);
|
||||
LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
|
||||
(refsSb != null? refsSb.toString(): ""));
|
||||
}
|
||||
if (lostFamilies) {
|
||||
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
|
||||
LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
|
||||
context.getCounter(Counts.LOST_FAMILIES).increment(1);
|
||||
context.write(key, LOSTFAM);
|
||||
|
@ -1188,6 +1226,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
// was added which can help a little debugging. This info is only available in mapper
|
||||
// output -- the 'Linked List error Key...' log message above. What we emit here is
|
||||
// useless for debugging.
|
||||
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
|
||||
context.getCounter("undef", keyString).increment(1);
|
||||
}
|
||||
} else if (defCount > 0 && refs.isEmpty()) {
|
||||
|
@ -1195,6 +1234,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
|
|||
context.write(key, UNREF);
|
||||
context.getCounter(Counts.UNREFERENCED).increment(1);
|
||||
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
|
||||
String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
|
||||
context.getCounter("unref", keyString).increment(1);
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue