HBASE-9899 for idempotent operation dups, return the result instead of throwing conflict exception (Guanghao Zhang)

This commit is contained in:
stack 2016-08-04 12:40:19 -07:00
parent 550b937bcf
commit 975f0dd958
10 changed files with 332 additions and 40 deletions

View File

@ -858,6 +858,63 @@ public final class ProtobufUtil {
return increment; return increment;
} }
/**
* Convert a protocol buffer Mutate to a Get.
* @param proto the protocol buffer Mutate to convert.
* @param cellScanner
* @return the converted client get.
* @throws IOException
*/
public static Get toGet(final MutationProto proto, final CellScanner cellScanner)
throws IOException {
MutationType type = proto.getMutateType();
assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name();
byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
Get get = null;
int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
if (cellCount > 0) {
// The proto has metadata only and the data is separate to be found in the cellScanner.
if (cellScanner == null) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: "
+ TextFormat.shortDebugString(proto));
}
for (int i = 0; i < cellCount; i++) {
if (!cellScanner.advance()) {
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i
+ " no cell returned: " + TextFormat.shortDebugString(proto));
}
Cell cell = cellScanner.current();
if (get == null) {
get = new Get(Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
get.addColumn(
Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()));
}
} else {
get = new Get(row);
for (ColumnValue column : proto.getColumnValueList()) {
byte[] family = column.getFamily().toByteArray();
for (QualifierValue qv : column.getQualifierValueList()) {
byte[] qualifier = qv.getQualifier().toByteArray();
if (!qv.hasValue()) {
throw new DoNotRetryIOException("Missing required field: qualifier value");
}
get.addColumn(family, qualifier);
}
}
}
if (proto.hasTimeRange()) {
TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
for (NameBytesPair attribute : proto.getAttributeList()) {
get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return get;
}
/** /**
* Convert a client Scan to a protocol buffer Scan * Convert a client Scan to a protocol buffer Scan
* *

View File

@ -2617,6 +2617,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) public RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException { throws IOException {
return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
private RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
long nonceGroup, long nonce) throws IOException {
startRegionOperation(Operation.SCAN); startRegionOperation(Operation.SCAN);
try { try {
// Verify families are all valid // Verify families are all valid
@ -2630,7 +2635,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkFamily(family); checkFamily(family);
} }
} }
return instantiateRegionScanner(scan, additionalScanners); return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce);
} finally { } finally {
closeRegionOperation(Operation.SCAN); closeRegionOperation(Operation.SCAN);
} }
@ -2638,13 +2643,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected RegionScanner instantiateRegionScanner(Scan scan, protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException { List<KeyValueScanner> additionalScanners) throws IOException {
return instantiateRegionScanner(scan, additionalScanners, HConstants.NO_NONCE,
HConstants.NO_NONCE);
}
protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
if (scan.isReversed()) { if (scan.isReversed()) {
if (scan.getFilter() != null) { if (scan.getFilter() != null) {
scan.getFilter().setReversed(true); scan.getFilter().setReversed(true);
} }
return new ReversedRegionScannerImpl(scan, additionalScanners, this); return new ReversedRegionScannerImpl(scan, additionalScanners, this);
} }
return new RegionScannerImpl(scan, additionalScanners, this); return new RegionScannerImpl(scan, additionalScanners, this, nonceGroup, nonce);
} }
@Override @Override
@ -5592,6 +5603,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException { throws IOException {
this(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
long nonceGroup, long nonce) throws IOException {
this.region = region; this.region = region;
this.maxResultSize = scan.getMaxResultSize(); this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) { if (scan.hasFilter()) {
@ -5621,15 +5637,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// getSmallestReadPoint, before scannerReadPoints is updated. // getSmallestReadPoint, before scannerReadPoints is updated.
IsolationLevel isolationLevel = scan.getIsolationLevel(); IsolationLevel isolationLevel = scan.getIsolationLevel();
synchronized(scannerReadPoints) { synchronized(scannerReadPoints) {
this.readPt = getReadPoint(isolationLevel); if (nonce == HConstants.NO_NONCE || rsServices == null
|| rsServices.getNonceManager() == null) {
this.readPt = getReadPoint(isolationLevel);
} else {
this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
}
scannerReadPoints.put(this, this.readPt); scannerReadPoints.put(this, this.readPt);
} }
initializeScanners(scan, additionalScanners);
}
protected void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException {
// Here we separate all scanners into two lists - scanner that provide data required // Here we separate all scanners into two lists - scanner that provide data required
// by the filter to operate (scanners list) and all others (joinedScanners list). // by the filter to operate (scanners list) and all others (joinedScanners list).
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
List<KeyValueScanner> joinedScanners List<KeyValueScanner> joinedScanners =
= new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
// Store all already instantiated scanners for exception handling // Store all already instantiated scanners for exception handling
List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
// handle additionalScanners // handle additionalScanners
@ -6795,15 +6821,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
} }
void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException {
checkRow(get.getRow(), "Get"); checkRow(get.getRow(), "Get");
// Verify families are all valid // Verify families are all valid
if (get.hasFamilies()) { if (get.hasFamilies()) {
for (byte [] family: get.familySet()) { for (byte[] family : get.familySet()) {
checkFamily(family); checkFamily(family);
} }
} else { // Adding all families to scanner } else { // Adding all families to scanner
for (byte[] family: this.htableDescriptor.getFamiliesKeys()) { for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
get.addFamily(family); get.addFamily(family);
} }
} }
@ -6811,7 +6837,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException { public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
@Override
public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException {
List<Cell> results = new ArrayList<Cell>(); List<Cell> results = new ArrayList<Cell>();
// pre-get CP hook // pre-get CP hook
@ -6825,7 +6856,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionScanner scanner = null; RegionScanner scanner = null;
try { try {
scanner = getScanner(scan); scanner = getScanner(scan, null, nonceGroup, nonce);
scanner.next(results); scanner.next(results);
} finally { } finally {
if (scanner != null) if (scanner != null)
@ -7168,6 +7199,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId); applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId);
} }
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
if (rsServices != null && rsServices.getNonceManager() != null) {
rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, nonce,
writeEntry.getWriteNumber());
}
writeEntry = null; writeEntry = null;
} finally { } finally {
this.updatesLock.readLock().unlock(); this.updatesLock.readLock().unlock();

View File

@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -426,11 +425,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* Starts the nonce operation for a mutation, if needed. * Starts the nonce operation for a mutation, if needed.
* @param mutation Mutation. * @param mutation Mutation.
* @param nonceGroup Nonce group from the request. * @param nonceGroup Nonce group from the request.
* @returns Nonce used (can be NO_NONCE). * @returns whether to proceed this mutation.
*/ */
private long startNonceOperation(final MutationProto mutation, long nonceGroup) private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
throws IOException, OperationConflictException { throws IOException {
if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE; if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
boolean canProceed = false; boolean canProceed = false;
try { try {
canProceed = regionServer.nonceManager.startOperation( canProceed = regionServer.nonceManager.startOperation(
@ -438,14 +437,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
throw new InterruptedIOException("Nonce start operation interrupted"); throw new InterruptedIOException("Nonce start operation interrupted");
} }
if (!canProceed) { return canProceed;
// TODO: instead, we could convert append/increment to get w/mvcc
String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
+ "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
+ "] may have already completed";
throw new OperationConflictException(message);
}
return mutation.getNonce();
} }
/** /**
@ -614,23 +606,33 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* bypassed as indicated by RegionObserver, null otherwise * bypassed as indicated by RegionObserver, null otherwise
* @throws IOException * @throws IOException
*/ */
private Result append(final Region region, final OperationQuota quota, final MutationProto m, private Result append(final Region region, final OperationQuota quota,
final CellScanner cellScanner, long nonceGroup) throws IOException { final MutationProto mutation, final CellScanner cellScanner, long nonceGroup)
throws IOException {
long before = EnvironmentEdgeManager.currentTime(); long before = EnvironmentEdgeManager.currentTime();
Append append = ProtobufUtil.toAppend(m, cellScanner); Append append = ProtobufUtil.toAppend(mutation, cellScanner);
quota.addMutation(append); quota.addMutation(append);
Result r = null; Result r = null;
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preAppend(append); r = region.getCoprocessorHost().preAppend(append);
} }
if (r == null) { if (r == null) {
long nonce = startNonceOperation(m, nonceGroup); boolean canProceed = startNonceOperation(mutation, nonceGroup);
boolean success = false; boolean success = false;
try { try {
r = region.append(append, nonceGroup, nonce); if (canProceed) {
r = region.append(append, nonceGroup, mutation.getNonce());
} else {
// convert duplicate append to get
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
nonceGroup, mutation.getNonce());
r = Result.create(results);
}
success = true; success = true;
} finally { } finally {
endNonceOperation(m, nonceGroup, success); if (canProceed) {
endNonceOperation(mutation, nonceGroup, success);
}
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postAppend(append, r); region.getCoprocessorHost().postAppend(append, r);
@ -662,13 +664,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.getCoprocessorHost().preIncrement(increment); r = region.getCoprocessorHost().preIncrement(increment);
} }
if (r == null) { if (r == null) {
long nonce = startNonceOperation(mutation, nonceGroup); boolean canProceed = startNonceOperation(mutation, nonceGroup);
boolean success = false; boolean success = false;
try { try {
r = region.increment(increment, nonceGroup, nonce); if (canProceed) {
r = region.increment(increment, nonceGroup, mutation.getNonce());
} else {
// convert duplicate increment to get
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
mutation.getNonce());
r = Result.create(results);
}
success = true; success = true;
} finally { } finally {
endNonceOperation(mutation, nonceGroup, success); if (canProceed) {
endNonceOperation(mutation, nonceGroup, success);
}
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().postIncrement(increment, r); r = region.getCoprocessorHost().postIncrement(increment, r);

View File

@ -394,6 +394,17 @@ public interface Region extends ConfigurationObserver {
*/ */
List<Cell> get(Get get, boolean withCoprocessor) throws IOException; List<Cell> get(Get get, boolean withCoprocessor) throws IOException;
/**
* Do a get for duplicate non-idempotent operation.
* @param get query parameters.
* @param withCoprocessor
* @param nonceGroup Nonce group.
* @param nonce Nonce.
* @return list of cells resulting from the operation
* @throws IOException
*/
List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException;
/** /**
* Return an iterator that scans over the HRegion, returning the indicated * Return an iterator that scans over the HRegion, returning the indicated
* columns and rows specified by the {@link Scan}. * columns and rows specified by the {@link Scan}.

View File

@ -62,6 +62,8 @@ public class ServerNonceManager {
private static final long WAITING_BIT = 4; private static final long WAITING_BIT = 4;
private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS; private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
private static long mvcc;
@Override @Override
public String toString() { public String toString() {
return "[state " + getState() + ", hasWait " + hasWait() + ", activity " return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
@ -98,6 +100,14 @@ public class ServerNonceManager {
return getActivityTime() < (minRelevantTime & (~0l >>> 3)); return getActivityTime() < (minRelevantTime & (~0l >>> 3));
} }
public void setMvcc(long mvcc) {
this.mvcc = mvcc;
}
public long getMvcc() {
return this.mvcc;
}
private long getActivityTime() { private long getActivityTime() {
return this.data >>> 3; return this.data >>> 3;
} }
@ -191,6 +201,39 @@ public class ServerNonceManager {
} }
} }
/**
* Store the write point in OperationContext when the operation succeed.
* @param group Nonce group.
* @param nonce Nonce.
* @param mvcc Write point of the succeed operation.
*/
public void addMvccToOperationContext(long group, long nonce, long mvcc) {
if (nonce == HConstants.NO_NONCE) {
return;
}
NonceKey nk = new NonceKey(group, nonce);
OperationContext result = nonces.get(nk);
assert result != null;
synchronized (result) {
result.setMvcc(mvcc);
}
}
/**
* Return the write point of the previous succeed operation.
* @param group Nonce group.
* @param nonce Nonce.
* @return write point of the previous succeed operation.
*/
public long getMvccFromOperationContext(long group, long nonce) {
if (nonce == HConstants.NO_NONCE) {
return Long.MAX_VALUE;
}
NonceKey nk = new NonceKey(group, nonce);
OperationContext result = nonces.get(nk);
return result == null ? Long.MAX_VALUE : result.getMvcc();
}
/** /**
* Reports the operation from WAL during replay. * Reports the operation from WAL during replay.
* @param group Nonce group. * @param group Nonce group.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
@ -28,6 +29,10 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -150,4 +155,43 @@ public class HConnectionTestingUtility {
Mockito.spy(new ConnectionImplementation(conf, null, null)); Mockito.spy(new ConnectionImplementation(conf, null, null));
return connection; return connection;
} }
/**
* This coproceesor sleep 2s at first increment/append rpc call.
*/
public static class SleepAtFirstRpcCall extends BaseRegionObserver {
static final AtomicLong ct = new AtomicLong(0);
static final String SLEEP_TIME_CONF_KEY =
"hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
static final long DEFAULT_SLEEP_TIME = 2000;
static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
public SleepAtFirstRpcCall() {
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
}
@Override
public Result postIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment, final Result result) throws IOException {
if (ct.incrementAndGet() == 1) {
Threads.sleep(sleepTime.get());
}
return result;
}
@Override
public Result postAppend(final ObserverContext<RegionCoprocessorEnvironment> e,
final Append append, final Result result) throws IOException {
if (ct.incrementAndGet() == 1) {
Threads.sleep(sleepTime.get());
}
return result;
}
}
} }

View File

@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -168,6 +169,50 @@ public class TestFromClientSide {
// Nothing to do. // Nothing to do.
} }
/**
* Test append result when there are duplicate rpc request.
*/
@Test
public void testDuplicateAppend() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDuplicateAppend");
Map<String, String> kvs = new HashMap<String, String>();
kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
// Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
Connection connection = ConnectionFactory.createConnection(c);
Table t = connection.getTable(TableName.valueOf("HCM-testDuplicateAppend"));
if (t instanceof HTable) {
HTable table = (HTable) t;
table.setOperationTimeout(3 * 1000);
try {
Append append = new Append(ROW);
append.add(TEST_UTIL.fam1, QUALIFIER, VALUE);
Result result = table.append(append);
// Verify expected result
Cell[] cells = result.rawCells();
assertEquals(1, cells.length);
assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
// Verify expected result again
Result readResult = table.get(new Get(ROW));
cells = readResult.rawCells();
assertEquals(1, cells.length);
assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
} finally {
table.close();
connection.close();
}
}
}
/** /**
* Basic client side validation of HBASE-4536 * Basic client side validation of HBASE-4536
*/ */

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@ -56,6 +60,7 @@ public class TestIncrementsFromClientSide {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
// This test depends on there being only one slave running at at a time. See the @Before // This test depends on there being only one slave running at at a time. See the @Before
// method where we do rolling restart. // method where we do rolling restart.
protected static int SLAVES = 1; protected static int SLAVES = 1;
@ -79,6 +84,49 @@ public class TestIncrementsFromClientSide {
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
/**
* Test increment result when there are duplicate rpc request.
*/
@Test
public void testDuplicateIncrement() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDuplicateIncrement");
Map<String, String> kvs = new HashMap<String, String>();
kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
// Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
Connection connection = ConnectionFactory.createConnection(c);
Table t = connection.getTable(TableName.valueOf("HCM-testDuplicateIncrement"));
if (t instanceof HTable) {
HTable table = (HTable) t;
table.setOperationTimeout(3 * 1000);
try {
Increment inc = new Increment(ROW);
inc.addColumn(TEST_UTIL.fam1, QUALIFIER, 1);
Result result = table.increment(inc);
Cell [] cells = result.rawCells();
assertEquals(1, cells.length);
assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
// Verify expected result
Result readResult = table.get(new Get(ROW));
cells = readResult.rawCells();
assertEquals(1, cells.length);
assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
} finally {
table.close();
connection.close();
}
}
}
@Test @Test
public void testIncrementWithDeletes() throws Exception { public void testIncrementWithDeletes() throws Exception {
LOG.info("Starting " + this.name.getMethodName()); LOG.info("Starting " + this.name.getMethodName());

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -524,16 +523,16 @@ public class TestMultiParallel {
Increment inc = new Increment(ONE_ROW); Increment inc = new Increment(ONE_ROW);
inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
table.increment(inc); table.increment(inc);
// duplicate increment
inc = new Increment(ONE_ROW); inc = new Increment(ONE_ROW);
inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L); inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
try { Result result = table.increment(inc);
table.increment(inc); validateResult(result, QUALIFIER, Bytes.toBytes(1L));
fail("Should have thrown an exception");
} catch (OperationConflictException ex) {
}
Get get = new Get(ONE_ROW); Get get = new Get(ONE_ROW);
get.addColumn(BYTES_FAMILY, QUALIFIER); get.addColumn(BYTES_FAMILY, QUALIFIER);
Result result = table.get(get); result = table.get(get);
validateResult(result, QUALIFIER, Bytes.toBytes(1L)); validateResult(result, QUALIFIER, Bytes.toBytes(1L));
// Now run a bunch of requests in parallel, exactly half should succeed. // Now run a bunch of requests in parallel, exactly half should succeed.
@ -561,7 +560,6 @@ public class TestMultiParallel {
} }
try { try {
table.increment(inc); table.increment(inc);
} catch (OperationConflictException ex) { // Some threads are expected to fail.
} catch (IOException ioEx) { } catch (IOException ioEx) {
fail("Not expected"); fail("Not expected");
} }

View File

@ -489,7 +489,7 @@ public class TestScannerHeartbeatMessages {
// Instantiate the custom heartbeat region scanners // Instantiate the custom heartbeat region scanners
@Override @Override
protected RegionScanner instantiateRegionScanner(Scan scan, protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException { List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
if (scan.isReversed()) { if (scan.isReversed()) {
if (scan.getFilter() != null) { if (scan.getFilter() != null) {
scan.getFilter().setReversed(true); scan.getFilter().setReversed(true);