HBASE-11842 Integration test for async wal replication to secondary regions

This commit is contained in:
Enis Soztutar 2015-02-22 21:29:12 -08:00
parent 7792dee0c3
commit 21b366afe1
14 changed files with 674 additions and 54 deletions

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -45,8 +46,8 @@ import com.google.common.collect.Sets;
public class IntegrationTestIngest extends IntegrationTestBase { public class IntegrationTestIngest extends IntegrationTestBase {
public static final char HIPHEN = '-'; public static final char HIPHEN = '-';
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
private static final long JUNIT_RUN_TIME = 10 * 60 * 1000; protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
/** A soft limit on how long we should run */ /** A soft limit on how long we should run */
protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
@ -66,6 +67,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
protected LoadTestTool loadTool; protected LoadTestTool loadTool;
protected String[] LOAD_TEST_TOOL_INIT_ARGS = { protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
LoadTestTool.OPT_COLUMN_FAMILIES,
LoadTestTool.OPT_COMPRESSION, LoadTestTool.OPT_COMPRESSION,
LoadTestTool.OPT_DATA_BLOCK_ENCODING, LoadTestTool.OPT_DATA_BLOCK_ENCODING,
LoadTestTool.OPT_INMEMORY, LoadTestTool.OPT_INMEMORY,
@ -78,7 +80,7 @@ public class IntegrationTestIngest extends IntegrationTestBase {
public void setUpCluster() throws Exception { public void setUpCluster() throws Exception {
util = getTestingUtil(getConf()); util = getTestingUtil(getConf());
LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
util.initializeCluster(SERVER_COUNT); util.initializeCluster(getMinServerCount());
LOG.debug("Done initializing/checking cluster"); LOG.debug("Done initializing/checking cluster");
cluster = util.getHBaseClusterInterface(); cluster = util.getHBaseClusterInterface();
deleteTableIfNecessary(); deleteTableIfNecessary();
@ -89,6 +91,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
initTable(); initTable();
} }
protected int getMinServerCount() {
return SERVER_COUNT;
}
protected void initTable() throws IOException { protected void initTable() throws IOException {
int ret = loadTool.run(getArgsForLoadTestToolInitTable()); int ret = loadTool.run(getArgsForLoadTestToolInitTable());
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
@ -125,7 +131,22 @@ public class IntegrationTestIngest extends IntegrationTestBase {
@Override @Override
protected Set<String> getColumnFamilies() { protected Set<String> getColumnFamilies() {
return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY)); Set<String> families = Sets.newHashSet();
String clazz = this.getClass().getSimpleName();
// parse conf for getting the column famly names because LTT is not initialized yet.
String familiesString = getConf().get(
String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
if (familiesString == null) {
for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
families.add(Bytes.toString(family));
}
} else {
for (String family : familiesString.split(",")) {
families.add(family);
}
}
return families;
} }
private void deleteTableIfNecessary() throws IOException { private void deleteTableIfNecessary() throws IOException {
@ -206,6 +227,8 @@ public class IntegrationTestIngest extends IntegrationTestBase {
List<String> args = new ArrayList<String>(); List<String> args = new ArrayList<String>();
args.add("-tn"); args.add("-tn");
args.add(getTablename().getNameAsString()); args.add(getTablename().getNameAsString());
args.add("-families");
args.add(getColumnFamiliesAsString());
args.add(mode); args.add(mode);
args.add(modeSpecificArg); args.add(modeSpecificArg);
args.add("-start_key"); args.add("-start_key");
@ -217,6 +240,10 @@ public class IntegrationTestIngest extends IntegrationTestBase {
return args.toArray(new String[args.size()]); return args.toArray(new String[args.size()]);
} }
private String getColumnFamiliesAsString() {
return StringUtils.join(",", getColumnFamilies());
}
/** Estimates a data size based on the cluster size */ /** Estimates a data size based on the cluster size */
protected long getNumKeys(long keysPerServer) protected long getNumKeys(long keysPerServer)
throws IOException { throws IOException {

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.hbase;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
/** /**
@ -39,7 +41,14 @@ public class IntegrationTestIngestStripeCompactions extends IntegrationTestInges
HTableDescriptor htd = new HTableDescriptor(getTablename()); HTableDescriptor htd = new HTableDescriptor(getTablename());
htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100"); htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100");
HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.COLUMN_FAMILY); HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY);
HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd); HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd);
} }
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int ret = ToolRunner.run(conf, new IntegrationTestIngestStripeCompactions(), args);
System.exit(ret);
}
} }

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.ConstantDelayQueue;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
import org.apache.hadoop.hbase.util.MultiThreadedWriter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Integration test for testing async wal replication to secondary region replicas. Sets up a table
* with given region replication (default 2), and uses LoadTestTool client writer, updater and
* reader threads for writes and reads and verification. It uses a delay queue with a given delay
* ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the
* written items available to readers. This means that a reader will only start reading from a row
* written by the writer / updater after 5secs has passed. The reader thread performs the reads from
* the given region replica id (default 1) to perform the reads. Async wal replication has to finish
* with the replication of the edits before read_delay_ms to the given region replica id so that
* the read and verify will not fail.
*
* The job will run for <b>at least<b> given runtime (default 10min) by running a concurrent
* writer and reader workload followed by a concurrent updater and reader workload for
* num_keys_per_server.
*<p>
* Example usage:
* <pre>
* hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
* -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
* -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
* -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
* -DIntegrationTestRegionReplicaReplication.region_replication=3
* -DIntegrationTestRegionReplicaReplication.region_replica_id=2
* -DIntegrationTestRegionReplicaReplication.num_read_threads=100
* -DIntegrationTestRegionReplicaReplication.num_write_threads=100
* </pre>
*/
@Category(IntegrationTests.class)
public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
private static final String TEST_NAME
= IntegrationTestRegionReplicaReplication.class.getSimpleName();
private static final String OPT_READ_DELAY_MS = "read_delay_ms";
private static final int DEFAULT_REGION_REPLICATION = 2;
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
@Override
protected int getMinServerCount() {
return SERVER_COUNT;
}
@Override
public void setConf(Configuration conf) {
conf.setIfUnset(
String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
String.valueOf(DEFAULT_REGION_REPLICATION));
conf.setIfUnset(
String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
conf.setBoolean("hbase.table.sanity.checks", true);
// enable async wal replication to region replicas for unit tests
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
conf.setInt("hbase.hstore.blockingStoreFiles", 100);
super.setConf(conf);
}
@Override
@Test
public void testIngest() throws Exception {
runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
}
@Override
protected void startMonkey() throws Exception {
// TODO: disabled for now
}
/**
* This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
* threads to become available to the MultiThradedReader threads. We add this delay because of
* the async nature of the wal replication to region replicas.
*/
public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
private long delayMs;
public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName) throws IOException {
super(dataGen, conf, tableName);
}
@Override
protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
this.delayMs = conf.getLong(String.format("%s.%s",
IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
}
}
/**
* This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
* threads to become available to the MultiThradedReader threads. We add this delay because of
* the async nature of the wal replication to region replicas.
*/
public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
private long delayMs;
public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, double updatePercent) throws IOException {
super(dataGen, conf, tableName, updatePercent);
}
@Override
protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
this.delayMs = conf.getLong(String.format("%s.%s",
IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
}
}
@Override
protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
int recordSize, int writeThreads, int readThreads) throws Exception {
LOG.info("Running ingest");
LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
// sleep for some time so that the cache for disabled tables does not interfere.
Threads.sleep(
getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
5000) + 1000);
long start = System.currentTimeMillis();
String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
long startKey = 0;
long numKeys = getNumKeys(keysPerServerPerIter);
while (System.currentTimeMillis() - start < 0.9 * runtime) {
LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
((runtime - (System.currentTimeMillis() - start))/60000) + " min");
int verifyPercent = 100;
int updatePercent = 20;
int ret = -1;
int regionReplicaId = conf.getInt(String.format("%s.%s"
, TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
// we will run writers and readers at the same time.
List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
args.add("-write");
args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
args.add("-" + LoadTestTool.OPT_MULTIPUT);
args.add("-writer");
args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class
args.add("-read");
args.add(String.format("%d:%d", verifyPercent, readThreads));
args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
args.add(String.valueOf(regionReplicaId));
ret = loadTool.run(args.toArray(new String[args.size()]));
if (0 != ret) {
String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
args.add("-update");
args.add(String.format("%s:%s:1", updatePercent, writeThreads));
args.add("-updater");
args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class
args.add("-read");
args.add(String.format("%d:%d", verifyPercent, readThreads));
args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
args.add(String.valueOf(regionReplicaId));
ret = loadTool.run(args.toArray(new String[args.size()]));
if (0 != ret) {
String errorMsg = "Load failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
startKey += numKeys;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
System.exit(ret);
}
}

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
@ -178,7 +179,11 @@ class MemStoreFlusher implements FlushRequester {
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
+ "Total Memstore size="
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+ ", Region memstore size="
+ humanReadableInt(regionToFlush.memstoreSize.get()));
flushedOne = flushRegion(regionToFlush, true, true); flushedOne = flushRegion(regionToFlush, true, true);
if (!flushedOne) { if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush + LOG.info("Excluding unflushable region " + regionToFlush +
@ -292,6 +297,7 @@ class MemStoreFlusher implements FlushRequester {
getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
} }
@Override
public void requestFlush(HRegion r, boolean forceFlushAllStores) { public void requestFlush(HRegion r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
@ -304,6 +310,7 @@ class MemStoreFlusher implements FlushRequester {
} }
} }
@Override
public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
@ -591,6 +598,7 @@ class MemStoreFlusher implements FlushRequester {
* Register a MemstoreFlushListener * Register a MemstoreFlushListener
* @param listener * @param listener
*/ */
@Override
public void registerFlushRequestListener(final FlushRequestListener listener) { public void registerFlushRequestListener(final FlushRequestListener listener) {
this.flushRequestListeners.add(listener); this.flushRequestListeners.add(listener);
} }
@ -600,6 +608,7 @@ class MemStoreFlusher implements FlushRequester {
* @param listener * @param listener
* @return true when passed listener is unregistered successfully. * @return true when passed listener is unregistered successfully.
*/ */
@Override
public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
return this.flushRequestListeners.remove(listener); return this.flushRequestListeners.remove(listener);
} }
@ -608,6 +617,7 @@ class MemStoreFlusher implements FlushRequester {
* Sets the global memstore limit to a new size. * Sets the global memstore limit to a new size.
* @param globalMemStoreSize * @param globalMemStoreSize
*/ */
@Override
public void setGlobalMemstoreLimit(long globalMemStoreSize) { public void setGlobalMemstoreLimit(long globalMemStoreSize) {
this.globalMemStoreLimit = globalMemStoreSize; this.globalMemStoreLimit = globalMemStoreSize;
this.globalMemStoreLimitLowMark = this.globalMemStoreLimitLowMark =

View File

@ -107,7 +107,7 @@ public class MetricsSource {
* *
* @param delta the number filtered. * @param delta the number filtered.
*/ */
private void incrLogEditsFiltered(long delta) { public void incrLogEditsFiltered(long delta) {
singleSourceSource.incrLogEditsFiltered(delta); singleSourceSource.incrLogEditsFiltered(delta);
globalSourceSource.incrLogEditsFiltered(delta); globalSourceSource.incrLogEditsFiltered(delta);
} }

View File

@ -232,6 +232,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
entryBuffers.appendEntry(entry); entryBuffers.appendEntry(entry);
} }
outputSink.flush(); // make sure everything is flushed outputSink.flush(); // make sure everything is flushed
ctx.getMetrics().incrLogEditsFiltered(
outputSink.getSkippedEditsCounter().getAndSet(0));
return true; return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -341,24 +343,58 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
List<Entry> entries) throws IOException { List<Entry> entries) throws IOException {
if (disabledAndDroppedTables.getIfPresent(tableName) != null) { if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
sink.getSkippedEditsCounter().incrementAndGet(); if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ " is cached as a disabled or dropped table");
}
sink.getSkippedEditsCounter().addAndGet(entries.size());
return; return;
} }
// get the replicas of the primary region // If the table is disabled or dropped, we should not replay the entries, and we can skip
// replaying them. However, we might not know whether the table is disabled until we
// invalidate the cache and check from meta
RegionLocations locations = null; RegionLocations locations = null;
try { boolean useCache = true;
locations = getRegionLocations(connection, tableName, row, true, 0); while (true) {
// get the replicas of the primary region
try {
locations = getRegionLocations(connection, tableName, row, useCache, 0);
if (locations == null) { if (locations == null) {
throw new HBaseIOException("Cannot locate locations for " throw new HBaseIOException("Cannot locate locations for "
+ tableName + ", row:" + Bytes.toStringBinary(row)); + tableName + ", row:" + Bytes.toStringBinary(row));
}
} catch (TableNotFoundException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ " is dropped. Adding table to cache.");
}
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
// skip this entry
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
} }
} catch (TableNotFoundException e) {
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored // check whether we should still replay this entry. If the regions are changed, or the
// skip this entry // entry is not coming from the primary region, filter it out.
sink.getSkippedEditsCounter().addAndGet(entries.size()); HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
return; if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
encodedRegionName)) {
if (useCache) {
useCache = false;
continue; // this will retry location lookup
}
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region region " + primaryLocation.getRegionInfo().getEncodedName()
+ " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
+ " from WALEdit");
}
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
break;
} }
if (locations.size() == 1) { if (locations.size() == 1) {
@ -366,17 +402,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
} }
ArrayList<Future<ReplicateWALEntryResponse>> tasks ArrayList<Future<ReplicateWALEntryResponse>> tasks
= new ArrayList<Future<ReplicateWALEntryResponse>>(2); = new ArrayList<Future<ReplicateWALEntryResponse>>(locations.size() - 1);
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out.
HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
encodedRegionName)) {
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
// All passed entries should belong to one region because it is coming from the EntryBuffers // All passed entries should belong to one region because it is coming from the EntryBuffers
// split per region. But the regions might split and merge (unlike log recovery case). // split per region. But the regions might split and merge (unlike log recovery case).
@ -413,6 +439,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// check whether the table is dropped or disabled which might cause // check whether the table is dropped or disabled which might cause
// SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE. // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because received exception for dropped or disabled table", cause);
}
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
if (!tasksCancelled) { if (!tasksCancelled) {
sink.getSkippedEditsCounter().addAndGet(entries.size()); sink.getSkippedEditsCounter().addAndGet(entries.size());
@ -490,6 +520,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
initialEncodedRegionName)) { initialEncodedRegionName)) {
skip = true; skip = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region region " + location.getRegionInfo().getEncodedName()
+ " is different than the original region "
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
}
return null; return null;
} }
@ -504,7 +540,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout) private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
throws IOException { throws IOException {
if (entries.isEmpty() || skip) { if (entries.isEmpty() || skip) {
skippedEntries.incrementAndGet(); skippedEntries.addAndGet(entries.size());
return ReplicateWALEntryResponse.newBuilder().build(); return ReplicateWALEntryResponse.newBuilder().build();
} }

View File

@ -691,8 +691,10 @@ public class ReplicationSource extends Thread
} }
replicateContext.setEntries(entries).setSize(currentSize); replicateContext.setEntries(entries).setSize(currentSize);
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext); boolean replicated = replicationEndpoint.replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) { if (!replicated) {
continue; continue;
@ -713,7 +715,8 @@ public class ReplicationSource extends Thread
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
+ this.totalReplicatedOperations + " operations"); + this.totalReplicatedOperations + " operations in " +
((endTimeNs - startTimeNs)/1000000) + " ms");
} }
break; break;
} catch (Exception ex) { } catch (Exception ex) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -87,7 +88,7 @@ public class CompressionTest {
return ; // already passed test, dont do it again. return ; // already passed test, dont do it again.
} else { } else {
// failed. // failed.
throw new IOException("Compression algorithm '" + algo.getName() + "'" + throw new DoNotRetryIOException("Compression algorithm '" + algo.getName() + "'" +
" previously failed test."); " previously failed test.");
} }
} }
@ -98,7 +99,7 @@ public class CompressionTest {
compressionTestResults[algo.ordinal()] = true; // passes compressionTestResults[algo.ordinal()] = true; // passes
} catch (Throwable t) { } catch (Throwable t) {
compressionTestResults[algo.ordinal()] = false; // failure compressionTestResults[algo.ordinal()] = false; // failure
throw new IOException(t); throw new DoNotRetryIOException(t);
} }
} }

View File

@ -3571,6 +3571,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer); return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
} }
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
TableName tableName, byte[][] columnFamilies, Algorithm compression,
DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
Durability durability)
throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.setDurability(durability);
desc.setRegionReplication(regionReplication);
HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
for (int i = 0; i < columnFamilies.length; i++) {
HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
hcd.setDataBlockEncoding(dataBlockEncoding);
hcd.setCompressionType(compression);
hcds[i] = hcd;
}
return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
}
/** /**
* Creates a pre-split table for load testing. If the table already exists, * Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues. * logs a warning and continues.
@ -3588,8 +3611,21 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/ */
public static int createPreSplitLoadTestTable(Configuration conf, public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException { HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
if (!desc.hasFamily(hcd.getName())) { return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
desc.addFamily(hcd); numRegionsPerServer);
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
for (HColumnDescriptor hcd : hcds) {
if (!desc.hasFamily(hcd.getName())) {
desc.addFamily(hcd);
}
} }
int totalNumberOfRegions = 0; int totalNumberOfRegions = 0;

View File

@ -242,6 +242,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
replicator.init(context); replicator.init(context);
replicator.start(); replicator.start();

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store
* @param <E> type of elements
*/
@InterfaceAudience.Private
public class ConstantDelayQueue<E> implements BlockingQueue<E> {
private static final class DelayedElement<T> implements Delayed {
T element;
long end;
public DelayedElement(T element, long delayMs) {
this.element = element;
this.end = EnvironmentEdgeManager.currentTime() + delayMs;
}
@Override
public int compareTo(Delayed o) {
long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
private final long delayMs;
// backing DelayQueue
private DelayQueue<DelayedElement<E>> queue = new DelayQueue<DelayedElement<E>>();
public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
}
@Override
public E remove() {
DelayedElement<E> el = queue.remove();
return el == null ? null : el.element;
}
@Override
public E poll() {
DelayedElement<E> el = queue.poll();
return el == null ? null : el.element;
}
@Override
public E element() {
DelayedElement<E> el = queue.element();
return el == null ? null : el.element;
}
@Override
public E peek() {
DelayedElement<E> el = queue.peek();
return el == null ? null : el.element;
}
@Override
public int size() {
return queue.size();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public boolean addAll(Collection<? extends E> c) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean add(E e) {
return queue.add(new DelayedElement<E>(e, delayMs));
}
@Override
public boolean offer(E e) {
return queue.offer(new DelayedElement<E>(e, delayMs));
}
@Override
public void put(E e) throws InterruptedException {
queue.put(new DelayedElement<E>(e, delayMs));
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(new DelayedElement<E>(e, delayMs), timeout, unit);
}
@Override
public E take() throws InterruptedException {
DelayedElement<E> el = queue.take();
return el == null ? null : el.element;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
DelayedElement<E> el = queue.poll(timeout, unit);
return el == null ? null : el.element;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public boolean contains(Object o) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public int drainTo(Collection<? super E> c) {
throw new UnsupportedOperationException(); // not implemented yet
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
throw new UnsupportedOperationException(); // not implemented yet
}
}

View File

@ -74,14 +74,17 @@ public class LoadTestTool extends AbstractHBaseTool {
/** Table name for the test */ /** Table name for the test */
private TableName tableName; private TableName tableName;
/** Column families for the test */
private byte[][] families;
/** Table name to use of not overridden on the command line */ /** Table name to use of not overridden on the command line */
protected static final String DEFAULT_TABLE_NAME = "cluster_test"; protected static final String DEFAULT_TABLE_NAME = "cluster_test";
/** Column family used by the test */ /** Column family used by the test */
public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf"); public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
/** Column families used by the test */ /** Column families used by the test */
protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY }; public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
/** The default data size if not specified */ /** The default data size if not specified */
protected static final int DEFAULT_DATA_SIZE = 64; protected static final int DEFAULT_DATA_SIZE = 64;
@ -130,18 +133,25 @@ public class LoadTestTool extends AbstractHBaseTool {
public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
+ " Any args for this class can be passed as colon separated after class name"; + " Any args for this class can be passed as colon separated after class name";
public static final String OPT_WRITER = "writer";
public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
public static final String OPT_UPDATER = "updater";
public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
public static final String OPT_READER = "reader"; public static final String OPT_READER = "reader";
public static final String OPT_READER_USAGE = "The class for executing the read requests"; public static final String OPT_READER_USAGE = "The class for executing the read requests";
protected static final String OPT_KEY_WINDOW = "key_window"; protected static final String OPT_KEY_WINDOW = "key_window";
protected static final String OPT_WRITE = "write"; protected static final String OPT_WRITE = "write";
protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
protected static final String OPT_MULTIPUT = "multiput"; public static final String OPT_MULTIPUT = "multiput";
public static final String OPT_MULTIGET = "multiget_batchsize"; public static final String OPT_MULTIGET = "multiget_batchsize";
protected static final String OPT_NUM_KEYS = "num_keys"; protected static final String OPT_NUM_KEYS = "num_keys";
protected static final String OPT_READ = "read"; protected static final String OPT_READ = "read";
protected static final String OPT_START_KEY = "start_key"; protected static final String OPT_START_KEY = "start_key";
public static final String OPT_TABLE_NAME = "tn"; public static final String OPT_TABLE_NAME = "tn";
public static final String OPT_COLUMN_FAMILIES = "families";
protected static final String OPT_ZK_QUORUM = "zk"; protected static final String OPT_ZK_QUORUM = "zk";
protected static final String OPT_ZK_PARENT_NODE = "zk_root"; protected static final String OPT_ZK_PARENT_NODE = "zk_root";
protected static final String OPT_SKIP_INIT = "skip_init"; protected static final String OPT_SKIP_INIT = "skip_init";
@ -245,6 +255,10 @@ public class LoadTestTool extends AbstractHBaseTool {
return parseInt(numThreadsStr, 1, Short.MAX_VALUE); return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
} }
public byte[][] getColumnFamilies() {
return families;
}
/** /**
* Apply column family options such as Bloom filters, compression, and data * Apply column family options such as Bloom filters, compression, and data
* block encoding. * block encoding.
@ -298,6 +312,7 @@ public class LoadTestTool extends AbstractHBaseTool {
"without port numbers"); "without port numbers");
addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
addOptWithArg(OPT_READ, OPT_USAGE_READ); addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
@ -320,6 +335,8 @@ public class LoadTestTool extends AbstractHBaseTool {
"separate updates for every column in a row"); "separate updates for every column in a row");
addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
addOptWithArg(OPT_READER, OPT_READER_USAGE); addOptWithArg(OPT_READER, OPT_READER_USAGE);
addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
@ -352,6 +369,16 @@ public class LoadTestTool extends AbstractHBaseTool {
tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
DEFAULT_TABLE_NAME)); DEFAULT_TABLE_NAME));
if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
families = new byte[list.length][];
for (int i = 0; i < list.length; i++) {
families[i] = Bytes.toBytes(list[i]);
}
} else {
families = DEFAULT_COLUMN_FAMILIES;
}
isWrite = cmd.hasOption(OPT_WRITE); isWrite = cmd.hasOption(OPT_WRITE);
isRead = cmd.hasOption(OPT_READ); isRead = cmd.hasOption(OPT_READ);
isUpdate = cmd.hasOption(OPT_UPDATE); isUpdate = cmd.hasOption(OPT_UPDATE);
@ -503,9 +530,9 @@ public class LoadTestTool extends AbstractHBaseTool {
} }
HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
regionReplication, durability); regionReplication, durability);
applyColumnFamilyOptions(tableName, COLUMN_FAMILIES); applyColumnFamilyOptions(tableName, getColumnFamilies());
} }
@Override @Override
@ -570,7 +597,7 @@ public class LoadTestTool extends AbstractHBaseTool {
} else { } else {
// Default DataGenerator is MultiThreadedAction.DefaultDataGenerator // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
minColsPerKey, maxColsPerKey, COLUMN_FAMILY); minColsPerKey, maxColsPerKey, families);
} }
if (userOwner != null) { if (userOwner != null) {
@ -603,7 +630,14 @@ public class LoadTestTool extends AbstractHBaseTool {
if (userOwner != null) { if (userOwner != null) {
writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
} else { } else {
writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); String writerClass = null;
if (cmd.hasOption(OPT_WRITER)) {
writerClass = cmd.getOptionValue(OPT_WRITER);
} else {
writerClass = MultiThreadedWriter.class.getCanonicalName();
}
writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
} }
writerThreads.setMultiPut(isMultiPut); writerThreads.setMultiPut(isMultiPut);
} }
@ -613,7 +647,13 @@ public class LoadTestTool extends AbstractHBaseTool {
updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
userOwner, userNames); userOwner, userNames);
} else { } else {
updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); String updaterClass = null;
if (cmd.hasOption(OPT_UPDATER)) {
updaterClass = cmd.getOptionValue(OPT_UPDATER);
} else {
updaterClass = MultiThreadedUpdater.class.getCanonicalName();
}
updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
} }
updaterThreads.setBatchUpdate(isBatchUpdate); updaterThreads.setBatchUpdate(isBatchUpdate);
updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
@ -700,7 +740,32 @@ public class LoadTestTool extends AbstractHBaseTool {
Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
byte[][].class); byte[][].class);
return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
minColsPerKey, maxColsPerKey, COLUMN_FAMILIES); minColsPerKey, maxColsPerKey, families);
} catch (Exception e) {
throw new IOException(e);
}
}
private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
, LoadTestDataGenerator dataGen) throws IOException {
try {
Class<?> clazz = Class.forName(clazzName);
Constructor<?> constructor = clazz.getConstructor(
LoadTestDataGenerator.class, Configuration.class, TableName.class);
return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
} catch (Exception e) {
throw new IOException(e);
}
}
private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
, LoadTestDataGenerator dataGen) throws IOException {
try {
Class<?> clazz = Class.forName(clazzName);
Constructor<?> constructor = clazz.getConstructor(
LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
return (MultiThreadedUpdater) constructor.newInstance(
dataGen, conf, tableName, updatePercent);
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }

View File

@ -46,7 +46,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
* {@link #wroteUpToKey}, the maximum key in the contiguous range of keys * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
* being inserted/updated. This queue is supposed to stay small. * being inserted/updated. This queue is supposed to stay small.
*/ */
protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000); protected BlockingQueue<Long> wroteKeys;
/** /**
* This is the current key to be inserted/updated by any thread. Each thread does an * This is the current key to be inserted/updated by any thread. Each thread does an
@ -75,6 +75,11 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
TableName tableName, String actionLetter) throws IOException { TableName tableName, String actionLetter) throws IOException {
super(dataGen, conf, tableName, actionLetter); super(dataGen, conf, tableName, actionLetter);
this.wroteKeys = createWriteKeysQueue(conf);
}
protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
return new ArrayBlockingQueue<Long>(10000);
} }
@Override @Override

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -81,7 +80,8 @@ public class RestartMetaTest extends AbstractHBaseTool {
// start the writers // start the writers
LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator( LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(
minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY); minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey,
LoadTestTool.DEFAULT_COLUMN_FAMILY);
MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true); writer.setMultiPut(true);
writer.start(startKey, endKey, numThreads); writer.start(startKey, endKey, numThreads);
@ -101,7 +101,7 @@ public class RestartMetaTest extends AbstractHBaseTool {
// create tables if needed // create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME, HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
LoadTestTool.COLUMN_FAMILY, Compression.Algorithm.NONE, LoadTestTool.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
DataBlockEncoding.NONE); DataBlockEncoding.NONE);
LOG.debug("Loading data....\n\n"); LOG.debug("Loading data....\n\n");