diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b74731fe0a2..3e752cd8484 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3339,7 +3339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALEdit walEdit = null; MultiVersionConcurrencyControl.WriteEntry writeEntry = null; long txid = 0; - boolean doRollBackMemstore = false; + boolean walSyncSuccess = true; boolean locked = false; int cellCount = 0; /** Keep track of the locks we hold so we can release them in finally clause */ @@ -3705,7 +3705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (updateSeqId) { updateSequenceId(familyMaps[i].values(), mvccNum); } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote addedSize += applyFamilyMapToMemstore(familyMaps[i]); } @@ -3721,11 +3720,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------- // STEP 7. Sync wal. // ------------------------- + walSyncSuccess = false; if (txid != 0) { syncOrDefer(txid, durability); } + walSyncSuccess = true; - doRollBackMemstore = false; // update memstore size this.addAndGetGlobalMemstoreSize(addedSize); @@ -3776,14 +3776,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi success = true; return addedSize; + } catch (Throwable t) { + // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL, + // and any replicated clusters. + if (!walSyncSuccess) { + rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t); + } + // Rethrow the exception. + throw t; } finally { - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for (int j = 0; j < familyMaps.length; j++) { - for(List cells:familyMaps[j].values()) { - rollbackMemstore(cells); - } - } + // if the wal sync was unsuccessful, complete the mvcc + if (!walSyncSuccess) { if (writeEntry != null) mvcc.complete(writeEntry); } else { if (writeEntry != null) { @@ -4255,33 +4258,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Store store = getStore(family); size += store.add(cells); } - return size; } - /** - * Remove all the keys listed in the map from the memstore. This method is - * called when a Put/Delete has updated memstore but subsequently fails to update - * the wal. This method is then invoked to rollback the memstore. - */ - private void rollbackMemstore(List memstoreCells) { - rollbackMemstore(null, memstoreCells); - } - - private void rollbackMemstore(final Store defaultStore, List memstoreCells) { - int kvsRolledback = 0; - for (Cell cell : memstoreCells) { - Store store = defaultStore; - if (store == null) { - byte[] family = CellUtil.cloneFamily(cell); - store = getStore(family); - } - store.rollback(cell); - kvsRolledback++; - } - LOG.debug("rollbackMemstore rolled back " + kvsRolledback); - } - @Override public void checkFamilies(Collection families) throws NoSuchColumnFamilyException { for (byte[] family : families) { @@ -7964,7 +7943,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.writeRequestsCount.increment(); RowLock rowLock = null; WALKey walKey = null; - boolean doRollBackMemstore = false; + boolean walSyncSuccess = true; try { rowLock = getRowLockInternal(row); assert rowLock != null; @@ -8092,7 +8071,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Actually write to Memstore now - doRollBackMemstore = !tempMemstore.isEmpty(); for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { @@ -8120,27 +8098,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock = null; } // sync the transaction log outside the rowlock + walSyncSuccess = false; if(txid != 0){ syncOrDefer(txid, durability); } + walSyncSuccess = true; if (rsServices != null && rsServices.getMetrics() != null) { rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor. getTableName()); } - doRollBackMemstore = false; + } catch (Throwable t) { + // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL, + // and any replicated clusters. + if (!walSyncSuccess) { + rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t); + } + // Rethrow the exception. + throw t; } finally { if (rowLock != null) { rowLock.release(); } // if the wal sync was unsuccessful, remove keys from memstore WriteEntry we = walKey != null? walKey.getWriteEntry(): null; - if (doRollBackMemstore) { - for (Map.Entry> entry: tempMemstore.entrySet()) { - rollbackMemstore(entry.getKey(), entry.getValue()); - } - for (Map.Entry> entry: removedCellsForMemStore.entrySet()) { - entry.getKey().add(entry.getValue()); - } + if (!walSyncSuccess) { if (we != null) { mvcc.complete(we); } @@ -8270,7 +8251,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException { RowLock rowLock = null; WALKey walKey = null; - boolean doRollBackMemstore = false; + boolean walSyncSuccess = true; long accumulatedResultSize = 0; List allKVs = new ArrayList(increment.size()); Map> removedCellsForMemStore = new HashMap<>(); @@ -8351,7 +8332,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Now write to memstore, a family at a time. - doRollBackMemstore = !forMemStore.isEmpty(); for (Map.Entry> entry: forMemStore.entrySet()) { Store store = entry.getKey(); List results = entry.getValue(); @@ -8375,24 +8355,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowLock.release(); rowLock = null; } + walSyncSuccess = false; // sync the transaction log outside the rowlock if(txid != 0) { syncOrDefer(txid, effectiveDurability); } - doRollBackMemstore = false; + walSyncSuccess = true; + } catch (Throwable t) { + // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL, + // and any replicated clusters. + if (!walSyncSuccess) { + rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t); + } + // Rethrow the exception. + throw t; } finally { if (rowLock != null) { rowLock.release(); } // if the wal sync was unsuccessful, remove keys from memstore WriteEntry we = walKey != null ? walKey.getWriteEntry() : null; - if (doRollBackMemstore) { - for (Map.Entry> entry: forMemStore.entrySet()) { - rollbackMemstore(entry.getKey(), entry.getValue()); - } - for (Map.Entry> entry: removedCellsForMemStore.entrySet()) { - entry.getKey().add(entry.getValue()); - } + if (!walSyncSuccess) { if (we != null) { mvcc.complete(we); } @@ -9342,4 +9325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public RegionSplitPolicy getSplitPolicy() { return this.splitPolicy; } + + public void setRegionServerServices(RegionServerServices services) { + this.rsServices = services; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java deleted file mode 100644 index f91adf42656..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.junit.AfterClass; -import org.junit.Assert; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -@Category(SmallTests.class) -public class TestRollbackFromClient { - @Rule - public TestName name = new TestName(); - private final static HBaseTestingUtility TEST_UTIL - = new HBaseTestingUtility(); - private static final byte[] FAMILY = Bytes.toBytes("testFamily"); - private static final int SLAVES = 3; - private static final byte[] ROW = Bytes.toBytes("testRow"); - private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2"); - private static final byte[] VALUE = Bytes.toBytes("testValue"); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName()); - TEST_UTIL.startMiniCluster(SLAVES); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testAppendRollback() throws IOException { - Updater updateForEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) { - try { - Append append = new Append(ROW); - append.add(FAMILY, QUALIFIER, VALUE); - append.add(FAMILY, QUALIFIER_V2, VALUE); - FailedHLog.SHOULD_FAIL.set(true); - table.append(append); - } catch (IOException e) { - // It should fail because the WAL fail also - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 0; - } - }; - testRollback(updateForEmptyTable, 1, null); - testRollback(updateForEmptyTable, 2, null); - - final Append preAppend = new Append(ROW); - preAppend.add(FAMILY, QUALIFIER, VALUE); - Cell initCell = preAppend.getCellList(FAMILY).get(0); - Updater updateForNonEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) throws IOException { - table.append(preAppend); - try { - Append append = new Append(ROW); - append.add(FAMILY, QUALIFIER, VALUE); - append.add(FAMILY, QUALIFIER_V2, VALUE); - FailedHLog.SHOULD_FAIL.set(true); - table.append(append); - Assert.fail("It should fail because the WAL sync is failed"); - } catch (IOException e) { - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 1; - } - }; - testRollback(updateForNonEmptyTable, 1, initCell); - testRollback(updateForNonEmptyTable, 2, initCell); - } - - @Test - public void testIncrementRollback() throws IOException { - Updater updateForEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) { - try { - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, QUALIFIER, 1); - inc.addColumn(FAMILY, QUALIFIER_V2, 2); - FailedHLog.SHOULD_FAIL.set(true); - table.increment(inc); - } catch (IOException e) { - // It should fail because the WAL fail also - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 0; - } - }; - testRollback(updateForEmptyTable, 1, null); - testRollback(updateForEmptyTable, 2, null); - - final Increment preIncrement = new Increment(ROW); - preIncrement.addColumn(FAMILY, QUALIFIER, 1); - Cell initCell = preIncrement.getCellList(FAMILY).get(0); - Updater updateForNonEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) throws IOException { - table.increment(preIncrement); - try { - Increment inc = new Increment(ROW); - inc.addColumn(FAMILY, QUALIFIER, 1); - inc.addColumn(FAMILY, QUALIFIER_V2, 2); - FailedHLog.SHOULD_FAIL.set(true); - table.increment(inc); - Assert.fail("It should fail because the WAL sync is failed"); - } catch (IOException e) { - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 1; - } - }; - testRollback(updateForNonEmptyTable, 1, initCell); - testRollback(updateForNonEmptyTable, 2, initCell); - } - - @Test - public void testPutRollback() throws IOException { - Updater updateForEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) { - try { - Put put = new Put(ROW); - put.addColumn(FAMILY, QUALIFIER, VALUE); - FailedHLog.SHOULD_FAIL.set(true); - table.put(put); - Assert.fail("It should fail because the WAL sync is failed"); - } catch (IOException e) { - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 0; - } - }; - testRollback(updateForEmptyTable, 1, null); - testRollback(updateForEmptyTable, 2, null); - - final Put prePut = new Put(ROW); - prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa")); - Cell preCell = prePut.getCellList(FAMILY).get(0); - Updater updateForNonEmptyTable = new Updater() { - @Override - public int updateData(Table table, byte[] family) throws IOException { - table.put(prePut); - try { - Put put = new Put(ROW); - put.addColumn(FAMILY, QUALIFIER, VALUE); - FailedHLog.SHOULD_FAIL.set(true); - table.put(put); - Assert.fail("It should fail because the WAL sync is failed"); - } catch (IOException e) { - } finally { - FailedHLog.SHOULD_FAIL.set(false); - } - return 1; - } - }; - testRollback(updateForNonEmptyTable, 1, preCell); - testRollback(updateForNonEmptyTable, 2, preCell); - } - - private void testRollback(Updater updater, int versions, Cell initCell) throws IOException { - TableName tableName = TableName.valueOf(this.name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor col = new HColumnDescriptor(FAMILY); - col.setMaxVersions(versions); - desc.addFamily(col); - TEST_UTIL.getHBaseAdmin().createTable(desc); - int expected; - List cells; - try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - Table table = conn.getTable(tableName)) { - expected = updater.updateData(table, FAMILY); - cells = getAllCells(table); - } - TEST_UTIL.getHBaseAdmin().disableTable(tableName); - TEST_UTIL.getHBaseAdmin().deleteTable(tableName); - assertEquals(expected, cells.size()); - if (initCell != null && cells.isEmpty()) { - Cell cell = cells.get(0); - assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell)); - assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell)); - assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell)); - assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell)); - } - } - - interface Updater { - int updateData(Table table, byte[] family) throws IOException; - } - - private static List getAllCells(Table table) throws IOException { - List cells = new ArrayList<>(); - try (ResultScanner scanner = table.getScanner(new Scan())) { - for (Result r : scanner) { - cells.addAll(r.listCells()); - } - return cells; - } - } - - public static class FailedDefaultWALProvider extends DefaultWALProvider { - @Override - public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException { - WAL wal = super.getWAL(identifier, namespace); - return new FailedHLog(wal); - } - } - - public static class FailedHLog implements WAL { - private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false); - private final WAL delegation; - FailedHLog(final WAL delegation) { - this.delegation = delegation; - } - @Override - public void registerWALActionsListener(WALActionsListener listener) { - delegation.registerWALActionsListener(listener); - } - - @Override - public boolean unregisterWALActionsListener(WALActionsListener listener) { - return delegation.unregisterWALActionsListener(listener); - } - - @Override - public byte[][] rollWriter() throws FailedLogCloseException, IOException { - return delegation.rollWriter(); - } - - @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - return delegation.rollWriter(force); - } - - @Override - public void shutdown() throws IOException { - delegation.shutdown(); - } - - @Override - public void close() throws IOException { - delegation.close(); - } - - @Override - public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { - return delegation.append(htd, info, key, edits, inMemstore); - } - - @Override - public void sync() throws IOException { - delegation.sync(); - } - - @Override - public void sync(long txid) throws IOException { - sync(txid, false); - } - - @Override - public void sync(boolean forceSync) throws IOException { - delegation.sync(forceSync); - } - - @Override - public void sync(long txid, boolean forceSync) throws IOException { - if (SHOULD_FAIL.get()) { - throw new IOException("[TESTING] we need the failure!!!"); - } - delegation.sync(txid, forceSync); - } - - @Override - public Long startCacheFlush(byte[] encodedRegionName, Set families) { - return delegation.startCacheFlush(encodedRegionName, families); - } - - @Override - public void completeCacheFlush(byte[] encodedRegionName) { - delegation.completeCacheFlush(encodedRegionName); - } - - @Override - public void abortCacheFlush(byte[] encodedRegionName) { - delegation.abortCacheFlush(encodedRegionName); - } - - @Override - public WALCoprocessorHost getCoprocessorHost() { - return delegation.getCoprocessorHost(); - } - - @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - return delegation.getEarliestMemstoreSeqNum(encodedRegionName); - } - - @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { - return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 863d514078c..19691de473f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -177,6 +177,7 @@ public class TestFailedAppendAndSync { boolean threwOnBoth = false; HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL); + region.setRegionServerServices(services); try { // Get some random bytes. byte[] value = Bytes.toBytes(getName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 338a07d31d2..8dab647481a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -38,6 +38,8 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -162,7 +164,6 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.FaultyFSLog; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -178,6 +179,7 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -311,57 +313,6 @@ public class TestHRegion { region = null; } - /* - * This test is for verifying memstore snapshot size is correctly updated in case of rollback - * See HBASE-10845 - */ - @Test (timeout=60000) - public void testMemstoreSnapshotSize() throws IOException { - class MyFaultyFSLog extends FaultyFSLog { - StoreFlushContext storeFlushCtx; - public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf) - throws IOException { - super(fs, rootDir, logName, conf); - } - - void setStoreFlushCtx(StoreFlushContext storeFlushCtx) { - this.storeFlushCtx = storeFlushCtx; - } - - @Override - public void sync(long txid, boolean forceSync) throws IOException { - storeFlushCtx.prepare(); - super.sync(txid, forceSync); - } - } - - FileSystem fs = FileSystem.get(CONF); - Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); - MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); - region = initHRegion(tableName, null, null, name.getMethodName(), - CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); - - Store store = region.getStore(COLUMN_FAMILY_BYTES); - // Get some random bytes. - byte [] value = Bytes.toBytes(name.getMethodName()); - faultyLog.setStoreFlushCtx(store.createFlushContext(12345)); - - Put put = new Put(value); - put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); - faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC); - - boolean threwIOE = false; - try { - region.put(put); - } catch (IOException ioe) { - threwIOE = true; - } finally { - assertTrue("The regionserver should have thrown an exception", threwIOE); - } - long sz = store.getFlushableSize(); - assertTrue("flushable size should be zero, but it is " + sz, sz == 0); - } - /** * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down */ @@ -551,6 +502,11 @@ public class TestHRegion { try { // Initialize region region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + RegionServerServices services = mock(RegionServerServices.class); + doNothing().when(services).abort(anyString(), Matchers.any()); + doReturn(ServerName.valueOf("fake-server", 0, 0L)). when(services).getServerName(); + region.setRegionServerServices(services); + long size = region.getMemstoreSize(); Assert.assertEquals(0, size); // Put one item into memstore. Measure the size of one item in memstore. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java new file mode 100644 index 00000000000..5554a685ef7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/* + Testing RS abort in case if sync fails/times out. + */ +@Category({MediumTests.class, RegionServerTests.class}) +public class TestFSHLogTimedOutSync { + private static final Log LOG = LogFactory.getLog(TestFSHLogTimedOutSync.class); + + @Rule public TestName name = new TestName(); + + private static final String COLUMN_FAMILY = "MyCF"; + private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); + private static final String COLUMN_QUALIFIER = "MyCQ"; + private static final byte [] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER); + private static HBaseTestingUtility TEST_UTIL; + private static Configuration CONF ; + private String dir; + + // Test names + protected TableName tableName; + + @Before + public void setup() throws IOException { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + CONF = TEST_UTIL.getConfiguration(); + // Disable block cache. + CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); + tableName = TableName.valueOf(name.getMethodName()); + } + + @After + public void tearDown() throws Exception { + LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); + TEST_UTIL.cleanupTestDir(); + } + + // Test that RS aborts in case of put, append and increment when sync fails or times out. + @Test(timeout=30000) + public void testRSAbortWhenSyncTimedOut() throws IOException { + // Dodgy WAL. Will throw exceptions when flags set. + class DodgyFSLog extends FSHLog { + volatile boolean throwSyncException = false; + + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + public void sync(long txid) throws IOException { + super.sync(txid); + if (throwSyncException) { + throw new TimeoutIOException("Exception"); + } + } + + @Override + public void sync(long txid, boolean force) throws IOException { + super.sync(txid, force); + if (throwSyncException) { + throw new TimeoutIOException("Exception"); + } + } + } + + // Make up mocked server and services. + RegionServerServices services = mock(RegionServerServices.class); + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + getName()); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL); + region.setRegionServerServices(services); + // Get some random bytes. + byte[] row = Bytes.toBytes(getName()); + byte[] value = Bytes.toBytes(getName()); + // Test Put operation + try { + dodgyWAL.throwSyncException = true; + Put put = new Put(row); + put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value); + region.put(put); + fail(); + } catch (IOException ioe) { + assertTrue(ioe instanceof TimeoutIOException); + } + // Verify that RS aborts + Mockito.verify(services, Mockito.times(1)). + abort(Mockito.anyString(), Mockito.anyObject()); + + // Test Append operation + try { + dodgyWAL.throwSyncException = true; + Append a = new Append(row); + a.setReturnResults(false); + a.add(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value); + region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); + fail(); + } catch (IOException ioe) { + assertTrue(ioe instanceof TimeoutIOException); + } + // Verify that RS aborts + Mockito.verify(services, Mockito.times(2)). + abort(Mockito.anyString(), Mockito.anyObject()); + + // Test Increment operation + try { + dodgyWAL.throwSyncException = true; + final Increment inc = new Increment(row); + inc.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("qual2"), 1); + region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); + fail(); + } catch (IOException ioe) { + assertTrue(ioe instanceof TimeoutIOException); + } + // Verify that RS aborts + Mockito.verify(services, Mockito.times(3)). + abort(Mockito.anyString(), Mockito.anyObject()); + } + + String getName() { + return name.getMethodName(); + } + + /** + * @return A region on which you must call + * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. + */ + public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, + Configuration conf, WAL wal) throws IOException { + return TEST_UTIL.createLocalHRegion(tableName.getName(), startKey, stopKey, + getName(), conf, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); + } +}