HBASE-9645 Regionserver halt because of HLogs "Logic Error Snapshot seq id from earlier flush still present"

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1535288 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-10-24 06:49:47 +00:00
parent bccd10de11
commit 5102aec6c0
1 changed files with 47 additions and 30 deletions

View File

@ -872,7 +872,7 @@ public class HRegion implements HeapSize { // , Writable{
public MultiVersionConsistencyControl getMVCC() { public MultiVersionConsistencyControl getMVCC() {
return mvcc; return mvcc;
} }
/* /*
* Returns readpoint considering given IsolationLevel * Returns readpoint considering given IsolationLevel
*/ */
@ -1614,7 +1614,7 @@ public class HRegion implements HeapSize { // , Writable{
// Record latest flush time // Record latest flush time
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Update the last flushed sequence id for region // Update the last flushed sequence id for region
if (this.rsServices != null) { if (this.rsServices != null) {
completeSequenceId = flushSeqId; completeSequenceId = flushSeqId;
@ -2077,7 +2077,7 @@ public class HRegion implements HeapSize { // , Writable{
lastIndexExclusive++; lastIndexExclusive++;
continue; continue;
} }
// If we haven't got any rows in our batch, we should block to // If we haven't got any rows in our batch, we should block to
// get the next one. // get the next one.
boolean shouldBlock = numReadyToWrite == 0; boolean shouldBlock = numReadyToWrite == 0;
@ -2158,8 +2158,8 @@ public class HRegion implements HeapSize { // , Writable{
// calling the pre CP hook for batch mutation // calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) { if (!isInReplay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp = MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.operations, new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
} }
@ -2231,7 +2231,7 @@ public class HRegion implements HeapSize { // , Writable{
locked = false; locked = false;
} }
releaseRowLocks(acquiredRowLocks); releaseRowLocks(acquiredRowLocks);
// ------------------------- // -------------------------
// STEP 7. Sync wal. // STEP 7. Sync wal.
// ------------------------- // -------------------------
@ -2241,8 +2241,8 @@ public class HRegion implements HeapSize { // , Writable{
walSyncSuccessful = true; walSyncSuccessful = true;
// calling the post CP hook for batch mutation // calling the post CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) { if (!isInReplay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp = MiniBatchOperationInProgress<Mutation> miniBatchOp =
new MiniBatchOperationInProgress<Mutation>(batchOp.operations, new MiniBatchOperationInProgress<Mutation>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp); coprocessorHost.postBatchMutate(miniBatchOp);
} }
@ -3172,7 +3172,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} }
// allocate new lock for this thread // allocate new lock for this thread
return rowLockContext.newLock(); return rowLockContext.newLock();
} finally { } finally {
@ -4641,7 +4641,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = stores.get(family.getKey()); Store store = stores.get(family.getKey());
List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
Collections.sort(family.getValue(), store.getComparator()); Collections.sort(family.getValue(), store.getComparator());
// Get previous values for all columns in this family // Get previous values for all columns in this family
Get get = new Get(row); Get get = new Get(row);
@ -4650,10 +4650,10 @@ public class HRegion implements HeapSize { // , Writable{
get.addColumn(family.getKey(), kv.getQualifier()); get.addColumn(family.getKey(), kv.getQualifier());
} }
List<Cell> results = get(get, false); List<Cell> results = get(get, false);
// Iterate the input columns and update existing values if they were // Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value // found, otherwise add new column initialized to the append value
// Avoid as much copying as possible. Every byte is copied at most // Avoid as much copying as possible. Every byte is copied at most
// once. // once.
// Would be nice if KeyValue had scatter/gather logic // Would be nice if KeyValue had scatter/gather logic
@ -4696,10 +4696,10 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
newKV.getBuffer(), newKV.getQualifierOffset(), newKV.getBuffer(), newKV.getQualifierOffset(),
kv.getQualifierLength()); kv.getQualifierLength());
newKV.setMvccVersion(w.getWriteNumber()); newKV.setMvccVersion(w.getWriteNumber());
kvs.add(newKV); kvs.add(newKV);
// Append update to WAL // Append update to WAL
if (writeToWAL) { if (writeToWAL) {
if (walEdits == null) { if (walEdits == null) {
@ -4708,11 +4708,11 @@ public class HRegion implements HeapSize { // , Writable{
walEdits.add(newKV); walEdits.add(newKV);
} }
} }
//store the kvs to the temporary memstore before writing HLog //store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs); tempMemstore.put(store, kvs);
} }
// Actually write to WAL now // Actually write to WAL now
if (writeToWAL) { if (writeToWAL) {
// Using default cluster id, as this can only happen in the orginating // Using default cluster id, as this can only happen in the orginating
@ -4724,7 +4724,7 @@ public class HRegion implements HeapSize { // , Writable{
} else { } else {
recordMutationWithoutWal(append.getFamilyCellMap()); recordMutationWithoutWal(append.getFamilyCellMap());
} }
//Actually write to Memstore now //Actually write to Memstore now
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey(); Store store = entry.getKey();
@ -4816,7 +4816,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = stores.get(family.getKey()); Store store = stores.get(family.getKey());
List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
// Get previous values for all columns in this family // Get previous values for all columns in this family
Get get = new Get(row); Get get = new Get(row);
for (Cell cell: family.getValue()) { for (Cell cell: family.getValue()) {
@ -4825,7 +4825,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
get.setTimeRange(tr.getMin(), tr.getMax()); get.setTimeRange(tr.getMin(), tr.getMax());
List<Cell> results = get(get, false); List<Cell> results = get(get, false);
// Iterate the input columns and update existing values if they were // Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount // found, otherwise add new column initialized to the increment amount
int idx = 0; int idx = 0;
@ -4842,13 +4842,13 @@ public class HRegion implements HeapSize { // , Writable{
} }
idx++; idx++;
} }
// Append new incremented KeyValue to list // Append new incremented KeyValue to list
KeyValue newKV = KeyValue newKV =
new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount)); new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount));
newKV.setMvccVersion(w.getWriteNumber()); newKV.setMvccVersion(w.getWriteNumber());
kvs.add(newKV); kvs.add(newKV);
// Prepare WAL updates // Prepare WAL updates
if (writeToWAL) { if (writeToWAL) {
if (walEdits == null) { if (walEdits == null) {
@ -4857,11 +4857,11 @@ public class HRegion implements HeapSize { // , Writable{
walEdits.add(newKV); walEdits.add(newKV);
} }
} }
//store the kvs to the temporary memstore before writing HLog //store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs); tempMemstore.put(store, kvs);
} }
// Actually write to WAL now // Actually write to WAL now
if (writeToWAL) { if (writeToWAL) {
// Using default cluster id, as this can only happen in the orginating // Using default cluster id, as this can only happen in the orginating
@ -5546,7 +5546,7 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
void failedBulkLoad(byte[] family, String srcPath) throws IOException; void failedBulkLoad(byte[] family, String srcPath) throws IOException;
} }
@VisibleForTesting class RowLockContext { @VisibleForTesting class RowLockContext {
private final HashedBytes row; private final HashedBytes row;
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
@ -5557,16 +5557,16 @@ public class HRegion implements HeapSize { // , Writable{
this.row = row; this.row = row;
this.thread = Thread.currentThread(); this.thread = Thread.currentThread();
} }
boolean ownedByCurrentThread() { boolean ownedByCurrentThread() {
return thread == Thread.currentThread(); return thread == Thread.currentThread();
} }
RowLock newLock() { RowLock newLock() {
lockCount++; lockCount++;
return new RowLock(this); return new RowLock(this);
} }
void releaseLock() { void releaseLock() {
if (!ownedByCurrentThread()) { if (!ownedByCurrentThread()) {
throw new IllegalArgumentException("Lock held by thread: " + thread throw new IllegalArgumentException("Lock held by thread: " + thread
@ -5584,7 +5584,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} }
/** /**
* Row lock held by a given thread. * Row lock held by a given thread.
* One thread may acquire multiple locks on the same row simultaneously. * One thread may acquire multiple locks on the same row simultaneously.
@ -5593,11 +5593,11 @@ public class HRegion implements HeapSize { // , Writable{
public class RowLock { public class RowLock {
@VisibleForTesting final RowLockContext context; @VisibleForTesting final RowLockContext context;
private boolean released = false; private boolean released = false;
@VisibleForTesting RowLock(RowLockContext context) { @VisibleForTesting RowLock(RowLockContext context) {
this.context = context; this.context = context;
} }
/** /**
* Release the given lock. If there are no remaining locks held by the current thread * Release the given lock. If there are no remaining locks held by the current thread
* then unlock the row and allow other threads to acquire the lock. * then unlock the row and allow other threads to acquire the lock.
@ -5610,4 +5610,21 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
} }
/**
* Lock the updates' readLock first, so that we could safely append logs in coprocessors.
* @throws RegionTooBusyException
* @throws InterruptedIOException
*/
public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
lock(updatesLock.readLock());
}
/**
* Unlock the updates' readLock after appending logs in coprocessors.
* @throws InterruptedIOException
*/
public void updatesUnlock() throws InterruptedIOException {
updatesLock.readLock().unlock();
}
} }