HBASE-25848: Add flexibility to backup replication in case replication filter throws an exception (#3283)
* HBASE-25848: Add flexibility to backup replication in case replication filter throws an exception
This commit is contained in:
parent
7c24ed4f45
commit
15e861169f
|
@ -152,11 +152,12 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
addBatchToShippingQueue(batch);
|
addBatchToShippingQueue(batch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) { // stream related
|
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
|
||||||
if (handleEofException(e, batch)) {
|
if (handleEofException(e, batch)) {
|
||||||
sleepMultiplier = 1;
|
sleepMultiplier = 1;
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Failed to read stream of replication entries", e);
|
LOG.warn("Failed to read stream of replication entries "
|
||||||
|
+ "or replication filter is recovering", e);
|
||||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||||
sleepMultiplier++;
|
sleepMultiplier++;
|
||||||
}
|
}
|
||||||
|
@ -281,7 +282,7 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
* logs from replication queue
|
* logs from replication queue
|
||||||
* @return true only the IOE can be handled
|
* @return true only the IOE can be handled
|
||||||
*/
|
*/
|
||||||
private boolean handleEofException(IOException e, WALEntryBatch batch)
|
private boolean handleEofException(Exception e, WALEntryBatch batch)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception should be thrown from any wal filter when the filter is expected
|
||||||
|
* to recover from the failures and it wants the replication to backup till it fails.
|
||||||
|
* There is special handling in replication wal reader to catch this exception and
|
||||||
|
* retry.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||||
|
public class WALEntryFilterRetryableException extends RuntimeException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public WALEntryFilterRetryableException(String m, Throwable t) {
|
||||||
|
super(m, t);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WALEntryFilterRetryableException(String m) {
|
||||||
|
super(m);
|
||||||
|
}
|
||||||
|
}
|
|
@ -121,6 +121,7 @@ public class TestWALEntryStream {
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
CONF = TEST_UTIL.getConfiguration();
|
CONF = TEST_UTIL.getConfiguration();
|
||||||
|
CONF.setLong("replication.source.sleepforretries", 10);
|
||||||
TEST_UTIL.startMiniDFSCluster(3);
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
|
||||||
cluster = TEST_UTIL.getDFSCluster();
|
cluster = TEST_UTIL.getDFSCluster();
|
||||||
|
@ -413,6 +414,17 @@ public class TestWALEntryStream {
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
|
||||||
|
Configuration conf) {
|
||||||
|
ReplicationSource source = mockReplicationSource(false, conf);
|
||||||
|
when(source.isPeerEnabled()).thenReturn(true);
|
||||||
|
ReplicationSourceWALReader reader =
|
||||||
|
new ReplicationSourceWALReader(fs, conf, logQueue, 0,
|
||||||
|
getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
|
||||||
|
reader.start();
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicationSourceWALReader() throws Exception {
|
public void testReplicationSourceWALReader() throws Exception {
|
||||||
appendEntriesToLogAndSync(3);
|
appendEntriesToLogAndSync(3);
|
||||||
|
@ -445,6 +457,36 @@ public class TestWALEntryStream {
|
||||||
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
|
assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicationSourceWALReaderWithFailingFilter() throws Exception {
|
||||||
|
appendEntriesToLogAndSync(3);
|
||||||
|
// get ending position
|
||||||
|
long position;
|
||||||
|
try (WALEntryStream entryStream =
|
||||||
|
new WALEntryStream(logQueue, CONF, 0, log, null,
|
||||||
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
|
entryStream.next();
|
||||||
|
entryStream.next();
|
||||||
|
entryStream.next();
|
||||||
|
position = entryStream.getPosition();
|
||||||
|
}
|
||||||
|
|
||||||
|
// start up a reader
|
||||||
|
Path walPath = getQueue().peek();
|
||||||
|
int numFailuresInFilter = 5;
|
||||||
|
ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter(
|
||||||
|
numFailuresInFilter, CONF);
|
||||||
|
WALEntryBatch entryBatch = reader.take();
|
||||||
|
assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
|
||||||
|
|
||||||
|
// should've batched up our entries
|
||||||
|
assertNotNull(entryBatch);
|
||||||
|
assertEquals(3, entryBatch.getWalEntries().size());
|
||||||
|
assertEquals(position, entryBatch.getLastWalPosition());
|
||||||
|
assertEquals(walPath, entryBatch.getLastWalPath());
|
||||||
|
assertEquals(3, entryBatch.getNbRowKeys());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicationSourceWALReaderRecovered() throws Exception {
|
public void testReplicationSourceWALReaderRecovered() throws Exception {
|
||||||
appendEntriesToLogAndSync(10);
|
appendEntriesToLogAndSync(10);
|
||||||
|
@ -636,6 +678,32 @@ public class TestWALEntryStream {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private WALEntryFilter getIntermittentFailingFilter(int numFailuresInFilter) {
|
||||||
|
return new FailingWALEntryFilter(numFailuresInFilter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FailingWALEntryFilter implements WALEntryFilter {
|
||||||
|
private int numFailures = 0;
|
||||||
|
private static int countFailures = 0;
|
||||||
|
|
||||||
|
public FailingWALEntryFilter(int numFailuresInFilter) {
|
||||||
|
numFailures = numFailuresInFilter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Entry filter(Entry entry) {
|
||||||
|
if (countFailures == numFailures) {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
countFailures = countFailures + 1;
|
||||||
|
throw new WALEntryFilterRetryableException("failing filter");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int numFailures(){
|
||||||
|
return countFailures;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class PathWatcher implements WALActionsListener {
|
class PathWatcher implements WALActionsListener {
|
||||||
|
|
||||||
Path currentPath;
|
Path currentPath;
|
||||||
|
|
Loading…
Reference in New Issue