diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index b3bf041223f..5ba0572eb5b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -858,6 +858,63 @@ public final class ProtobufUtil { return increment; } + /** + * Convert a protocol buffer Mutate to a Get. + * @param proto the protocol buffer Mutate to convert. + * @param cellScanner + * @return the converted client get. + * @throws IOException + */ + public static Get toGet(final MutationProto proto, final CellScanner cellScanner) + throws IOException { + MutationType type = proto.getMutateType(); + assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name(); + byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null; + Get get = null; + int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0; + if (cellCount > 0) { + // The proto has metadata only and the data is separate to be found in the cellScanner. + if (cellScanner == null) { + throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " + + TextFormat.shortDebugString(proto)); + } + for (int i = 0; i < cellCount; i++) { + if (!cellScanner.advance()) { + throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i + + " no cell returned: " + TextFormat.shortDebugString(proto)); + } + Cell cell = cellScanner.current(); + if (get == null) { + get = new Get(Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + get.addColumn( + Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), + Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())); + } + } else { + get = new Get(row); + for (ColumnValue column : proto.getColumnValueList()) { + byte[] family = column.getFamily().toByteArray(); + for (QualifierValue qv : column.getQualifierValueList()) { + byte[] qualifier = qv.getQualifier().toByteArray(); + if (!qv.hasValue()) { + throw new DoNotRetryIOException("Missing required field: qualifier value"); + } + get.addColumn(family, qualifier); + } + } + } + if (proto.hasTimeRange()) { + TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + get.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } + for (NameBytesPair attribute : proto.getAttributeList()) { + get.setAttribute(attribute.getName(), attribute.getValue().toByteArray()); + } + return get; + } + /** * Convert a client Scan to a protocol buffer Scan * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b7950df146e..86c02eaf79f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2617,6 +2617,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public RegionScanner getScanner(Scan scan, List additionalScanners) throws IOException { + return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + private RegionScanner getScanner(Scan scan, List additionalScanners, + long nonceGroup, long nonce) throws IOException { startRegionOperation(Operation.SCAN); try { // Verify families are all valid @@ -2630,7 +2635,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkFamily(family); } } - return instantiateRegionScanner(scan, additionalScanners); + return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce); } finally { closeRegionOperation(Operation.SCAN); } @@ -2638,13 +2643,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected RegionScanner instantiateRegionScanner(Scan scan, List additionalScanners) throws IOException { + return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + + protected RegionScanner instantiateRegionScanner(Scan scan, + List additionalScanners, long nonceGroup, long nonce) throws IOException { if (scan.isReversed()) { if (scan.getFilter() != null) { scan.getFilter().setReversed(true); } return new ReversedRegionScannerImpl(scan, additionalScanners, this); } - return new RegionScannerImpl(scan, additionalScanners, this); + return new RegionScannerImpl(scan, additionalScanners, this, nonceGroup, nonce); } @Override @@ -5592,6 +5603,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionScannerImpl(Scan scan, List additionalScanners, HRegion region) throws IOException { + this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + RegionScannerImpl(Scan scan, List additionalScanners, HRegion region, + long nonceGroup, long nonce) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); if (scan.hasFilter()) { @@ -5621,15 +5637,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { - this.readPt = getReadPoint(isolationLevel); + if (nonce == HConstants.NO_NONCE || rsServices == null + || rsServices.getNonceManager() == null) { + this.readPt = getReadPoint(isolationLevel); + } else { + this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); + } scannerReadPoints.put(this, this.readPt); } + initializeScanners(scan, additionalScanners); + } + + protected void initializeScanners(Scan scan, List additionalScanners) + throws IOException { // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). List scanners = new ArrayList(scan.getFamilyMap().size()); - List joinedScanners - = new ArrayList(scan.getFamilyMap().size()); + List joinedScanners = + new ArrayList(scan.getFamilyMap().size()); // Store all already instantiated scanners for exception handling List instantiatedScanners = new ArrayList(); // handle additionalScanners @@ -6795,15 +6821,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { + void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { - for (byte [] family: get.familySet()) { + for (byte[] family : get.familySet()) { checkFamily(family); } } else { // Adding all families to scanner - for (byte[] family: this.htableDescriptor.getFamiliesKeys()) { + for (byte[] family : this.htableDescriptor.getFamiliesKeys()) { get.addFamily(family); } } @@ -6811,7 +6837,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public List get(Get get, boolean withCoprocessor) throws IOException { + return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + @Override + public List get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) + throws IOException { List results = new ArrayList(); // pre-get CP hook @@ -6825,7 +6856,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionScanner scanner = null; try { - scanner = getScanner(scan); + scanner = getScanner(scan, null, nonceGroup, nonce); scanner.next(results); } finally { if (scanner != null) @@ -7168,6 +7199,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId); } mvcc.completeAndWait(writeEntry); + if (rsServices != null && rsServices.getNonceManager() != null) { + rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, nonce, + writeEntry.getWriteNumber()); + } writeEntry = null; } finally { this.updatesLock.readLock().unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9cfc5dfb70a..f9b78e11ffc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; -import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -426,11 +425,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Starts the nonce operation for a mutation, if needed. * @param mutation Mutation. * @param nonceGroup Nonce group from the request. - * @returns Nonce used (can be NO_NONCE). + * @returns whether to proceed this mutation. */ - private long startNonceOperation(final MutationProto mutation, long nonceGroup) - throws IOException, OperationConflictException { - if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE; + private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) + throws IOException { + if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; boolean canProceed = false; try { canProceed = regionServer.nonceManager.startOperation( @@ -438,14 +437,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } catch (InterruptedException ex) { throw new InterruptedIOException("Nonce start operation interrupted"); } - if (!canProceed) { - // TODO: instead, we could convert append/increment to get w/mvcc - String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce() - + "} on row [" + Bytes.toString(mutation.getRow().toByteArray()) - + "] may have already completed"; - throw new OperationConflictException(message); - } - return mutation.getNonce(); + return canProceed; } /** @@ -614,23 +606,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * bypassed as indicated by RegionObserver, null otherwise * @throws IOException */ - private Result append(final Region region, final OperationQuota quota, final MutationProto m, - final CellScanner cellScanner, long nonceGroup) throws IOException { + private Result append(final Region region, final OperationQuota quota, + final MutationProto mutation, final CellScanner cellScanner, long nonceGroup) + throws IOException { long before = EnvironmentEdgeManager.currentTime(); - Append append = ProtobufUtil.toAppend(m, cellScanner); + Append append = ProtobufUtil.toAppend(mutation, cellScanner); quota.addMutation(append); Result r = null; if (region.getCoprocessorHost() != null) { r = region.getCoprocessorHost().preAppend(append); } if (r == null) { - long nonce = startNonceOperation(m, nonceGroup); + boolean canProceed = startNonceOperation(mutation, nonceGroup); boolean success = false; try { - r = region.append(append, nonceGroup, nonce); + if (canProceed) { + r = region.append(append, nonceGroup, mutation.getNonce()); + } else { + // convert duplicate append to get + List results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, + nonceGroup, mutation.getNonce()); + r = Result.create(results); + } success = true; } finally { - endNonceOperation(m, nonceGroup, success); + if (canProceed) { + endNonceOperation(mutation, nonceGroup, success); + } } if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postAppend(append, r); @@ -662,13 +664,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = region.getCoprocessorHost().preIncrement(increment); } if (r == null) { - long nonce = startNonceOperation(mutation, nonceGroup); + boolean canProceed = startNonceOperation(mutation, nonceGroup); boolean success = false; try { - r = region.increment(increment, nonceGroup, nonce); + if (canProceed) { + r = region.increment(increment, nonceGroup, mutation.getNonce()); + } else { + // convert duplicate increment to get + List results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, + mutation.getNonce()); + r = Result.create(results); + } success = true; } finally { - endNonceOperation(mutation, nonceGroup, success); + if (canProceed) { + endNonceOperation(mutation, nonceGroup, success); + } } if (region.getCoprocessorHost() != null) { r = region.getCoprocessorHost().postIncrement(increment, r); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 9b1f82a0564..efd68b841b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -394,6 +394,17 @@ public interface Region extends ConfigurationObserver { */ List get(Get get, boolean withCoprocessor) throws IOException; + /** + * Do a get for duplicate non-idempotent operation. + * @param get query parameters. + * @param withCoprocessor + * @param nonceGroup Nonce group. + * @param nonce Nonce. + * @return list of cells resulting from the operation + * @throws IOException + */ + List get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException; + /** * Return an iterator that scans over the HRegion, returning the indicated * columns and rows specified by the {@link Scan}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java index b2b656b1d81..459b69a04b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java @@ -62,6 +62,8 @@ public class ServerNonceManager { private static final long WAITING_BIT = 4; private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS; + private static long mvcc; + @Override public String toString() { return "[state " + getState() + ", hasWait " + hasWait() + ", activity " @@ -98,6 +100,14 @@ public class ServerNonceManager { return getActivityTime() < (minRelevantTime & (~0l >>> 3)); } + public void setMvcc(long mvcc) { + this.mvcc = mvcc; + } + + public long getMvcc() { + return this.mvcc; + } + private long getActivityTime() { return this.data >>> 3; } @@ -191,6 +201,39 @@ public class ServerNonceManager { } } + /** + * Store the write point in OperationContext when the operation succeed. + * @param group Nonce group. + * @param nonce Nonce. + * @param mvcc Write point of the succeed operation. + */ + public void addMvccToOperationContext(long group, long nonce, long mvcc) { + if (nonce == HConstants.NO_NONCE) { + return; + } + NonceKey nk = new NonceKey(group, nonce); + OperationContext result = nonces.get(nk); + assert result != null; + synchronized (result) { + result.setMvcc(mvcc); + } + } + + /** + * Return the write point of the previous succeed operation. + * @param group Nonce group. + * @param nonce Nonce. + * @return write point of the previous succeed operation. + */ + public long getMvccFromOperationContext(long group, long nonce) { + if (nonce == HConstants.NO_NONCE) { + return Long.MAX_VALUE; + } + NonceKey nk = new NonceKey(group, nonce); + OperationContext result = nonces.get(nk); + return result == null ? Long.MAX_VALUE : result.getMvcc(); + } + /** * Reports the operation from WAL during replay. * @param group Nonce group. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index c8ccd2ade1f..265e3c1c352 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RegionLocations; @@ -28,6 +29,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.mockito.Mockito; @@ -150,4 +155,43 @@ public class HConnectionTestingUtility { Mockito.spy(new ConnectionImplementation(conf, null, null)); return connection; } + + /** + * This coproceesor sleep 2s at first increment/append rpc call. + */ + public static class SleepAtFirstRpcCall extends BaseRegionObserver { + static final AtomicLong ct = new AtomicLong(0); + static final String SLEEP_TIME_CONF_KEY = + "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime"; + static final long DEFAULT_SLEEP_TIME = 2000; + static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); + + public SleepAtFirstRpcCall() { + } + + @Override + public void postOpen(ObserverContext c) { + RegionCoprocessorEnvironment env = c.getEnvironment(); + Configuration conf = env.getConfiguration(); + sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME)); + } + + @Override + public Result postIncrement(final ObserverContext e, + final Increment increment, final Result result) throws IOException { + if (ct.incrementAndGet() == 1) { + Threads.sleep(sleepTime.get()); + } + return result; + } + + @Override + public Result postAppend(final ObserverContext e, + final Append append, final Result result) throws IOException { + if (ct.incrementAndGet() == 1) { + Threads.sleep(sleepTime.get()); + } + return result; + } + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ca4b609814a..bc94b024b8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -168,6 +169,50 @@ public class TestFromClientSide { // Nothing to do. } + /** + * Test append result when there are duplicate rpc request. + */ + @Test + public void testDuplicateAppend() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDuplicateAppend"); + Map kvs = new HashMap(); + kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000"); + hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, kvs); + TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close(); + + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50); + // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500); + + Connection connection = ConnectionFactory.createConnection(c); + Table t = connection.getTable(TableName.valueOf("HCM-testDuplicateAppend")); + if (t instanceof HTable) { + HTable table = (HTable) t; + table.setOperationTimeout(3 * 1000); + + try { + Append append = new Append(ROW); + append.add(TEST_UTIL.fam1, QUALIFIER, VALUE); + Result result = table.append(append); + + // Verify expected result + Cell[] cells = result.rawCells(); + assertEquals(1, cells.length); + assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE); + + // Verify expected result again + Result readResult = table.get(new Get(ROW)); + cells = readResult.rawCells(); + assertEquals(1, cells.length); + assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE); + } finally { + table.close(); + connection.close(); + } + } + } + /** * Basic client side validation of HBASE-4536 */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java index 6b4ee894661..3ddfef4308c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +31,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; @@ -56,6 +60,7 @@ public class TestIncrementsFromClientSide { protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); // This test depends on there being only one slave running at at a time. See the @Before // method where we do rolling restart. protected static int SLAVES = 1; @@ -79,6 +84,49 @@ public class TestIncrementsFromClientSide { TEST_UTIL.shutdownMiniCluster(); } + /** + * Test increment result when there are duplicate rpc request. + */ + @Test + public void testDuplicateIncrement() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDuplicateIncrement"); + Map kvs = new HashMap(); + kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000"); + hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, kvs); + TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close(); + + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50); + // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500); + + Connection connection = ConnectionFactory.createConnection(c); + Table t = connection.getTable(TableName.valueOf("HCM-testDuplicateIncrement")); + if (t instanceof HTable) { + HTable table = (HTable) t; + table.setOperationTimeout(3 * 1000); + + try { + Increment inc = new Increment(ROW); + inc.addColumn(TEST_UTIL.fam1, QUALIFIER, 1); + Result result = table.increment(inc); + + Cell [] cells = result.rawCells(); + assertEquals(1, cells.length); + assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1); + + // Verify expected result + Result readResult = table.get(new Get(ROW)); + cells = readResult.rawCells(); + assertEquals(1, cells.length); + assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1); + } finally { + table.close(); + connection.close(); + } + } + } + @Test public void testIncrementWithDeletes() throws Exception { LOG.info("Starting " + this.name.getMethodName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 67ac51e85ae..b1ad172f410 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -524,16 +523,16 @@ public class TestMultiParallel { Increment inc = new Increment(ONE_ROW); inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); table.increment(inc); + + // duplicate increment inc = new Increment(ONE_ROW); inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); - try { - table.increment(inc); - fail("Should have thrown an exception"); - } catch (OperationConflictException ex) { - } + Result result = table.increment(inc); + validateResult(result, QUALIFIER, Bytes.toBytes(1L)); + Get get = new Get(ONE_ROW); get.addColumn(BYTES_FAMILY, QUALIFIER); - Result result = table.get(get); + result = table.get(get); validateResult(result, QUALIFIER, Bytes.toBytes(1L)); // Now run a bunch of requests in parallel, exactly half should succeed. @@ -561,7 +560,6 @@ public class TestMultiParallel { } try { table.increment(inc); - } catch (OperationConflictException ex) { // Some threads are expected to fail. } catch (IOException ioEx) { fail("Not expected"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 54bee942aa8..b906e848714 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -489,7 +489,7 @@ public class TestScannerHeartbeatMessages { // Instantiate the custom heartbeat region scanners @Override protected RegionScanner instantiateRegionScanner(Scan scan, - List additionalScanners) throws IOException { + List additionalScanners, long nonceGroup, long nonce) throws IOException { if (scan.isReversed()) { if (scan.getFilter() != null) { scan.getFilter().setReversed(true);