HBASE-13121 Async wal replication for region replicas and dist log replay does not work together

This commit is contained in:
Enis Soztutar 2015-03-09 15:49:09 -07:00
parent 61cc8e0de1
commit 5025d3aa91
11 changed files with 404 additions and 46 deletions

View File

@ -262,7 +262,10 @@ public class RpcClientImpl extends AbstractRpcClient {
try {
Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
} catch (IOException e) {
LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("call write error for call #" + cts.call.id
+ ", message =" + e.getMessage());
}
cts.call.setException(e);
markClosed(e);
}
@ -1132,6 +1135,7 @@ public class RpcClientImpl extends AbstractRpcClient {
* @throws InterruptedException
* @throws IOException
*/
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
Message param, Message returnType, User ticket, InetSocketAddress addr)
throws IOException, InterruptedException {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -455,11 +456,8 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
try {
if (ZKUtil.checkExists(watcher, nodePath) == -1) {
HRegion r = recoveringRegions.remove(region);
if (r != null) {
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
server.getExecutorService().submit(
new FinishRegionRecoveringHandler(server, region, nodePath));
} else {
// current check is a defensive(or redundant) mechanism to prevent us from
// having stale recovering regions in our internal RS memory state while

View File

@ -269,6 +269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* replication.
*/
protected volatile long lastReplayedOpenRegionSeqId = -1L;
protected volatile long lastReplayedCompactionSeqId = -1L;
/**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
@ -1153,6 +1154,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public void setRecovering(boolean newState) {
boolean wasRecovering = this.isRecovering;
// before we flip the recovering switch (enabling reads) we should write the region open
// event to WAL if needed
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& wasRecovering && !newState) {
// force a flush only if region replication is set up for this region. Otherwise no need.
boolean forceFlush = getTableDesc().getRegionReplication() > 1;
// force a flush first
MonitoredTask status = TaskMonitor.get().createStatus(
"Flushing region " + this + " because recovery is finished");
try {
if (forceFlush) {
internalFlushcache(status);
}
status.setStatus("Writing region open event marker to WAL because recovery is finished");
try {
long seqId = openSeqNum;
// obtain a new seqId because we possibly have writes and flushes on top of openSeqNum
if (wal != null) {
seqId = getNextSequenceId(wal);
}
writeRegionOpenMarker(wal, seqId);
} catch (IOException e) {
// We cannot rethrow this exception since we are being called from the zk thread. The
// region has already opened. In this case we log the error, but continue
LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
+ "event to WAL, continueing", e);
}
} catch (IOException ioe) {
// Distributed log replay semantics does not necessarily require a flush, since the replayed
// data is already written again in the WAL. So failed flush should be fine.
LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
+ "event to WAL, continueing", ioe);
} finally {
status.cleanup();
}
}
this.isRecovering = newState;
if (wasRecovering && !isRecovering) {
// Call only when wal replay is over.
@ -2380,7 +2421,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @return Next sequence number unassociated with any actual edit.
* @throws IOException
*/
private long getNextSequenceId(final WAL wal) throws IOException {
@VisibleForTesting
protected long getNextSequenceId(final WAL wal) throws IOException {
WALKey key = this.appendEmptyEdit(wal, null);
return key.getSequenceId();
}
@ -4121,17 +4163,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
"Compaction marker from WAL ", compaction);
synchronized (writestate) {
if (replaySeqId < lastReplayedOpenRegionSeqId) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
+ " because its sequence id " + replaySeqId + " is smaller than this regions "
+ "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
return;
}
if (replaySeqId < lastReplayedCompactionSeqId) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ " because its sequence id " + replaySeqId + " is smaller than this regions "
+ "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
return;
} else {
lastReplayedCompactionSeqId = replaySeqId;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Replaying compaction marker " + TextFormat.shortDebugString(compaction));
+ "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
+ " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
+ lastReplayedOpenRegionSeqId);
}
startRegionOperation(Operation.REPLAY_EVENT);
@ -4144,10 +4198,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return;
}
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
logRegionFiles();
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
}
}
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
@ -4184,6 +4240,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
TextFormat.shortDebugString(flush));
break;
}
logRegionFiles();
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
@ -4644,6 +4702,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
}
logRegionFiles();
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
@ -4849,6 +4908,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
private void logRegionFiles() {
if (LOG.isTraceEnabled()) {
LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
for (Store s : stores.values()) {
for (StoreFile sf : s.getStorefiles()) {
LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
}
}
}
}
/** Checks whether the given regionName is either equal to our region, or that
* the regionName is the primary region to our corresponding range for the secondary replica.
*/
@ -4953,6 +5023,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
for (StoreFile storeFile: store.getStorefiles()) {
storeFileNames.add(storeFile.getPath().toString());
}
logRegionFiles();
}
}
return storeFileNames;
@ -6116,7 +6188,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
checkClassLoading();
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
if (wal != null && getRegionServerServices() != null && !writestate.readOnly) {
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& !isRecovering) {
// Only write the region open event marker to WAL if (1) we are not read-only
// (2) dist log replay is off or we are not recovering. In case region is
// recovering, the open event will be written at setRecovering(false)
writeRegionOpenMarker(wal, openSeqNum);
}
return this;
@ -7211,7 +7287,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
ClassSize.OBJECT +
ClassSize.ARRAY +
45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(12 * Bytes.SIZEOF_LONG) +
(13 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
public class FinishRegionRecoveringHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(FinishRegionRecoveringHandler.class);
protected final RegionServerServices rss;
protected final String regionName;
protected final String path;
public FinishRegionRecoveringHandler(RegionServerServices rss,
String regionName, String path) {
// we are using the open region handlers, since this operation is in the region open lifecycle
super(rss, EventType.M_RS_OPEN_REGION);
this.rss = rss;
this.regionName = regionName;
this.path = path;
}
@Override
public void process() throws IOException {
HRegion region = this.rss.getRecoveringRegions().remove(regionName);
if (region != null) {
region.setRecovering(false);
LOG.info(path + " deleted; " + regionName + " recovered.");
}
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A base class WALEntryFilter implementations. Protects against changes in the interface signature.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public abstract class BaseWALEntryFilter implements WALEntryFilter {
}

View File

@ -46,9 +46,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
@ -73,6 +75,7 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
/**
@ -104,6 +107,44 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private ExecutorService pool;
/**
* Skips the entries which has original seqId. Only entries persisted via distributed log replay
* have their original seq Id fields set.
*/
private class SkipReplayedEditsFilter extends BaseWALEntryFilter {
@Override
public Entry filter(Entry entry) {
// if orig seq id is set, skip replaying the entry
if (entry.getKey().getOrigLogSeqNum() > 0) {
return null;
}
return entry;
}
}
@Override
public WALEntryFilter getWALEntryfilter() {
WALEntryFilter superFilter = super.getWALEntryfilter();
WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter();
if (superFilter == null) {
return skipReplayedEditsFilter;
}
if (skipReplayedEditsFilter == null) {
return superFilter;
}
ArrayList<WALEntryFilter> filters = Lists.newArrayList();
filters.add(superFilter);
filters.add(skipReplayedEditsFilter);
return new ChainWALEntryFilter(filters);
}
protected WALEntryFilter getSkipReplayedEditsFilter() {
return new SkipReplayedEditsFilter();
}
@Override
public void init(Context context) throws IOException {
super.init(context);
@ -139,7 +180,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@Override
protected void doStart() {
try {
connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration());
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.pool = getDefaultThreadPool(conf);
outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool,
numWriterThreads, operationTimeout);

View File

@ -350,7 +350,7 @@ public class WALPrettyPrinter {
options.addOption("j", "json", false, "Output JSON");
options.addOption("p", "printvals", false, "Print values");
options.addOption("r", "region", true,
"Region to filter by. Pass region name; e.g. 'hbase:meta,,1'");
"Region to filter by. Pass encoded region name; e.g. '9192caead6a5a20acb4454ffbc79fa14'");
options.addOption("s", "sequence", true,
"Sequence to filter by. Pass sequence number.");
options.addOption("w", "row", true, "Row to filter by. Pass row name.");

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.zookeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
import org.apache.zookeeper.KeeperException;
/**
@ -47,6 +47,7 @@ public class RecoveringRegionWatcher extends ZooKeeperListener {
* Called when a node has been deleted
* @param path full path of the deleted node
*/
@Override
public void nodeDeleted(String path) {
if (this.server.isStopped() || this.server.isStopping()) {
return;
@ -58,12 +59,8 @@ public class RecoveringRegionWatcher extends ZooKeeperListener {
}
String regionName = path.substring(parentPath.length() + 1);
HRegion region = this.server.getRecoveringRegions().remove(regionName);
if (region != null) {
region.setRecovering(false);
}
LOG.info(path + " deleted; " + regionName + " recovered.");
server.getExecutorService().submit(new FinishRegionRecoveringHandler(server, regionName, path));
}
@Override

View File

@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -131,6 +132,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -167,6 +169,7 @@ import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
/**
@ -5804,6 +5807,109 @@ public class TestHRegion {
}
}
// Helper for test testOpenRegionWrittenToWALForLogReplay
static class HRegionWithSeqId extends HRegion {
public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
final Configuration confParam, final HRegionInfo regionInfo,
final HTableDescriptor htd, final RegionServerServices rsServices) {
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
}
@Override
protected long getNextSequenceId(WAL wal) throws IOException {
return 42;
}
}
@Test
@SuppressWarnings("unchecked")
public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
// similar to the above test but with distributed log replay
final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd
= new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWALForLogReplay"));
htd.addFamily(new HColumnDescriptor(fam1));
htd.addFamily(new HColumnDescriptor(fam2));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
// open the region w/o rss and wal and flush some files
HRegion region =
HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
.getConfiguration(), htd);
assertNotNull(region);
// create a file in fam1 for the region before opening in OpenRegionHandler
region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
region.flushcache();
HBaseTestingUtility.closeRegionAndWAL(region);
ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
// capture append() calls
WAL wal = mock(WAL.class);
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
// add the region to recovering regions
HashMap<String, HRegion> recoveringRegions = Maps.newHashMap();
recoveringRegions.put(region.getRegionInfo().getEncodedName(), null);
when(rss.getRecoveringRegions()).thenReturn(recoveringRegions);
try {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName());
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
conf, rss, null);
// verify that we have not appended region open event to WAL because this region is still
// recovering
verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
// not put the region out of recovering state
new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo")
.prepare().process();
// now we should have put the entry
verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
WALEdit edit = editCaptor.getValue();
assertNotNull(edit);
assertNotNull(edit.getCells());
assertEquals(1, edit.getCells().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
assertNotNull(desc);
LOG.info("RegionEventDescriptor from WAL: " + desc);
assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
hri.getEncodedNameAsBytes()));
assertTrue(desc.getLogSequenceNumber() > 0);
assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
assertEquals(2, desc.getStoresCount());
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
assertEquals(1, store.getStoreFileCount()); // 1store file
assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
store = desc.getStores(1);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
assertEquals(0, store.getStoreFileCount()); // no store files
} finally {
HBaseTestingUtility.closeRegionAndWAL(region);
}
}
@Test
@SuppressWarnings("unchecked")
public void testCloseRegionWrittenToWAL() throws Exception {

View File

@ -95,7 +95,7 @@ public class TestRegionReplicaFailover {
@Parameters
public static Collection<Object[]> getParameters() {
Object[][] params =
new Boolean[][] { {false} }; // TODO: enable dist log replay testing after HBASE-13121
new Boolean[][] { {true}, {false} };
return Arrays.asList(params);
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -54,9 +55,12 @@ import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.After;
import org.junit.AfterClass;
@ -158,7 +162,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
}
}
@Test
@Test (timeout = 240000)
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
@ -198,7 +202,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
}
}
@Test
@Test (timeout = 240000)
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
@ -233,7 +237,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
connection.close();
}
@Test
@Test (timeout = 240000)
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
@ -263,4 +267,52 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
connection.close();
}
@Test (timeout = 240000)
public void testReplayedEditsAreSkipped() throws Exception {
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
when(mockPeer.getTableCFs()).thenReturn(null);
when(context.getReplicationPeer()).thenReturn(mockPeer);
replicator.init(context);
replicator.start();
// test the filter for the RE, not actual replication
WALEntryFilter filter = replicator.getWALEntryfilter();
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
for (Entry e: entries) {
if (Integer.parseInt(Bytes.toString(e.getEdit().getCells().get(0).getValue())) % 2 == 0) {
e.getKey().setOrigLogSeqNum(1); // simulate dist log replay by setting orig seq id
}
}
long skipped = 0, replayed = 0;
for (Entry e : entries) {
if (filter.filter(e) == null) {
skipped++;
} else {
replayed++;
}
}
assertEquals(500, skipped);
assertEquals(500, replayed);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
connection.close();
}
}