diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5a064f7f203..e6830efaa9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2206,7 +2206,8 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterIds(), now, this.htableDescriptor); + walEdit, mutation.getClusterIds(), now, this.htableDescriptor, + this.getCoprocessorHost()); } // ------------------------------- @@ -4531,9 +4532,9 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; // 7. Append no sync if (!walEdit.isEmpty()) { - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdit, - processor.getClusterIds(), now, this.htableDescriptor); + txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), + walEdit, processor.getClusterIds(), now, this.htableDescriptor, + this.getCoprocessorHost()); } // 8. Release region lock if (locked) { @@ -4760,7 +4761,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + this.htableDescriptor, this.getCoprocessorHost()); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } @@ -4908,7 +4909,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + this.htableDescriptor, this.getCoprocessorHost()); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b29bb719867..180677e0289 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -840,7 +841,7 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore); + append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore, null); } /** @@ -870,9 +871,9 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd, boolean doSync, - boolean isInMemstore) + private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, + RegionCoprocessorHost regionCoproHost) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -893,7 +894,7 @@ class FSHLog implements HLog, Syncable { byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); - doWrite(info, logKey, edits, htd); + doWrite(info, logKey, edits, htd, regionCoproHost); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { @@ -916,9 +917,10 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd) + List clusterIds, final long now, HTableDescriptor htd, + RegionCoprocessorHost regionCoproHost) throws IOException { - return append(info, tableName, edits, clusterIds, now, htd, false, true); + return append(info, tableName, edits, clusterIds, now, htd, false, true, regionCoproHost); } /** @@ -1203,7 +1205,7 @@ class FSHLog implements HLog, Syncable { // TODO: Remove info. Unused. protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, - HTableDescriptor htd) + HTableDescriptor htd, RegionCoprocessorHost regionCoproHost) throws IOException { if (!this.enabled) { return; @@ -1220,12 +1222,18 @@ class FSHLog implements HLog, Syncable { if (logEdit.isReplay()) { // set replication scope null so that this won't be replicated logKey.setScopes(null); + if(regionCoproHost != null) { + regionCoproHost.preWALRestore(info, logKey, logEdit); + } } // write to our buffer for the Hlog file. logSyncer.append(new FSHLog.Entry(logKey, logEdit)); } long took = EnvironmentEdgeManager.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); + if(logEdit.isReplay() && regionCoproHost != null ) { + regionCoproHost.postWALRestore(info, logKey, logEdit); + } long len = 0; for (KeyValue kv : logEdit.getKeyValues()) { len += kv.getLength(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 8528863edab..31b693e2a44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.io.Writable; @@ -300,7 +301,8 @@ public interface HLog { * @throws IOException */ public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd) throws IOException; + List clusterIds, final long now, HTableDescriptor htd, + RegionCoprocessorHost regionCoproHost) throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 22ac63d27fb..edf1c37e8a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableList; @@ -36,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; @@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -103,6 +106,15 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0); final AtomicInteger ctPreBatchMutate = new AtomicInteger(0); final AtomicInteger ctPostBatchMutate = new AtomicInteger(0); + final AtomicInteger ctPreWALRestore = new AtomicInteger(0); + final AtomicInteger ctPostWALRestore = new AtomicInteger(0); + + + final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false); + + public void setThrowOnPostFlush(Boolean val){ + throwOnPostFlush.set(val); + } @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -144,7 +156,7 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public InternalScanner preFlush(ObserverContext c, - Store store, InternalScanner scanner) { + Store store, InternalScanner scanner) throws IOException { ctPreFlush.incrementAndGet(); return scanner; } @@ -158,8 +170,11 @@ public class SimpleRegionObserver extends BaseRegionObserver { @Override public void postFlush(ObserverContext c, - Store store, StoreFile resultFile) { + Store store, StoreFile resultFile) throws IOException { ctPostFlush.incrementAndGet(); + if (throwOnPostFlush.get()){ + throw new IOException("throwOnPostFlush is true in postFlush"); + } } public boolean wasFlushed() { @@ -502,6 +517,19 @@ public class SimpleRegionObserver extends BaseRegionObserver { return hasLoaded; } + @Override + public void preWALRestore(ObserverContext env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + ctPreWALRestore.incrementAndGet(); + } + + @Override + public void postWALRestore(ObserverContext env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + ctPostWALRestore.incrementAndGet(); + } + + public boolean hadPreGet() { return ctPreGet.get() > 0; } @@ -666,4 +694,12 @@ public class SimpleRegionObserver extends BaseRegionObserver { public int getCtPostIncrement() { return ctPostIncrement.get(); } + + public int getCtPreWALRestore() { + return ctPreWALRestore.get(); + } + + public int getCtPostWALRestore() { + return ctPostWALRestore.get(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index b1f2c6ac947..6daa3896d71 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -92,6 +94,7 @@ public class TestRegionObserverInterface { public static void setupBeforeClass() throws Exception { // set configure to indicate which cp should be loaded Configuration conf = util.getConfiguration(); + conf.setBoolean("hbase.master.distributed.log.replay", true); conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver"); @@ -487,11 +490,66 @@ public class TestRegionObserverInterface { table.close(); } + @Test + public void testRecovery() throws Exception { + LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery"); + TableName tableName = TEST_TABLE; + + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + + JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); + ServerName sn2 = rs1.getRegionServer().getServerName(); + String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName(); + + util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes()); + while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){ + Thread.sleep(100); + } + + Put put = new Put(ROW); + put.add(A, A, A); + put.add(B, B, B); + put.add(C, C, C); + table.put(put); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + TEST_TABLE, + new Boolean[] {false, false, true, true, true, true, false} + ); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"}, + TEST_TABLE, + new Integer[] {0, 0, 1, 1}); + + cluster.killRegionServer(rs1.getRegionServer().getServerName()); + Threads.sleep(20000); // just to be sure that the kill has fully started. + util.waitUntilAllRegionsAssigned(tableName); + + verifyMethodResult(SimpleRegionObserver.class, + new String[]{"getCtPreWALRestore", "getCtPostWALRestore"}, + TEST_TABLE, + new Integer[]{1, 1}); + + verifyMethodResult(SimpleRegionObserver.class, + new String[]{"getCtPrePut", "getCtPostPut"}, + TEST_TABLE, + new Integer[]{0, 0}); + + util.deleteTable(tableName); + table.close(); + } + // check each region whether the coprocessor upcalls are called or not. - private void verifyMethodResult(Class c, String methodName[], TableName tableName, + private void verifyMethodResult(Class c, String methodName[], TableName tableName, Object value[]) throws IOException { try { for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { + if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()){ + continue; + } for (HRegionInfo r : ProtobufUtil.getOnlineRegions(t.getRegionServer())) { if (!r.getTable().equals(tableName)) { continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 941fa132d19..7afcf13e9f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -3849,8 +3849,8 @@ public class TestHRegion extends HBaseTestCase { //verify append called or not verify(log, expectAppend ? times(1) : never()) - .appendNoSync((HRegionInfo)any(), eq(tableName), - (WALEdit)any(), (List)any(), anyLong(), (HTableDescriptor)any()); + .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List)any(), + anyLong(), (HTableDescriptor)any(), (RegionCoprocessorHost)any()); //verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index f76a5bc8983..9c0704632ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -104,7 +104,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTable(), walEdit, - new ArrayList(), now, htd); + new ArrayList(), now, htd, null); } else { hlog.append(hri, hri.getTable(), walEdit, now, htd); } @@ -191,7 +191,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool LOG.info("Rolling after " + appends + " edits"); rollWriter(); } - super.doWrite(info, logKey, logEdit, htd); + super.doWrite(info, logKey, logEdit, htd, null); }; }; hlog.rollWriter();