diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 9400a2cd9d1..74924747e16 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -262,7 +262,10 @@ public class RpcClientImpl extends AbstractRpcClient { try { Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span); } catch (IOException e) { - LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("call write error for call #" + cts.call.id + + ", message =" + e.getMessage()); + } cts.call.setException(e); markClosed(e); } @@ -1132,6 +1135,7 @@ public class RpcClientImpl extends AbstractRpcClient { * @throws InterruptedException * @throws IOException */ + @Override protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress addr) throws IOException, InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 9ea6bd70c1d..32df146d7bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; +import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -455,11 +456,8 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region); try { if (ZKUtil.checkExists(watcher, nodePath) == -1) { - HRegion r = recoveringRegions.remove(region); - if (r != null) { - r.setRecovering(false); - } - LOG.debug("Mark recovering region:" + region + " up."); + server.getExecutorService().submit( + new FinishRegionRecoveringHandler(server, region, nodePath)); } else { // current check is a defensive(or redundant) mechanism to prevent us from // having stale recovering regions in our internal RS memory state while @@ -583,7 +581,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements * Next part is related to WALSplitterHandler */ /** - * endTask() can fail and the only way to recover out of it is for the + * endTask() can fail and the only way to recover out of it is for the * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. * @param slt * @param ctr diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6c9c31c7449..a75b956afc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -269,6 +269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * replication. */ protected volatile long lastReplayedOpenRegionSeqId = -1L; + protected volatile long lastReplayedCompactionSeqId = -1L; /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for @@ -1153,6 +1154,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public void setRecovering(boolean newState) { boolean wasRecovering = this.isRecovering; + // before we flip the recovering switch (enabling reads) we should write the region open + // event to WAL if needed + if (wal != null && getRegionServerServices() != null && !writestate.readOnly + && wasRecovering && !newState) { + + // force a flush only if region replication is set up for this region. Otherwise no need. + boolean forceFlush = getTableDesc().getRegionReplication() > 1; + + // force a flush first + MonitoredTask status = TaskMonitor.get().createStatus( + "Flushing region " + this + " because recovery is finished"); + try { + if (forceFlush) { + internalFlushcache(status); + } + + status.setStatus("Writing region open event marker to WAL because recovery is finished"); + try { + long seqId = openSeqNum; + // obtain a new seqId because we possibly have writes and flushes on top of openSeqNum + if (wal != null) { + seqId = getNextSequenceId(wal); + } + writeRegionOpenMarker(wal, seqId); + } catch (IOException e) { + // We cannot rethrow this exception since we are being called from the zk thread. The + // region has already opened. In this case we log the error, but continue + LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening " + + "event to WAL, continueing", e); + } + } catch (IOException ioe) { + // Distributed log replay semantics does not necessarily require a flush, since the replayed + // data is already written again in the WAL. So failed flush should be fine. + LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush " + + "event to WAL, continueing", ioe); + } finally { + status.cleanup(); + } + } + this.isRecovering = newState; if (wasRecovering && !isRecovering) { // Call only when wal replay is over. @@ -2380,7 +2421,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @return Next sequence number unassociated with any actual edit. * @throws IOException */ - private long getNextSequenceId(final WAL wal) throws IOException { + @VisibleForTesting + protected long getNextSequenceId(final WAL wal) throws IOException { WALKey key = this.appendEmptyEdit(wal, null); return key.getSequenceId(); } @@ -4121,31 +4163,45 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // checkTargetRegion(compaction.getEncodedRegionName().toByteArray(), "Compaction marker from WAL ", compaction); - if (replaySeqId < lastReplayedOpenRegionSeqId) { - LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) - + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId " - + " of " + lastReplayedOpenRegionSeqId); - return; - } - - if (LOG.isDebugEnabled()) { - LOG.debug(getRegionInfo().getEncodedName() + " : " - + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)); - } - - startRegionOperation(Operation.REPLAY_EVENT); - try { - Store store = this.getStore(compaction.getFamilyName().toByteArray()); - if (store == null) { + synchronized (writestate) { + if (replaySeqId < lastReplayedOpenRegionSeqId) { LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Found Compaction WAL edit for deleted family:" - + Bytes.toString(compaction.getFamilyName().toByteArray())); + + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) + + " because its sequence id " + replaySeqId + " is smaller than this regions " + + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId); return; } - store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles); - } finally { - closeRegionOperation(Operation.REPLAY_EVENT); + if (replaySeqId < lastReplayedCompactionSeqId) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction) + + " because its sequence id " + replaySeqId + " is smaller than this regions " + + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId); + return; + } else { + lastReplayedCompactionSeqId = replaySeqId; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Replaying compaction marker " + TextFormat.shortDebugString(compaction) + + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId=" + + lastReplayedOpenRegionSeqId); + } + + startRegionOperation(Operation.REPLAY_EVENT); + try { + Store store = this.getStore(compaction.getFamilyName().toByteArray()); + if (store == null) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Found Compaction WAL edit for deleted family:" + + Bytes.toString(compaction.getFamilyName().toByteArray())); + return; + } + store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles); + logRegionFiles(); + } finally { + closeRegionOperation(Operation.REPLAY_EVENT); + } } } @@ -4184,6 +4240,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // TextFormat.shortDebugString(flush)); break; } + + logRegionFiles(); } finally { closeRegionOperation(Operation.REPLAY_EVENT); } @@ -4644,6 +4702,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // notifyAll(); // FindBugs NN_NAKED_NOTIFY } } + logRegionFiles(); } finally { closeRegionOperation(Operation.REPLAY_EVENT); } @@ -4849,6 +4908,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } + private void logRegionFiles() { + if (LOG.isTraceEnabled()) { + LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: "); + for (Store s : stores.values()) { + for (StoreFile sf : s.getStorefiles()) { + LOG.trace(getRegionInfo().getEncodedName() + " : " + sf); + } + } + } + } + /** Checks whether the given regionName is either equal to our region, or that * the regionName is the primary region to our corresponding range for the secondary replica. */ @@ -4953,6 +5023,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // for (StoreFile storeFile: store.getStorefiles()) { storeFileNames.add(storeFile.getPath().toString()); } + + logRegionFiles(); } } return storeFileNames; @@ -6116,7 +6188,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // checkClassLoading(); this.openSeqNum = initialize(reporter); this.setSequenceId(openSeqNum); - if (wal != null && getRegionServerServices() != null && !writestate.readOnly) { + if (wal != null && getRegionServerServices() != null && !writestate.readOnly + && !isRecovering) { + // Only write the region open event marker to WAL if (1) we are not read-only + // (2) dist log replay is off or we are not recovering. In case region is + // recovering, the open event will be written at setRecovering(false) writeRegionOpenMarker(wal, openSeqNum); } return this; @@ -7211,7 +7287,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // ClassSize.OBJECT + ClassSize.ARRAY + 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (12 * Bytes.SIZEOF_LONG) + + (13 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java new file mode 100644 index 00000000000..2ff3454fef6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/FinishRegionRecoveringHandler.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.handler; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +public class FinishRegionRecoveringHandler extends EventHandler { + private static final Log LOG = LogFactory.getLog(FinishRegionRecoveringHandler.class); + + protected final RegionServerServices rss; + protected final String regionName; + protected final String path; + + public FinishRegionRecoveringHandler(RegionServerServices rss, + String regionName, String path) { + // we are using the open region handlers, since this operation is in the region open lifecycle + super(rss, EventType.M_RS_OPEN_REGION); + this.rss = rss; + this.regionName = regionName; + this.path = path; + } + + @Override + public void process() throws IOException { + HRegion region = this.rss.getRecoveringRegions().remove(regionName); + if (region != null) { + region.setRecovering(false); + LOG.info(path + " deleted; " + regionName + " recovered."); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java new file mode 100644 index 00000000000..42b3b7bc39b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseWALEntryFilter.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A base class WALEntryFilter implementations. Protects against changes in the interface signature. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public abstract class BaseWALEntryFilter implements WALEntryFilter { +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 348136fcd50..559b7bf6c18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -46,9 +46,9 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; @@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; +import org.apache.hadoop.hbase.replication.BaseWALEntryFilter; +import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -73,6 +75,7 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; /** @@ -104,6 +107,44 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private ExecutorService pool; + /** + * Skips the entries which has original seqId. Only entries persisted via distributed log replay + * have their original seq Id fields set. + */ + private class SkipReplayedEditsFilter extends BaseWALEntryFilter { + @Override + public Entry filter(Entry entry) { + // if orig seq id is set, skip replaying the entry + if (entry.getKey().getOrigLogSeqNum() > 0) { + return null; + } + return entry; + } + } + + @Override + public WALEntryFilter getWALEntryfilter() { + WALEntryFilter superFilter = super.getWALEntryfilter(); + WALEntryFilter skipReplayedEditsFilter = getSkipReplayedEditsFilter(); + + if (superFilter == null) { + return skipReplayedEditsFilter; + } + + if (skipReplayedEditsFilter == null) { + return superFilter; + } + + ArrayList filters = Lists.newArrayList(); + filters.add(superFilter); + filters.add(skipReplayedEditsFilter); + return new ChainWALEntryFilter(filters); + } + + protected WALEntryFilter getSkipReplayedEditsFilter() { + return new SkipReplayedEditsFilter(); + } + @Override public void init(Context context) throws IOException { super.init(context); @@ -139,7 +180,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { @Override protected void doStart() { try { - connection = (ClusterConnection) HConnectionManager.createConnection(ctx.getConfiguration()); + connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); this.pool = getDefaultThreadPool(conf); outputSink = new RegionReplicaOutputSink(controller, entryBuffers, connection, pool, numWriterThreads, operationTimeout); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java index 720cedcee3c..f0d1e674b2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java @@ -350,7 +350,7 @@ public class WALPrettyPrinter { options.addOption("j", "json", false, "Output JSON"); options.addOption("p", "printvals", false, "Print values"); options.addOption("r", "region", true, - "Region to filter by. Pass region name; e.g. 'hbase:meta,,1'"); + "Region to filter by. Pass encoded region name; e.g. '9192caead6a5a20acb4454ffbc79fa14'"); options.addOption("s", "sequence", true, "Sequence to filter by. Pass sequence number."); options.addOption("w", "row", true, "Row to filter by. Pass row name."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java index a07bd2f6c81..5fff9d22600 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.zookeeper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; import org.apache.zookeeper.KeeperException; /** @@ -33,7 +33,7 @@ public class RecoveringRegionWatcher extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class); private HRegionServer server; - + /** * Construct a ZooKeeper event listener. */ @@ -47,6 +47,7 @@ public class RecoveringRegionWatcher extends ZooKeeperListener { * Called when a node has been deleted * @param path full path of the deleted node */ + @Override public void nodeDeleted(String path) { if (this.server.isStopped() || this.server.isStopping()) { return; @@ -58,12 +59,8 @@ public class RecoveringRegionWatcher extends ZooKeeperListener { } String regionName = path.substring(parentPath.length() + 1); - HRegion region = this.server.getRecoveringRegions().remove(regionName); - if (region != null) { - region.setRecovering(false); - } - LOG.info(path + " deleted; " + regionName + " recovered."); + server.getExecutorService().submit(new FinishRegionRecoveringHandler(server, regionName, path)); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index dfca4480818..785e669cf4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -131,6 +132,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -167,6 +169,7 @@ import org.mockito.Mockito; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.protobuf.ByteString; /** @@ -5804,6 +5807,109 @@ public class TestHRegion { } } + // Helper for test testOpenRegionWrittenToWALForLogReplay + static class HRegionWithSeqId extends HRegion { + public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs, + final Configuration confParam, final HRegionInfo regionInfo, + final HTableDescriptor htd, final RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + @Override + protected long getNextSequenceId(WAL wal) throws IOException { + return 42; + } + } + + @Test + @SuppressWarnings("unchecked") + public void testOpenRegionWrittenToWALForLogReplay() throws Exception { + // similar to the above test but with distributed log replay + final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay", + 100, 42); + final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); + + HTableDescriptor htd + = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWALForLogReplay")); + htd.addFamily(new HColumnDescriptor(fam1)); + htd.addFamily(new HColumnDescriptor(fam2)); + + HRegionInfo hri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); + + // open the region w/o rss and wal and flush some files + HRegion region = + HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL + .getConfiguration(), htd); + assertNotNull(region); + + // create a file in fam1 for the region before opening in OpenRegionHandler + region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1)); + region.flushcache(); + HBaseTestingUtility.closeRegionAndWAL(region); + + ArgumentCaptor editCaptor = ArgumentCaptor.forClass(WALEdit.class); + + // capture append() calls + WAL wal = mock(WAL.class); + when(rss.getWAL((HRegionInfo) any())).thenReturn(wal); + + // add the region to recovering regions + HashMap recoveringRegions = Maps.newHashMap(); + recoveringRegions.put(region.getRegionInfo().getEncodedName(), null); + when(rss.getRecoveringRegions()).thenReturn(recoveringRegions); + + try { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName()); + region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), + conf, rss, null); + + // verify that we have not appended region open event to WAL because this region is still + // recovering + verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() + , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + + // not put the region out of recovering state + new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo") + .prepare().process(); + + // now we should have put the entry + verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any() + , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List)any()); + + WALEdit edit = editCaptor.getValue(); + assertNotNull(edit); + assertNotNull(edit.getCells()); + assertEquals(1, edit.getCells().size()); + RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0)); + assertNotNull(desc); + + LOG.info("RegionEventDescriptor from WAL: " + desc); + + assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType()); + assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName())); + assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(), + hri.getEncodedNameAsBytes())); + assertTrue(desc.getLogSequenceNumber() > 0); + assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer())); + assertEquals(2, desc.getStoresCount()); + + StoreDescriptor store = desc.getStores(0); + assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1)); + assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1)); + assertEquals(1, store.getStoreFileCount()); // 1store file + assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative + + store = desc.getStores(1); + assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2)); + assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2)); + assertEquals(0, store.getStoreFileCount()); // no store files + + } finally { + HBaseTestingUtility.closeRegionAndWAL(region); + } + } + @Test @SuppressWarnings("unchecked") public void testCloseRegionWrittenToWAL() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java index 75c6967ff45..43c124ccf09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java @@ -95,7 +95,7 @@ public class TestRegionReplicaFailover { @Parameters public static Collection getParameters() { Object[][] params = - new Boolean[][] { {false} }; // TODO: enable dist log replay testing after HBASE-13121 + new Boolean[][] { {true}, {false} }; return Arrays.asList(params); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 9170d8d6777..9cbb0348243 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -54,9 +55,12 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.junit.After; import org.junit.AfterClass; @@ -158,7 +162,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster { } } - @Test + @Test (timeout = 240000) public void testReplayCallable() throws Exception { // tests replaying the edits to a secondary region replica using the Callable directly openRegion(HTU, rs0, hriSecondary); @@ -198,7 +202,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster { } } - @Test + @Test (timeout = 240000) public void testReplayCallableWithRegionMove() throws Exception { // tests replaying the edits to a secondary region replica using the Callable directly while // the region is moved to another location.It tests handling of RME. @@ -233,7 +237,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster { connection.close(); } - @Test + @Test (timeout = 240000) public void testRegionReplicaReplicationEndpointReplicate() throws Exception { // tests replaying the edits to a secondary region replica using the RRRE.replicate() openRegion(HTU, rs0, hriSecondary); @@ -263,4 +267,52 @@ public class TestRegionReplicaReplicationEndpointNoMaster { connection.close(); } + @Test (timeout = 240000) + public void testReplayedEditsAreSkipped() throws Exception { + openRegion(HTU, rs0, hriSecondary); + ClusterConnection connection = + (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); + RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); + + ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); + when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); + + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + when(mockPeer.getTableCFs()).thenReturn(null); + when(context.getReplicationPeer()).thenReturn(mockPeer); + + replicator.init(context); + replicator.start(); + + // test the filter for the RE, not actual replication + WALEntryFilter filter = replicator.getWALEntryfilter(); + + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + + Assert.assertEquals(1000, entries.size()); + for (Entry e: entries) { + if (Integer.parseInt(Bytes.toString(e.getEdit().getCells().get(0).getValue())) % 2 == 0) { + e.getKey().setOrigLogSeqNum(1); // simulate dist log replay by setting orig seq id + } + } + + long skipped = 0, replayed = 0; + for (Entry e : entries) { + if (filter.filter(e) == null) { + skipped++; + } else { + replayed++; + } + } + + assertEquals(500, skipped); + assertEquals(500, replayed); + + HTU.deleteNumericRows(table, f, 0, 1000); + closeRegion(HTU, rs0, hriSecondary); + connection.close(); + } + }