diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java index 6947b08aea0..e5941d5e1a0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/LoadTestDataGenerator.java @@ -25,6 +25,14 @@ import java.util.Set; public abstract class LoadTestDataGenerator { protected final LoadTestKVGenerator kvGenerator; + // The mutate info column stores information + // about update done to this column family this row. + public final static byte[] MUTATE_INFO = "mutate_info".getBytes(); + + // The increment column always has a long value, + // which can be incremented later on during updates. + public final static byte[] INCREMENT = "increment".getBytes(); + /** * Initializes the object. * @param minValueSize minimum size of the value generated by diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java index 9ab62bab6dc..cee90e72842 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java @@ -113,6 +113,19 @@ public abstract class IngestIntegrationTestBase { Assert.fail(errorMsg); } + ret = loadTool.run(new String[] { + "-tn", tableName, + "-update", String.format("60:%d", writeThreads), + "-start_key", String.valueOf(startKey), + "-num_keys", String.valueOf(numKeys), + "-skip_init" + }); + if (0 != ret) { + String errorMsg = "Update failed with error code " + ret; + LOG.error(errorMsg); + Assert.fail(errorMsg); + } + ret = loadTool.run(new String[] { "-tn", tableName, "-read", "100:20", diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java index 516382e7af5..2b77ad5f588 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java @@ -24,8 +24,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 7a21c632a78..1e6c254da79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; @@ -70,10 +69,14 @@ public class LoadTestTool extends AbstractHBaseTool { ":" + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; - /** Usa\ge string for the read option */ + /** Usage string for the read option */ protected static final String OPT_USAGE_READ = "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + /** Usage string for the update option */ + protected static final String OPT_USAGE_UPDATE = + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + Arrays.toString(BloomType.values()); @@ -111,6 +114,8 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_SKIP_INIT = "skip_init"; protected static final String OPT_INIT_ONLY = "init_only"; private static final String NUM_TABLES = "num_tables"; + protected static final String OPT_BATCHUPDATE = "batchupdate"; + protected static final String OPT_UPDATE = "update"; protected static final long DEFAULT_START_KEY = 0; @@ -119,10 +124,11 @@ public class LoadTestTool extends AbstractHBaseTool { protected MultiThreadedWriter writerThreads = null; protected MultiThreadedReader readerThreads = null; + protected MultiThreadedUpdater updaterThreads = null; protected long startKey, endKey; - protected boolean isWrite, isRead; + protected boolean isWrite, isRead, isUpdate; // Column family options protected DataBlockEncoding dataBlockEncodingAlgo; @@ -136,6 +142,11 @@ public class LoadTestTool extends AbstractHBaseTool { protected int minColDataSize, maxColDataSize; protected boolean isMultiPut; + // Updater options + protected int numUpdaterThreads = DEFAULT_NUM_THREADS; + protected int updatePercent; + protected boolean isBatchUpdate; + // Reader options private int numReaderThreads = DEFAULT_NUM_THREADS; private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; @@ -212,6 +223,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); addOptWithArg(OPT_READ, OPT_USAGE_READ); + addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); @@ -225,6 +237,8 @@ public class LoadTestTool extends AbstractHBaseTool { addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + "separate puts for every column in a row"); + addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " + + "separate updates for every column in a row"); addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE); addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); @@ -250,16 +264,17 @@ public class LoadTestTool extends AbstractHBaseTool { isWrite = cmd.hasOption(OPT_WRITE); isRead = cmd.hasOption(OPT_READ); + isUpdate = cmd.hasOption(OPT_UPDATE); isInitOnly = cmd.hasOption(OPT_INIT_ONLY); - if (!isWrite && !isRead && !isInitOnly) { + if (!isWrite && !isRead && !isUpdate && !isInitOnly) { throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + - "-" + OPT_READ + " has to be specified"); + "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified"); } - if (isInitOnly && (isRead || isWrite)) { + if (isInitOnly && (isRead || isWrite || isUpdate)) { throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with" - + " either -" + OPT_WRITE + " or -" + OPT_READ); + + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ); } if (!isInitOnly) { @@ -303,6 +318,21 @@ public class LoadTestTool extends AbstractHBaseTool { + maxColDataSize); } + if (isUpdate) { + String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 2); + int colIndex = 0; + updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); + if (colIndex < mutateOpts.length) { + numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); + } + + isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); + + System.out.println("Batch updates: " + isBatchUpdate); + System.out.println("Percent of keys to update: " + updatePercent); + System.out.println("Updater threads: " + numUpdaterThreads); + } + if (isRead) { String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); int colIndex = 0; @@ -390,16 +420,27 @@ public class LoadTestTool extends AbstractHBaseTool { writerThreads.setMultiPut(isMultiPut); } + if (isUpdate) { + updaterThreads = new MultiThreadedUpdater(dataGen, conf, tableName, updatePercent); + updaterThreads.setBatchUpdate(isBatchUpdate); + } + if (isRead) { readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); } - if (isRead && isWrite) { - LOG.info("Concurrent read/write workload: making readers aware of the " + - "write point"); - readerThreads.linkToWriter(writerThreads); + if (isUpdate && isWrite) { + LOG.info("Concurrent write/update workload: making updaters aware of the " + + "write point"); + updaterThreads.linkToWriter(writerThreads); + } + + if (isRead && (isUpdate || isWrite)) { + LOG.info("Concurrent write/read workload: making readers aware of the " + + "write point"); + readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); } if (isWrite) { @@ -407,6 +448,11 @@ public class LoadTestTool extends AbstractHBaseTool { writerThreads.start(startKey, endKey, numWriterThreads); } + if (isUpdate) { + System.out.println("Starting to mutate data..."); + updaterThreads.start(startKey, endKey, numUpdaterThreads); + } + if (isRead) { System.out.println("Starting to read data..."); readerThreads.start(startKey, endKey, numReaderThreads); @@ -416,6 +462,10 @@ public class LoadTestTool extends AbstractHBaseTool { writerThreads.waitForFinish(); } + if (isUpdate) { + updaterThreads.waitForFinish(); + } + if (isRead) { readerThreads.waitForFinish(); } @@ -424,13 +474,16 @@ public class LoadTestTool extends AbstractHBaseTool { if (isWrite) { success = success && writerThreads.getNumWriteFailures() == 0; } + if (isUpdate) { + success = success && updaterThreads.getNumWriteFailures() == 0; + } if (isRead) { success = success && readerThreads.getNumReadErrors() == 0 && readerThreads.getNumReadFailures() == 0; } - return success ? EXIT_SUCCESS : this.EXIT_FAILURE; + return success ? EXIT_SUCCESS : EXIT_FAILURE; } - + public static void main(String[] args) { new LoadTestTool().doStaticMain(args); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 7f04d22809a..56ad8c3fb68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -16,8 +16,13 @@ */ package org.apache.hadoop.hbase.util; +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; + import java.io.IOException; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.Set; @@ -27,12 +32,18 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Preconditions; + /** * Common base class for reader and writer parts of multi-thread HBase load * test ({@link LoadTestTool}). @@ -300,7 +311,7 @@ public abstract class MultiThreadedAction { // See if we have any data at all. if (result.isEmpty()) { - LOG.error("No data returned for key = [" + rowKeyStr + "]"); + LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned"); return false; } @@ -311,7 +322,8 @@ public abstract class MultiThreadedAction { // See if we have all the CFs. byte[][] expectedCfs = dataGenerator.getColumnFamilies(); if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) { - LOG.error("Bad family count for [" + rowKeyStr + "]: " + result.getMap().size()); + LOG.error("Error checking data for key [" + rowKeyStr + + "], bad family count: " + result.getMap().size()); return false; } @@ -320,34 +332,155 @@ public abstract class MultiThreadedAction { String cfStr = Bytes.toString(cf); Map columnValues = result.getFamilyMap(cf); if (columnValues == null) { - LOG.error("No data for family [" + cfStr + "] for [" + rowKeyStr + "]"); + LOG.error("Error checking data for key [" + rowKeyStr + + "], no data for family [" + cfStr + "]]"); return false; } - // See if we have correct columns. - if (verifyCfAndColumnIntegrity - && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) { - String colsStr = ""; - for (byte[] col : columnValues.keySet()) { - if (colsStr.length() > 0) { - colsStr += ", "; - } - colsStr += "[" + Bytes.toString(col) + "]"; + + Map mutateInfo = null; + if (verifyCfAndColumnIntegrity || verifyValues) { + if (!columnValues.containsKey(MUTATE_INFO)) { + LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found"); + return false; } - LOG.error("Bad columns for family [" + cfStr + "] for [" + rowKeyStr + "]: " + colsStr); - return false; - } - // See if values check out. - if (verifyValues) { - for (Map.Entry kv : columnValues.entrySet()) { - if (!dataGenerator.verify(result.getRow(), cf, kv.getKey(), kv.getValue())) { + + long cfHash = Arrays.hashCode(cf); + // Verify deleted columns, and make up column counts if deleted + byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO); + mutateInfo = parseMutateInfo(mutateInfoValue); + for (Map.Entry mutate: mutateInfo.entrySet()) { + if (mutate.getValue() == MutationType.DELETE) { + byte[] column = Bytes.toBytes(mutate.getKey()); + long columnHash = Arrays.hashCode(column); + long hashCode = cfHash + columnHash; + if (hashCode % 2 == 0) { + if (columnValues.containsKey(column)) { + LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + mutate.getKey() + "]; should be deleted"); + return false; + } + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + columnValues.put(column, hashCodeBytes); + } + } + } + + // Verify increment + if (!columnValues.containsKey(INCREMENT)) { + LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found"); + return false; + } + long currentValue = Bytes.toLong(columnValues.remove(INCREMENT)); + if (verifyValues) { + long amount = mutateInfo.isEmpty() ? 0 : cfHash; + long originalValue = Arrays.hashCode(result.getRow()); + long extra = currentValue - originalValue; + if (extra != 0 && (amount == 0 || extra % amount != 0)) { LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" - + cfStr + "], column [" + Bytes.toString(kv.getKey()) + "]; value of length " + - + kv.getValue().length); + + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]"); return false; } + if (amount != 0 && extra != amount) { + LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times"); + } + } + + // See if we have correct columns. + if (verifyCfAndColumnIntegrity + && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) { + String colsStr = ""; + for (byte[] col : columnValues.keySet()) { + if (colsStr.length() > 0) { + colsStr += ", "; + } + colsStr += "[" + Bytes.toString(col) + "]"; + } + LOG.error("Error checking data for key [" + rowKeyStr + + "], bad columns for family [" + cfStr + "]: " + colsStr); + return false; + } + // See if values check out. + if (verifyValues) { + for (Map.Entry kv : columnValues.entrySet()) { + String column = Bytes.toString(kv.getKey()); + MutationType mutation = mutateInfo.get(column); + boolean verificationNeeded = true; + byte[] bytes = kv.getValue(); + if (mutation != null) { + boolean mutationVerified = true; + long columnHash = Arrays.hashCode(kv.getKey()); + long hashCode = cfHash + columnHash; + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + if (mutation == MutationType.APPEND) { + int offset = bytes.length - hashCodeBytes.length; + mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes, + 0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length); + if (mutationVerified) { + int n = 1; + while (true) { + int newOffset = offset - hashCodeBytes.length; + if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0, + hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) { + break; + } + offset = newOffset; + n++; + } + if (n > 1) { + LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + column + "], appended [" + n + "] times"); + } + byte[] dest = new byte[offset]; + System.arraycopy(bytes, 0, dest, 0, offset); + bytes = dest; + } + } else if (hashCode % 2 == 0) { // checkAndPut + mutationVerified = Bytes.equals(bytes, hashCodeBytes); + verificationNeeded = false; + } + if (!mutationVerified) { + LOG.error("Error checking data for key [" + rowKeyStr + + "], mutation checking failed for column family [" + cfStr + "], column [" + + column + "]; mutation [" + mutation + "], hashCode [" + + hashCode + "], verificationNeeded [" + + verificationNeeded + "]"); + return false; + } + } // end of mutation checking + if (verificationNeeded && + !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) { + LOG.error("Error checking data for key [" + rowKeyStr + "], column family [" + + cfStr + "], column [" + column + "], mutation [" + mutation + + "]; value of length " + bytes.length); + return false; + } + } } } } return true; } + + // Parse mutate info into a map of => + private Map parseMutateInfo(byte[] mutateInfo) { + Map mi = new HashMap(); + if (mutateInfo != null) { + String mutateInfoStr = Bytes.toString(mutateInfo); + String[] mutations = mutateInfoStr.split("#"); + for (String mutation: mutations) { + if (mutation.isEmpty()) continue; + Preconditions.checkArgument(mutation.contains(":"), + "Invalid mutation info " + mutation); + int p = mutation.indexOf(":"); + String column = mutation.substring(0, p); + MutationType type = MutationType.valueOf( + Integer.parseInt(mutation.substring(p+1))); + mi.put(column, type); + } + } + return mi; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index d27bc62c8fc..a32b55af8ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -18,15 +18,15 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.util.HashSet; -import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; @@ -41,7 +41,7 @@ public class MultiThreadedReader extends MultiThreadedAction private final double verifyPercent; private volatile boolean aborted; - private MultiThreadedWriter writer = null; + private MultiThreadedWriterBase writer = null; /** * The number of keys verified in a sequence. This will never be larger than @@ -77,9 +77,9 @@ public class MultiThreadedReader extends MultiThreadedAction this.verifyPercent = verifyPercent; } - public void linkToWriter(MultiThreadedWriter writer) { + public void linkToWriter(MultiThreadedWriterBase writer) { this.writer = writer; - writer.setTrackInsertedKeys(true); + writer.setTrackWroteKeys(true); } public void setMaxErrors(int maxErrors) { @@ -108,7 +108,6 @@ public class MultiThreadedReader extends MultiThreadedAction public class HBaseReaderThread extends Thread { private final int readerId; private final HTable table; - private final Random random = new Random(); /** The "current" key being read. Increases from startKey to endKey. */ private long curKey; @@ -182,13 +181,13 @@ public class MultiThreadedReader extends MultiThreadedAction * constraint. */ private long maxKeyWeCanRead() { - long insertedUpToKey = writer.insertedUpToKey(); + long insertedUpToKey = writer.wroteUpToKey(); if (insertedUpToKey >= endKey - 1) { // The writer has finished writing our range, so we can read any // key in the range. return endKey - 1; } - return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow); + return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); } private long getNextKeyToRead() { @@ -217,7 +216,7 @@ public class MultiThreadedReader extends MultiThreadedAction // later. Set a flag to make sure that we don't count this key towards // the set of unique keys we have verified. readingRandomKey = true; - return startKey + Math.abs(random.nextLong()) + return startKey + Math.abs(RandomUtils.nextLong()) % (maxKeyToRead - startKey + 1); } @@ -239,7 +238,7 @@ public class MultiThreadedReader extends MultiThreadedAction if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } - queryKey(get, random.nextInt(100) < verifyPercent); + queryKey(get, RandomUtils.nextInt(100) < verifyPercent); } catch (IOException e) { numReadFailures.addAndGet(1); LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") @@ -279,7 +278,7 @@ public class MultiThreadedReader extends MultiThreadedAction numCols.addAndGet(cols); } else { if (writer != null) { - LOG.error("At the time of failure, writer inserted " + writer.numKeys.get() + " keys"); + LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys"); } numErrorsAfterThis = numReadErrors.incrementAndGet(); } @@ -315,5 +314,4 @@ public class MultiThreadedReader extends MultiThreadedAction appendToStatus(sb, "READ ERRORS", numReadErrors.get()); return sb.toString(); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java new file mode 100644 index 00000000000..e7f281e42d3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedUpdater.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** Creates multiple threads that write key/values into the */ +public class MultiThreadedUpdater extends MultiThreadedWriterBase { + private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class); + + private Set updaters = new HashSet(); + + private MultiThreadedWriterBase writer = null; + private boolean isBatchUpdate = false; + private final double updatePercent; + + public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double updatePercent) { + super(dataGen, conf, tableName, "U"); + this.updatePercent = updatePercent; + } + + /** Use batch vs. separate updates for every column in a row */ + public void setBatchUpdate(boolean isBatchUpdate) { + this.isBatchUpdate = isBatchUpdate; + } + + public void linkToWriter(MultiThreadedWriterBase writer) { + this.writer = writer; + writer.setTrackWroteKeys(true); + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + + if (verbose) { + LOG.debug("Updating keys [" + startKey + ", " + endKey + ")"); + } + + for (int i = 0; i < numThreads; ++i) { + HBaseUpdaterThread updater = new HBaseUpdaterThread(i); + updaters.add(updater); + } + + startThreads(updaters); + } + + private long getNextKeyToUpdate() { + if (writer == null) { + return nextKeyToWrite.getAndIncrement(); + } + synchronized (this) { + if (nextKeyToWrite.get() >= endKey) { + // Finished the whole key range + return endKey; + } + while (nextKeyToWrite.get() > writer.wroteUpToKey()) { + Threads.sleepWithoutInterrupt(100); + } + long k = nextKeyToWrite.getAndIncrement(); + if (writer.failedToWriteKey(k)) { + failedKeySet.add(k); + return getNextKeyToUpdate(); + } + return k; + } + } + + private class HBaseUpdaterThread extends Thread { + private final HTable table; + + public HBaseUpdaterThread(int updaterId) throws IOException { + setName(getClass().getSimpleName() + "_" + updaterId); + table = new HTable(conf, tableName); + } + + public void run() { + try { + long rowKeyBase; + StringBuilder buf = new StringBuilder(); + byte[][] columnFamilies = dataGenerator.getColumnFamilies(); + while ((rowKeyBase = getNextKeyToUpdate()) < endKey) { + if (RandomUtils.nextInt(100) < updatePercent) { + byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); + Increment inc = new Increment(rowKey); + Append app = new Append(rowKey); + numKeys.addAndGet(1); + int columnCount = 0; + for (byte[] cf : columnFamilies) { + long cfHash = Arrays.hashCode(cf); + inc.addColumn(cf, INCREMENT, cfHash); + buf.setLength(0); // Clear the buffer + buf.append("#").append(Bytes.toString(INCREMENT)); + buf.append(":").append(MutationType.INCREMENT.getNumber()); + app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); + ++columnCount; + if (!isBatchUpdate) { + mutate(table, inc, rowKeyBase); + numCols.addAndGet(1); + inc = new Increment(rowKey); + mutate(table, app, rowKeyBase); + numCols.addAndGet(1); + app = new Append(rowKey); + } + Result result = null; + try { + Get get = new Get(rowKey); + get.addFamily(cf); + result = table.get(get); + } catch (IOException ie) { + LOG.warn("Failed to get the row for key = [" + + rowKey + "], column family = [" + Bytes.toString(cf) + "]", ie); + } + Map columnValues = + result != null ? result.getFamilyMap(cf) : null; + if (columnValues == null) { + failedKeySet.add(rowKeyBase); + LOG.error("Failed to update the row with key = [" + + rowKey + "], since we could not get the original row"); + } + for (byte[] column : columnValues.keySet()) { + if (Bytes.equals(column, INCREMENT) + || Bytes.equals(column, MUTATE_INFO)) { + continue; + } + MutationType mt = MutationType.valueOf( + RandomUtils.nextInt(MutationType.values().length)); + long columnHash = Arrays.hashCode(column); + long hashCode = cfHash + columnHash; + byte[] hashCodeBytes = Bytes.toBytes(hashCode); + byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY; + if (hashCode % 2 == 0) { + KeyValue kv = result.getColumnLatest(cf, column); + checkedValue = kv != null ? kv.getValue() : null; + Preconditions.checkNotNull(checkedValue, + "Column value to be checked should not be null"); + } + buf.setLength(0); // Clear the buffer + buf.append("#").append(Bytes.toString(column)).append(":"); + ++columnCount; + switch (mt) { + case PUT: + Put put = new Put(rowKey); + put.add(cf, column, hashCodeBytes); + mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.PUT.getNumber()); + break; + case DELETE: + Delete delete = new Delete(rowKey); + // Delete all versions since a put + // could be called multiple times if CM is used + delete.deleteColumns(cf, column); + mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue); + buf.append(MutationType.DELETE.getNumber()); + break; + default: + buf.append(MutationType.APPEND.getNumber()); + app.add(cf, column, hashCodeBytes); + } + app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString())); + if (!isBatchUpdate) { + mutate(table, app, rowKeyBase); + numCols.addAndGet(1); + app = new Append(rowKey); + } + } + } + if (isBatchUpdate) { + if (verbose) { + LOG.debug("Preparing increment and append for key = [" + + rowKey + "], " + columnCount + " columns"); + } + mutate(table, inc, rowKeyBase); + mutate(table, app, rowKeyBase); + numCols.addAndGet(columnCount); + } + } + if (trackWroteKeys) { + wroteKeys.add(rowKeyBase); + } + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + numThreadsWorking.decrementAndGet(); + } + } + } + + @Override + public void waitForFinish() { + super.waitForFinish(); + System.out.println("Failed to update keys: " + failedKeySet.size()); + for (Long key : failedKeySet) { + System.out.println("Failed to update key: " + key); + } + } + + public void mutate(HTable table, Mutation m, long keyBase) { + mutate(table, m, keyBase, null, null, null, null); + } + + public void mutate(HTable table, Mutation m, + long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) { + long start = System.currentTimeMillis(); + try { + if (m instanceof Increment) { + table.increment((Increment)m); + } else if (m instanceof Append) { + table.append((Append)m); + } else if (m instanceof Put) { + table.checkAndPut(row, cf, q, v, (Put)m); + } else if (m instanceof Delete) { + table.checkAndDelete(row, cf, q, v, (Delete)m); + } else { + throw new IllegalArgumentException( + "unsupported mutation " + m.getClass().getSimpleName()); + } + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + + exceptionInfo); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java index 24b63bd64cd..699e19c36eb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java @@ -18,72 +18,33 @@ package org.apache.hadoop.hbase.util; +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; +import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; + import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; +import java.util.Arrays; import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; /** Creates multiple threads that write key/values into the */ -public class MultiThreadedWriter extends MultiThreadedAction { +public class MultiThreadedWriter extends MultiThreadedWriterBase { private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); private Set writers = new HashSet(); private boolean isMultiPut = false; - /** - * A temporary place to keep track of inserted keys. This is written to by - * all writers and is drained on a separate thread that populates - * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys - * being inserted. This queue is supposed to stay small. - */ - private BlockingQueue insertedKeys = new ArrayBlockingQueue(10000); - - /** - * This is the current key to be inserted by any thread. Each thread does an - * atomic get and increment operation and inserts the current value. - */ - private AtomicLong nextKeyToInsert = new AtomicLong(); - - /** - * The highest key in the contiguous range of keys . - */ - private AtomicLong insertedUpToKey = new AtomicLong(); - - /** The sorted set of keys NOT inserted by the writers */ - private Set failedKeySet = new ConcurrentSkipListSet(); - - /** - * The total size of the temporary inserted key set that have not yet lined - * up in a our contiguous sequence starting from startKey. Supposed to stay - * small. - */ - private AtomicLong insertedKeyQueueSize = new AtomicLong(); - - /** Enable this if used in conjunction with a concurrent reader. */ - private boolean trackInsertedKeys; - public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf, - TableName tableName) { + TableName tableName) { super(dataGen, conf, tableName, "W"); } @@ -101,19 +62,11 @@ public class MultiThreadedWriter extends MultiThreadedAction { LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); } - nextKeyToInsert.set(startKey); - insertedUpToKey.set(startKey - 1); - for (int i = 0; i < numThreads; ++i) { HBaseWriterThread writer = new HBaseWriterThread(i); writers.add(writer); } - if (trackInsertedKeys) { - new Thread(new InsertedKeysTracker()).start(); - numThreadsWorking.incrementAndGet(); - } - startThreads(writers); } @@ -129,13 +82,12 @@ public class MultiThreadedWriter extends MultiThreadedAction { try { long rowKeyBase; byte[][] columnFamilies = dataGenerator.getColumnFamilies(); - while ((rowKeyBase = nextKeyToInsert.getAndIncrement()) < endKey) { + while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) { byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase); Put put = new Put(rowKey); numKeys.addAndGet(1); int columnCount = 0; for (byte[] cf : columnFamilies) { - String s; byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf); for (byte[] column : columns) { byte[] value = dataGenerator.generateValue(rowKey, cf, column); @@ -147,6 +99,14 @@ public class MultiThreadedWriter extends MultiThreadedAction { put = new Put(rowKey); } } + long rowKeyHash = Arrays.hashCode(rowKey); + put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY); + put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash)); + if (!isMultiPut) { + insert(table, put, rowKeyBase); + numCols.addAndGet(1); + put = new Put(rowKey); + } } if (isMultiPut) { if (verbose) { @@ -155,8 +115,8 @@ public class MultiThreadedWriter extends MultiThreadedAction { insert(table, put, rowKeyBase); numCols.addAndGet(columnCount); } - if (trackInsertedKeys) { - insertedKeys.add(rowKeyBase); + if (trackWroteKeys) { + wroteKeys.add(rowKeyBase); } } } finally { @@ -170,104 +130,6 @@ public class MultiThreadedWriter extends MultiThreadedAction { } } - public void insert(HTable table, Put put, long keyBase) { - long start = System.currentTimeMillis(); - try { - table.put(put); - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); - } catch (IOException e) { - failedKeySet.add(keyBase); - String exceptionInfo; - if (e instanceof RetriesExhaustedWithDetailsException) { - RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; - exceptionInfo = aggEx.getExhaustiveDescription(); - } else { - StringWriter stackWriter = new StringWriter(); - PrintWriter pw = new PrintWriter(stackWriter); - e.printStackTrace(pw); - pw.flush(); - exceptionInfo = StringUtils.stringifyException(e); - } - LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + - "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " - + exceptionInfo); - } - } - - private String getRegionDebugInfoSafe(HTable table, byte[] rowKey) { - HRegionLocation cached = null, real = null; - try { - cached = table.getRegionLocation(rowKey, false); - real = table.getRegionLocation(rowKey, true); - } catch (Throwable t) { - // Cannot obtain region information for another catch block - too bad! - } - String result = "no information can be obtained"; - if (cached != null) { - result = "cached: " + cached.toString(); - } - if (real != null) { - if (real.equals(cached)) { - result += "; cache is up to date"; - } else { - result = (cached != null) ? (result + "; ") : ""; - result += "real: " + real.toString(); - } - } - return result; - } - - /** - * A thread that keeps track of the highest key in the contiguous range of - * inserted keys. - */ - private class InsertedKeysTracker implements Runnable { - - @Override - public void run() { - Thread.currentThread().setName(getClass().getSimpleName()); - try { - long expectedKey = startKey; - Queue sortedKeys = new PriorityQueue(); - while (expectedKey < endKey) { - // Block until a new element is available. - Long k; - try { - k = insertedKeys.poll(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.info("Inserted key tracker thread interrupted", e); - break; - } - if (k == null) { - continue; - } - if (k == expectedKey) { - // Skip the "sorted key" queue and consume this key. - insertedUpToKey.set(k); - ++expectedKey; - } else { - sortedKeys.add(k); - } - - // See if we have a sequence of contiguous keys lined up. - while (!sortedKeys.isEmpty() - && ((k = sortedKeys.peek()) == expectedKey)) { - sortedKeys.poll(); - insertedUpToKey.set(k); - ++expectedKey; - } - - insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size()); - } - } catch (Exception ex) { - LOG.error("Error in inserted key tracker", ex); - } finally { - numThreadsWorking.decrementAndGet(); - } - } - - } - @Override public void waitForFinish() { super.waitForFinish(); @@ -276,37 +138,4 @@ public class MultiThreadedWriter extends MultiThreadedAction { System.out.println("Failed to write key: " + key); } } - - public int getNumWriteFailures() { - return failedKeySet.size(); - } - - /** - * The max key until which all keys have been inserted (successfully or not). - * @return the last key that we have inserted all keys up to (inclusive) - */ - public long insertedUpToKey() { - return insertedUpToKey.get(); - } - - public boolean failedToWriteKey(long k) { - return failedKeySet.contains(k); - } - - @Override - protected String progressInfo() { - StringBuilder sb = new StringBuilder(); - appendToStatus(sb, "insertedUpTo", insertedUpToKey.get()); - appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get()); - return sb.toString(); - } - - /** - * Used for a joint write/read workload. Enables tracking the last inserted - * key, which requires a blocking queue and a consumer thread. - * @param enable whether to enable tracking the last inserted key - */ - public void setTrackInsertedKeys(boolean enable) { - trackInsertedKeys = enable; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java new file mode 100644 index 00000000000..c4b8d2acb67 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; + +/** Creates multiple threads that write key/values into the */ +public abstract class MultiThreadedWriterBase extends MultiThreadedAction { + private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class); + + /** + * A temporary place to keep track of inserted/updated keys. This is written to by + * all writers and is drained on a separate thread that populates + * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys + * being inserted/updated. This queue is supposed to stay small. + */ + protected BlockingQueue wroteKeys = new ArrayBlockingQueue(10000); + + /** + * This is the current key to be inserted/updated by any thread. Each thread does an + * atomic get and increment operation and inserts the current value. + */ + protected AtomicLong nextKeyToWrite = new AtomicLong(); + + /** + * The highest key in the contiguous range of keys . + */ + protected AtomicLong wroteUpToKey = new AtomicLong(); + + /** The sorted set of keys NOT inserted/updated by the writers */ + protected Set failedKeySet = new ConcurrentSkipListSet(); + + /** + * The total size of the temporary inserted/updated key set that have not yet lined + * up in a our contiguous sequence starting from startKey. Supposed to stay + * small. + */ + protected AtomicLong wroteKeyQueueSize = new AtomicLong(); + + /** Enable this if used in conjunction with a concurrent reader. */ + protected boolean trackWroteKeys; + + public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, String actionLetter) { + super(dataGen, conf, tableName, actionLetter); + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + + nextKeyToWrite.set(startKey); + wroteUpToKey.set(startKey - 1); + + if (trackWroteKeys) { + new Thread(new WroteKeysTracker()).start(); + numThreadsWorking.incrementAndGet(); + } + } + + protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) { + HRegionLocation cached = null, real = null; + try { + cached = table.getRegionLocation(rowKey, false); + real = table.getRegionLocation(rowKey, true); + } catch (Throwable t) { + // Cannot obtain region information for another catch block - too bad! + } + String result = "no information can be obtained"; + if (cached != null) { + result = "cached: " + cached.toString(); + } + if (real != null) { + if (real.equals(cached)) { + result += "; cache is up to date"; + } else { + result = (cached != null) ? (result + "; ") : ""; + result += "real: " + real.toString(); + } + } + return result; + } + + /** + * A thread that keeps track of the highest key in the contiguous range of + * inserted/updated keys. + */ + private class WroteKeysTracker implements Runnable { + + @Override + public void run() { + Thread.currentThread().setName(getClass().getSimpleName()); + try { + long expectedKey = startKey; + Queue sortedKeys = new PriorityQueue(); + while (expectedKey < endKey) { + // Block until a new element is available. + Long k; + try { + k = wroteKeys.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info("Inserted key tracker thread interrupted", e); + break; + } + if (k == null) { + continue; + } + if (k == expectedKey) { + // Skip the "sorted key" queue and consume this key. + wroteUpToKey.set(k); + ++expectedKey; + } else { + sortedKeys.add(k); + } + + // See if we have a sequence of contiguous keys lined up. + while (!sortedKeys.isEmpty() + && ((k = sortedKeys.peek()) == expectedKey)) { + sortedKeys.poll(); + wroteUpToKey.set(k); + ++expectedKey; + } + + wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size()); + } + } catch (Exception ex) { + LOG.error("Error in inserted/updaed key tracker", ex); + } finally { + numThreadsWorking.decrementAndGet(); + } + } + } + + public int getNumWriteFailures() { + return failedKeySet.size(); + } + + public void insert(HTable table, Put put, long keyBase) { + long start = System.currentTimeMillis(); + try { + table.put(put); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(keyBase); + String exceptionInfo; + if (e instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e; + exceptionInfo = aggEx.getExhaustiveDescription(); + } else { + StringWriter stackWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(stackWriter); + e.printStackTrace(pw); + pw.flush(); + exceptionInfo = StringUtils.stringifyException(e); + } + LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start) + + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + + exceptionInfo); + } + } + + /** + * The max key until which all keys have been inserted/updated (successfully or not). + * @return the last key that we have inserted/updated all keys up to (inclusive) + */ + public long wroteUpToKey() { + return wroteUpToKey.get(); + } + + public boolean failedToWriteKey(long k) { + return failedKeySet.contains(k); + } + + @Override + protected String progressInfo() { + StringBuilder sb = new StringBuilder(); + appendToStatus(sb, "wroteUpTo", wroteUpToKey.get()); + appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get()); + return sb.toString(); + } + + /** + * Used for a joint write/read workload. Enables tracking the last inserted/updated + * key, which requires a blocking queue and a consumer thread. + * @param enable whether to enable tracking the last inserted/updated key + */ + public void setTrackWroteKeys(boolean enable) { + trackWroteKeys = enable; + } +}