HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
writes occur in same millisecond (Clint Morgan via J-D) git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@931727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b9247cdca
commit
0b788acf5e
|
@ -486,6 +486,8 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2252 Mapping a very big table kills region servers
|
HBASE-2252 Mapping a very big table kills region servers
|
||||||
HBASE-2412 [stargate] PerformanceEvaluation
|
HBASE-2412 [stargate] PerformanceEvaluation
|
||||||
HBASE-2419 Remove from RS logs the fat NotServingRegionException stack
|
HBASE-2419 Remove from RS logs the fat NotServingRegionException stack
|
||||||
|
HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
|
||||||
|
writes occur in same millisecond (Clint Morgan via J-D)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -181,6 +181,9 @@ public class IndexedTable extends TransactionalTable {
|
||||||
Result row = indexResult[i];
|
Result row = indexResult[i];
|
||||||
|
|
||||||
byte[] baseRow = row.getValue(INDEX_COL_FAMILY_NAME, INDEX_BASE_ROW);
|
byte[] baseRow = row.getValue(INDEX_COL_FAMILY_NAME, INDEX_BASE_ROW);
|
||||||
|
if (baseRow == null) {
|
||||||
|
throw new IllegalStateException("Missing base row for indexed row: ["+Bytes.toString(row.getRow())+"]");
|
||||||
|
}
|
||||||
LOG.debug("next index row [" + Bytes.toString(row.getRow())
|
LOG.debug("next index row [" + Bytes.toString(row.getRow())
|
||||||
+ "] -> base row [" + Bytes.toString(baseRow) + "]");
|
+ "] -> base row [" + Bytes.toString(baseRow) + "]");
|
||||||
Result baseResult = null;
|
Result baseResult = null;
|
||||||
|
@ -209,7 +212,10 @@ public class IndexedTable extends TransactionalTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (baseResult != null) {
|
if (baseResult != null) {
|
||||||
results.addAll(baseResult.list());
|
List<KeyValue> list = baseResult.list();
|
||||||
|
if (list != null) {
|
||||||
|
results.addAll(list);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result[i] = new Result(results);
|
result[i] = new Result(results);
|
||||||
|
|
|
@ -147,11 +147,30 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult);
|
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult);
|
||||||
|
|
||||||
for (IndexSpecification indexSpec : indexesToUpdate) {
|
for (IndexSpecification indexSpec : indexesToUpdate) {
|
||||||
removeOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
|
updateIndex(indexSpec, put, newColumnValues, oldColumnValues);
|
||||||
updateIndex(indexSpec, put.getRow(), newColumnValues);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: This call takes place in an RPC, and requires an RPC. This makes for
|
||||||
|
// a likely deadlock if the number of RPCs we are trying to serve is >= the
|
||||||
|
// number of handler threads.
|
||||||
|
private void updateIndex(IndexSpecification indexSpec, Put put,
|
||||||
|
NavigableMap<byte[], byte[]> newColumnValues,
|
||||||
|
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
|
||||||
|
Delete indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
|
||||||
|
Put indexPut = makeIndexUpdate(indexSpec, put.getRow(), newColumnValues);
|
||||||
|
|
||||||
|
HTable indexTable = getIndexTable(indexSpec);
|
||||||
|
if (indexDelete != null && !Bytes.equals(indexDelete.getRow(), indexPut.getRow())) {
|
||||||
|
// Only do the delete if the row changed. This way we save the put after delete issues in HBASE-2256
|
||||||
|
LOG.debug("Deleting old index row ["+Bytes.toString(indexDelete.getRow())+"]. New row is ["+Bytes.toString(indexPut.getRow())+"].");
|
||||||
|
indexTable.delete(indexDelete);
|
||||||
|
} else if (indexDelete != null){
|
||||||
|
LOG.debug("Skipping deleting index row ["+Bytes.toString(indexDelete.getRow())+"] because it has not changed.");
|
||||||
|
}
|
||||||
|
indexTable.put(indexPut);
|
||||||
|
}
|
||||||
|
|
||||||
/** Return the columns needed for the update. */
|
/** Return the columns needed for the update. */
|
||||||
private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
|
private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
|
||||||
NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -163,7 +182,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
return neededColumns;
|
return neededColumns;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
|
private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row,
|
||||||
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
|
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
|
||||||
for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
|
for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
|
||||||
if (!oldColumnValues.containsKey(indexedCol)) {
|
if (!oldColumnValues.containsKey(indexedCol)) {
|
||||||
|
@ -171,7 +190,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
+ "] not trying to remove old entry for row ["
|
+ "] not trying to remove old entry for row ["
|
||||||
+ Bytes.toString(row) + "] because col ["
|
+ Bytes.toString(row) + "] because col ["
|
||||||
+ Bytes.toString(indexedCol) + "] is missing");
|
+ Bytes.toString(indexedCol) + "] is missing");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +198,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
oldColumnValues);
|
oldColumnValues);
|
||||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
|
LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
|
||||||
+ Bytes.toString(oldIndexRow) + "]");
|
+ Bytes.toString(oldIndexRow) + "]");
|
||||||
getIndexTable(indexSpec).delete(new Delete(oldIndexRow));
|
return new Delete(oldIndexRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) {
|
private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) {
|
||||||
|
@ -212,23 +231,21 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: This call takes place in an RPC, and requires an RPC. This makes for
|
private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row,
|
||||||
// a likely deadlock if the number of RPCs we are trying to serve is >= the
|
|
||||||
// number of handler threads.
|
|
||||||
private void updateIndex(IndexSpecification indexSpec, byte[] row,
|
|
||||||
SortedMap<byte[], byte[]> columnValues) throws IOException {
|
SortedMap<byte[], byte[]> columnValues) throws IOException {
|
||||||
Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
|
Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
|
||||||
getIndexTable(indexSpec).put(indexUpdate);
|
|
||||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
|
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
|
||||||
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
|
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
|
||||||
+ Bytes.toString(row) + "]");
|
+ Bytes.toString(row) + "]");
|
||||||
|
return indexUpdate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME we can be smarter about this and avoid the base gets and index maintenance in many cases.
|
||||||
@Override
|
@Override
|
||||||
public void delete(Delete delete, final Integer lockid, boolean writeToWAL)
|
public void delete(Delete delete, final Integer lockid, boolean writeToWAL)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// First remove the existing indexes.
|
// First look at the current (to be the old) state.
|
||||||
|
SortedMap<byte[], byte[]> oldColumnValues = null;
|
||||||
if (!getIndexes().isEmpty()) {
|
if (!getIndexes().isEmpty()) {
|
||||||
// Need all columns
|
// Need all columns
|
||||||
NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
|
NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
|
||||||
|
@ -244,12 +261,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
}
|
}
|
||||||
|
|
||||||
Result oldRow = super.get(get, lockid);
|
Result oldRow = super.get(get, lockid);
|
||||||
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldRow);
|
oldColumnValues = convertToValueMap(oldRow);
|
||||||
|
|
||||||
|
|
||||||
for (IndexSpecification indexSpec : getIndexes()) {
|
|
||||||
removeOldIndexEntry(indexSpec, delete.getRow(), oldColumnValues);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
super.delete(delete, lockid, writeToWAL);
|
super.delete(delete, lockid, writeToWAL);
|
||||||
|
@ -259,13 +271,57 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
|
|
||||||
// Rebuild index if there is still a version visible.
|
// Rebuild index if there is still a version visible.
|
||||||
Result currentRow = super.get(get, lockid);
|
Result currentRow = super.get(get, lockid);
|
||||||
if (!currentRow.isEmpty()) {
|
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
|
||||||
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
|
for (IndexSpecification indexSpec : getIndexes()) {
|
||||||
for (IndexSpecification indexSpec : getIndexes()) {
|
Delete indexDelete = null;
|
||||||
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
|
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, oldColumnValues)) {
|
||||||
updateIndex(indexSpec, delete.getRow(), currentColumnValues);
|
indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, delete
|
||||||
|
.getRow(), oldColumnValues);
|
||||||
|
}
|
||||||
|
Put indexPut = null;
|
||||||
|
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec,
|
||||||
|
currentColumnValues)) {
|
||||||
|
indexPut = makeIndexUpdate(indexSpec, delete.getRow(),
|
||||||
|
currentColumnValues);
|
||||||
|
}
|
||||||
|
if (indexPut == null && indexDelete == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
HTable indexTable = getIndexTable(indexSpec);
|
||||||
|
if (indexDelete != null
|
||||||
|
&& (indexPut == null || !Bytes.equals(indexDelete.getRow(),
|
||||||
|
indexPut.getRow()))) {
|
||||||
|
// Only do the delete if the row changed. This way we save the put
|
||||||
|
// after delete issues in HBASE-2256
|
||||||
|
LOG.debug("Deleting old index row ["
|
||||||
|
+ Bytes.toString(indexDelete.getRow()) + "].");
|
||||||
|
indexTable.delete(indexDelete);
|
||||||
|
} else if (indexDelete != null) {
|
||||||
|
LOG.debug("Skipping deleting index row ["
|
||||||
|
+ Bytes.toString(indexDelete.getRow())
|
||||||
|
+ "] because it has not changed.");
|
||||||
|
|
||||||
|
for (byte [] indexCol : indexSpec.getAdditionalColumns()) {
|
||||||
|
byte[][] parsed = KeyValue.parseColumn(indexCol);
|
||||||
|
List<KeyValue> famDeletes = delete.getFamilyMap().get(parsed[0]);
|
||||||
|
if (famDeletes != null) {
|
||||||
|
for (KeyValue kv : famDeletes) {
|
||||||
|
if (Bytes.equals(parsed[0], kv.getFamily()) && Bytes.equals(parsed[1], kv.getQualifier())) {
|
||||||
|
LOG.debug("Need to delete this specific column: "+Bytes.toString(indexCol));
|
||||||
|
Delete columnDelete = new Delete(indexDelete.getRow());
|
||||||
|
columnDelete.deleteColumns(parsed[0],parsed[1]);
|
||||||
|
indexTable.delete(columnDelete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (indexPut != null) {
|
||||||
|
getIndexTable(indexSpec).put(indexPut);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionserver.transactional;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -138,17 +140,23 @@ class TransactionState {
|
||||||
}
|
}
|
||||||
|
|
||||||
void addWrite(final Put write) {
|
void addWrite(final Put write) {
|
||||||
byte [] now = Bytes.toBytes(System.currentTimeMillis());
|
updateLatestTimestamp(write.getFamilyMap().values());
|
||||||
// HAVE to manually set the KV timestamps
|
|
||||||
for (List<KeyValue> kvs : write.getFamilyMap().values()) {
|
|
||||||
for (KeyValue kv : kvs) {
|
|
||||||
kv.updateLatestStamp(now);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
puts.add(write);
|
puts.add(write);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//FIXME REVIEW not sure about this. Needed for log recovery? but broke other tests.
|
||||||
|
private void updateLatestTimestamp(Collection<List<KeyValue>> kvsCollection) {
|
||||||
|
byte [] now = Bytes.toBytes(System.currentTimeMillis());
|
||||||
|
// HAVE to manually set the KV timestamps
|
||||||
|
for (List<KeyValue> kvs : kvsCollection) {
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
if (kv.isLatestTimestamp()) {
|
||||||
|
kv.updateLatestStamp(now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boolean hasWrite() {
|
boolean hasWrite() {
|
||||||
return puts.size() > 0 || deletes.size() > 0;
|
return puts.size() > 0 || deletes.size() > 0;
|
||||||
}
|
}
|
||||||
|
@ -371,13 +379,6 @@ class TransactionState {
|
||||||
return deletes;
|
return deletes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set deleteSet.
|
|
||||||
* @param deleteSet the deleteSet to set
|
|
||||||
*/
|
|
||||||
void setDeleteSet(List<Delete> deleteSet) {
|
|
||||||
this.deletes = deleteSet;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
|
/** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
|
||||||
*
|
*
|
||||||
* @return scanner
|
* @return scanner
|
||||||
|
@ -393,20 +394,54 @@ class TransactionState {
|
||||||
*/
|
*/
|
||||||
private class PutScanner implements KeyValueScanner, InternalScanner {
|
private class PutScanner implements KeyValueScanner, InternalScanner {
|
||||||
|
|
||||||
private NavigableSet<KeyValue> kvSet;
|
private List<KeyValue> kvList;
|
||||||
private Iterator<KeyValue> iterator;
|
private Iterator<KeyValue> iterator;
|
||||||
private boolean didHasNext = false;
|
private boolean didHasNext = false;
|
||||||
private KeyValue next = null;
|
private KeyValue next = null;
|
||||||
|
|
||||||
|
|
||||||
PutScanner() {
|
PutScanner() {
|
||||||
kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
|
kvList = new ArrayList<KeyValue>();
|
||||||
for (Put put : puts) {
|
for (Put put : puts) {
|
||||||
for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
|
for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
|
||||||
kvSet.addAll(putKVs);
|
kvList.addAll(putKVs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
iterator = kvSet.iterator();
|
Collections.sort(kvList, new Comparator<KeyValue>() {
|
||||||
|
|
||||||
|
/** We want to honor the order of the puts in the case where multiple have the same timestamp.
|
||||||
|
*
|
||||||
|
* @param o1
|
||||||
|
* @param o2
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public int compare(KeyValue o1, KeyValue o2) {
|
||||||
|
int result = KeyValue.COMPARATOR.compare(o1, o2);
|
||||||
|
if (result != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
if (o1 == o2) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int put1Number = getPutNumber(o1);
|
||||||
|
int put2Number = getPutNumber(o2);
|
||||||
|
return put2Number - put1Number;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
iterator = kvList.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getPutNumber(KeyValue kv) {
|
||||||
|
for (int i=0; i < puts.size(); i++) {
|
||||||
|
for (List<KeyValue> putKVs : puts.get(i).getFamilyMap().values()) {
|
||||||
|
for (KeyValue putKV : putKVs)
|
||||||
|
if (putKV == kv) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalStateException("Can not fine put KV in puts");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
@ -424,8 +459,18 @@ class TransactionState {
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void iteratorFrom(KeyValue key) {
|
||||||
|
iterator = kvList.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
KeyValue next = iterator.next();
|
||||||
|
if (KeyValue.COMPARATOR.compare(next, key) >= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean seek(KeyValue key) {
|
public boolean seek(KeyValue key) {
|
||||||
iterator = kvSet.headSet(key).iterator();
|
iteratorFrom(key);
|
||||||
|
|
||||||
getNext();
|
getNext();
|
||||||
return next != null;
|
return next != null;
|
||||||
|
|
|
@ -127,6 +127,25 @@ public class TestTransactions extends HBaseClusterTestCase {
|
||||||
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
|
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetAfterPutPut() throws IOException {
|
||||||
|
TransactionState transactionState = transactionManager.beginTransaction();
|
||||||
|
|
||||||
|
int originalValue = Bytes.toInt(table.get(transactionState,
|
||||||
|
new Get(ROW1).addColumn(FAMILY, QUAL_A)).value());
|
||||||
|
int newValue = originalValue + 1;
|
||||||
|
|
||||||
|
table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
|
||||||
|
.toBytes(newValue)));
|
||||||
|
|
||||||
|
newValue = newValue + 1;
|
||||||
|
|
||||||
|
table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
|
||||||
|
.toBytes(newValue)));
|
||||||
|
|
||||||
|
Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(FAMILY, QUAL_A));
|
||||||
|
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
|
||||||
|
}
|
||||||
|
|
||||||
public void testScanAfterUpdatePut() throws IOException {
|
public void testScanAfterUpdatePut() throws IOException {
|
||||||
TransactionState transactionState = transactionManager.beginTransaction();
|
TransactionState transactionState = transactionManager.beginTransaction();
|
||||||
|
|
||||||
|
@ -176,11 +195,6 @@ public class TestTransactions extends HBaseClusterTestCase {
|
||||||
int row2Value = 199;
|
int row2Value = 199;
|
||||||
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
|
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
|
||||||
.toBytes(row2Value)));
|
.toBytes(row2Value)));
|
||||||
try {
|
|
||||||
Thread.sleep(500);
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
// just ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
row2Value = 299;
|
row2Value = 299;
|
||||||
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
|
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
|
||||||
|
@ -197,8 +211,17 @@ public class TestTransactions extends HBaseClusterTestCase {
|
||||||
Assert.assertNotNull(result);
|
Assert.assertNotNull(result);
|
||||||
Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
|
Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
|
||||||
Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
|
Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
|
||||||
|
|
||||||
|
// TODO commit and verifty that we see second put.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPutPutScanOverAndOver() throws IOException {
|
||||||
|
// Do this test many times to try and hit two puts in the same millisecond
|
||||||
|
for (int i=0 ; i < 100; i++) {
|
||||||
|
testPutPutScan();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
|
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
|
||||||
private TransactionState makeTransaction1() throws IOException {
|
private TransactionState makeTransaction1() throws IOException {
|
||||||
TransactionState transactionState = transactionManager.beginTransaction();
|
TransactionState transactionState = transactionManager.beginTransaction();
|
||||||
|
|
Loading…
Reference in New Issue