diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 57c0a163702..57f4cba5c06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -152,11 +152,12 @@ class ReplicationSourceWALReader extends Thread { addBatchToShippingQueue(batch); } } - } catch (IOException e) { // stream related + } catch (WALEntryFilterRetryableException | IOException e) { // stream related if (handleEofException(e, batch)) { sleepMultiplier = 1; } else { - LOG.warn("Failed to read stream of replication entries", e); + LOG.warn("Failed to read stream of replication entries " + + "or replication filter is recovering", e); if (sleepMultiplier < maxRetriesMultiplier) { sleepMultiplier++; } @@ -281,7 +282,7 @@ class ReplicationSourceWALReader extends Thread { * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(IOException e, WALEntryBatch batch) + private boolean handleEofException(Exception e, WALEntryBatch batch) throws InterruptedException { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryFilterRetryableException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryFilterRetryableException.java new file mode 100644 index 00000000000..f93f8b058b2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryFilterRetryableException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This exception should be thrown from any wal filter when the filter is expected + * to recover from the failures and it wants the replication to backup till it fails. + * There is special handling in replication wal reader to catch this exception and + * retry. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public class WALEntryFilterRetryableException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public WALEntryFilterRetryableException(String m, Throwable t) { + super(m, t); + } + + public WALEntryFilterRetryableException(String m) { + super(m); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index ae9bb676843..b7d2a084862 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -121,6 +121,7 @@ public class TestWALEntryStream { public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); CONF = TEST_UTIL.getConfiguration(); + CONF.setLong("replication.source.sleepforretries", 10); TEST_UTIL.startMiniDFSCluster(3); cluster = TEST_UTIL.getDFSCluster(); @@ -413,6 +414,17 @@ public class TestWALEntryStream { return reader; } + private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, + Configuration conf) { + ReplicationSource source = mockReplicationSource(false, conf); + when(source.isPeerEnabled()).thenReturn(true); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, conf, logQueue, 0, + getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); + reader.start(); + return reader; + } + @Test public void testReplicationSourceWALReader() throws Exception { appendEntriesToLogAndSync(3); @@ -445,6 +457,36 @@ public class TestWALEntryStream { assertEquals("foo", getRow(entryBatch.getWalEntries().get(0))); } + @Test + public void testReplicationSourceWALReaderWithFailingFilter() throws Exception { + appendEntriesToLogAndSync(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + // start up a reader + Path walPath = getQueue().peek(); + int numFailuresInFilter = 5; + ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter( + numFailuresInFilter, CONF); + WALEntryBatch entryBatch = reader.take(); + assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); + + // should've batched up our entries + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + } + @Test public void testReplicationSourceWALReaderRecovered() throws Exception { appendEntriesToLogAndSync(10); @@ -636,6 +678,32 @@ public class TestWALEntryStream { }; } + private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) { + return new FailingWALEntryFilter(numFailuresInFilter); + } + + public static class FailingWALEntryFilter implements WALEntryFilter { + private int numFailures = 0; + private static int countFailures = 0; + + public FailingWALEntryFilter(int numFailuresInFilter) { + numFailures = numFailuresInFilter; + } + + @Override + public Entry filter(Entry entry) { + if (countFailures == numFailures) { + return entry; + } + countFailures = countFailures + 1; + throw new WALEntryFilterRetryableException("failing filter"); + } + + public static int numFailures(){ + return countFailures; + } + } + class PathWatcher implements WALActionsListener { Path currentPath;