HBASE-13832 Procedure v2: try to roll the WAL master on sync failure before aborting

This commit is contained in:
Matteo Bertozzi 2015-07-09 08:34:42 -07:00
parent 87ee37ceb8
commit ae1f485ee8
8 changed files with 488 additions and 98 deletions

View File

@ -37,6 +37,11 @@ public interface ProcedureStore {
* The main process should register a listener and respond to the store events.
*/
public interface ProcedureStoreListener {
/**
* triggered when the store sync is completed.
*/
void postSync();
/**
* triggered when the store is not able to write out data.
* the main process should abort.

View File

@ -56,6 +56,14 @@ public abstract class ProcedureStoreBase implements ProcedureStore {
return listeners.remove(listener);
}
protected void sendPostSyncSignal() {
if (!this.listeners.isEmpty()) {
for (ProcedureStoreListener listener : this.listeners) {
listener.postSync();
}
}
}
protected void sendAbortProcessSignal() {
if (!this.listeners.isEmpty()) {
for (ProcedureStoreListener listener : this.listeners) {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedTransferQueue;
@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.util.Threads;
import com.google.common.annotations.VisibleForTesting;
/**
* WAL implementation of the ProcedureStore.
@ -64,7 +68,25 @@ public class WALProcedureStore extends ProcedureStoreBase {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
}
private static final int MAX_RETRIES_BEFORE_ABORT = 3;
private static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY =
"hbase.procedure.store.wal.max.retries.before.roll";
private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
private static final String WAIT_BEFORE_ROLL_CONF_KEY =
"hbase.procedure.store.wal.wait.before.roll";
private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
private static final String ROLL_RETRIES_CONF_KEY =
"hbase.procedure.store.wal.max.roll.retries";
private static final int DEFAULT_ROLL_RETRIES = 3;
private static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY =
"hbase.procedure.store.wal.sync.failure.roll.max";
private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
private static final String PERIODIC_ROLL_CONF_KEY =
"hbase.procedure.store.wal.periodic.roll.msec";
private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h
private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
@ -88,16 +110,22 @@ public class WALProcedureStore extends ProcedureStoreBase {
private final Path logDir;
private AtomicBoolean inSync = new AtomicBoolean(false);
private AtomicReference<Throwable> syncException = new AtomicReference<>();
private LinkedTransferQueue<ByteSlot> slotsCache = null;
private Set<ProcedureWALFile> corruptedLogs = null;
private AtomicLong totalSynced = new AtomicLong(0);
private AtomicLong lastRollTs = new AtomicLong(0);
private FSDataOutputStream stream = null;
private long lastRollTs = 0;
private long flushLogId = 0;
private int slotIndex = 0;
private Thread syncThread;
private ByteSlot[] slots;
private int maxRetriesBeforeRoll;
private int maxSyncFailureRoll;
private int waitBeforeRoll;
private int rollRetries;
private int periodicRollMsec;
private long rollThreshold;
private boolean useHsync;
private int syncWaitMsec;
@ -124,7 +152,13 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
// Tunings
maxRetriesBeforeRoll =
conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL);
maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL);
waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL);
rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES);
rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD);
periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL);
syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC);
useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC);
@ -132,11 +166,11 @@ public class WALProcedureStore extends ProcedureStoreBase {
syncThread = new Thread("WALProcedureStoreSyncThread") {
@Override
public void run() {
while (isRunning()) {
try {
syncLoop();
} catch (IOException e) {
LOG.error("Got an exception from the sync-loop", e);
try {
syncLoop();
} catch (Throwable e) {
LOG.error("Got an exception from the sync-loop", e);
if (!isSyncAborted()) {
sendAbortProcessSignal();
}
}
@ -155,6 +189,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
if (lock.tryLock()) {
try {
waitCond.signalAll();
syncCond.signalAll();
} finally {
lock.unlock();
}
@ -310,6 +345,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
// Update the store tracker
synchronized (storeTracker) {
storeTracker.insert(proc, subprocs);
if (logId == flushLogId) {
checkAndTryRoll();
}
}
}
@ -342,6 +380,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
storeTracker.update(proc);
if (logId == flushLogId) {
removeOldLogs = storeTracker.isUpdated();
checkAndTryRoll();
}
}
@ -377,8 +416,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
synchronized (storeTracker) {
storeTracker.delete(procId);
if (logId == flushLogId) {
if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
removeOldLogs = rollWriterOrDie();
if (storeTracker.isEmpty() || storeTracker.isUpdated()) {
removeOldLogs = checkAndTryRoll();
} else {
checkAndTryRoll();
}
}
}
@ -399,14 +440,23 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
private long pushData(final ByteSlot slot) {
assert isRunning() && !logs.isEmpty() : "recoverLease() must be called before inserting data";
long logId = -1;
if (!isRunning()) {
throw new RuntimeException("the store must be running before inserting data");
}
if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before inserting data");
}
long logId = -1;
lock.lock();
try {
// Wait for the sync to be completed
while (true) {
if (inSync.get()) {
if (!isRunning()) {
throw new RuntimeException("store no longer running");
} else if (isSyncAborted()) {
throw new RuntimeException("sync aborted", syncException.get());
} else if (inSync.get()) {
syncCond.await();
} else if (slotIndex == slots.length) {
slotCond.signal();
@ -434,72 +484,101 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendAbortProcessSignal();
throw new RuntimeException(e);
} finally {
lock.unlock();
if (isSyncAborted()) {
throw new RuntimeException("sync aborted", syncException.get());
}
}
return logId;
}
private void syncLoop() throws IOException {
private boolean isSyncAborted() {
return syncException.get() != null;
}
private void syncLoop() throws Throwable {
inSync.set(false);
while (isRunning()) {
lock.lock();
try {
// Wait until new data is available
if (slotIndex == 0) {
if (LOG.isTraceEnabled()) {
float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollTsSec)));
}
waitCond.await();
lock.lock();
try {
while (isRunning()) {
try {
// Wait until new data is available
if (slotIndex == 0) {
// no data.. probably a stop()
continue;
if (LOG.isTraceEnabled()) {
float rollTsSec = getMillisFromLastRoll() / 1000.0f;
LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)",
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollTsSec)));
}
waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
if (slotIndex == 0) {
// no data.. probably a stop()
checkAndTryRoll();
continue;
}
}
}
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
long syncWaitSt = System.currentTimeMillis();
if (slotIndex != slots.length) {
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
}
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
", slotIndex=" + slotIndex +
", totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
" " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
}
// Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing
long syncWaitSt = System.currentTimeMillis();
if (slotIndex != slots.length) {
slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS);
}
long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = getMillisFromLastRoll() / 1000.0f;
LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s/sec",
StringUtils.humanTimeDiff(syncWaitMs), slotIndex,
StringUtils.humanSize(totalSynced.get()),
StringUtils.humanSize(totalSynced.get() / rollSec)));
}
inSync.set(true);
totalSynced.addAndGet(syncSlots());
slotIndex = 0;
inSync.set(false);
syncCond.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendAbortProcessSignal();
} finally {
lock.unlock();
inSync.set(true);
totalSynced.addAndGet(syncSlots());
slotIndex = 0;
inSync.set(false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sendAbortProcessSignal();
syncException.compareAndSet(null, e);
throw e;
} catch (Throwable t) {
syncException.compareAndSet(null, t);
throw t;
} finally {
syncCond.signalAll();
}
}
} finally {
lock.unlock();
}
}
private long syncSlots() {
private long syncSlots() throws Throwable {
int retry = 0;
int logRolled = 0;
long totalSynced = 0;
do {
try {
totalSynced = syncSlots(stream, slots, 0, slotIndex);
break;
} catch (Throwable e) {
if (++retry == MAX_RETRIES_BEFORE_ABORT) {
LOG.error("Sync slot failed, abort.", e);
sendAbortProcessSignal();
if (++retry >= maxRetriesBeforeRoll) {
if (logRolled >= maxSyncFailureRoll) {
LOG.error("Sync slots after log roll failed, abort.", e);
sendAbortProcessSignal();
throw e;
}
if (!rollWriterOrDie()) {
throw e;
}
logRolled++;
retry = 0;
}
}
} while (isRunning());
@ -520,6 +599,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
} else {
stream.hflush();
}
sendPostSyncSignal();
if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + slots.length +
@ -528,14 +608,45 @@ public class WALProcedureStore extends ProcedureStoreBase {
return totalSynced;
}
private boolean rollWriterOrDie() {
try {
return rollWriter();
} catch (IOException e) {
LOG.warn("Unable to roll the log", e);
sendAbortProcessSignal();
return false;
@VisibleForTesting
public boolean rollWriterOrDie() {
for (int i = 1; i <= rollRetries; ++i) {
try {
if (rollWriter()) {
return true;
}
} catch (IOException e) {
LOG.warn("Unable to roll the log, attempt=" + i, e);
Threads.sleepWithoutInterrupt(waitBeforeRoll);
}
}
LOG.fatal("Unable to roll the log");
sendAbortProcessSignal();
throw new RuntimeException("unable to roll the log");
}
protected boolean checkAndTryRoll() {
if (!isRunning()) return false;
if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) {
try {
return rollWriter();
} catch (IOException e) {
LOG.warn("Unable to roll the log", e);
}
}
return false;
}
private long getMillisToNextPeriodicRoll() {
if (lastRollTs.get() > 0 && periodicRollMsec > 0) {
return periodicRollMsec - getMillisFromLastRoll();
}
return Long.MAX_VALUE;
}
private long getMillisFromLastRoll() {
return (System.currentTimeMillis() - lastRollTs.get());
}
protected boolean rollWriter() throws IOException {
@ -573,7 +684,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
stream = newStream;
flushLogId = logId;
totalSynced.set(0);
lastRollTs = System.currentTimeMillis();
lastRollTs.set(System.currentTimeMillis());
logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos));
} finally {
lock.unlock();

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -182,4 +184,34 @@ public class ProcedureTestingUtility {
assertTrue("expected abort exception, got "+ cause,
cause instanceof ProcedureAbortedException);
}
public static class TestProcedure extends Procedure<Void> {
public TestProcedure() {}
public TestProcedure(long procId, long parentId) {
setProcId(procId);
if (parentId > 0) {
setParentProcId(parentId);
}
}
public void addStackId(final int index) {
addStackIndex(index);
}
@Override
protected Procedure[] execute(Void env) { return null; }
@Override
protected void rollback(Void env) { }
@Override
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException { }
@Override
protected void deserializeStateData(final InputStream stream) throws IOException { }
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@ -355,36 +356,6 @@ public class TestWALProcedureStore {
});
}
public static class TestProcedure extends Procedure<Void> {
public TestProcedure() {}
public TestProcedure(long procId, long parentId) {
setProcId(procId);
if (parentId > 0) {
setParentProcId(parentId);
}
}
public void addStackId(final int index) {
addStackIndex(index);
}
@Override
protected Procedure[] execute(Void env) { return null; }
@Override
protected void rollback(Void env) { }
@Override
protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(final OutputStream stream) throws IOException { }
@Override
protected void deserializeStateData(final InputStream stream) throws IOException { }
}
private void corruptLog(final FileStatus logFile, final long dropBytes)
throws IOException {
assertTrue(logFile.getLen() > dropBytes);

View File

@ -74,6 +74,11 @@ public class MasterProcedureEnv {
this.master = master;
}
@Override
public void postSync() {
// no-op
}
@Override
public void abortProcess() {
master.abort("The Procedure Store lost the lease");

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@ -31,10 +32,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.TestWALProcedureStore.TestSequentialProcedure;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -53,7 +55,6 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -64,6 +65,11 @@ public class TestMasterFailoverWithProcedures {
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static void setupConf(Configuration conf) {
// don't waste time retrying with the roll, the test is already slow enough.
conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1);
conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0);
conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1);
conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1);
}
@Before
@ -94,6 +100,9 @@ public class TestMasterFailoverWithProcedures {
// Abort Latch for the master store
final CountDownLatch masterStoreAbort = new CountDownLatch(1);
masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.debug("Abort store of Master");
@ -113,6 +122,9 @@ public class TestMasterFailoverWithProcedures {
// Abort Latch for the test store
final CountDownLatch backupStore3Abort = new CountDownLatch(1);
backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.debug("Abort store of backupMaster3");
@ -127,8 +139,13 @@ public class TestMasterFailoverWithProcedures {
HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
LOG.debug("submit proc");
getMasterProcedureExecutor().submitProcedure(
new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
try {
getMasterProcedureExecutor().submitProcedure(
new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {
LOG.info("got " + e.getMessage());
}
LOG.debug("wait master store abort");
masterStoreAbort.await();
@ -140,10 +157,52 @@ public class TestMasterFailoverWithProcedures {
// wait the store in here to abort (the test will fail due to timeout if it doesn't)
LOG.debug("wait the store to abort");
backupStore3.getStoreTracker().setDeleted(1, false);
backupStore3.delete(1);
try {
backupStore3.delete(1);
fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {
LOG.info("got " + e.getMessage());
}
backupStore3Abort.await();
}
@Test(timeout=60000)
public void testWALfencingWithWALRolling() throws IOException {
final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
HMaster backupMaster3 = Mockito.mock(HMaster.class);
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
firstMaster.getMasterFileSystem().getFileSystem(),
((WALProcedureStore)procStore).getLogDir(),
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// start a second store which should fence the first one out
LOG.info("Starting new WALProcedureStore");
procStore2.start(1);
procStore2.recoverLease();
LOG.info("Inserting into second WALProcedureStore");
// insert something to the second store then delete it, causing a WAL roll
Procedure proc2 = new TestSequentialProcedure();
procStore2.insert(proc2, null);
procStore2.rollWriterOrDie();
LOG.info("Inserting into first WALProcedureStore");
// insert something to the first store
proc2 = new TestSequentialProcedure();
try {
procStore.insert(proc2, null);
fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {
LOG.info("got " + e.getMessage());
}
}
// ==========================================================================
// Test Create Table
// ==========================================================================

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({MasterTests.class, LargeTests.class})
public class TestWALProcedureStoreOnHDFS {
private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private WALProcedureStore store;
private static void setupConf(Configuration conf) {
conf.setInt("dfs.replication", 3);
conf.setInt("dfs.namenode.replication.min", 3);
// increase the value for slow test-env
conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
conf.setInt("hbase.procedure.store.wal.max.roll.retries", 5);
conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 5);
}
@Before
public void setup() throws Exception {
setupConf(UTIL.getConfiguration());
MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
store = ProcedureTestingUtility.createWalStore(
UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.fatal("Abort the Procedure Store");
store.stop(true);
}
});
store.start(8);
store.recoverLease();
}
@After
public void tearDown() throws Exception {
store.stop(false);
UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Test(timeout=60000, expected=RuntimeException.class)
public void testWalAbortOnLowReplication() throws Exception {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
store.insert(new TestProcedure(1, -1), null);
for (long i = 2; store.isRunning(); ++i) {
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
store.insert(new TestProcedure(i, -1), null);
Thread.sleep(100);
}
assertFalse(store.isRunning());
fail("The store.insert() should throw an exeption");
}
@Test(timeout=60000)
public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
store.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {
Threads.sleepWithoutInterrupt(2000);
}
@Override
public void abortProcess() {}
});
final AtomicInteger reCount = new AtomicInteger(0);
Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
for (int i = 0; i < thread.length; ++i) {
final long procId = i + 1;
thread[i] = new Thread() {
public void run() {
try {
LOG.debug("[S] INSERT " + procId);
store.insert(new TestProcedure(procId, -1), null);
LOG.debug("[E] INSERT " + procId);
} catch (RuntimeException e) {
reCount.incrementAndGet();
LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
}
}
};
thread[i].start();
}
Thread.sleep(1000);
LOG.info("Stop DataNode");
UTIL.getDFSCluster().stopDataNode(0);
assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
for (int i = 0; i < thread.length; ++i) {
thread[i].join();
}
assertFalse(store.isRunning());
assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
reCount.get() < thread.length);
}
@Test(timeout=60000)
public void testWalRollOnLowReplication() throws Exception {
int dnCount = 0;
store.insert(new TestProcedure(1, -1), null);
UTIL.getDFSCluster().restartDataNode(dnCount);
for (long i = 2; i < 100; ++i) {
store.insert(new TestProcedure(i, -1), null);
Thread.sleep(100);
if ((i % 30) == 0) {
LOG.info("Restart Data Node");
UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
}
}
assertTrue(store.isRunning());
}
}