hbase-9390: coprocessors observers are not called during a recovery with the new log replay algorithm - 1

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1523172 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jeffreyz 2013-09-14 01:44:53 +00:00
parent ed509f750c
commit 41f0f6f9d1
7 changed files with 127 additions and 22 deletions

View File

@ -2206,7 +2206,8 @@ public class HRegion implements HeapSize { // , Writable{
Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
walEdit, mutation.getClusterIds(), now, this.htableDescriptor,
this.getCoprocessorHost());
}
// -------------------------------
@ -4531,9 +4532,9 @@ public class HRegion implements HeapSize { // , Writable{
long txid = 0;
// 7. Append no sync
if (!walEdit.isEmpty()) {
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdit,
processor.getClusterIds(), now, this.htableDescriptor);
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, processor.getClusterIds(), now, this.htableDescriptor,
this.getCoprocessorHost());
}
// 8. Release region lock
if (locked) {
@ -4760,7 +4761,7 @@ public class HRegion implements HeapSize { // , Writable{
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
this.htableDescriptor, this.getCoprocessorHost());
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
@ -4908,7 +4909,7 @@ public class HRegion implements HeapSize { // , Writable{
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
this.htableDescriptor, this.getCoprocessorHost());
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.DrainBarrier;
@ -840,7 +841,7 @@ class FSHLog implements HLog, Syncable {
@Override
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore, null);
}
/**
@ -870,9 +871,9 @@ class FSHLog implements HLog, Syncable {
* @throws IOException
*/
@SuppressWarnings("deprecation")
private long append(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync,
boolean isInMemstore)
private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
RegionCoprocessorHost regionCoproHost)
throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();
if (this.closed) {
@ -893,7 +894,7 @@ class FSHLog implements HLog, Syncable {
byte [] encodedRegionName = info.getEncodedNameAsBytes();
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
doWrite(info, logKey, edits, htd);
doWrite(info, logKey, edits, htd, regionCoproHost);
this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
if (htd.isDeferredLogFlush()) {
@ -916,9 +917,10 @@ class FSHLog implements HLog, Syncable {
@Override
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd)
List<UUID> clusterIds, final long now, HTableDescriptor htd,
RegionCoprocessorHost regionCoproHost)
throws IOException {
return append(info, tableName, edits, clusterIds, now, htd, false, true);
return append(info, tableName, edits, clusterIds, now, htd, false, true, regionCoproHost);
}
/**
@ -1203,7 +1205,7 @@ class FSHLog implements HLog, Syncable {
// TODO: Remove info. Unused.
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
HTableDescriptor htd, RegionCoprocessorHost regionCoproHost)
throws IOException {
if (!this.enabled) {
return;
@ -1220,12 +1222,18 @@ class FSHLog implements HLog, Syncable {
if (logEdit.isReplay()) {
// set replication scope null so that this won't be replicated
logKey.setScopes(null);
if(regionCoproHost != null) {
regionCoproHost.preWALRestore(info, logKey, logEdit);
}
}
// write to our buffer for the Hlog file.
logSyncer.append(new FSHLog.Entry(logKey, logEdit));
}
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
coprocessorHost.postWALWrite(info, logKey, logEdit);
if(logEdit.isReplay() && regionCoproHost != null ) {
regionCoproHost.postWALRestore(info, logKey, logEdit);
}
long len = 0;
for (KeyValue kv : logEdit.getKeyValues()) {
len += kv.getLength();

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.io.Writable;
@ -300,7 +301,8 @@ public interface HLog {
* @throws IOException
*/
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
List<UUID> clusterIds, final long now, HTableDescriptor htd,
RegionCoprocessorHost regionCoproHost) throws IOException;
// TODO: Do we need all these versions of sync?
void hsync() throws IOException;

View File

@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableList;
@ -36,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -103,6 +106,15 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0);
final AtomicInteger ctPreBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
}
@Override
public void start(CoprocessorEnvironment e) throws IOException {
@ -144,7 +156,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, InternalScanner scanner) {
Store store, InternalScanner scanner) throws IOException {
ctPreFlush.incrementAndGet();
return scanner;
}
@ -158,8 +170,11 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) {
Store store, StoreFile resultFile) throws IOException {
ctPostFlush.incrementAndGet();
if (throwOnPostFlush.get()){
throw new IOException("throwOnPostFlush is true in postFlush");
}
}
public boolean wasFlushed() {
@ -502,6 +517,19 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return hasLoaded;
}
@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
HLogKey logKey, WALEdit logEdit) throws IOException {
ctPreWALRestore.incrementAndGet();
}
@Override
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
ctPostWALRestore.incrementAndGet();
}
public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
@ -666,4 +694,12 @@ public class SimpleRegionObserver extends BaseRegionObserver {
public int getCtPostIncrement() {
return ctPostIncrement.get();
}
public int getCtPreWALRestore() {
return ctPreWALRestore.get();
}
public int getCtPostWALRestore() {
return ctPostWALRestore.get();
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -92,6 +94,7 @@ public class TestRegionObserverInterface {
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setBoolean("hbase.master.distributed.log.replay", true);
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
@ -487,11 +490,66 @@ public class TestRegionObserverInterface {
table.close();
}
@Test
public void testRecovery() throws Exception {
LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery");
TableName tableName = TEST_TABLE;
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
ServerName sn2 = rs1.getRegionServer().getServerName();
String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
Thread.sleep(100);
}
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
table.put(put);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
TEST_TABLE,
new Boolean[] {false, false, true, true, true, true, false}
);
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
TEST_TABLE,
new Integer[] {0, 0, 1, 1});
cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(20000); // just to be sure that the kill has fully started.
util.waitUntilAllRegionsAssigned(tableName);
verifyMethodResult(SimpleRegionObserver.class,
new String[]{"getCtPreWALRestore", "getCtPostWALRestore"},
TEST_TABLE,
new Integer[]{1, 1});
verifyMethodResult(SimpleRegionObserver.class,
new String[]{"getCtPrePut", "getCtPostPut"},
TEST_TABLE,
new Integer[]{0, 0});
util.deleteTable(tableName);
table.close();
}
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class c, String methodName[], TableName tableName,
private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
Object value[]) throws IOException {
try {
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()){
continue;
}
for (HRegionInfo r : ProtobufUtil.getOnlineRegions(t.getRegionServer())) {
if (!r.getTable().equals(tableName)) {
continue;

View File

@ -3849,8 +3849,8 @@ public class TestHRegion extends HBaseTestCase {
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HRegionInfo)any(), eq(tableName),
(WALEdit)any(), (List<UUID>)any(), anyLong(), (HTableDescriptor)any());
.appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(),
anyLong(), (HTableDescriptor)any(), (RegionCoprocessorHost)any());
//verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {

View File

@ -104,7 +104,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
HRegionInfo hri = region.getRegionInfo();
if (this.noSync) {
hlog.appendNoSync(hri, hri.getTable(), walEdit,
new ArrayList<UUID>(), now, htd);
new ArrayList<UUID>(), now, htd, null);
} else {
hlog.append(hri, hri.getTable(), walEdit, now, htd);
}
@ -191,7 +191,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
LOG.info("Rolling after " + appends + " edits");
rollWriter();
}
super.doWrite(info, logKey, logEdit, htd);
super.doWrite(info, logKey, logEdit, htd, null);
};
};
hlog.rollWriter();