HBASE-1670 transactions / indexing fixes: trx deletes not handeled, index scan can't specify stopRow

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@797262 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-07-23 23:46:00 +00:00
parent 7ddaccdd61
commit 69f8d4e935
16 changed files with 320 additions and 121 deletions

View File

@ -282,6 +282,8 @@ Release 0.20.0 - Unreleased
client package client package
HBASE-1680 FilterList writable only works for HBaseObjectWritable HBASE-1680 FilterList writable only works for HBaseObjectWritable
defined types (Clint Morgan via Stack and Jon Gray) defined types (Clint Morgan via Stack and Jon Gray)
HBASE-1607 transactions / indexing fixes: trx deletes not handeled, index
scan can't specify stopRow (Clint Morgan via Stack)
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
@ -39,14 +38,12 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.transactional.TransactionalTable; import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** HTable extended with indexed support. */ /** HTable extended with indexed support. */
public class IndexedTable extends TransactionalTable { public class IndexedTable extends TransactionalTable {
// FIXME, these belong elsewhere // TODO move these schema constants elsewhere
public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__"); public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__");
public static final byte[] INDEX_COL_FAMILY = Bytes.add( public static final byte[] INDEX_COL_FAMILY = Bytes.add(
INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }); INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER });
@ -79,6 +76,7 @@ public class IndexedTable extends TransactionalTable {
* *
* @param indexId the id of the index to use * @param indexId the id of the index to use
* @param indexStartRow (created from the IndexKeyGenerator) * @param indexStartRow (created from the IndexKeyGenerator)
* @param indexStopRow (created from the IndexKeyGenerator)
* @param indexColumns in the index table * @param indexColumns in the index table
* @param indexFilter filter to run on the index'ed table. This can only use * @param indexFilter filter to run on the index'ed table. This can only use
* columns that have been added to the index. * columns that have been added to the index.
@ -87,7 +85,7 @@ public class IndexedTable extends TransactionalTable {
* @throws IOException * @throws IOException
* @throws IndexNotFoundException * @throws IndexNotFoundException
*/ */
public ResultScanner getIndexedScanner(String indexId, final byte[] indexStartRow, public ResultScanner getIndexedScanner(String indexId, final byte[] indexStartRow, final byte[] indexStopRow,
byte[][] indexColumns, final Filter indexFilter, byte[][] indexColumns, final Filter indexFilter,
final byte[][] baseColumns) throws IOException, IndexNotFoundException { final byte[][] baseColumns) throws IOException, IndexNotFoundException {
IndexSpecification indexSpec = this.indexedTableDescriptor.getIndex(indexId); IndexSpecification indexSpec = this.indexedTableDescriptor.getIndex(indexId);
@ -114,9 +112,15 @@ public class IndexedTable extends TransactionalTable {
allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN; allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN;
} }
Scan indexScan = new Scan(indexStartRow); Scan indexScan = new Scan();
//indexScan.setFilter(filter); // FIXME indexScan.setFilter(indexFilter);
indexScan.addColumns(allIndexColumns); indexScan.addColumns(allIndexColumns);
if (indexStartRow != null) {
indexScan.setStartRow(indexStartRow);
}
if (indexStopRow != null) {
indexScan.setStopRow(indexStopRow);
}
ResultScanner indexScanner = indexTable.getScanner(indexScan); ResultScanner indexScanner = indexTable.getScanner(indexScan);
return new ScannerWrapper(indexScanner, baseColumns); return new ScannerWrapper(indexScanner, baseColumns);
@ -173,8 +177,6 @@ public class IndexedTable extends TransactionalTable {
byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN); byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN);
LOG.debug("next index row [" + Bytes.toString(row.getRow()) LOG.debug("next index row [" + Bytes.toString(row.getRow())
+ "] -> base row [" + Bytes.toString(baseRow) + "]"); + "] -> base row [" + Bytes.toString(baseRow) + "]");
HbaseMapWritable<byte[], Cell> colValues =
new HbaseMapWritable<byte[], Cell>();
Result baseResult = null; Result baseResult = null;
if (columns != null && columns.length > 0) { if (columns != null && columns.length > 0) {
LOG.debug("Going to base table for remaining columns"); LOG.debug("Going to base table for remaining columns");

View File

@ -28,11 +28,8 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
public class IndexedTableDescriptor { public class IndexedTableDescriptor {

View File

@ -65,7 +65,9 @@ public class JtaXAResource implements XAResource {
} catch (CommitUnsuccessfulException e) { } catch (CommitUnsuccessfulException e) {
throw new XAException(XAException.XA_RBROLLBACK); throw new XAException(XAException.XA_RBROLLBACK);
} catch (IOException e) { } catch (IOException e) {
throw new XAException(XAException.XA_RBPROTO); // FIXME correct code? XAException xae = new XAException(XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
} finally { } finally {
threadLocalTransactionState.remove(); threadLocalTransactionState.remove();
} }
@ -85,7 +87,9 @@ public class JtaXAResource implements XAResource {
try { try {
transactionManager.abort(state); transactionManager.abort(state);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); // FIXME, should be an XAException? XAException xae = new XAException(XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
} }
} }
} }
@ -108,9 +112,13 @@ public class JtaXAResource implements XAResource {
try { try {
status = this.transactionManager.prepareCommit(state); status = this.transactionManager.prepareCommit(state);
} catch (CommitUnsuccessfulException e) { } catch (CommitUnsuccessfulException e) {
throw new XAException(XAException.XA_HEURRB); // FIXME correct code? XAException xae = new XAException(XAException.XA_HEURRB);
xae.initCause(e);
throw xae;
} catch (IOException e) { } catch (IOException e) {
throw new XAException(XAException.XA_RBPROTO); // FIXME correct code? XAException xae = new XAException(XAException.XAER_RMERR);
xae.initCause(e);
throw xae;
} }
switch (status) { switch (status) {
@ -119,7 +127,7 @@ public class JtaXAResource implements XAResource {
case TransactionalRegionInterface.COMMIT_OK_READ_ONLY: case TransactionalRegionInterface.COMMIT_OK_READ_ONLY:
return XAResource.XA_RDONLY; return XAResource.XA_RDONLY;
default: default:
throw new XAException(XAException.XA_RBPROTO); // FIXME correct code? throw new XAException(XAException.XA_RBPROTO);
} }
} }

View File

@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
* *
*/ */
public class TransactionManager { public class TransactionManager {
static {
TransactionalRPC.initialize();
}
static final Log LOG = LogFactory.getLog(TransactionManager.class); static final Log LOG = LogFactory.getLog(TransactionManager.class);
private final HConnection connection; private final HConnection connection;
@ -123,7 +127,6 @@ public class TransactionManager {
LOG.debug("Commit of transaction [" + transactionState.getTransactionId() LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ "] was unsucsessful", e); + "] was unsucsessful", e);
// This happens on a NSRE that is triggered by a split // This happens on a NSRE that is triggered by a split
// FIXME, but then abort fails
try { try {
abort(transactionState); abort(transactionState);
} catch (Exception abortException) { } catch (Exception abortException) {
@ -177,7 +180,6 @@ public class TransactionManager {
LOG.debug("Commit of transaction [" + transactionState.getTransactionId() LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ "] was unsucsessful", e); + "] was unsucsessful", e);
// This happens on a NSRE that is triggered by a split // This happens on a NSRE that is triggered by a split
// FIXME, but then abort fails
try { try {
abort(transactionState); abort(transactionState);
} catch (Exception abortException) { } catch (Exception abortException) {

View File

@ -42,9 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
*/ */
public class TransactionalTable extends HTable { public class TransactionalTable extends HTable {
private static final byte RPC_CODE = 100;
static { static {
HBaseRPC.addToMap(TransactionalRegionInterface.class, RPC_CODE); TransactionalRPC.initialize();
} }
/** /**

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**

View File

@ -31,7 +31,6 @@ import java.util.NavigableSet;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
import org.apache.hadoop.hbase.client.tableindexed.IndexedTableDescriptor; import org.apache.hadoop.hbase.client.tableindexed.IndexedTableDescriptor;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion; import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
@ -148,24 +146,6 @@ class IndexedRegion extends TransactionalRegion {
} }
} }
private void updateIndexes(Delete delete) {
// FIXME
// Handle delete batch updates. Go back and get the next older values
// for (BatchOperation op : batchUpdate) {
// if (!op.isPut()) {
// Cell current = oldColumnCells.get(op.getColumn());
// if (current != null) {
// // TODO: Fix this profligacy!!! St.Ack
// Cell [] older = Cell.createSingleCellArray(super.get(batchUpdate.getRow(),
// op.getColumn(), current.getTimestamp(), 1));
// if (older != null && older.length > 0) {
// newColumnValues.put(op.getColumn(), older[0].getValue());
// }
// }
// }
// }
}
/** Return the columns needed for the update. */ /** Return the columns needed for the update. */
private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) { private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);

View File

@ -70,7 +70,7 @@ class THLog extends HLog {
public void writeDeleteToLog(HRegionInfo regionInfo, final long transactionId, final Delete delete) public void writeDeleteToLog(HRegionInfo regionInfo, final long transactionId, final Delete delete)
throws IOException { throws IOException {
// FIXME this.append(regionInfo, delete, transactionId);
} }
public void writeCommitToLog(HRegionInfo regionInfo, final long transactionId) throws IOException { public void writeCommitToLog(HRegionInfo regionInfo, final long transactionId) throws IOException {
@ -112,7 +112,7 @@ class THLog extends HLog {
public void append(HRegionInfo regionInfo, Put update, long transactionId) public void append(HRegionInfo regionInfo, Put update, long transactionId)
throws IOException { throws IOException {
long commitTime = System.currentTimeMillis(); // FIXME ? long commitTime = System.currentTimeMillis();
THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
.getTableDesc().getName(), -1, commitTime, THLogKey.TrxOp.OP, .getTableDesc().getName(), -1, commitTime, THLogKey.TrxOp.OP,
@ -123,6 +123,30 @@ class THLog extends HLog {
} }
} }
/**
* Write a transactional delete to the log.
*
* @param regionInfo
* @param now
* @param update
* @param transactionId
* @throws IOException
*/
public void append(HRegionInfo regionInfo, Delete delete, long transactionId)
throws IOException {
long commitTime = System.currentTimeMillis();
THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
.getTableDesc().getName(), -1, commitTime, THLogKey.TrxOp.OP,
transactionId);
for (KeyValue value : convertToKeyValues(delete)) {
super.append(regionInfo, key, value);
}
}
private List<KeyValue> convertToKeyValues(Put update) { private List<KeyValue> convertToKeyValues(Put update) {
List<KeyValue> edits = new ArrayList<KeyValue>(); List<KeyValue> edits = new ArrayList<KeyValue>();
@ -133,4 +157,15 @@ class THLog extends HLog {
} }
return edits; return edits;
} }
private List<KeyValue> convertToKeyValues(Delete delete) {
List<KeyValue> edits = new ArrayList<KeyValue>();
for (List<KeyValue> kvs : delete.getFamilyMap().values()) {
for (KeyValue kv : kvs) {
edits.add(kv);
}
}
return edits;
}
} }

View File

@ -19,7 +19,9 @@
*/ */
package org.apache.hadoop.hbase.regionserver.transactional; package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
@ -30,6 +32,7 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
@ -37,10 +40,14 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
* Holds the state of a transaction. * Holds the state of a transaction. This includes a buffer of all writes, a
* record of all reads / scans, and information about which other transactions
* we need to check against.
*/ */
class TransactionState { class TransactionState {
@ -70,8 +77,8 @@ class TransactionState {
protected byte[] endRow; protected byte[] endRow;
public ScanRange(byte[] startRow, byte[] endRow) { public ScanRange(byte[] startRow, byte[] endRow) {
this.startRow = startRow; this.startRow = startRow == HConstants.EMPTY_START_ROW ? null : startRow;
this.endRow = endRow; this.endRow = endRow == HConstants.EMPTY_END_ROW ? null : endRow;
} }
/** /**
@ -104,8 +111,9 @@ class TransactionState {
private Status status; private Status status;
private SortedSet<byte[]> readSet = new TreeSet<byte[]>( private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
Bytes.BYTES_COMPARATOR); Bytes.BYTES_COMPARATOR);
private List<Put> writeSet = new LinkedList<Put>(); private List<Put> puts = new LinkedList<Put>();
private List<ScanRange> scanSet = new LinkedList<ScanRange>(); private List<ScanRange> scans = new LinkedList<ScanRange>();
private List<Delete> deletes = new LinkedList<Delete>();
private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>(); private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
private int startSequenceNumber; private int startSequenceNumber;
private Integer sequenceNumber; private Integer sequenceNumber;
@ -128,15 +136,19 @@ class TransactionState {
} }
void addWrite(final Put write) { void addWrite(final Put write) {
writeSet.add(write); puts.add(write);
} }
List<Put> getWriteSet() { boolean hasWrite() {
return writeSet; return puts.size() > 0 || deletes.size() > 0;
}
List<Put> getPuts() {
return puts;
} }
void addDelete(final Delete delete) { void addDelete(final Delete delete) {
throw new UnsupportedOperationException("NYI"); deletes.add(delete);
} }
/** /**
@ -149,9 +161,11 @@ class TransactionState {
*/ */
Result localGet(Get get) { Result localGet(Get get) {
// TODO take deletes into account as well
List<KeyValue> localKVs = new LinkedList<KeyValue>(); List<KeyValue> localKVs = new LinkedList<KeyValue>();
for (Put put : writeSet) { for (Put put : puts) {
if (!Bytes.equals(get.getRow(), put.getRow())) { if (!Bytes.equals(get.getRow(), put.getRow())) {
continue; continue;
} }
@ -203,7 +217,7 @@ class TransactionState {
return false; // Cannot conflict with aborted transactions return false; // Cannot conflict with aborted transactions
} }
for (Put otherUpdate : checkAgainst.getWriteSet()) { for (Put otherUpdate : checkAgainst.getPuts()) {
if (this.getReadSet().contains(otherUpdate.getRow())) { if (this.getReadSet().contains(otherUpdate.getRow())) {
LOG.debug("Transaction [" + this.toString() LOG.debug("Transaction [" + this.toString()
+ "] has read which conflicts with [" + checkAgainst.toString() + "] has read which conflicts with [" + checkAgainst.toString()
@ -211,7 +225,7 @@ class TransactionState {
+ Bytes.toString(otherUpdate.getRow()) + "]"); + Bytes.toString(otherUpdate.getRow()) + "]");
return true; return true;
} }
for (ScanRange scanRange : this.scanSet) { for (ScanRange scanRange : this.scans) {
if (scanRange.contains(otherUpdate.getRow())) { if (scanRange.contains(otherUpdate.getRow())) {
LOG.debug("Transaction [" + this.toString() LOG.debug("Transaction [" + this.toString()
+ "] has scan which conflicts with [" + checkAgainst.toString() + "] has scan which conflicts with [" + checkAgainst.toString()
@ -289,9 +303,9 @@ class TransactionState {
result.append(" read Size: "); result.append(" read Size: ");
result.append(readSet.size()); result.append(readSet.size());
result.append(" scan Size: "); result.append(" scan Size: ");
result.append(scanSet.size()); result.append(scans.size());
result.append(" write Size: "); result.append(" write Size: ");
result.append(writeSet.size()); result.append(puts.size());
result.append(" startSQ: "); result.append(" startSQ: ");
result.append(startSequenceNumber); result.append(startSequenceNumber);
if (sequenceNumber != null) { if (sequenceNumber != null) {
@ -328,7 +342,7 @@ class TransactionState {
scanRange.startRow == null ? "null" : Bytes scanRange.startRow == null ? "null" : Bytes
.toString(scanRange.startRow), scanRange.endRow == null ? "null" .toString(scanRange.startRow), scanRange.endRow == null ? "null"
: Bytes.toString(scanRange.endRow))); : Bytes.toString(scanRange.endRow)));
scanSet.add(scanRange); scans.add(scanRange);
} }
int getCommitPendingWaits() { int getCommitPendingWaits() {
@ -338,4 +352,106 @@ class TransactionState {
void incrementCommitPendingWaits() { void incrementCommitPendingWaits() {
this.commitPendingWaits++; this.commitPendingWaits++;
} }
/** Get deleteSet.
* @return deleteSet
*/
List<Delete> getDeleteSet() {
return deletes;
}
/** Set deleteSet.
* @param deleteSet the deleteSet to set
*/
void setDeleteSet(List<Delete> deleteSet) {
this.deletes = deleteSet;
}
/** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
*
* @return scanner
*/
KeyValueScanner getScanner() {
return new PutScanner();
}
/** Scanner of the puts that occur during this transaction.
*
* @author clint.morgan
*
*/
private class PutScanner implements KeyValueScanner, InternalScanner {
private NavigableSet<KeyValue> kvSet;
private Iterator<KeyValue> iterator;
private boolean didHasNext = false;
private KeyValue next = null;
PutScanner() {
kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
for (Put put : puts) {
for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
kvSet.addAll(putKVs);
}
}
iterator = kvSet.iterator();
}
public void close() {
// Nothing to close
}
public KeyValue next() {
getNext();
didHasNext = false;
return next;
}
public KeyValue peek() {
getNext();
return next;
}
public boolean seek(KeyValue key) {
iterator = kvSet.headSet(key).iterator();
getNext();
return next != null;
}
private KeyValue getNext() {
if (didHasNext) {
return next;
}
didHasNext = true;
if (iterator.hasNext()) {
next = iterator.next(); }
else {
next= null;
}
return next;
}
public boolean next(List<KeyValue> results) throws IOException {
KeyValue peek = this.peek();
if (peek == null) {
return false;
}
byte [] row = peek.getRow();
results.add(peek);
while (true){
if (this.peek() == null) {
break;
}
if (!Bytes.equals(row, this.peek().getRow())) {
break;
}
results.add(this.next());
}
return true;
}
}
} }

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -201,13 +202,12 @@ public class TransactionalRegion extends HRegion {
TransactionState state = new TransactionState(transactionId, super.getLog() TransactionState state = new TransactionState(transactionId, super.getLog()
.getSequenceNumber(), super.getRegionInfo()); .getSequenceNumber(), super.getRegionInfo());
// Order is important here ... state.setStartSequenceNumber(nextSequenceId.get());
List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>( List<TransactionState> commitPendingCopy = new ArrayList<TransactionState>(
commitPendingTransactions); commitPendingTransactions);
for (TransactionState commitPending : commitPendingCopy) { for (TransactionState commitPending : commitPendingCopy) {
state.addTransactionToCheck(commitPending); state.addTransactionToCheck(commitPending);
} }
state.setStartSequenceNumber(nextSequenceId.get());
synchronized (transactionsById) { synchronized (transactionsById) {
transactionsById.put(key, state); transactionsById.put(key, state);
@ -271,7 +271,9 @@ public class TransactionalRegion extends HRegion {
TransactionState state = getTransactionState(transactionId); TransactionState state = getTransactionState(transactionId);
state.addScan(scan); state.addScan(scan);
return new ScannerWrapper(transactionId, super.getScanner(scan)); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(1);
scanners.add(state.getScanner());
return super.getScanner(scan, scanners);
} }
/** /**
@ -310,7 +312,6 @@ public class TransactionalRegion extends HRegion {
/** /**
* Add a delete to the transaction. Does not get applied until commit process. * Add a delete to the transaction. Does not get applied until commit process.
* FIXME, not sure about this approach
* *
* @param transactionId * @param transactionId
* @param delete * @param delete
@ -350,7 +351,7 @@ public class TransactionalRegion extends HRegion {
+ ". Voting for commit"); + ". Voting for commit");
// If there are writes we must keep record off the transaction // If there are writes we must keep record off the transaction
if (state.getWriteSet().size() > 0) { if (state.hasWrite()) {
// Order is important // Order is important
state.setStatus(Status.COMMIT_PENDING); state.setStatus(Status.COMMIT_PENDING);
commitPendingTransactions.add(state); commitPendingTransactions.add(state);
@ -403,20 +404,19 @@ public class TransactionalRegion extends HRegion {
* @throws IOException * @throws IOException
*/ */
public void commit(final long transactionId) throws IOException { public void commit(final long transactionId) throws IOException {
// Not checking closing...
TransactionState state; TransactionState state;
try { try {
state = getTransactionState(transactionId); state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) { } catch (UnknownTransactionException e) {
LOG.fatal("Asked to commit unknown transaction: " + transactionId LOG.fatal("Asked to commit unknown transaction: " + transactionId
+ " in region " + super.getRegionInfo().getRegionNameAsString()); + " in region " + super.getRegionInfo().getRegionNameAsString());
// FIXME Write to the transaction log that this transaction was corrupted // TODO. Anything to handle here?
throw e; throw e;
} }
if (!state.getStatus().equals(Status.COMMIT_PENDING)) { if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
LOG.fatal("Asked to commit a non pending transaction"); LOG.fatal("Asked to commit a non pending transaction");
// FIXME Write to the transaction log that this transaction was corrupted // TODO. Anything to handle here?
throw new IOException("commit failure"); throw new IOException("commit failure");
} }
@ -461,17 +461,21 @@ public class TransactionalRegion extends HRegion {
this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId()); this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
for (Put update : state.getWriteSet()) { for (Put update : state.getPuts()) {
this.put(update, false); // Don't need to WAL these this.put(update, false); // Don't need to WAL these
// FIME, maybe should be walled so we don't need to look so far back. }
for (Delete delete : state.getDeleteSet()) {
this.delete(delete, null, false);
} }
state.setStatus(Status.COMMITED); state.setStatus(Status.COMMITED);
if (state.getWriteSet().size() > 0 if (state.hasWrite()
&& !commitPendingTransactions.remove(state)) { && !commitPendingTransactions.remove(state)) {
LOG LOG
.fatal("Commiting a non-query transaction that is not in commitPendingTransactions"); .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
throw new IOException("commit failure"); // FIXME, how to handle? // Something has gone really wrong.
throw new IOException("commit failure");
} }
retireTransaction(state); retireTransaction(state);
} }
@ -480,11 +484,11 @@ public class TransactionalRegion extends HRegion {
public List<StoreFile> close(boolean abort) throws IOException { public List<StoreFile> close(boolean abort) throws IOException {
prepareToClose(); prepareToClose();
if (!commitPendingTransactions.isEmpty()) { if (!commitPendingTransactions.isEmpty()) {
// FIXME, better way to handle?
LOG.warn("Closing transactional region [" LOG.warn("Closing transactional region ["
+ getRegionInfo().getRegionNameAsString() + "], but still have [" + getRegionInfo().getRegionNameAsString() + "], but still have ["
+ commitPendingTransactions.size() + commitPendingTransactions.size()
+ "] transactions that are pending commit"); + "] transactions that are pending commit.");
// TODO resolve from the Global Trx Log.
} }
return super.close(abort); return super.close(abort);
} }
@ -495,7 +499,7 @@ public class TransactionalRegion extends HRegion {
} }
boolean closing = false; boolean closing = false;
private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
/** /**
* Get ready to close. * Get ready to close.
* *
@ -511,10 +515,10 @@ public class TransactionalRegion extends HRegion {
+ commitPendingTransactions.size() + commitPendingTransactions.size()
+ "] transactions that are pending commit. Sleeping"); + "] transactions that are pending commit. Sleeping");
for (TransactionState s : commitPendingTransactions) { for (TransactionState s : commitPendingTransactions) {
LOG.info(s.toString()); LOG.info("commit pending: " + s.toString());
} }
try { try {
Thread.sleep(200); Thread.sleep(CLOSE_WAIT_ON_COMMIT_PENDING);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -700,33 +704,4 @@ public class TransactionalRegion extends HRegion {
} }
} }
} }
/** Wrapper which keeps track of rows returned by scanner. */
private class ScannerWrapper implements InternalScanner {
private long transactionId;
private InternalScanner scanner;
/**
* @param transactionId
* @param scanner
* @throws UnknownTransactionException
*/
public ScannerWrapper(final long transactionId,
final InternalScanner scanner) throws UnknownTransactionException {
this.transactionId = transactionId;
this.scanner = scanner;
}
public void close() throws IOException {
scanner.close();
}
public boolean next(List<KeyValue> results) throws IOException {
boolean result = scanner.next(results);
TransactionState state = getTransactionState(transactionId);
// FIXME need to weave in new stuff from this transaction too.
return result;
}
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.transactional.TransactionalRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HLog;
@ -53,6 +54,11 @@ import org.apache.hadoop.util.Progressable;
*/ */
public class TransactionalRegionServer extends HRegionServer implements public class TransactionalRegionServer extends HRegionServer implements
TransactionalRegionInterface { TransactionalRegionInterface {
static {
TransactionalRPC.initialize();
}
private static final String LEASE_TIME = "hbase.transaction.leasetime"; private static final String LEASE_TIME = "hbase.transaction.leasetime";
private static final int DEFAULT_LEASE_TIME = 60 * 1000; private static final int DEFAULT_LEASE_TIME = 60 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000; private static final int LEASE_CHECK_FREQUENCY = 1000;

View File

@ -73,8 +73,7 @@ public class TestIndexedTable extends HBaseClusterTestCase {
IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc); IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc);
// Create a new index that does lexicographic ordering on COL_A // Create a new index that does lexicographic ordering on COL_A
IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A, IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A, COL_A);
COL_A);
indexDesc.addIndex(colAIndex); indexDesc.addIndex(colAIndex);
admin = new IndexedTableAdmin(conf); admin = new IndexedTableAdmin(conf);
@ -99,9 +98,10 @@ public class TestIndexedTable extends HBaseClusterTestCase {
assertRowsInOrder(NUM_ROWS); assertRowsInOrder(NUM_ROWS);
} }
private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException { private void assertRowsInOrder(int numRowsExpected)
ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A, throws IndexNotFoundException, IOException {
HConstants.EMPTY_START_ROW, null, null, null); ResultScanner scanner = table.getIndexedScanner(INDEX_COL_A, null, null,
null, null, null);
int numRows = 0; int numRows = 0;
byte[] lastColA = null; byte[] lastColA = null;
for (Result rowResult : scanner) { for (Result rowResult : scanner) {

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException; import java.io.IOException;
import junit.framework.Assert;
import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -29,6 +31,8 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -95,7 +99,7 @@ public class TestTransactions extends HBaseClusterTestCase {
transactionManager.tryCommit(transactionState2); transactionManager.tryCommit(transactionState2);
} }
public void TestTwoTransactionsWithConflict() throws IOException, public void testTwoTransactionsWithConflict() throws IOException,
CommitUnsuccessfulException { CommitUnsuccessfulException {
TransactionState transactionState1 = makeTransaction1(); TransactionState transactionState1 = makeTransaction1();
TransactionState transactionState2 = makeTransaction2(); TransactionState transactionState2 = makeTransaction2();
@ -110,14 +114,73 @@ public class TestTransactions extends HBaseClusterTestCase {
} }
} }
public void testGetAfterPut() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
int originalValue = Bytes.toInt(table.get(transactionState,
new Get(ROW1).addColumn(COL_A)).value());
int newValue = originalValue + 1;
table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
.toBytes(newValue)));
Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
}
public void testScanAfterUpdatePut() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
int originalValue = Bytes.toInt(table.get(transactionState,
new Get(ROW1).addColumn(COL_A)).value());
int newValue = originalValue + 1;
table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
.toBytes(newValue)));
ResultScanner scanner = table.getScanner(transactionState, new Scan()
.addFamily(FAMILY));
Result result = scanner.next();
Assert.assertNotNull(result);
Assert.assertEquals(Bytes.toString(ROW1), Bytes.toString(result.getRow()));
Assert.assertEquals(newValue, Bytes.toInt(result.value()));
result = scanner.next();
Assert.assertNull(result);
}
public void testScanAfterNewPut() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
int row2Value = 199;
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
.toBytes(row2Value)));
ResultScanner scanner = table.getScanner(transactionState, new Scan()
.addFamily(FAMILY));
Result result = scanner.next();
Assert.assertNotNull(result);
Assert.assertEquals(Bytes.toString(ROW1), Bytes.toString(result.getRow()));
result = scanner.next();
Assert.assertNotNull(result);
Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
}
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
private TransactionState makeTransaction1() throws IOException { private TransactionState makeTransaction1() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction(); TransactionState transactionState = transactionManager.beginTransaction();
Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A)); Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
table.put(new Put(ROW2).add(FAMILY, QUAL_A, row1_A.getValue(COL_A))); table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, row1_A
table.put(new Put(ROW3).add(FAMILY, QUAL_A, row1_A.getValue(COL_A))); .getValue(COL_A)));
table.put(transactionState, new Put(ROW3).add(FAMILY, QUAL_A, row1_A
.getValue(COL_A)));
return transactionState; return transactionState;
} }
@ -130,7 +193,8 @@ public class TestTransactions extends HBaseClusterTestCase {
int value = Bytes.toInt(row1_A.getValue(COL_A)); int value = Bytes.toInt(row1_A.getValue(COL_A));
table.put(new Put(ROW1).add(FAMILY, QUAL_A, Bytes.toBytes(value + 1))); table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
.toBytes(value + 1)));
return transactionState; return transactionState;
} }

View File

@ -620,6 +620,9 @@ public class KeyValue implements Writable, HeapSize {
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
public String toString() { public String toString() {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
}
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
"/vlen=" + getValueLength(); "/vlen=" + getValueLength();
} }

View File

@ -1063,6 +1063,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*/ */
public InternalScanner getScanner(Scan scan) public InternalScanner getScanner(Scan scan)
throws IOException { throws IOException {
return getScanner(scan, null);
}
protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
newScannerLock.readLock().lock(); newScannerLock.readLock().lock();
try { try {
if (this.closed.get()) { if (this.closed.get()) {
@ -1078,7 +1082,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
scan.addFamily(family); scan.addFamily(family);
} }
} }
return new RegionScanner(scan); return new RegionScanner(scan, additionalScanners);
} finally { } finally {
newScannerLock.readLock().unlock(); newScannerLock.readLock().unlock();
@ -1678,7 +1682,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private final KeyValueHeap storeHeap; private final KeyValueHeap storeHeap;
private final byte [] stopRow; private final byte [] stopRow;
RegionScanner(Scan scan) { RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null; this.stopRow = null;
} else { } else {
@ -1686,6 +1690,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} }
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) { scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey()); Store store = stores.get(entry.getKey());
@ -1695,6 +1702,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
} }
RegionScanner(Scan scan) {
this(scan, null);
}
/** /**
* Get the next row of results from this region. * Get the next row of results from this region.
* @param results list to append results to * @param results list to append results to