diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java index c0c54b7879b..8495889a3a7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LoadTestTool; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; @@ -45,8 +46,8 @@ import com.google.common.collect.Sets; public class IntegrationTestIngest extends IntegrationTestBase { public static final char HIPHEN = '-'; private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster - private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; - private static final long JUNIT_RUN_TIME = 10 * 60 * 1000; + protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; + protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000; /** A soft limit on how long we should run */ protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; @@ -66,6 +67,7 @@ public class IntegrationTestIngest extends IntegrationTestBase { protected LoadTestTool loadTool; protected String[] LOAD_TEST_TOOL_INIT_ARGS = { + LoadTestTool.OPT_COLUMN_FAMILIES, LoadTestTool.OPT_COMPRESSION, LoadTestTool.OPT_DATA_BLOCK_ENCODING, LoadTestTool.OPT_INMEMORY, @@ -78,7 +80,7 @@ public class IntegrationTestIngest extends IntegrationTestBase { public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); - util.initializeCluster(SERVER_COUNT); + util.initializeCluster(getMinServerCount()); LOG.debug("Done initializing/checking cluster"); cluster = util.getHBaseClusterInterface(); deleteTableIfNecessary(); @@ -89,6 +91,10 @@ public class IntegrationTestIngest extends IntegrationTestBase { initTable(); } + protected int getMinServerCount() { + return SERVER_COUNT; + } + protected void initTable() throws IOException { int ret = loadTool.run(getArgsForLoadTestToolInitTable()); Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); @@ -125,7 +131,22 @@ public class IntegrationTestIngest extends IntegrationTestBase { @Override protected Set getColumnFamilies() { - return Sets.newHashSet(Bytes.toString(LoadTestTool.COLUMN_FAMILY)); + Set families = Sets.newHashSet(); + String clazz = this.getClass().getSimpleName(); + // parse conf for getting the column famly names because LTT is not initialized yet. + String familiesString = getConf().get( + String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES)); + if (familiesString == null) { + for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) { + families.add(Bytes.toString(family)); + } + } else { + for (String family : familiesString.split(",")) { + families.add(family); + } + } + + return families; } private void deleteTableIfNecessary() throws IOException { @@ -206,6 +227,8 @@ public class IntegrationTestIngest extends IntegrationTestBase { List args = new ArrayList(); args.add("-tn"); args.add(getTablename().getNameAsString()); + args.add("-families"); + args.add(getColumnFamiliesAsString()); args.add(mode); args.add(modeSpecificArg); args.add("-start_key"); @@ -217,6 +240,10 @@ public class IntegrationTestIngest extends IntegrationTestBase { return args.toArray(new String[args.size()]); } + private String getColumnFamiliesAsString() { + return StringUtils.join(",", getColumnFamilies()); + } + /** Estimates a data size based on the cluster size */ protected long getNumKeys(long keysPerServer) throws IOException { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java index ebf159e3594..d64fbb07952 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestStripeCompactions.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.util.ToolRunner; import org.junit.experimental.categories.Category; /** @@ -39,7 +41,14 @@ public class IntegrationTestIngestStripeCompactions extends IntegrationTestInges HTableDescriptor htd = new HTableDescriptor(getTablename()); htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "100"); - HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.COLUMN_FAMILY); + HColumnDescriptor hcd = new HColumnDescriptor(LoadTestTool.DEFAULT_COLUMN_FAMILY); HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), htd, hcd); } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestIngestStripeCompactions(), args); + System.exit(ret); + } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java new file mode 100644 index 00000000000..30da5c01600 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaReplication.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.ConstantDelayQueue; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.MultiThreadedUpdater; +import org.apache.hadoop.hbase.util.MultiThreadedWriter; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * Integration test for testing async wal replication to secondary region replicas. Sets up a table + * with given region replication (default 2), and uses LoadTestTool client writer, updater and + * reader threads for writes and reads and verification. It uses a delay queue with a given delay + * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the + * written items available to readers. This means that a reader will only start reading from a row + * written by the writer / updater after 5secs has passed. The reader thread performs the reads from + * the given region replica id (default 1) to perform the reads. Async wal replication has to finish + * with the replication of the edits before read_delay_ms to the given region replica id so that + * the read and verify will not fail. + * + * The job will run for at least given runtime (default 10min) by running a concurrent + * writer and reader workload followed by a concurrent updater and reader workload for + * num_keys_per_server. + *

+ * Example usage: + *

+ * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
+ * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
+ * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
+ * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
+ * -DIntegrationTestRegionReplicaReplication.region_replication=3
+ * -DIntegrationTestRegionReplicaReplication.region_replica_id=2
+ * -DIntegrationTestRegionReplicaReplication.num_read_threads=100
+ * -DIntegrationTestRegionReplicaReplication.num_write_threads=100
+ * 
+ */ +@Category(IntegrationTests.class) +public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest { + + private static final String TEST_NAME + = IntegrationTestRegionReplicaReplication.class.getSimpleName(); + + private static final String OPT_READ_DELAY_MS = "read_delay_ms"; + + private static final int DEFAULT_REGION_REPLICATION = 2; + private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster + private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"}; + + @Override + protected int getMinServerCount() { + return SERVER_COUNT; + } + + @Override + public void setConf(Configuration conf) { + conf.setIfUnset( + String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION), + String.valueOf(DEFAULT_REGION_REPLICATION)); + + conf.setIfUnset( + String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES), + StringUtils.join(",", DEFAULT_COLUMN_FAMILIES)); + + conf.setBoolean("hbase.table.sanity.checks", true); + + // enable async wal replication to region replicas for unit tests + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB + conf.setInt("hbase.hstore.blockingStoreFiles", 100); + + super.setConf(conf); + } + + @Override + @Test + public void testIngest() throws Exception { + runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20); + } + + @Override + protected void startMonkey() throws Exception { + // TODO: disabled for now + } + + /** + * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer + * threads to become available to the MultiThradedReader threads. We add this delay because of + * the async nature of the wal replication to region replicas. + */ + public static class DelayingMultiThreadedWriter extends MultiThreadedWriter { + private long delayMs; + public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName) throws IOException { + super(dataGen, conf, tableName); + } + @Override + protected BlockingQueue createWriteKeysQueue(Configuration conf) { + this.delayMs = conf.getLong(String.format("%s.%s", + IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000); + return new ConstantDelayQueue(TimeUnit.MILLISECONDS, delayMs); + } + } + + /** + * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer + * threads to become available to the MultiThradedReader threads. We add this delay because of + * the async nature of the wal replication to region replicas. + */ + public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater { + private long delayMs; + public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double updatePercent) throws IOException { + super(dataGen, conf, tableName, updatePercent); + } + @Override + protected BlockingQueue createWriteKeysQueue(Configuration conf) { + this.delayMs = conf.getLong(String.format("%s.%s", + IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000); + return new ConstantDelayQueue(TimeUnit.MILLISECONDS, delayMs); + } + } + + @Override + protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, + int recordSize, int writeThreads, int readThreads) throws Exception { + + LOG.info("Running ingest"); + LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize()); + + // sleep for some time so that the cache for disabled tables does not interfere. + Threads.sleep( + getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", + 5000) + 1000); + + long start = System.currentTimeMillis(); + String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); + long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); + long startKey = 0; + + long numKeys = getNumKeys(keysPerServerPerIter); + while (System.currentTimeMillis() - start < 0.9 * runtime) { + LOG.info("Intended run time: " + (runtime/60000) + " min, left:" + + ((runtime - (System.currentTimeMillis() - start))/60000) + " min"); + + int verifyPercent = 100; + int updatePercent = 20; + int ret = -1; + int regionReplicaId = conf.getInt(String.format("%s.%s" + , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1); + + // we will run writers and readers at the same time. + List args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys)); + args.add("-write"); + args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads)); + args.add("-" + LoadTestTool.OPT_MULTIPUT); + args.add("-writer"); + args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class + args.add("-read"); + args.add(String.format("%d:%d", verifyPercent, readThreads)); + args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID); + args.add(String.valueOf(regionReplicaId)); + + ret = loadTool.run(args.toArray(new String[args.size()])); + if (0 != ret) { + String errorMsg = "Load failed with error code " + ret; + LOG.error(errorMsg); + Assert.fail(errorMsg); + } + + args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys)); + args.add("-update"); + args.add(String.format("%s:%s:1", updatePercent, writeThreads)); + args.add("-updater"); + args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class + args.add("-read"); + args.add(String.format("%d:%d", verifyPercent, readThreads)); + args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID); + args.add(String.valueOf(regionReplicaId)); + + ret = loadTool.run(args.toArray(new String[args.size()])); + if (0 != ret) { + String errorMsg = "Load failed with error code " + ret; + LOG.error(errorMsg); + Assert.fail(errorMsg); + } + startKey += numKeys; + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args); + System.exit(ret); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index e5ad5908cb8..6268b7824d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.util.StringUtils.humanReadableInt; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; @@ -105,9 +106,9 @@ class MemStoreFlusher implements FlushRequester { long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true); this.globalMemStoreLimit = (long) (max * globalMemStorePercent); - this.globalMemStoreLimitLowMarkPercent = + this.globalMemStoreLimitLowMarkPercent = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent); - this.globalMemStoreLimitLowMark = + this.globalMemStoreLimitLowMark = (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", @@ -178,7 +179,11 @@ class MemStoreFlusher implements FlushRequester { Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); - LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + + "Total Memstore size=" + + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + + ", Region memstore size=" + + humanReadableInt(regionToFlush.memstoreSize.get())); flushedOne = flushRegion(regionToFlush, true, true); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + @@ -292,6 +297,7 @@ class MemStoreFlusher implements FlushRequester { getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; } + @Override public void requestFlush(HRegion r, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { @@ -304,6 +310,7 @@ class MemStoreFlusher implements FlushRequester { } } + @Override public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { @@ -591,6 +598,7 @@ class MemStoreFlusher implements FlushRequester { * Register a MemstoreFlushListener * @param listener */ + @Override public void registerFlushRequestListener(final FlushRequestListener listener) { this.flushRequestListeners.add(listener); } @@ -600,6 +608,7 @@ class MemStoreFlusher implements FlushRequester { * @param listener * @return true when passed listener is unregistered successfully. */ + @Override public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { return this.flushRequestListeners.remove(listener); } @@ -608,9 +617,10 @@ class MemStoreFlusher implements FlushRequester { * Sets the global memstore limit to a new size. * @param globalMemStoreSize */ + @Override public void setGlobalMemstoreLimit(long globalMemStoreSize) { this.globalMemStoreLimit = globalMemStoreSize; - this.globalMemStoreLimitLowMark = + this.globalMemStoreLimitLowMark = (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize); reclaimMemStoreMemory(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 21296a011e8..04c3d2d4b8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -107,7 +107,7 @@ public class MetricsSource { * * @param delta the number filtered. */ - private void incrLogEditsFiltered(long delta) { + public void incrLogEditsFiltered(long delta) { singleSourceSource.incrLogEditsFiltered(delta); globalSourceSource.incrLogEditsFiltered(delta); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index fc19603d815..b38a0e65ba0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -79,8 +79,8 @@ import com.google.common.cache.CacheBuilder; import com.google.protobuf.ServiceException; /** - * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint - * which receives the WAL edits from the WAL, and sends the edits to replicas + * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint + * which receives the WAL edits from the WAL, and sends the edits to replicas * of regions. */ @InterfaceAudience.Private @@ -232,6 +232,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { entryBuffers.appendEntry(entry); } outputSink.flush(); // make sure everything is flushed + ctx.getMetrics().incrLogEditsFiltered( + outputSink.getSkippedEditsCounter().getAndSet(0)); return true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -341,24 +343,58 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { List entries) throws IOException { if (disabledAndDroppedTables.getIfPresent(tableName) != null) { - sink.getSkippedEditsCounter().incrementAndGet(); + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries because table " + tableName + + " is cached as a disabled or dropped table"); + } + sink.getSkippedEditsCounter().addAndGet(entries.size()); return; } - // get the replicas of the primary region + // If the table is disabled or dropped, we should not replay the entries, and we can skip + // replaying them. However, we might not know whether the table is disabled until we + // invalidate the cache and check from meta RegionLocations locations = null; - try { - locations = getRegionLocations(connection, tableName, row, true, 0); + boolean useCache = true; + while (true) { + // get the replicas of the primary region + try { + locations = getRegionLocations(connection, tableName, row, useCache, 0); - if (locations == null) { - throw new HBaseIOException("Cannot locate locations for " - + tableName + ", row:" + Bytes.toStringBinary(row)); + if (locations == null) { + throw new HBaseIOException("Cannot locate locations for " + + tableName + ", row:" + Bytes.toStringBinary(row)); + } + } catch (TableNotFoundException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries because table " + tableName + + " is dropped. Adding table to cache."); + } + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored + // skip this entry + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; } - } catch (TableNotFoundException e) { - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored - // skip this entry - sink.getSkippedEditsCounter().addAndGet(entries.size()); - return; + + // check whether we should still replay this entry. If the regions are changed, or the + // entry is not coming from the primary region, filter it out. + HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); + if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), + encodedRegionName)) { + if (useCache) { + useCache = false; + continue; // this will retry location lookup + } + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + + " because located region region " + primaryLocation.getRegionInfo().getEncodedName() + + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) + + " from WALEdit"); + } + sink.getSkippedEditsCounter().addAndGet(entries.size()); + return; + } + break; } if (locations.size() == 1) { @@ -366,17 +402,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } ArrayList> tasks - = new ArrayList>(2); - - // check whether we should still replay this entry. If the regions are changed, or the - // entry is not coming form the primary region, filter it out. - HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); - if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), - encodedRegionName)) { - sink.getSkippedEditsCounter().addAndGet(entries.size()); - return; - } - + = new ArrayList>(locations.size() - 1); // All passed entries should belong to one region because it is coming from the EntryBuffers // split per region. But the regions might split and merge (unlike log recovery case). @@ -413,6 +439,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { // check whether the table is dropped or disabled which might cause // SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE. if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + + " because received exception for dropped or disabled table", cause); + } disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. if (!tasksCancelled) { sink.getSkippedEditsCounter().addAndGet(entries.size()); @@ -490,6 +520,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), initialEncodedRegionName)) { skip = true; + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping " + entries.size() + " entries in table " + tableName + + " because located region region " + location.getRegionInfo().getEncodedName() + + " is different than the original region " + + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); + } return null; } @@ -504,7 +540,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private ReplicateWALEntryResponse replayToServer(List entries, int timeout) throws IOException { if (entries.isEmpty() || skip) { - skippedEntries.incrementAndGet(); + skippedEntries.addAndGet(entries.size()); return ReplicateWALEntryResponse.newBuilder().build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 714080fee68..794a3e155c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -691,8 +691,10 @@ public class ReplicationSource extends Thread } replicateContext.setEntries(entries).setSize(currentSize); + long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged boolean replicated = replicationEndpoint.replicate(replicateContext); + long endTimeNs = System.nanoTime(); if (!replicated) { continue; @@ -713,7 +715,8 @@ public class ReplicationSource extends Thread this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); if (LOG.isTraceEnabled()) { LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " - + this.totalReplicatedOperations + " operations"); + + this.totalReplicatedOperations + " operations in " + + ((endTimeNs - startTimeNs)/1000000) + " ms"); } break; } catch (Exception ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java index 5ec13f4861d..cdef12f92e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CompressionTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.io.compress.Compression; @@ -87,7 +88,7 @@ public class CompressionTest { return ; // already passed test, dont do it again. } else { // failed. - throw new IOException("Compression algorithm '" + algo.getName() + "'" + + throw new DoNotRetryIOException("Compression algorithm '" + algo.getName() + "'" + " previously failed test."); } } @@ -98,7 +99,7 @@ public class CompressionTest { compressionTestResults[algo.ordinal()] = true; // passes } catch (Throwable t) { compressionTestResults[algo.ordinal()] = false; // failure - throw new IOException(t); + throw new DoNotRetryIOException(t); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 1a377fcae9c..c1897cfac89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3571,6 +3571,29 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer); } + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + TableName tableName, byte[][] columnFamilies, Algorithm compression, + DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication, + Durability durability) + throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setDurability(durability); + desc.setRegionReplication(regionReplication); + HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length]; + for (int i = 0; i < columnFamilies.length; i++) { + HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]); + hcd.setDataBlockEncoding(dataBlockEncoding); + hcd.setCompressionType(compression); + hcds[i] = hcd; + } + return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer); + } + /** * Creates a pre-split table for load testing. If the table already exists, * logs a warning and continues. @@ -3588,8 +3611,21 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public static int createPreSplitLoadTestTable(Configuration conf, HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException { - if (!desc.hasFamily(hcd.getName())) { - desc.addFamily(hcd); + return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd}, + numRegionsPerServer); + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException { + for (HColumnDescriptor hcd : hcds) { + if (!desc.hasFamily(hcd.getName())) { + desc.addFamily(hcd); + } } int totalNumberOfRegions = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index a191bdd79f2..23263012deb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -242,6 +242,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster { ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); + when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); replicator.init(context); replicator.start(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java new file mode 100644 index 00000000000..73ce71adc65 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/ConstantDelayQueue.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store + * @param type of elements + */ +@InterfaceAudience.Private +public class ConstantDelayQueue implements BlockingQueue { + + private static final class DelayedElement implements Delayed { + T element; + long end; + public DelayedElement(T element, long delayMs) { + this.element = element; + this.end = EnvironmentEdgeManager.currentTime() + delayMs; + } + + @Override + public int compareTo(Delayed o) { + long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); + return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + } + + private final long delayMs; + + // backing DelayQueue + private DelayQueue> queue = new DelayQueue>(); + + public ConstantDelayQueue(TimeUnit timeUnit, long delay) { + this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit); + } + + @Override + public E remove() { + DelayedElement el = queue.remove(); + return el == null ? null : el.element; + } + + @Override + public E poll() { + DelayedElement el = queue.poll(); + return el == null ? null : el.element; + } + + @Override + public E element() { + DelayedElement el = queue.element(); + return el == null ? null : el.element; + } + + @Override + public E peek() { + DelayedElement el = queue.peek(); + return el == null ? null : el.element; + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean add(E e) { + return queue.add(new DelayedElement(e, delayMs)); + } + + @Override + public boolean offer(E e) { + return queue.offer(new DelayedElement(e, delayMs)); + } + + @Override + public void put(E e) throws InterruptedException { + queue.put(new DelayedElement(e, delayMs)); + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return queue.offer(new DelayedElement(e, delayMs), timeout, unit); + } + + @Override + public E take() throws InterruptedException { + DelayedElement el = queue.take(); + return el == null ? null : el.element; + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + DelayedElement el = queue.poll(timeout, unit); + return el == null ? null : el.element; + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public int drainTo(Collection c) { + throw new UnsupportedOperationException(); // not implemented yet + } + + @Override + public int drainTo(Collection c, int maxElements) { + throw new UnsupportedOperationException(); // not implemented yet + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 90e07b352f4..6d64bc62419 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -74,14 +74,17 @@ public class LoadTestTool extends AbstractHBaseTool { /** Table name for the test */ private TableName tableName; + /** Column families for the test */ + private byte[][] families; + /** Table name to use of not overridden on the command line */ protected static final String DEFAULT_TABLE_NAME = "cluster_test"; /** Column family used by the test */ - public static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf"); + public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf"); /** Column families used by the test */ - protected static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY }; + public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY }; /** The default data size if not specified */ protected static final int DEFAULT_DATA_SIZE = 64; @@ -130,18 +133,25 @@ public class LoadTestTool extends AbstractHBaseTool { public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." + " Any args for this class can be passed as colon separated after class name"; + public static final String OPT_WRITER = "writer"; + public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; + + public static final String OPT_UPDATER = "updater"; + public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; + public static final String OPT_READER = "reader"; public static final String OPT_READER_USAGE = "The class for executing the read requests"; protected static final String OPT_KEY_WINDOW = "key_window"; protected static final String OPT_WRITE = "write"; protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; - protected static final String OPT_MULTIPUT = "multiput"; + public static final String OPT_MULTIPUT = "multiput"; public static final String OPT_MULTIGET = "multiget_batchsize"; protected static final String OPT_NUM_KEYS = "num_keys"; protected static final String OPT_READ = "read"; protected static final String OPT_START_KEY = "start_key"; public static final String OPT_TABLE_NAME = "tn"; + public static final String OPT_COLUMN_FAMILIES = "families"; protected static final String OPT_ZK_QUORUM = "zk"; protected static final String OPT_ZK_PARENT_NODE = "zk_root"; protected static final String OPT_SKIP_INIT = "skip_init"; @@ -245,6 +255,10 @@ public class LoadTestTool extends AbstractHBaseTool { return parseInt(numThreadsStr, 1, Short.MAX_VALUE); } + public byte[][] getColumnFamilies() { + return families; + } + /** * Apply column family options such as Bloom filters, compression, and data * block encoding. @@ -298,6 +312,7 @@ public class LoadTestTool extends AbstractHBaseTool { "without port numbers"); addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); + addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); addOptWithArg(OPT_READ, OPT_USAGE_READ); addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); @@ -320,6 +335,8 @@ public class LoadTestTool extends AbstractHBaseTool { "separate updates for every column in a row"); addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); + addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); + addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); addOptWithArg(OPT_READER, OPT_READER_USAGE); addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); @@ -352,6 +369,16 @@ public class LoadTestTool extends AbstractHBaseTool { tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME)); + if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { + String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); + families = new byte[list.length][]; + for (int i = 0; i < list.length; i++) { + families[i] = Bytes.toBytes(list[i]); + } + } else { + families = DEFAULT_COLUMN_FAMILIES; + } + isWrite = cmd.hasOption(OPT_WRITE); isRead = cmd.hasOption(OPT_READ); isUpdate = cmd.hasOption(OPT_UPDATE); @@ -503,9 +530,9 @@ public class LoadTestTool extends AbstractHBaseTool { } HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, - COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, + getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, regionReplication, durability); - applyColumnFamilyOptions(tableName, COLUMN_FAMILIES); + applyColumnFamilyOptions(tableName, getColumnFamilies()); } @Override @@ -570,7 +597,7 @@ public class LoadTestTool extends AbstractHBaseTool { } else { // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, - minColsPerKey, maxColsPerKey, COLUMN_FAMILY); + minColsPerKey, maxColsPerKey, families); } if (userOwner != null) { @@ -603,7 +630,14 @@ public class LoadTestTool extends AbstractHBaseTool { if (userOwner != null) { writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); } else { - writerThreads = new MultiThreadedWriter(dataGen, conf, tableName); + String writerClass = null; + if (cmd.hasOption(OPT_WRITER)) { + writerClass = cmd.getOptionValue(OPT_WRITER); + } else { + writerClass = MultiThreadedWriter.class.getCanonicalName(); + } + + writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); } writerThreads.setMultiPut(isMultiPut); } @@ -613,7 +647,13 @@ public class LoadTestTool extends AbstractHBaseTool { updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, userOwner, userNames); } else { - updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + String updaterClass = null; + if (cmd.hasOption(OPT_UPDATER)) { + updaterClass = cmd.getOptionValue(OPT_UPDATER); + } else { + updaterClass = MultiThreadedUpdater.class.getCanonicalName(); + } + updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); } updaterThreads.setBatchUpdate(isBatchUpdate); updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); @@ -700,7 +740,32 @@ public class LoadTestTool extends AbstractHBaseTool { Constructor constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, byte[][].class); return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, - minColsPerKey, maxColsPerKey, COLUMN_FAMILIES); + minColsPerKey, maxColsPerKey, families); + } catch (Exception e) { + throw new IOException(e); + } + } + + private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class clazz = Class.forName(clazzName); + Constructor constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class); + return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); + } catch (Exception e) { + throw new IOException(e); + } + } + + private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class clazz = Class.forName(clazzName); + Constructor constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); + return (MultiThreadedUpdater) constructor.newInstance( + dataGen, conf, tableName, updatePercent); } catch (Exception e) { throw new IOException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java index 9eb0c930765..d4e6d805d98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -46,7 +46,7 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys * being inserted/updated. This queue is supposed to stay small. */ - protected BlockingQueue wroteKeys = new ArrayBlockingQueue(10000); + protected BlockingQueue wroteKeys; /** * This is the current key to be inserted/updated by any thread. Each thread does an @@ -75,6 +75,11 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction { public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, String actionLetter) throws IOException { super(dataGen, conf, tableName, actionLetter); + this.wroteKeys = createWriteKeysQueue(conf); + } + + protected BlockingQueue createWriteKeysQueue(Configuration conf) { + return new ArrayBlockingQueue(10000); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java index b0a17a91b99..6beb2e61655 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/RestartMetaTest.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -81,7 +80,8 @@ public class RestartMetaTest extends AbstractHBaseTool { // start the writers LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator( - minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, LoadTestTool.COLUMN_FAMILY); + minColDataSize, maxColDataSize, minColsPerKey, maxColsPerKey, + LoadTestTool.DEFAULT_COLUMN_FAMILY); MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); writer.setMultiPut(true); writer.start(startKey, endKey, numThreads); @@ -101,7 +101,7 @@ public class RestartMetaTest extends AbstractHBaseTool { // create tables if needed HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME, - LoadTestTool.COLUMN_FAMILY, Compression.Algorithm.NONE, + LoadTestTool.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE, DataBlockEncoding.NONE); LOG.debug("Loading data....\n\n");