HBASE-1233 Transactional fixes: Overly conservative scan read-set, potential CME

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@749716 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-03-03 19:51:58 +00:00
parent 08693f66a8
commit 1e3b2548b2
5 changed files with 157 additions and 62 deletions

View File

@ -29,6 +29,8 @@ Release 0.20.0 - Unreleased
HBASE-1217 add new compression and hfile blocksize to HColumnDescriptor
HBASE-859 HStoreKey needs a reworking
HBASE-1211 NPE in retries exhausted exception
HBASE-1233 Transactional fixes: Overly conservative scan read-set,
potential CME (Clint Morgan via Stack)
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -42,18 +42,20 @@ TransactionalRegionServer. This is done by setting
<i>hbase.regionserver.impl </i> to
<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
<p>
The read set claimed by a transactional scanner is determined from the start and
end keys which the scanner is opened with.
<h3> Known Issues </h3>
Recovery in the face of hregion server failure
is not fully implemented. Thus, you cannot rely on the transactional
properties in the face of node failure.
<p> In order to avoid phantom reads on scanners, scanners currently
claim a <i>write set</i> for all rows in every regions which they scan
through. This means that if transaction A writes to a region that
transaction B is scanning, then there is a conflict (only one
transacton can be committed). This will occur even if the scanner
never went over the row that was written.
</body>
</html>

View File

@ -88,6 +88,14 @@ public class RowFilterSet implements RowFilterInterface {
return operator;
}
/** Get the filters.
*
* @return filters
*/
public Set<RowFilterInterface> getFilters() {
return filters;
}
/** Add a filter.
*
* @param filter

View File

@ -31,6 +31,11 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.RowFilterSet;
import org.apache.hadoop.hbase.filter.StopRowFilter;
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
@ -58,40 +63,69 @@ class TransactionState {
ABORTED
}
/**
* Simple container of the range of the scanners we've opened. Used to check
* for conflicting writes.
*/
private class ScanRange {
private byte[] startRow;
private byte[] endRow;
public ScanRange(byte[] startRow, byte[] endRow) {
this.startRow = startRow;
this.endRow = endRow;
}
/**
* Check if this scan range contains the given key.
*
* @param rowKey
* @return
*/
public boolean contains(byte[] rowKey) {
if (startRow != null && Bytes.compareTo(rowKey, startRow) < 0) {
return false;
}
if (endRow != null && Bytes.compareTo(endRow, rowKey) < 0) {
return false;
}
return true;
}
}
private final HRegionInfo regionInfo;
private final long hLogStartSequenceId;
private final long transactionId;
private Status status;
private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
Bytes.BYTES_COMPARATOR);
private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
private List<ScanRange> scanSet = new LinkedList<ScanRange>();
private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
private int startSequenceNumber;
private Integer sequenceNumber;
boolean hasScan = false;
//TODO: Why don't these methods and the class itself use default access?
// They are only referenced from within this package.
public TransactionState(final long transactionId,
final long rLogStartSequenceId) {
TransactionState(final long transactionId, final long rLogStartSequenceId,
HRegionInfo regionInfo) {
this.transactionId = transactionId;
this.hLogStartSequenceId = rLogStartSequenceId;
this.regionInfo = regionInfo;
this.status = Status.PENDING;
}
public void addRead(final byte[] rowKey) {
void addRead(final byte[] rowKey) {
readSet.add(rowKey);
}
public Set<byte[]> getReadSet() {
Set<byte[]> getReadSet() {
return readSet;
}
public void addWrite(final BatchUpdate write) {
void addWrite(final BatchUpdate write) {
writeSet.add(write);
}
public List<BatchUpdate> getWriteSet() {
List<BatchUpdate> getWriteSet() {
return writeSet;
}
@ -103,8 +137,8 @@ class TransactionState {
* @param timestamp
* @return
*/
public Map<byte[], Cell> localGetFull(final byte[] row,
final Set<byte[]> columns, final long timestamp) {
Map<byte[], Cell> localGetFull(final byte[] row, final Set<byte[]> columns,
final long timestamp) {
Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
for (BatchUpdate b : writeSet) {
@ -133,8 +167,7 @@ class TransactionState {
* @param timestamp
* @return
*/
public Cell[] localGet(final byte[] row, final byte[] column,
final long timestamp) {
Cell[] localGet(final byte[] row, final byte[] column, final long timestamp) {
ArrayList<Cell> results = new ArrayList<Cell>();
// Go in reverse order to put newest updates first in list
@ -158,11 +191,11 @@ class TransactionState {
.toArray(new Cell[results.size()]);
}
public void addTransactionToCheck(final TransactionState transaction) {
void addTransactionToCheck(final TransactionState transaction) {
transactionsToCheck.add(transaction);
}
public boolean hasConflict() {
boolean hasConflict() {
for (TransactionState transactionState : transactionsToCheck) {
if (hasConflict(transactionState)) {
return true;
@ -177,17 +210,21 @@ class TransactionState {
}
for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
if (this.hasScan) {
LOG.info("Transaction" + this.toString()
+ " has a scan read. Meanwile a write occured. "
+ "Conservitivly reporting conflict");
if (this.getReadSet().contains(otherUpdate.getRow())) {
LOG.debug("Transaction [" + this.toString()
+ "] has read which conflicts with [" + checkAgainst.toString()
+ "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+ Bytes.toString(otherUpdate.getRow()) + "]");
return true;
}
if (this.getReadSet().contains(otherUpdate.getRow())) {
LOG.trace("Transaction " + this.toString() + " conflicts with "
+ checkAgainst.toString());
return true;
for (ScanRange scanRange : this.scanSet) {
if (scanRange.contains(otherUpdate.getRow())) {
LOG.debug("Transaction [" + this.toString()
+ "] has scan which conflicts with [" + checkAgainst.toString()
+ "]: region [" + regionInfo.getRegionNameAsString() + "], row["
+ Bytes.toString(otherUpdate.getRow()) + "]");
return true;
}
}
}
return false;
@ -198,7 +235,7 @@ class TransactionState {
*
* @return Return the status.
*/
public Status getStatus() {
Status getStatus() {
return status;
}
@ -207,7 +244,7 @@ class TransactionState {
*
* @param status The status to set.
*/
public void setStatus(final Status status) {
void setStatus(final Status status) {
this.status = status;
}
@ -216,7 +253,7 @@ class TransactionState {
*
* @return Return the startSequenceNumber.
*/
public int getStartSequenceNumber() {
int getStartSequenceNumber() {
return startSequenceNumber;
}
@ -225,7 +262,7 @@ class TransactionState {
*
* @param startSequenceNumber.
*/
public void setStartSequenceNumber(final int startSequenceNumber) {
void setStartSequenceNumber(final int startSequenceNumber) {
this.startSequenceNumber = startSequenceNumber;
}
@ -234,7 +271,7 @@ class TransactionState {
*
* @return Return the sequenceNumber.
*/
public Integer getSequenceNumber() {
Integer getSequenceNumber() {
return sequenceNumber;
}
@ -243,7 +280,7 @@ class TransactionState {
*
* @param sequenceNumber The sequenceNumber to set.
*/
public void setSequenceNumber(final Integer sequenceNumber) {
void setSequenceNumber(final Integer sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
@ -256,6 +293,8 @@ class TransactionState {
result.append(status.name());
result.append(" read Size: ");
result.append(readSet.size());
result.append(" scan Size: ");
result.append(scanSet.size());
result.append(" write Size: ");
result.append(writeSet.size());
result.append(" startSQ: ");
@ -274,7 +313,7 @@ class TransactionState {
*
* @return Return the transactionId.
*/
public long getTransactionId() {
long getTransactionId() {
return transactionId;
}
@ -283,17 +322,41 @@ class TransactionState {
*
* @return Return the startSequenceId.
*/
public long getHLogStartSequenceId() {
long getHLogStartSequenceId() {
return hLogStartSequenceId;
}
/**
* Set the hasScan.
*
* @param hasScan The hasScan to set.
*/
public void setHasScan(final boolean hasScan) {
this.hasScan = hasScan;
void addScan(byte[] firstRow, RowFilterInterface filter) {
ScanRange scanRange = new ScanRange(firstRow, getEndRow(filter));
LOG.trace(String.format(
"Adding scan for transcaction [%s], from [%s] to [%s]", transactionId,
scanRange.startRow == null ? "null" : Bytes
.toString(scanRange.startRow), scanRange.endRow == null ? "null"
: Bytes.toString(scanRange.endRow)));
scanSet.add(scanRange);
}
private byte[] getEndRow(RowFilterInterface filter) {
if (filter instanceof WhileMatchRowFilter) {
WhileMatchRowFilter wmrFilter = (WhileMatchRowFilter) filter;
if (wmrFilter.getInternalFilter() instanceof StopRowFilter) {
StopRowFilter stopFilter = (StopRowFilter) wmrFilter
.getInternalFilter();
return stopFilter.getStopRowKey();
}
} else if (filter instanceof RowFilterSet) {
RowFilterSet rowFilterSet = (RowFilterSet) filter;
if (rowFilterSet.getOperator()
.equals(RowFilterSet.Operator.MUST_PASS_ALL)) {
for (RowFilterInterface subFilter : rowFilterSet.getFilters()) {
byte[] endRow = getEndRow(subFilter);
if (endRow != null) {
return endRow;
}
}
}
}
return null;
}
}

View File

@ -180,14 +180,16 @@ public class TransactionalRegion extends HRegion {
alias.setStatus(Status.ABORTED);
retireTransaction(alias);
}
LOG.error("Existing trasaction with id ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");
throw new IOException("Already exiting transaction id: " + key);
}
TransactionState state = new TransactionState(transactionId, super.getLog()
.getSequenceNumber());
.getSequenceNumber(), super.getRegionInfo());
// Order is important here
for (TransactionState commitPending : commitPendingTransactions) {
// Order is important here ...
List<TransactionState> commitPendingCopy = new LinkedList<TransactionState>(commitPendingTransactions);
for (TransactionState commitPending : commitPendingCopy) {
state.addTransactionToCheck(commitPending);
}
state.setStartSequenceNumber(nextSequenceId.get());
@ -196,6 +198,7 @@ public class TransactionalRegion extends HRegion {
try {
transactionLeases.createLease(key, new TransactionLeaseListener(key));
} catch (LeaseStillHeldException e) {
LOG.error("Lease still held for ["+key+"] in region ["+super.getRegionInfo().getRegionNameAsString()+"]");
throw new RuntimeException(e);
}
LOG.debug("Begining transaction " + key + " in region "
@ -337,6 +340,8 @@ public class TransactionalRegion extends HRegion {
public InternalScanner getScanner(final long transactionId,
final byte[][] cols, final byte[] firstRow, final long timestamp,
final RowFilterInterface filter) throws IOException {
TransactionState state = getTransactionState(transactionId);
state.addScan(firstRow, filter);
return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
timestamp, filter));
}
@ -578,14 +583,31 @@ public class TransactionalRegion extends HRegion {
numRemoved++;
}
if (numRemoved > 0) {
LOG.debug("Removed " + numRemoved
+ " commited transactions with sequence lower than "
+ minStartSeqNumber + ". Still have "
+ commitedTransactionsBySequenceNumber.size() + " left");
} else if (commitedTransactionsBySequenceNumber.size() > 0) {
LOG.debug("Could not remove any transactions, and still have "
+ commitedTransactionsBySequenceNumber.size() + " left");
if (LOG.isDebugEnabled()) {
StringBuilder debugMessage = new StringBuilder();
if (numRemoved > 0) {
debugMessage.append("Removed ").append(numRemoved).append(
" commited transactions");
if (minStartSeqNumber == Integer.MAX_VALUE) {
debugMessage.append("with any sequence number");
} else {
debugMessage.append("with sequence lower than ").append(
minStartSeqNumber).append(".");
}
if (!commitedTransactionsBySequenceNumber.isEmpty()) {
debugMessage.append(" Still have ").append(
commitedTransactionsBySequenceNumber.size()).append(" left.");
} else {
debugMessage.append("None left.");
}
LOG.debug(debugMessage.toString());
} else if (commitedTransactionsBySequenceNumber.size() > 0) {
debugMessage.append(
"Could not remove any transactions, and still have ").append(
commitedTransactionsBySequenceNumber.size()).append(" left");
LOG.debug(debugMessage.toString());
}
}
}
@ -647,9 +669,11 @@ public class TransactionalRegion extends HRegion {
/**
* @param transactionId
* @param scanner
* @throws UnknownTransactionException
*/
public ScannerWrapper(final long transactionId,
final InternalScanner scanner) {
final InternalScanner scanner) throws UnknownTransactionException {
this.transactionId = transactionId;
this.scanner = scanner;
}
@ -670,10 +694,6 @@ public class TransactionalRegion extends HRegion {
final SortedMap<byte[], Cell> results) throws IOException {
boolean result = scanner.next(key, results);
TransactionState state = getTransactionState(transactionId);
state.setHasScan(true);
// FIXME, not using row, just claiming read over the whole region. We are
// being very conservative on scans to avoid phantom reads.
state.addRead(key.getRow());
if (result) {
Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,