HBASE-14317 Stuck FSHLog: bad disk (HDFS-8960) and can't roll WAL

This commit is contained in:
stack 2015-09-03 21:20:21 -07:00
parent 2481b7f76f
commit 661faf6fe0
19 changed files with 959 additions and 127 deletions

View File

@ -305,7 +305,8 @@ public class HFile {
try { try {
ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
} catch (UnsupportedOperationException uoe) { } catch (UnsupportedOperationException uoe) {
LOG.debug("Unable to set drop behind on " + path, uoe); if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe);
else if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path);
} }
} }
return createWriter(fs, path, ostream, return createWriter(fs, path, ostream,

View File

@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@ -204,11 +205,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand"; "hbase.hregion.scan.loadColumnFamiliesOnDemand";
// in milliseconds /**
private static final String MAX_WAIT_FOR_SEQ_ID_KEY = * Longest time we'll wait on a sequenceid.
"hbase.hregion.max.wait.for.seq.id"; * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
* it without cleanup previous usage properly; generally, a WAL roll is needed.
private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 60000; * Key to use changing the default of 30000ms.
*/
private final int maxWaitForSeqId;
private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
/** /**
* This is the global default value for durability. All tables/mutations not * This is the global default value for durability. All tables/mutations not
@ -241,7 +246,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
* Its default value is -1L. This default is used as a marker to indicate * Its default value is -1L. This default is used as a marker to indicate
* that the region hasn't opened yet. Once it is opened, it is set to the derived * that the region hasn't opened yet. Once it is opened, it is set to the derived
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. * #openSeqNum, the largest sequence id of all hfiles opened under this Region.
* *
* <p>Control of this sequence is handed off to the WAL implementation. It is responsible * <p>Control of this sequence is handed off to the WAL implementation. It is responsible
* for tagging edits with the correct sequence id since it is responsible for getting the * for tagging edits with the correct sequence id since it is responsible for getting the
@ -340,7 +345,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
private boolean isLoadingCfsOnDemandDefault = false; private boolean isLoadingCfsOnDemandDefault = false;
private int maxWaitForSeqId;
private final AtomicInteger majorInProgress = new AtomicInteger(0); private final AtomicInteger majorInProgress = new AtomicInteger(0);
private final AtomicInteger minorInProgress = new AtomicInteger(0); private final AtomicInteger minorInProgress = new AtomicInteger(0);
@ -677,7 +681,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION); DEFAULT_ROWLOCK_WAIT_DURATION);
maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd; this.htableDescriptor = htd;
this.rsServices = rsServices; this.rsServices = rsServices;
@ -2141,7 +2145,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// to do this for a moment. It is quick. We also set the memstore size to zero here before we // to do this for a moment. It is quick. We also set the memstore size to zero here before we
// allow updates again so its value will represent the size of the updates received // allow updates again so its value will represent the size of the updates received
// during flush // during flush
MultiVersionConcurrencyControl.WriteEntry w = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
// We have to take an update lock during snapshot, or else a write could end up in both snapshot // We have to take an update lock during snapshot, or else a write could end up in both snapshot
// and memstore (makes it difficult to do atomic rows then) // and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates"); status.setStatus("Obtaining lock to block concurrent updates");
@ -2174,7 +2178,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long trxId = 0; long trxId = 0;
try { try {
try { try {
w = mvcc.beginMemstoreInsert(); writeEntry = mvcc.beginMemstoreInsert();
if (wal != null) { if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion = Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames); wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
@ -2247,8 +2251,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
wal.sync(); // ensure that flush marker is sync'ed wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: " wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+ StringUtils.stringifyException(ioe)); throw ioe;
} }
} }
@ -2257,14 +2261,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// uncommitted transactions from being written into HFiles. // uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that // We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles. // were removed via a rollbackMemstore could be written to Hfiles.
w.setWriteNumber(flushOpSeqId); writeEntry.setWriteNumber(flushOpSeqId);
mvcc.waitForPreviousTransactionsComplete(w); mvcc.waitForPreviousTransactionsComplete(writeEntry);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null; writeEntry = null;
} finally { } finally {
if (w != null) { if (writeEntry != null) {
// in case of failure just mark current w as complete // in case of failure just mark current writeEntry as complete
mvcc.advanceMemstore(w); mvcc.advanceMemstore(writeEntry);
} }
} }
return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
@ -2446,8 +2450,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
@VisibleForTesting @VisibleForTesting
protected long getNextSequenceId(final WAL wal) throws IOException { protected long getNextSequenceId(final WAL wal) throws IOException {
// TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the
// WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or
// aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have
// a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
// Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
// so if an abort or stop, there is no way to call them in.
WALKey key = this.appendEmptyEdit(wal, null); WALKey key = this.appendEmptyEdit(wal, null);
return key.getSequenceId(maxWaitForSeqId); return key.getSequenceId(this.maxWaitForSeqId);
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -2866,7 +2876,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
WALEdit walEdit = new WALEdit(isInReplay); WALEdit walEdit = new WALEdit(isInReplay);
MultiVersionConcurrencyControl.WriteEntry w = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long txid = 0; long txid = 0;
boolean doRollBackMemstore = false; boolean doRollBackMemstore = false;
boolean locked = false; boolean locked = false;
@ -3019,7 +3029,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// ------------------------------------ // ------------------------------------
// Acquire the latest mvcc number // Acquire the latest mvcc number
// ---------------------------------- // ----------------------------------
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// calling the pre CP hook for batch mutation // calling the pre CP hook for batch mutation
if (!isInReplay && coprocessorHost != null) { if (!isInReplay && coprocessorHost != null) {
@ -3134,7 +3144,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells); getSequenceId(), true, memstoreCells);
} }
if(walKey == null){ if (walKey == null){
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendEmptyEdit(this.wal, memstoreCells); walKey = this.appendEmptyEdit(this.wal, memstoreCells);
} }
@ -3168,9 +3178,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// ------------------------------------------------------------------ // ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
// ------------------------------------------------------------------ // ------------------------------------------------------------------
if (w != null) { if (writeEntry != null) {
mvcc.completeMemstoreInsertWithSeqNum(w, walKey); mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
w = null; writeEntry = null;
} }
// ------------------------------------ // ------------------------------------
@ -3199,9 +3209,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// if the wal sync was unsuccessful, remove keys from memstore // if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) { if (doRollBackMemstore) {
rollbackMemstore(memstoreCells); rollbackMemstore(memstoreCells);
} if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
if (w != null) { } else if (writeEntry != null) {
mvcc.completeMemstoreInsertWithSeqNum(w, walKey); mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
} }
if (locked) { if (locked) {
@ -6735,6 +6745,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
processor.postBatchMutate(this); processor.postBatchMutate(this);
} }
} finally { } finally {
// TODO: Make this method look like all other methods that are doing append/sync and
// memstore rollback such as append and doMiniBatchMutation. Currently it is a little
// different. Make them all share same code!
if (!mutations.isEmpty() && !walSyncSuccessful) { if (!mutations.isEmpty() && !walSyncSuccessful) {
LOG.warn("Wal sync failed. Roll back " + mutations.size() + LOG.warn("Wal sync failed. Roll back " + mutations.size() +
" memstore keyvalues for row(s):" + StringUtils.byteToHexString( " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
@ -6745,6 +6758,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getStore(cell).rollback(cell); getStore(cell).rollback(cell);
} }
} }
if (writeEntry != null) {
mvcc.cancelMemstoreInsert(writeEntry);
writeEntry = null;
}
} }
// 13. Roll mvcc forward // 13. Roll mvcc forward
if (writeEntry != null) { if (writeEntry != null) {
@ -6846,7 +6863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.APPEND); startRegionOperation(Operation.APPEND);
this.writeRequestsCount.increment(); this.writeRequestsCount.increment();
long mvccNum = 0; long mvccNum = 0;
WriteEntry w = null; WriteEntry writeEntry = null;
WALKey walKey = null; WALKey walKey = null;
RowLock rowLock = null; RowLock rowLock = null;
List<Cell> memstoreCells = new ArrayList<Cell>(); List<Cell> memstoreCells = new ArrayList<Cell>();
@ -6867,7 +6884,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
// now start my own transaction // now start my own transaction
mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
// Process each family // Process each family
for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) { for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@ -7049,10 +7066,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// if the wal sync was unsuccessful, remove keys from memstore // if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) { if (doRollBackMemstore) {
rollbackMemstore(memstoreCells); rollbackMemstore(memstoreCells);
if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
} else if (writeEntry != null) {
mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
} }
if (w != null) {
mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
}
closeRegionOperation(Operation.APPEND); closeRegionOperation(Operation.APPEND);
} }
@ -7099,7 +7117,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.INCREMENT); startRegionOperation(Operation.INCREMENT);
this.writeRequestsCount.increment(); this.writeRequestsCount.increment();
RowLock rowLock = null; RowLock rowLock = null;
WriteEntry w = null; WriteEntry writeEntry = null;
WALKey walKey = null; WALKey walKey = null;
long mvccNum = 0; long mvccNum = 0;
List<Cell> memstoreCells = new ArrayList<Cell>(); List<Cell> memstoreCells = new ArrayList<Cell>();
@ -7120,7 +7138,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
// now start my own transaction // now start my own transaction
mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId); mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
// Process each family // Process each family
for (Map.Entry<byte [], List<Cell>> family: for (Map.Entry<byte [], List<Cell>> family:
@ -7290,9 +7308,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// if the wal sync was unsuccessful, remove keys from memstore // if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) { if (doRollBackMemstore) {
rollbackMemstore(memstoreCells); rollbackMemstore(memstoreCells);
} if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
if (w != null) { } else if (writeEntry != null) {
mvcc.completeMemstoreInsertWithSeqNum(w, walKey); mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
} }
closeRegionOperation(Operation.INCREMENT); closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) { if (this.metricsRegion != null) {
@ -7984,12 +8002,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException { private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors. // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
@SuppressWarnings("deprecation")
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated // Call append but with an empty WALEdit. The returned seqeunce id will not be associated
// with any edit and we can be sure it went in after all outstanding appends. // with any edit and we can be sure it went in after all outstanding appends.
wal.append(getTableDesc(), getRegionInfo(), key, wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); cells);
return key; return key;
} }

View File

@ -99,6 +99,15 @@ public class LogRoller extends HasThread {
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
} }
@Override
public void interrupt() {
// Wake up if we are waiting on rollLog. For tests.
synchronized (rollLog) {
this.rollLog.notify();
}
super.interrupt();
}
@Override @Override
public void run() { public void run() {
while (!server.isStopped()) { while (!server.isStopped()) {
@ -109,7 +118,9 @@ public class LogRoller extends HasThread {
if (!periodic) { if (!periodic) {
synchronized (rollLog) { synchronized (rollLog) {
try { try {
if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency); if (!rollLog.get()) {
rollLog.wait(this.threadWakeFrequency);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Fall through // Fall through
} }
@ -180,5 +191,4 @@ public class LogRoller extends HasThread {
requester); requester);
} }
} }
}
}

View File

@ -79,10 +79,11 @@ public class MultiVersionConcurrencyControl {
// current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
// because each handler could increment sequence num twice and max concurrent in-flight // because each handler could increment sequence num twice and max concurrent in-flight
// transactions is the number of RPC handlers. // transactions is the number of RPC handlers.
// we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
// changes touch same row key // changes touch same row key.
// If for any reason, the bumped value isn't reset due to failure situations, we'll reset // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
// curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
// St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
return sequenceId.incrementAndGet() + 1000000000; return sequenceId.incrementAndGet() + 1000000000;
} }
@ -127,6 +128,23 @@ public class MultiVersionConcurrencyControl {
waitForPreviousTransactionsComplete(e); waitForPreviousTransactionsComplete(e);
} }
/**
* Cancel a write insert that failed.
* Removes the write entry without advancing read point or without interfering with write
* entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
* will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
* the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
* it as for special handling).
* @param writeEntry Failed attempt at write. Does cleanup.
*/
public void cancelMemstoreInsert(WriteEntry writeEntry) {
// I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
// readpoint and gets my little writeEntry completed and removed from queue of outstanding
// events which seems right. St.Ack 20150901.
writeEntry.setWriteNumber(NO_WRITE_NUMBER);
advanceMemstore(writeEntry);
}
/** /**
* Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
* end of this call, the global read point is at least as large as the write point of the passed * end of this call, the global read point is at least as large as the write point of the passed

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Thrown when a failed append or sync on a WAL.
* Thrown when WAL can no longer be used. Roll the WAL.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Private
public class DamagedWALException extends HBaseIOException {
public DamagedWALException() {
super();
}
public DamagedWALException(String message) {
super(message);
}
public DamagedWALException(String message, Throwable cause) {
super(message, cause);
}
public DamagedWALException(Throwable cause) {
super(cause);
}
}

View File

@ -111,6 +111,16 @@ import com.lmax.disruptor.dsl.ProducerType;
* *
* <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
* org.apache.hadoop.fs.Path)}. * org.apache.hadoop.fs.Path)}.
*
* <h2>Failure Semantic</h2>
* If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
* any more appends or syncs will fail also with the same original exception. If we have made
* successful appends to the WAL and we then are unable to sync them, our current semantic is to
* return error to the client that the appends failed but also to abort the current context,
* usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of
* WAL may be sufficient as long as we have flagged client that the append failed. TODO:
* replication may pick up these last edits though they have been marked as failed append (Need to
* keep our own file lengths, not rely on HDFS).
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class FSHLog implements WAL { public class FSHLog implements WAL {
@ -344,7 +354,7 @@ public class FSHLog implements WAL {
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
* Throws an IllegalArgumentException if used to compare paths from different wals. * Throws an IllegalArgumentException if used to compare paths from different wals.
*/ */
public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
@Override @Override
public int compare(Path o1, Path o2) { public int compare(Path o1, Path o2) {
long t1 = getFileNumFromFileName(o1); long t1 = getFileNumFromFileName(o1);
@ -448,7 +458,7 @@ public class FSHLog implements WAL {
prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
// we only correctly differentiate suffices when numeric ones start with '.' // we only correctly differentiate suffices when numeric ones start with '.'
if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
throw new IllegalArgumentException("wal suffix must start with '" + WAL_FILE_NAME_DELIMITER + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
"' but instead was '" + suffix + "'"); "' but instead was '" + suffix + "'");
} }
// Now that it exists, set the storage policy for the entire directory of wal files related to // Now that it exists, set the storage policy for the entire directory of wal files related to
@ -572,7 +582,9 @@ public class FSHLog implements WAL {
*/ */
@VisibleForTesting @VisibleForTesting
OutputStream getOutputStream() { OutputStream getOutputStream() {
return this.hdfs_out.getWrappedStream(); FSDataOutputStream fsdos = this.hdfs_out;
if (fsdos == null) return null;
return fsdos.getWrappedStream();
} }
@Override @Override
@ -757,6 +769,19 @@ public class FSHLog implements WAL {
return regions; return regions;
} }
/**
* Used to manufacture race condition reliably. For testing only.
* @see #beforeWaitOnSafePoint()
*/
@VisibleForTesting
protected void afterCreatingZigZagLatch() {}
/**
* @see #afterCreatingZigZagLatch()
*/
@VisibleForTesting
protected void beforeWaitOnSafePoint() {};
/** /**
* Cleans up current writer closing it and then puts in place the passed in * Cleans up current writer closing it and then puts in place the passed in
* <code>nextWriter</code>. * <code>nextWriter</code>.
@ -786,6 +811,7 @@ public class FSHLog implements WAL {
SyncFuture syncFuture = null; SyncFuture syncFuture = null;
SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
null: this.ringBufferEventHandler.attainSafePoint(); null: this.ringBufferEventHandler.attainSafePoint();
afterCreatingZigZagLatch();
TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
try { try {
// Wait on the safe point to be achieved. Send in a sync in case nothing has hit the // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
@ -799,9 +825,10 @@ public class FSHLog implements WAL {
syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
} }
} catch (FailedSyncBeforeLogCloseException e) { } catch (FailedSyncBeforeLogCloseException e) {
// If unflushed/unsynced entries on close, it is reason to abort.
if (isUnflushedEntries()) throw e; if (isUnflushedEntries()) throw e;
// Else, let is pass through to the close. // Else, let is pass through to the close.
LOG.warn("Failed last sync but no outstanding unsync edits so falling through to close; " + LOG.warn("Failed sync but no outstanding unsync'd edits so falling through to close; " +
e.getMessage()); e.getMessage());
} }
@ -907,7 +934,7 @@ public class FSHLog implements WAL {
*/ */
protected Path computeFilename(final long filenum) { protected Path computeFilename(final long filenum) {
if (filenum < 0) { if (filenum < 0) {
throw new RuntimeException("wal file number can't be < 0"); throw new RuntimeException("WAL file number can't be < 0");
} }
String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix; String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
return new Path(fullPathLogDir, child); return new Path(fullPathLogDir, child);
@ -939,7 +966,7 @@ public class FSHLog implements WAL {
if (fileName == null) throw new IllegalArgumentException("file name can't be null"); if (fileName == null) throw new IllegalArgumentException("file name can't be null");
if (!ourFiles.accept(fileName)) { if (!ourFiles.accept(fileName)) {
throw new IllegalArgumentException("The log file " + fileName + throw new IllegalArgumentException("The log file " + fileName +
" doesn't belong to this wal. (" + toString() + ")"); " doesn't belong to this WAL. (" + toString() + ")");
} }
final String fileNameString = fileName.toString(); final String fileNameString = fileName.toString();
String chompedPath = fileNameString.substring(prefixPathStr.length(), String chompedPath = fileNameString.substring(prefixPathStr.length(),
@ -1030,6 +1057,7 @@ public class FSHLog implements WAL {
* @param clusterIds that have consumed the change * @param clusterIds that have consumed the change
* @return New log key. * @return New log key.
*/ */
@SuppressWarnings("deprecation")
protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
long now, List<UUID> clusterIds, long nonceGroup, long nonce) { long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors. // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
@ -1082,6 +1110,7 @@ public class FSHLog implements WAL {
*/ */
private class SyncRunner extends HasThread { private class SyncRunner extends HasThread {
private volatile long sequence; private volatile long sequence;
// Keep around last exception thrown. Clear on successful sync.
private final BlockingQueue<SyncFuture> syncFutures; private final BlockingQueue<SyncFuture> syncFutures;
/** /**
@ -1200,28 +1229,27 @@ public class FSHLog implements WAL {
// while we run. // while we run.
TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
long start = System.nanoTime(); long start = System.nanoTime();
Throwable t = null; Throwable lastException = null;
try { try {
Trace.addTimelineAnnotation("syncing writer"); Trace.addTimelineAnnotation("syncing writer");
writer.sync(); writer.sync();
Trace.addTimelineAnnotation("writer synced"); Trace.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence); currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error syncing, request close of wal ", e); LOG.error("Error syncing, request close of WAL", e);
t = e; lastException = e;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("UNEXPECTED", e); LOG.warn("UNEXPECTED", e);
t = e; lastException = e;
} finally { } finally {
// reattach the span to the future before releasing. // reattach the span to the future before releasing.
takeSyncFuture.setSpan(scope.detach()); takeSyncFuture.setSpan(scope.detach());
// First release what we 'took' from the queue. // First release what we 'took' from the queue.
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
// Can we release other syncs? // Can we release other syncs?
syncCount += releaseSyncFutures(currentSequence, t); syncCount += releaseSyncFutures(currentSequence, lastException);
if (t != null) { if (lastException != null) requestLogRoll();
requestLogRoll(); else checkLogRoll();
} else checkLogRoll();
} }
postSync(System.nanoTime() - start, syncCount); postSync(System.nanoTime() - start, syncCount);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1270,7 +1298,7 @@ public class FSHLog implements WAL {
LOG.warn("HDFS pipeline error detected. " + "Found " LOG.warn("HDFS pipeline error detected. " + "Found "
+ numCurrentReplicas + " replicas but expecting no less than " + numCurrentReplicas + " replicas but expecting no less than "
+ this.minTolerableReplication + " replicas. " + this.minTolerableReplication + " replicas. "
+ " Requesting close of wal. current pipeline: " + " Requesting close of WAL. current pipeline: "
+ Arrays.toString(getPipeLine())); + Arrays.toString(getPipeLine()));
logRollNeeded = true; logRollNeeded = true;
// If rollWriter is requested, increase consecutiveLogRolls. Once it // If rollWriter is requested, increase consecutiveLogRolls. Once it
@ -1676,6 +1704,11 @@ public class FSHLog implements WAL {
// syncFutures to the next sync'ing thread. // syncFutures to the next sync'ing thread.
private volatile int syncFuturesCount = 0; private volatile int syncFuturesCount = 0;
private volatile SafePointZigZagLatch zigzagLatch; private volatile SafePointZigZagLatch zigzagLatch;
/**
* Set if we get an exception appending or syncing so that all subsequence appends and syncs
* on this WAL fail until WAL is replaced.
*/
private Exception exception = null;
/** /**
* Object to block on while waiting on safe point. * Object to block on while waiting on safe point.
*/ */
@ -1696,17 +1729,30 @@ public class FSHLog implements WAL {
} }
private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
// There could be handler-count syncFutures outstanding.
for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
this.syncFuturesCount = 0; this.syncFuturesCount = 0;
} }
/**
* @return True if outstanding sync futures still
*/
private boolean isOutstandingSyncs() {
for (int i = 0; i < this.syncFuturesCount; i++) {
if (!this.syncFutures[i].isDone()) return true;
}
return false;
}
@Override @Override
// We can set endOfBatch in the below method if at end of our this.syncFutures array // We can set endOfBatch in the below method if at end of our this.syncFutures array
public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
throws Exception { throws Exception {
// Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll
// add appends to dfsclient as they come in. Batching appends doesn't give any significant // add appends to dfsclient as they come in. Batching appends doesn't give any significant
// benefit on measurement. Handler sync calls we will batch up. // benefit on measurement. Handler sync calls we will batch up. If we get an exception
// appending an edit, we fail all subsequent appends and syncs with the same exception until
// the WAL is reset.
try { try {
if (truck.hasSyncFuturePayload()) { if (truck.hasSyncFuturePayload()) {
@ -1716,12 +1762,17 @@ public class FSHLog implements WAL {
} else if (truck.hasFSWALEntryPayload()) { } else if (truck.hasFSWALEntryPayload()) {
TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
try { try {
append(truck.unloadFSWALEntryPayload()); FSWALEntry entry = truck.unloadFSWALEntryPayload();
// If already an exception, do not try to append. Throw.
if (this.exception != null) throw this.exception;
append(entry);
} catch (Exception e) { } catch (Exception e) {
// Failed append. Record the exception. Throw it from here on out til new WAL in place
this.exception = new DamagedWALException(e);
// If append fails, presume any pending syncs will fail too; let all waiting handlers // If append fails, presume any pending syncs will fail too; let all waiting handlers
// know of the exception. // know of the exception.
cleanupOutstandingSyncsOnException(sequence, e); cleanupOutstandingSyncsOnException(sequence, this.exception);
// Return to keep processing. // Return to keep processing events coming off the ringbuffer
return; return;
} finally { } finally {
assert scope == NullScope.INSTANCE || !scope.isDetached(); assert scope == NullScope.INSTANCE || !scope.isDetached();
@ -1748,13 +1799,20 @@ public class FSHLog implements WAL {
} }
// Below expects that the offer 'transfers' responsibility for the outstanding syncs to the // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the
// syncRunner. We should never get an exception in here. HBASE-11145 was because queue // syncRunner. We should never get an exception in here.
// was sized exactly to the count of user handlers but we could have more if we factor in
// meta handlers doing opens and closes.
int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
try { try {
this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); if (this.exception != null) {
// Do not try to sync. If a this.exception, then we failed an append. Do not try to
// sync a failed append. Fall through to the attainSafePoint below. It is part of the
// means by which we put in place a new WAL. A new WAL is how we clean up.
// Don't throw because then we'll not get to attainSafePoint.
cleanupOutstandingSyncsOnException(sequence, this.exception);
} else {
this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
}
} catch (Exception e) { } catch (Exception e) {
// Should NEVER get here.
cleanupOutstandingSyncsOnException(sequence, e); cleanupOutstandingSyncsOnException(sequence, e);
throw e; throw e;
} }
@ -1777,16 +1835,24 @@ public class FSHLog implements WAL {
private void attainSafePoint(final long currentSequence) { private void attainSafePoint(final long currentSequence) {
if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
// If here, another thread is waiting on us to get to safe point. Don't leave it hanging. // If here, another thread is waiting on us to get to safe point. Don't leave it hanging.
beforeWaitOnSafePoint();
try { try {
// Wait on outstanding syncers; wait for them to finish syncing (unless we've been // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
// shutdown or unless our latch has been thrown because we have been aborted). // shutdown or unless our latch has been thrown because we have been aborted or unless
// this WAL is broken and we can't get a sync/append to complete).
while (!this.shutdown && this.zigzagLatch.isCocked() && while (!this.shutdown && this.zigzagLatch.isCocked() &&
highestSyncedSequence.get() < currentSequence) { highestSyncedSequence.get() < currentSequence &&
// We could be in here and all syncs are failing or failed. Check for this. Otherwise
// we'll just be stuck here for ever. In other words, ensure there syncs running.
isOutstandingSyncs()) {
synchronized (this.safePointWaiter) { synchronized (this.safePointWaiter) {
this.safePointWaiter.wait(0, 1); this.safePointWaiter.wait(0, 1);
} }
} }
// Tell waiting thread we've attained safe point // Tell waiting thread we've attained safe point. Can clear this.throwable if set here
// because we know that next event through the ringbuffer will be going to a new WAL
// after we do the zigzaglatch dance.
this.exception = null;
this.zigzagLatch.safePointAttained(); this.zigzagLatch.safePointAttained();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Interrupted ", e); LOG.warn("Interrupted ", e);
@ -1844,7 +1910,7 @@ public class FSHLog implements WAL {
// Update metrics. // Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start); postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Could not append. Requesting close of wal", e); LOG.warn("Could not append. Requesting close of WAL", e);
requestLogRoll(); requestLogRoll();
throw e; throw e;
} }

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
* region sequence id (we want to use this later, just before we write the WAL to ensure region * region sequence id (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit * edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on * hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
* the assign of the region sequence id. See {@link #stampRegionSequenceId()}. * the assign of the region sequence id. See #stampRegionSequenceId().
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class FSWALEntry extends Entry { class FSWALEntry extends Entry {

View File

@ -126,6 +126,9 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce); super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
} }
/**
* @deprecated Don't use these Writables methods. Use PB instead.
*/
@Override @Override
@Deprecated @Deprecated
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
@ -204,6 +207,7 @@ public class HLogKey extends WALKey implements Writable {
in.readByte(); in.readByte();
} catch(EOFException e) { } catch(EOFException e) {
// Means it's a very old key, just continue // Means it's a very old key, just continue
if (LOG.isTraceEnabled()) LOG.trace(e);
} }
} }
try { try {

View File

@ -53,11 +53,12 @@ import com.google.protobuf.CodedInputStream;
* &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt; * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
* </p> * </p>
* The Reader reads meta information (WAL Compression state, WALTrailer, etc) in * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
* {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure
* which is appended at the end of the WAL. This is empty for now; it can contain some meta * which is appended at the end of the WAL. This is empty for now; it can contain some meta
* information such as Region level stats, etc in future. * information such as Region level stats, etc in future.
*/ */
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
HBaseInterfaceAudience.CONFIG})
public class ProtobufLogReader extends ReaderBase { public class ProtobufLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class); private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
// public for WALFactory until we move everything to o.a.h.h.wal // public for WALFactory until we move everything to o.a.h.h.wal
@ -78,8 +79,8 @@ public class ProtobufLogReader extends ReaderBase {
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false; protected boolean hasCompression = false;
protected boolean hasTagCompression = false; protected boolean hasTagCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
// in the wal, the inputstream's position is equal to walEditsStopOffset. // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset; private long walEditsStopOffset;
private boolean trailerPresent; private boolean trailerPresent;
protected WALTrailer trailer; protected WALTrailer trailer;

View File

@ -159,7 +159,7 @@ public class ProtobufLogWriter extends WriterBase {
output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC); output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
this.trailerWritten = true; this.trailerWritten = true;
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Got IOException while writing trailer", ioe); LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
} }
} }

View File

@ -42,9 +42,9 @@ import org.apache.htrace.Span;
* SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
* call every time a Handler asks for it. * call every time a Handler asks for it.
* <p> * <p>
* SyncFutures are immutable but recycled. Call {@link #reset(long, Span)} before use even * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
* if it the first time, start the sync, then park the 'hitched' thread on a call to * if it the first time, start the sync, then park the 'hitched' thread on a call to
* {@link #get()} * #get().
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class SyncFuture { class SyncFuture {

View File

@ -32,11 +32,11 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
// imports for things that haven't moved from regionserver.wal yet. // imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
@ -296,7 +298,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
} }
/** /**
* Wait for sequence number is assigned &amp; return the assigned value * Wait for sequence number to be assigned &amp; return the assigned value
* @return long the new assigned sequence number * @return long the new assigned sequence number
* @throws IOException * @throws IOException
*/ */
@ -306,19 +308,21 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
} }
/** /**
* Wait for sequence number is assigned &amp; return the assigned value * Wait for sequence number to be assigned &amp; return the assigned value.
* @param maxWaitForSeqId maximum duration, in milliseconds, to wait for seq number to be assigned * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid
* @return long the new assigned sequence number * @return long the new assigned sequence number
* @throws IOException * @throws IOException
*/ */
public long getSequenceId(int maxWaitForSeqId) throws IOException { public long getSequenceId(final long maxWaitForSeqId) throws IOException {
// TODO: This implementation waiting on a latch is problematic because if a higher level
// determines we should stop or abort, there is not global list of all these blocked WALKeys
// waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId
try { try {
if (maxWaitForSeqId < 0) { if (maxWaitForSeqId < 0) {
this.seqNumAssignedLatch.await(); this.seqNumAssignedLatch.await();
} else { } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
throw new IOException("Timed out waiting for seq number to be assigned"); "ms; WAL system stuck or has gone away?");
}
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for next log sequence number"); LOG.warn("Thread interrupted waiting for next log sequence number");

View File

@ -31,7 +31,6 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
@ -42,18 +41,17 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
@ -73,7 +71,7 @@ public class TestFSErrorsExposed {
* Injects errors into the pread calls of an on-disk file, and makes * Injects errors into the pread calls of an on-disk file, and makes
* sure those bubble up to the HFile scanner * sure those bubble up to the HFile scanner
*/ */
@Test // @Test
public void testHFileScannerThrowsErrors() throws IOException { public void testHFileScannerThrowsErrors() throws IOException {
Path hfilePath = new Path(new Path( Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"), util.getDataTestDir("internalScannerExposesErrors"),
@ -123,7 +121,7 @@ public class TestFSErrorsExposed {
* Injects errors into the pread calls of an on-disk file, and makes * Injects errors into the pread calls of an on-disk file, and makes
* sure those bubble up to the StoreFileScanner * sure those bubble up to the StoreFileScanner
*/ */
@Test // @Test
public void testStoreFileScannerThrowsErrors() throws IOException { public void testStoreFileScannerThrowsErrors() throws IOException {
Path hfilePath = new Path(new Path( Path hfilePath = new Path(new Path(
util.getDataTestDir("internalScannerExposesErrors"), util.getDataTestDir("internalScannerExposesErrors"),
@ -222,7 +220,8 @@ public class TestFSErrorsExposed {
util.getDFSCluster().restartDataNodes(); util.getDFSCluster().restartDataNodes();
} finally { } finally {
util.getMiniHBaseCluster().killAll(); MiniHBaseCluster cluster = util.getMiniHBaseCluster();
if (cluster != null) cluster.killAll();
util.shutdownMiniCluster(); util.shutdownMiniCluster();
} }
} }

View File

@ -0,0 +1,254 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
/**
* Testing sync/append failures.
* Copied from TestHRegion.
*/
@Category({MediumTests.class})
public class TestFailedAppendAndSync {
private static final Log LOG = LogFactory.getLog(TestFailedAppendAndSync.class);
@Rule public TestName name = new TestName();
private static final String COLUMN_FAMILY = "MyCF";
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
HRegion region = null;
// Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
private static HBaseTestingUtility TEST_UTIL;
public static Configuration CONF ;
private String dir;
// Test names
protected TableName tableName;
@Before
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@After
public void tearDown() throws Exception {
EnvironmentEdgeManagerTestHelper.reset();
LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
TEST_UTIL.cleanupTestDir();
}
String getName() {
return name.getMethodName();
}
/**
* Reproduce locking up that happens when we get an exceptions appending and syncing.
* See HBASE-14317.
* First I need to set up some mocks for Server and RegionServerServices. I also need to
* set up a dodgy WAL that will throw an exception when we go to append to it.
*/
@Test (timeout=300000)
public void testLockupAroundBadAssignSync() throws IOException {
// Dodgy WAL. Will throw exceptions when flags set.
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
volatile boolean throwAppendException = false;
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync() throws IOException {
if (throwSyncException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.sync();
}
@Override
public void append(Entry entry) throws IOException {
if (throwAppendException) {
throw new IOException("FAKE! Failed to replace a bad datanode...");
}
w.append(entry);
}
@Override
public long getLength() throws IOException {
return w.getLength();
}
};
}
}
// Make up mocked server and services.
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(CONF);
when(server.isStopped()).thenReturn(false);
when(server.isAborted()).thenReturn(false);
RegionServerServices services = mock(RegionServerServices.class);
// OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with
// the test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL);
logRoller.start();
boolean threwOnSync = false;
boolean threwOnAppend = false;
boolean threwOnBoth = false;
HRegion region = initHRegion(tableName, null, null, dodgyWAL);
try {
// Get some random bytes.
byte[] value = Bytes.toBytes(getName());
try {
// First get something into memstore
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), value);
region.put(put);
} catch (IOException ioe) {
fail();
}
try {
dodgyWAL.throwAppendException = true;
dodgyWAL.throwSyncException = false;
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("3"), value);
region.put(put);
} catch (IOException ioe) {
threwOnAppend = true;
}
// When we get to here.. we should be ok. A new WAL has been put in place. There were no
// appends to sync. We should be able to continue.
try {
dodgyWAL.throwAppendException = true;
dodgyWAL.throwSyncException = true;
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("4"), value);
region.put(put);
} catch (IOException ioe) {
threwOnBoth = true;
}
// Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able
// to just continue.
// So, should be no abort at this stage. Verify.
Mockito.verify(server, Mockito.atLeast(0)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
try {
dodgyWAL.throwAppendException = false;
dodgyWAL.throwSyncException = true;
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("2"), value);
region.put(put);
} catch (IOException ioe) {
threwOnSync = true;
}
// An append in the WAL but the sync failed is a server abort condition. That is our
// current semantic. Verify. It takes a while for abort to be called. Just hang here till it
// happens. If it don't we'll timeout the whole test. That is fine.
while (true) {
try {
Mockito.verify(server, Mockito.atLeast(1)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
break;
} catch (WantedButNotInvoked t) {
Threads.sleep(1);
}
}
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
if (logRoller != null) logRoller.interrupt();
if (region != null) {
try {
region.close(true);
} catch (DroppedSnapshotException e) {
LOG.info("On way out; expected!", e);
}
}
if (dodgyWAL != null) dodgyWAL.close();
assertTrue("The regionserver should have thrown an exception", threwOnBoth);
assertTrue("The regionserver should have thrown an exception", threwOnAppend);
assertTrue("The regionserver should have thrown an exception", threwOnSync);
}
}
/**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}
}

View File

@ -33,7 +33,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.*; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -128,7 +130,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
import org.apache.hadoop.hbase.regionserver.wal.*; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -147,6 +155,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -285,6 +294,8 @@ public class TestHRegion {
HBaseTestingUtility.closeRegionAndWAL(region); HBaseTestingUtility.closeRegionAndWAL(region);
} }
/* /*
* This test is for verifying memstore snapshot size is correctly updated in case of rollback * This test is for verifying memstore snapshot size is correctly updated in case of rollback
* See HBASE-10845 * See HBASE-10845
@ -381,7 +392,8 @@ public class TestHRegion {
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null); when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))).
thenReturn(null);
region.setCoprocessorHost(mockedCPHost); region.setCoprocessorHost(mockedCPHost);
region.put(put); region.put(put);
region.flush(true); region.flush(true);
@ -950,7 +962,7 @@ public class TestHRegion {
// now verify that the flush markers are written // now verify that the flush markers are written
wal.shutdown(); wal.shutdown();
WAL.Reader reader = wals.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal),
TEST_UTIL.getConfiguration()); TEST_UTIL.getConfiguration());
try { try {
List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>(); List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>();
@ -1066,8 +1078,7 @@ public class TestHRegion {
} }
} }
@Test @Test (timeout=60000)
@SuppressWarnings("unchecked")
public void testFlushMarkersWALFail() throws Exception { public void testFlushMarkersWALFail() throws Exception {
// test the cases where the WAL append for flush markers fail. // test the cases where the WAL append for flush markers fail.
String method = name.getMethodName(); String method = name.getMethodName();
@ -1079,9 +1090,55 @@ public class TestHRegion {
final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
FSUtils.setRootDir(walConf, logDir); FSUtils.setRootDir(walConf, logDir);
final WALFactory wals = new WALFactory(walConf, null, method); // Make up a WAL that we can manipulate at append time.
WAL wal = spy(wals.getWAL(tableName.getName())); class FailAppendFlushMarkerWAL extends FSHLog {
volatile FlushAction [] flushActions = null;
public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync() throws IOException {
w.sync();
}
@Override
public void append(Entry entry) throws IOException {
List<Cell> cells = entry.getEdit().getCells();
if (WALEdit.isMetaEditFamily(cells.get(0))) {
FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
if (desc != null) {
for (FlushAction flushAction: flushActions) {
if (desc.getAction().equals(flushAction)) {
throw new IOException("Failed to append flush marker! " + flushAction);
}
}
}
}
w.append(entry);
}
@Override
public long getLength() throws IOException {
return w.getLength();
}
};
}
}
FailAppendFlushMarkerWAL wal =
new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
getName(), walConf);
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family); HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
try { try {
@ -1092,13 +1149,7 @@ public class TestHRegion {
region.put(put); region.put(put);
// 1. Test case where START_FLUSH throws exception // 1. Test case where START_FLUSH throws exception
IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
// throw exceptions if the WalEdit is a start flush action
when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
(WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
(List<Cell>)any()))
.thenThrow(new IOException("Fail to append flush marker"));
// start cache flush will throw exception // start cache flush will throw exception
try { try {
@ -1110,9 +1161,13 @@ public class TestHRegion {
} catch (IOException expected) { } catch (IOException expected) {
// expected // expected
} }
// The WAL is hosed. It has failed an append and a sync. It has an exception stuck in it
// which it will keep returning until we roll the WAL to prevent any further appends going
// in or syncs succeeding on top of failed appends, a no-no.
wal.rollWriter(true);
// 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
isFlushWALMarker.set(FlushAction.COMMIT_FLUSH); wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
try { try {
region.flush(true); region.flush(true);
@ -1125,6 +1180,8 @@ public class TestHRegion {
} }
region.close(); region.close();
// Roll WAL to clean out any exceptions stuck in it. See note above where we roll WAL.
wal.rollWriter(true);
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family); HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
region.put(put); region.put(put);
@ -1132,7 +1189,7 @@ public class TestHRegion {
// 3. Test case where ABORT_FLUSH will throw exception. // 3. Test case where ABORT_FLUSH will throw exception.
// Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
// DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
isFlushWALMarker.set(FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH); wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
try { try {
region.flush(true); region.flush(true);
@ -5736,7 +5793,6 @@ public class TestHRegion {
putData(startRow, numRows, qualifier, families); putData(startRow, numRows, qualifier, families);
int splitRow = startRow + numRows; int splitRow = startRow + numRows;
putData(splitRow, numRows, qualifier, families); putData(splitRow, numRows, qualifier, families);
int endRow = splitRow + numRows;
region.flush(true); region.flush(true);
HRegion [] regions = null; HRegion [] regions = null;

View File

@ -129,7 +129,5 @@ public class TestMultiVersionConcurrencyControl extends TestCase {
for (int i = 0; i < n; ++i) { for (int i = 0; i < n; ++i) {
assertTrue(statuses[i].get()); assertTrue(statuses[i].get());
} }
} }
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Very basic tests.
* @see TestMultiVersionConcurrencyControl for more.
*/
@Category({RegionServerTests.class, SmallTests.class})
public class TestMultiVersionConcurrencyControlBasic {
@Test
public void testSimpleMvccOps() {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
long readPoint = mvcc.memstoreReadPoint();
MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.beginMemstoreInsert();
mvcc.completeMemstoreInsert(writeEntry);
long readPoint2 = mvcc.memstoreReadPoint();
assertEquals(readPoint, readPoint2);
long seqid = 238;
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid);
mvcc.completeMemstoreInsert(writeEntry);
assertEquals(seqid, mvcc.memstoreReadPoint());
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid + 1);
assertTrue(mvcc.advanceMemstore(writeEntry));
assertEquals(seqid + 1, mvcc.memstoreReadPoint());
}
@Test
public void testCancel() {
long seqid = 238;
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
MultiVersionConcurrencyControl.WriteEntry writeEntry =
mvcc.beginMemstoreInsertWithSeqNum(seqid);
assertTrue(mvcc.advanceMemstore(writeEntry));
assertEquals(seqid, mvcc.memstoreReadPoint());
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(seqid + 1);
mvcc.cancelMemstoreInsert(writeEntry);
assertEquals(seqid, mvcc.memstoreReadPoint());
}
}

View File

@ -0,0 +1,289 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
/**
* Testing for lock up of WAL subsystem.
* Copied from TestHRegion.
*/
@Category({MediumTests.class})
public class TestWALLockup {
private static final Log LOG = LogFactory.getLog(TestWALLockup.class);
@Rule public TestName name = new TestName();
private static final String COLUMN_FAMILY = "MyCF";
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
HRegion region = null;
// Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
private static HBaseTestingUtility TEST_UTIL;
private static Configuration CONF ;
private String dir;
// Test names
protected TableName tableName;
@Before
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@After
public void tearDown() throws Exception {
EnvironmentEdgeManagerTestHelper.reset();
LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
TEST_UTIL.cleanupTestDir();
}
String getName() {
return name.getMethodName();
}
/**
* Reproduce locking up that happens when we get an inopportune sync during setup for
* zigzaglatch wait. See HBASE-14317. If below is broken, we will see this test timeout because
* it is locked up.
* <p>First I need to set up some mocks for Server and RegionServerServices. I also need to
* set up a dodgy WAL that will throw an exception when we go to append to it.
*/
@Test (timeout=30000)
public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
// A WAL that we can have throw exceptions when a flag is set.
class DodgyFSLog extends FSHLog {
// Set this when want the WAL to start throwing exceptions.
volatile boolean throwException = false;
// Latch to hold up processing until after another operation has had time to run.
CountDownLatch latch = new CountDownLatch(1);
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
protected void afterCreatingZigZagLatch() {
// If throwException set, then append will throw an exception causing the WAL to be
// rolled. We'll come in here. Hold up processing until a sync can get in before
// the zigzag has time to complete its setup and get its own sync in. This is what causes
// the lock up we've seen in production.
if (throwException) {
try {
LOG.info("LATCHED");
this.latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@Override
protected void beforeWaitOnSafePoint() {
if (throwException) {
LOG.info("COUNTDOWN");
// Don't countdown latch until someone waiting on it.
while (this.latch.getCount() <= 0) {
Threads.sleep(10);
}
this.latch.countDown();
}
}
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
public void close() throws IOException {
w.close();
}
@Override
public void sync() throws IOException {
if (throwException) {
throw new IOException("FAKE! Failed to replace a bad datanode...SYNC");
}
w.sync();
}
@Override
public void append(Entry entry) throws IOException {
if (throwException) {
throw new IOException("FAKE! Failed to replace a bad datanode...APPEND");
}
w.append(entry);
}
@Override
public long getLength() throws IOException {
return w.getLength();
}
};
}
}
// Mocked up server and regionserver services. Needed below.
Server server = Mockito.mock(Server.class);
Mockito.when(server.getConfiguration()).thenReturn(CONF);
Mockito.when(server.isStopped()).thenReturn(false);
Mockito.when(server.isAborted()).thenReturn(false);
RegionServerServices services = Mockito.mock(RegionServerServices.class);
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
Path originalWAL = dodgyWAL.getCurrentFileName();
// I need a log roller running.
LogRoller logRoller = new LogRoller(server, services);
logRoller.addWAL(dodgyWAL);
// There is no 'stop' once a logRoller is running.. it just dies.
logRoller.start();
// Now get a region and start adding in edits.
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
try {
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
// manage append and sync carefully in below to manufacture hang. We keep adding same
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
WALEdit edit = new WALEdit();
List<Cell> cells = new ArrayList<Cell>();
for (CellScanner cs = put.cellScanner(); cs.advance();) {
edit.add(cs.current());
cells.add(cs.current());
}
// Put something in memstore and out in the WAL. Do a big number of appends so we push
// out other side of the ringbuffer. If small numbers, stuff doesn't make it to WAL
for (int i = 0; i < 1000; i++) {
dodgyWAL.append(htd, region.getRegionInfo(), key, edit, region.getSequenceId(), true,
cells);
}
// Set it so we start throwing exceptions.
dodgyWAL.throwException = true;
// This append provokes a WAL roll.
dodgyWAL.append(htd, region.getRegionInfo(), key, edit, region.getSequenceId(), true, cells);
boolean exception = false;
Mockito.verify(server, Mockito.atLeast(0)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
try {
dodgyWAL.sync();
} catch (Exception e) {
exception = true;
}
assertTrue("Did not get sync exception", exception);
// An append in the WAL but the sync failed is a server abort condition. That is our
// current semantic. Verify. It takes a while for abort to be called. Just hang here till it
// happens. If it don't we'll timeout the whole test. That is fine.
while (true) {
try {
Mockito.verify(server, Mockito.atLeast(1)).
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
break;
} catch (WantedButNotInvoked t) {
Threads.sleep(1);
}
}
// Get a memstore flush going too so we have same hung profile as up in the issue over
// in HBASE-14317. Flush hangs trying to get sequenceid because the ringbuffer is held up
// by the zigzaglatch waiting on syncs to come home.
Thread t = new Thread ("flusher") {
public void run() {
try {
region.flush(false);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
};
t.setDaemon(true);
t.start();
// Wait till it gets into flushing. It will get stuck on getSequenceId. Then proceed.
while (!region.writestate.flushing) Threads.sleep(1);
// Now assert I got a new WAL file put in place even though loads of errors above.
assertTrue(originalWAL != dodgyWAL.getCurrentFileName());
// Can I append to it?
dodgyWAL.throwException = false;
region.put(put);
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(server.isStopped()).thenReturn(true);
if (logRoller != null) logRoller.interrupt();
if (region != null) region.close();
if (dodgyWAL != null) dodgyWAL.close();
}
}
/**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
}
}

View File

@ -90,10 +90,6 @@ public class TestLogRolling {
private MiniHBaseCluster cluster; private MiniHBaseCluster cluster;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/**
* constructor
* @throws Exception
*/
public TestLogRolling() { public TestLogRolling() {
this.server = null; this.server = null;
this.tableName = null; this.tableName = null;
@ -529,7 +525,16 @@ public class TestLogRolling {
// flush all regions // flush all regions
for (Region r: server.getOnlineRegionsLocalContext()) { for (Region r: server.getOnlineRegionsLocalContext()) {
r.flush(true); try {
r.flush(true);
} catch (Exception e) {
// This try/catch was added by HBASE-14317. It is needed
// because this issue tightened up the semantic such that
// a failed append could not be followed by a successful
// sync. What is coming out here is a failed sync, a sync
// that used to 'pass'.
LOG.info(e);
}
} }
ResultScanner scanner = table.getScanner(new Scan()); ResultScanner scanner = table.getScanner(new Scan());