HBASE-26195 Abort RS if wal sync fails or times out

This commit is contained in:
Rushabh Shah 2021-09-08 15:15:56 -04:00 committed by GitHub
parent 08ff55df71
commit d309276121
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 467 deletions

View File

@ -3339,7 +3339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALEdit walEdit = null;
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long txid = 0;
boolean doRollBackMemstore = false;
boolean walSyncSuccess = true;
boolean locked = false;
int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */
@ -3705,7 +3705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (updateSeqId) {
updateSequenceId(familyMaps[i].values(), mvccNum);
}
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
}
@ -3721,11 +3720,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// -------------------------
// STEP 7. Sync wal.
// -------------------------
walSyncSuccess = false;
if (txid != 0) {
syncOrDefer(txid, durability);
}
walSyncSuccess = true;
doRollBackMemstore = false;
// update memstore size
this.addAndGetGlobalMemstoreSize(addedSize);
@ -3776,14 +3776,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
success = true;
return addedSize;
} catch (Throwable t) {
// WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
// and any replicated clusters.
if (!walSyncSuccess) {
rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
}
// Rethrow the exception.
throw t;
} finally {
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
for (int j = 0; j < familyMaps.length; j++) {
for(List<Cell> cells:familyMaps[j].values()) {
rollbackMemstore(cells);
}
}
// if the wal sync was unsuccessful, complete the mvcc
if (!walSyncSuccess) {
if (writeEntry != null) mvcc.complete(writeEntry);
} else {
if (writeEntry != null) {
@ -4255,33 +4258,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Store store = getStore(family);
size += store.add(cells);
}
return size;
}
/**
* Remove all the keys listed in the map from the memstore. This method is
* called when a Put/Delete has updated memstore but subsequently fails to update
* the wal. This method is then invoked to rollback the memstore.
*/
private void rollbackMemstore(List<Cell> memstoreCells) {
rollbackMemstore(null, memstoreCells);
}
private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
int kvsRolledback = 0;
for (Cell cell : memstoreCells) {
Store store = defaultStore;
if (store == null) {
byte[] family = CellUtil.cloneFamily(cell);
store = getStore(family);
}
store.rollback(cell);
kvsRolledback++;
}
LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
}
@Override
public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
for (byte[] family : families) {
@ -7964,7 +7943,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.writeRequestsCount.increment();
RowLock rowLock = null;
WALKey walKey = null;
boolean doRollBackMemstore = false;
boolean walSyncSuccess = true;
try {
rowLock = getRowLockInternal(row);
assert rowLock != null;
@ -8092,7 +8071,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Actually write to Memstore now
doRollBackMemstore = !tempMemstore.isEmpty();
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
@ -8120,27 +8098,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock = null;
}
// sync the transaction log outside the rowlock
walSyncSuccess = false;
if(txid != 0){
syncOrDefer(txid, durability);
}
walSyncSuccess = true;
if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
getTableName());
}
doRollBackMemstore = false;
} catch (Throwable t) {
// WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
// and any replicated clusters.
if (!walSyncSuccess) {
rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
}
// Rethrow the exception.
throw t;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
if (doRollBackMemstore) {
for (Map.Entry<Store, List<Cell>> entry: tempMemstore.entrySet()) {
rollbackMemstore(entry.getKey(), entry.getValue());
}
for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
entry.getKey().add(entry.getValue());
}
if (!walSyncSuccess) {
if (we != null) {
mvcc.complete(we);
}
@ -8270,7 +8251,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
RowLock rowLock = null;
WALKey walKey = null;
boolean doRollBackMemstore = false;
boolean walSyncSuccess = true;
long accumulatedResultSize = 0;
List<Cell> allKVs = new ArrayList<Cell>(increment.size());
Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
@ -8351,7 +8332,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Now write to memstore, a family at a time.
doRollBackMemstore = !forMemStore.isEmpty();
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
Store store = entry.getKey();
List<Cell> results = entry.getValue();
@ -8375,24 +8355,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release();
rowLock = null;
}
walSyncSuccess = false;
// sync the transaction log outside the rowlock
if(txid != 0) {
syncOrDefer(txid, effectiveDurability);
}
doRollBackMemstore = false;
walSyncSuccess = true;
} catch (Throwable t) {
// WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
// and any replicated clusters.
if (!walSyncSuccess) {
rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
}
// Rethrow the exception.
throw t;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
WriteEntry we = walKey != null ? walKey.getWriteEntry() : null;
if (doRollBackMemstore) {
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
rollbackMemstore(entry.getKey(), entry.getValue());
}
for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
entry.getKey().add(entry.getValue());
}
if (!walSyncSuccess) {
if (we != null) {
mvcc.complete(we);
}
@ -9342,4 +9325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public RegionSplitPolicy getSplitPolicy() {
return this.splitPolicy;
}
public void setRegionServerServices(RegionServerServices services) {
this.rsServices = services;
}
}

View File

@ -1,361 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category(SmallTests.class)
public class TestRollbackFromClient {
@Rule
public TestName name = new TestName();
private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final int SLAVES = 3;
private static final byte[] ROW = Bytes.toBytes("testRow");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
private static final byte[] VALUE = Bytes.toBytes("testValue");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName());
TEST_UTIL.startMiniCluster(SLAVES);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testAppendRollback() throws IOException {
Updater updateForEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) {
try {
Append append = new Append(ROW);
append.add(FAMILY, QUALIFIER, VALUE);
append.add(FAMILY, QUALIFIER_V2, VALUE);
FailedHLog.SHOULD_FAIL.set(true);
table.append(append);
} catch (IOException e) {
// It should fail because the WAL fail also
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 0;
}
};
testRollback(updateForEmptyTable, 1, null);
testRollback(updateForEmptyTable, 2, null);
final Append preAppend = new Append(ROW);
preAppend.add(FAMILY, QUALIFIER, VALUE);
Cell initCell = preAppend.getCellList(FAMILY).get(0);
Updater updateForNonEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) throws IOException {
table.append(preAppend);
try {
Append append = new Append(ROW);
append.add(FAMILY, QUALIFIER, VALUE);
append.add(FAMILY, QUALIFIER_V2, VALUE);
FailedHLog.SHOULD_FAIL.set(true);
table.append(append);
Assert.fail("It should fail because the WAL sync is failed");
} catch (IOException e) {
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 1;
}
};
testRollback(updateForNonEmptyTable, 1, initCell);
testRollback(updateForNonEmptyTable, 2, initCell);
}
@Test
public void testIncrementRollback() throws IOException {
Updater updateForEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) {
try {
Increment inc = new Increment(ROW);
inc.addColumn(FAMILY, QUALIFIER, 1);
inc.addColumn(FAMILY, QUALIFIER_V2, 2);
FailedHLog.SHOULD_FAIL.set(true);
table.increment(inc);
} catch (IOException e) {
// It should fail because the WAL fail also
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 0;
}
};
testRollback(updateForEmptyTable, 1, null);
testRollback(updateForEmptyTable, 2, null);
final Increment preIncrement = new Increment(ROW);
preIncrement.addColumn(FAMILY, QUALIFIER, 1);
Cell initCell = preIncrement.getCellList(FAMILY).get(0);
Updater updateForNonEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) throws IOException {
table.increment(preIncrement);
try {
Increment inc = new Increment(ROW);
inc.addColumn(FAMILY, QUALIFIER, 1);
inc.addColumn(FAMILY, QUALIFIER_V2, 2);
FailedHLog.SHOULD_FAIL.set(true);
table.increment(inc);
Assert.fail("It should fail because the WAL sync is failed");
} catch (IOException e) {
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 1;
}
};
testRollback(updateForNonEmptyTable, 1, initCell);
testRollback(updateForNonEmptyTable, 2, initCell);
}
@Test
public void testPutRollback() throws IOException {
Updater updateForEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) {
try {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
FailedHLog.SHOULD_FAIL.set(true);
table.put(put);
Assert.fail("It should fail because the WAL sync is failed");
} catch (IOException e) {
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 0;
}
};
testRollback(updateForEmptyTable, 1, null);
testRollback(updateForEmptyTable, 2, null);
final Put prePut = new Put(ROW);
prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
Cell preCell = prePut.getCellList(FAMILY).get(0);
Updater updateForNonEmptyTable = new Updater() {
@Override
public int updateData(Table table, byte[] family) throws IOException {
table.put(prePut);
try {
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, VALUE);
FailedHLog.SHOULD_FAIL.set(true);
table.put(put);
Assert.fail("It should fail because the WAL sync is failed");
} catch (IOException e) {
} finally {
FailedHLog.SHOULD_FAIL.set(false);
}
return 1;
}
};
testRollback(updateForNonEmptyTable, 1, preCell);
testRollback(updateForNonEmptyTable, 2, preCell);
}
private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
TableName tableName = TableName.valueOf(this.name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor(FAMILY);
col.setMaxVersions(versions);
desc.addFamily(col);
TEST_UTIL.getHBaseAdmin().createTable(desc);
int expected;
List<Cell> cells;
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Table table = conn.getTable(tableName)) {
expected = updater.updateData(table, FAMILY);
cells = getAllCells(table);
}
TEST_UTIL.getHBaseAdmin().disableTable(tableName);
TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
assertEquals(expected, cells.size());
if (initCell != null && cells.isEmpty()) {
Cell cell = cells.get(0);
assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
}
}
interface Updater {
int updateData(Table table, byte[] family) throws IOException;
}
private static List<Cell> getAllCells(Table table) throws IOException {
List<Cell> cells = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(new Scan())) {
for (Result r : scanner) {
cells.addAll(r.listCells());
}
return cells;
}
}
public static class FailedDefaultWALProvider extends DefaultWALProvider {
@Override
public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
WAL wal = super.getWAL(identifier, namespace);
return new FailedHLog(wal);
}
}
public static class FailedHLog implements WAL {
private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
private final WAL delegation;
FailedHLog(final WAL delegation) {
this.delegation = delegation;
}
@Override
public void registerWALActionsListener(WALActionsListener listener) {
delegation.registerWALActionsListener(listener);
}
@Override
public boolean unregisterWALActionsListener(WALActionsListener listener) {
return delegation.unregisterWALActionsListener(listener);
}
@Override
public byte[][] rollWriter() throws FailedLogCloseException, IOException {
return delegation.rollWriter();
}
@Override
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
return delegation.rollWriter(force);
}
@Override
public void shutdown() throws IOException {
delegation.shutdown();
}
@Override
public void close() throws IOException {
delegation.close();
}
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
return delegation.append(htd, info, key, edits, inMemstore);
}
@Override
public void sync() throws IOException {
delegation.sync();
}
@Override
public void sync(long txid) throws IOException {
sync(txid, false);
}
@Override
public void sync(boolean forceSync) throws IOException {
delegation.sync(forceSync);
}
@Override
public void sync(long txid, boolean forceSync) throws IOException {
if (SHOULD_FAIL.get()) {
throw new IOException("[TESTING] we need the failure!!!");
}
delegation.sync(txid, forceSync);
}
@Override
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
return delegation.startCacheFlush(encodedRegionName, families);
}
@Override
public void completeCacheFlush(byte[] encodedRegionName) {
delegation.completeCacheFlush(encodedRegionName);
}
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
delegation.abortCacheFlush(encodedRegionName);
}
@Override
public WALCoprocessorHost getCoprocessorHost() {
return delegation.getCoprocessorHost();
}
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
}
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
}
}
}

View File

@ -177,6 +177,7 @@ public class TestFailedAppendAndSync {
boolean threwOnBoth = false;
HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
region.setRegionServerServices(services);
try {
// Get some random bytes.
byte[] value = Bytes.toBytes(getName());

View File

@ -38,6 +38,8 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -162,7 +164,6 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.FaultyFSLog;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@ -178,6 +179,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -311,57 +313,6 @@ public class TestHRegion {
region = null;
}
/*
* This test is for verifying memstore snapshot size is correctly updated in case of rollback
* See HBASE-10845
*/
@Test (timeout=60000)
public void testMemstoreSnapshotSize() throws IOException {
class MyFaultyFSLog extends FaultyFSLog {
StoreFlushContext storeFlushCtx;
public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
throws IOException {
super(fs, rootDir, logName, conf);
}
void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
this.storeFlushCtx = storeFlushCtx;
}
@Override
public void sync(long txid, boolean forceSync) throws IOException {
storeFlushCtx.prepare();
super.sync(txid, forceSync);
}
}
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
region = initHRegion(tableName, null, null, name.getMethodName(),
CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
Store store = region.getStore(COLUMN_FAMILY_BYTES);
// Get some random bytes.
byte [] value = Bytes.toBytes(name.getMethodName());
faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
Put put = new Put(value);
put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
boolean threwIOE = false;
try {
region.put(put);
} catch (IOException ioe) {
threwIOE = true;
} finally {
assertTrue("The regionserver should have thrown an exception", threwIOE);
}
long sz = store.getFlushableSize();
assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
}
/**
* Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
*/
@ -551,6 +502,11 @@ public class TestHRegion {
try {
// Initialize region
region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
RegionServerServices services = mock(RegionServerServices.class);
doNothing().when(services).abort(anyString(), Matchers.<Throwable>any());
doReturn(ServerName.valueOf("fake-server", 0, 0L)). when(services).getServerName();
region.setRegionServerServices(services);
long size = region.getMemstoreSize();
Assert.assertEquals(0, size);
// Put one item into memstore. Measure the size of one item in memstore.

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
/*
Testing RS abort in case if sync fails/times out.
*/
@Category({MediumTests.class, RegionServerTests.class})
public class TestFSHLogTimedOutSync {
private static final Log LOG = LogFactory.getLog(TestFSHLogTimedOutSync.class);
@Rule public TestName name = new TestName();
private static final String COLUMN_FAMILY = "MyCF";
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
private static final String COLUMN_QUALIFIER = "MyCQ";
private static final byte [] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
private static HBaseTestingUtility TEST_UTIL;
private static Configuration CONF ;
private String dir;
// Test names
protected TableName tableName;
@Before
public void setup() throws IOException {
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
// Disable block cache.
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
tableName = TableName.valueOf(name.getMethodName());
}
@After
public void tearDown() throws Exception {
LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
TEST_UTIL.cleanupTestDir();
}
// Test that RS aborts in case of put, append and increment when sync fails or times out.
@Test(timeout=30000)
public void testRSAbortWhenSyncTimedOut() throws IOException {
// Dodgy WAL. Will throw exceptions when flags set.
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}
@Override
public void sync(long txid) throws IOException {
super.sync(txid);
if (throwSyncException) {
throw new TimeoutIOException("Exception");
}
}
@Override
public void sync(long txid, boolean force) throws IOException {
super.sync(txid, force);
if (throwSyncException) {
throw new TimeoutIOException("Exception");
}
}
}
// Make up mocked server and services.
RegionServerServices services = mock(RegionServerServices.class);
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
region.setRegionServerServices(services);
// Get some random bytes.
byte[] row = Bytes.toBytes(getName());
byte[] value = Bytes.toBytes(getName());
// Test Put operation
try {
dodgyWAL.throwSyncException = true;
Put put = new Put(row);
put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
region.put(put);
fail();
} catch (IOException ioe) {
assertTrue(ioe instanceof TimeoutIOException);
}
// Verify that RS aborts
Mockito.verify(services, Mockito.times(1)).
abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
// Test Append operation
try {
dodgyWAL.throwSyncException = true;
Append a = new Append(row);
a.setReturnResults(false);
a.add(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
fail();
} catch (IOException ioe) {
assertTrue(ioe instanceof TimeoutIOException);
}
// Verify that RS aborts
Mockito.verify(services, Mockito.times(2)).
abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
// Test Increment operation
try {
dodgyWAL.throwSyncException = true;
final Increment inc = new Increment(row);
inc.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("qual2"), 1);
region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
fail();
} catch (IOException ioe) {
assertTrue(ioe instanceof TimeoutIOException);
}
// Verify that RS aborts
Mockito.verify(services, Mockito.times(3)).
abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
}
String getName() {
return name.getMethodName();
}
/**
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
Configuration conf, WAL wal) throws IOException {
return TEST_UTIL.createLocalHRegion(tableName.getName(), startKey, stopKey,
getName(), conf, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
}
}