HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Sandeep Pal 2021-02-19 10:15:45 -08:00 committed by GitHub
parent 2d26c94ef0
commit f9a91488b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 594 additions and 196 deletions

View File

@ -59,7 +59,7 @@ public interface ReplicationQueues {
* @param queueId a String that identifies the queue. * @param queueId a String that identifies the queue.
* @param filename name of the WAL * @param filename name of the WAL
*/ */
void removeLog(String queueId, String filename); void removeLog(String queueId, String filename) throws ReplicationSourceWithoutPeerException;
/** /**
* Set the current position for a specific WAL in a given queue. * Set the current position for a specific WAL in a given queue.

View File

@ -25,19 +25,19 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
/** /**
@ -132,17 +132,44 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} }
@Override @Override
public void removeLog(String queueId, String filename) { public void removeLog(String queueId, String filename)
throws ReplicationSourceWithoutPeerException {
try { try {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); try {
znode = ZKUtil.joinZNode(znode, filename); String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
ZKUtil.deleteNode(this.zookeeper, znode); znode = ZKUtil.joinZNode(znode, filename);
} catch (KeeperException e) { ZKUtil.deleteNode(this.zookeeper, znode);
this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" } catch (KeeperException.NoNodeException e) {
+ filename + ")", e); // in case of no node exception we should not crash the region server
// but instead check if the replication peer has been removed.
// If so, we can throw here so that the source can terminate itself.
// This situation can occur when the replication peer znodes has been
// removed but the sources not terminated due to any miss from zk node delete watcher.
if (!doesPeerExist(queueId)) {
LOG.warn("Replication peer " + queueId + " has been removed", e);
throw new ReplicationSourceWithoutPeerException(
"Znodes for peer has been delete while a source is still active", e);
} else {
throw e;
}
}
} catch (KeeperException ke) {
this.abortable.abort(
"Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", ke);
} }
} }
private boolean doesPeerExist(String queueId) throws KeeperException {
String peerId = queueId;
if (peerId.contains("-")) {
// queueId will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = queueId.split("-")[0];
}
return peerExists(peerId);
}
@Override @Override
public void setLogPosition(String queueId, String filename, long position) { public void setLogPosition(String queueId, String filename, long position) {
try { try {
@ -426,8 +453,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
// add delete op for peer // add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
if (LOG.isTraceEnabled()) if (LOG.isTraceEnabled()) {
LOG.trace(" The multi list size is: " + listOfOps.size()); LOG.trace(" The multi list size is: " + listOfOps.size());
}
} }
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
@ -506,7 +534,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} }
/** /**
* @param lockOwner * @param lockOwner lock owner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
* for use as content of an replication lock during region server fail over. * for use as content of an replication lock during region server fail over.
*/ */

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This exception is thrown when the replication source is running with no
* corresponding peer. This can due to race condition between PeersWatcher
* zk listerner and source trying to remove the queues, or if zk listener for
* delete node was never invoked for any reason. See HBASE-25583
*/
@InterfaceAudience.Private
public class ReplicationSourceWithoutPeerException extends ReplicationException {
private static final long serialVersionUID = 1L;
public ReplicationSourceWithoutPeerException(String m, Throwable t) {
super(m, t);
}
public ReplicationSourceWithoutPeerException(String m) {
super(m);
}
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@ -158,7 +159,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param clusterId unique UUID for the cluster * @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation * @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source * @param metrics metrics for replication source
* @throws IOException * @throws IOException IO Exception
*/ */
@Override @Override
public void init(final Configuration conf, final FileSystem fs, public void init(final Configuration conf, final FileSystem fs,
@ -441,7 +442,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override @Override
public Path getCurrentPath() { public Path getCurrentPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) { for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath(); if (worker.getCurrentPath() != null) {
return worker.getCurrentPath();
}
} }
return null; return null;
} }
@ -460,7 +463,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return 0; return 0;
} }
private boolean isSourceActive() { public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning; return !this.stopper.isStopped() && this.sourceRunning;
} }
@ -792,9 +795,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
private void updateLogPosition(long lastReadPosition) { private void updateLogPosition(long lastReadPosition) {
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition, try {
this.replicationQueueInfo.isQueueRecovered(), false); manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
lastLoggedPosition = lastReadPosition; this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
} catch (ReplicationSourceWithoutPeerException re) {
source.terminate("Replication peer is removed and source should terminate", re);
}
} }
public void startup() { public void startup() {
@ -976,7 +983,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
/** /**
* Set the worker state * Set the worker state
* @param state * @param state the state of the wal reader
*/ */
public void setWorkerState(WorkerState state) { public void setWorkerState(WorkerState state) {
this.state = state; this.state = state;

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@ -125,14 +126,14 @@ public class ReplicationSourceManager implements ReplicationListener {
/** /**
* Creates a replication manager and sets the watch on all the other registered region servers * Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues * @param replicationQueues the interface for manipulating replication queues
* @param replicationPeers * @param replicationPeers the replication peers maintenance class
* @param replicationTracker * @param replicationTracker the replication tracker to track the states
* @param conf the configuration to use * @param conf the configuration to use
* @param server the server for this region server * @param server the server for this region server
* @param fs the file system to use * @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs * @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived * @param oldLogDir the directory where old logs are archived
* @param clusterId * @param clusterId the cluster id of the source cluster
*/ */
public ReplicationSourceManager(final ReplicationQueues replicationQueues, public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker, final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
@ -181,14 +182,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* wal it belongs to and will log, for this region server, the current * wal it belongs to and will log, for this region server, the current
* position. It will also clean old logs from the queue. * position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from * @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries. * replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster * @param id id of the peer cluster
* @param position current location in the log * @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server * @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK * @param holdLogInZK if true then the log is retained in ZK
*/ */
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position, public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) { boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException {
String fileName = log.getName(); String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position); this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) { if (holdLogInZK) {
@ -204,7 +205,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param id id of the peer cluster * @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue * @param queueRecovered Whether this is a recovered queue
*/ */
public void cleanOldLogs(String key, String id, boolean queueRecovered) { public void cleanOldLogs(String key, String id, boolean queueRecovered)
throws ReplicationSourceWithoutPeerException {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key); String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) { if (queueRecovered) {
Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id); Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
@ -222,9 +224,10 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
} }
} }
} }
private void cleanOldLogs(SortedSet<String> wals, String key, String id) { private void cleanOldLogs(SortedSet<String> wals, String key, String id)
throws ReplicationSourceWithoutPeerException {
SortedSet<String> walSet = wals.headSet(key); SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet); LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
for (String wal : walSet) { for (String wal : walSet) {
@ -267,7 +270,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* need to enqueue the latest log of each wal group and do replication * need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster * @param id the id of the peer cluster
* @return the source that was created * @return the source that was created
* @throws IOException * @throws IOException IO Exception
*/ */
protected ReplicationSourceInterface addSource(String id) throws IOException, protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException { ReplicationException {
@ -365,7 +368,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/** /**
* Get the normal source for a given peer * Get the normal source for a given peer
* @param peerId * @param peerId the replication peer Id
* @return the normal source for the give peer if it exists, otherwise null. * @return the normal source for the give peer if it exists, otherwise null.
*/ */
public ReplicationSourceInterface getSource(String peerId) { public ReplicationSourceInterface getSource(String peerId) {
@ -402,7 +405,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Check and enqueue the given log to the correct source. If there's still no source for the * Check and enqueue the given log to the correct source. If there's still no source for the
* group to which the given log belongs, create one * group to which the given log belongs, create one
* @param logPath the log path to check and enqueue * @param logPath the log path to check and enqueue
* @throws IOException * @throws IOException IO Exception
*/ */
private void recordLog(Path logPath) throws IOException { private void recordLog(Path logPath) throws IOException {
String logName = logPath.getName(); String logName = logPath.getName();
@ -467,7 +470,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param server the server object for this region server * @param server the server object for this region server
* @param peerId the id of the peer cluster * @param peerId the id of the peer cluster
* @return the created source * @return the created source
* @throws IOException * @throws IOException IO Exception
*/ */
protected ReplicationSourceInterface getReplicationSource(final Configuration conf, protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager, final FileSystem fs, final ReplicationSourceManager manager,
@ -523,7 +526,8 @@ public class ReplicationSourceManager implements ReplicationListener {
clusterId, replicationEndpoint, metrics); clusterId, replicationEndpoint, metrics);
// init replication endpoint // init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), replicationEndpoint.init(new ReplicationEndpoint.Context(
conf, replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
return src; return src;
@ -535,7 +539,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* znodes and finally will delete the old znodes. * znodes and finally will delete the old znodes.
* *
* It creates one old source for any type of source of the old rs. * It creates one old source for any type of source of the old rs.
* @param rsZnode * @param rsZnode znode for region server from where to transfer the queues
*/ */
private void transferQueues(String rsZnode) { private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer = NodeFailoverWorker transfer =
@ -664,7 +668,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final UUID clusterId; private final UUID clusterId;
/** /**
* @param rsZnode * @param rsZnode znode for dead region server
*/ */
public NodeFailoverWorker(String rsZnode) { public NodeFailoverWorker(String rsZnode) {
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId); this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
@ -820,7 +824,9 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get the ReplicationPeers used by this ReplicationSourceManager * Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager
*/ */
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} public ReplicationPeers getReplicationPeers() {
return this.replicationPeers;
}
/** /**
* Get a string representation of all the sources' metrics * Get a string representation of all the sources' metrics

View File

@ -44,6 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
String peerClusterId; String peerClusterId;
Path currentPath; Path currentPath;
MetricsSource metrics; MetricsSource metrics;
public static final String fakeExceptionMessage = "Fake Exception";
@Override @Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy {
@Override
public void terminate(String reason) {
// This is to block the zk listener to close the queues
// to simulate the znodes getting deleted without zk listener getting invoked
throw new RuntimeException(fakeExceptionMessage);
}
}

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -104,7 +105,7 @@ public class TestReplicationSource {
private static Configuration conf = TEST_UTIL.getConfiguration(); private static Configuration conf = TEST_UTIL.getConfiguration();
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception exception
*/ */
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -112,9 +113,13 @@ public class TestReplicationSource {
FS = TEST_UTIL.getDFSCluster().getFileSystem(); FS = TEST_UTIL.getDFSCluster().getFileSystem();
Path rootDir = TEST_UTIL.createRootDir(); Path rootDir = TEST_UTIL.createRootDir();
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true); if (FS.exists(oldLogDir)) {
FS.delete(oldLogDir, true);
}
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
if (FS.exists(logDir)) FS.delete(logDir, true); if (FS.exists(logDir)) {
FS.delete(logDir, true);
}
} }
@Before @Before
@ -154,7 +159,7 @@ public class TestReplicationSource {
* Sanity check that we can move logs around while we are reading * Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard * from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived. * time reading logs that are being archived.
* @throws Exception * @throws Exception exception
*/ */
@Test @Test
public void testLogMoving() throws Exception{ public void testLogMoving() throws Exception{
@ -277,6 +282,14 @@ public class TestReplicationSource {
when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed); when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
} }
// source manager throws the exception while cleaning logs
private void setReplicationSourceWithoutPeerException()
throws ReplicationSourceWithoutPeerException {
doThrow(new ReplicationSourceWithoutPeerException("No peer")).when(manager)
.logPositionAndCleanOldLogs(Mockito.<Path>anyObject(), Mockito.anyString(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
}
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
throws IOException { throws IOException {
final ReplicationSource source = new ReplicationSource(); final ReplicationSource source = new ReplicationSource();
@ -470,6 +483,65 @@ public class TestReplicationSource {
assertThat(positionCaptor.getValue(), is(pos)); assertThat(positionCaptor.getValue(), is(pos));
} }
/**
* There can be a scenario of replication peer removed but the replication source
* still running since termination of source depends upon zk listener and there
* can a rare scenario where zk listener might not get invoked or get delayed.
* In that case, replication source manager will throw since it won't be able
* to remove the znode while removing the log. We should terminate the source
* in that case. See HBASE-25583
* @throws Exception any exception
*/
@Test
public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() throws Exception {
Mocks mocks = new Mocks();
mocks.setReplicationSourceWithoutPeerException();
// set table cfs to filter all cells out
final TableName replicatedTable = TableName.valueOf("replicated_table");
final Map<TableName, List<String>> cfs =
Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
when(mocks.peer.getTableCFs()).thenReturn(cfs);
// Append 3 entries in a log
final Path log1 = new Path(logDir, "log.1");
WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
appendEntries(writer1, 3);
// Replication end point with no filter
final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
@Override
public WALEntryFilter getWALEntryfilter() {
return null;
}
};
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
source.run();
source.enqueueLog(log1);
// Wait for source to replicate
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return endpoint.replicateCount.get() == 1;
}
});
// Wait for all the entries to get replicated
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return endpoint.lastEntries.size() == 3;
}
});
// After that the source should be terminated
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
// wait until reader read all cells
return !source.isSourceActive();
}
});
}
/** /**
* Tests that recovered queues are preserved on a regionserver shutdown. * Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192 * See HBASE-18192

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -53,7 +57,6 @@ public abstract class TestReplicationStateBasic {
protected static String KEY_TWO; protected static String KEY_TWO;
// For testing when we try to replicate to ourself // For testing when we try to replicate to ourself
protected String OUR_ID = "3";
protected String OUR_KEY; protected String OUR_KEY;
protected static int zkTimeoutCount; protected static int zkTimeoutCount;
@ -119,7 +122,6 @@ public abstract class TestReplicationStateBasic {
// 3 replicators should exist // 3 replicators should exist
assertEquals(3, rq1.getListOfReplicators().size()); assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus"); rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues(); rq1.removeAllQueues();
assertNull(rq1.getAllQueues()); assertNull(rq1.getAllQueues());
assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
@ -166,6 +168,19 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rq2.getListOfReplicators().size()); assertEquals(0, rq2.getListOfReplicators().size());
} }
@Test
public void testLogRemovalWithNoZnode() throws ReplicationException {
rq1.init(server1);
Exception expectedException = null;
try {
rq1.removeLog("bogus", "bogus");
} catch (ReplicationException e) {
expectedException = e;
}
assertTrue(expectedException instanceof ReplicationSourceWithoutPeerException);
}
@Test @Test
public void testInvalidClusterKeys() throws ReplicationException, KeeperException { public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
rp.init(); rp.init();

View File

@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;

View File

@ -0,0 +1,150 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class TestReplicationSourceBase {
private static final Log LOG =
LogFactory.getLog(TestReplicationSourceBase.class);
protected static Configuration conf;
protected static HBaseTestingUtility utility;
protected static Replication replication;
protected static ReplicationSourceManager manager;
protected static ZooKeeperWatcher zkw;
protected static HTableDescriptor htd;
protected static HRegionInfo hri;
protected static final byte[] r1 = Bytes.toBytes("r1");
protected static final byte[] r2 = Bytes.toBytes("r2");
protected static final byte[] f1 = Bytes.toBytes("f1");
protected static final byte[] f2 = Bytes.toBytes("f2");
protected static final TableName test = TableName.valueOf("test");
protected static final String slaveId = "1";
protected static FileSystem fs;
protected static Path oldLogDir;
protected static Path logDir;
protected static DummyServer server;
@BeforeClass public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(
conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
+ ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME);
server = new DummyServer(conf, "example.hostname.com", zkw);
replication = new Replication(server, fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
manager.addSource(slaveId);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1);
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
col = new HColumnDescriptor(f2);
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
@AfterClass public static void tearDownAfterClass() throws Exception {
try {
manager.join();
} catch (RuntimeException re) {
if (re.getMessage().equals(fakeExceptionMessage)) {
LOG.info("It is fine");
}
}
utility.shutdownMiniCluster();
}
@Rule public TestName testName = new TestName();
private void cleanLogDir() throws IOException {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
}
@Before public void setUp() throws Exception {
LOG.info("Start " + testName.getMethodName());
cleanLogDir();
}
@After public void tearDown() throws Exception {
LOG.info("End " + testName.getMethodName());
cleanLogDir();
}
}

View File

@ -47,9 +47,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -58,11 +56,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@ -78,6 +74,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -86,61 +83,25 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestReplicationSourceManager { public class TestReplicationSourceManager extends TestReplicationSourceBase {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestReplicationSourceManager.class); LogFactory.getLog(TestReplicationSourceManager.class);
private static Configuration conf;
private static HBaseTestingUtility utility;
private static Replication replication;
private static ReplicationSourceManager manager;
private static ZooKeeperWatcher zkw;
private static HTableDescriptor htd;
private static HRegionInfo hri;
private static final byte[] r1 = Bytes.toBytes("r1");
private static final byte[] r2 = Bytes.toBytes("r2");
private static final byte[] f1 = Bytes.toBytes("f1");
private static final byte[] f2 = Bytes.toBytes("f2");
private static final TableName test = private static final TableName test =
TableName.valueOf("test"); TableName.valueOf("test");
private static final String slaveId = "1"; private static final String slaveId = "1";
private static FileSystem fs;
private static Path oldLogDir;
private static Path logDir;
private static CountDownLatch latch; private static CountDownLatch latch;
private static List<String> files = new ArrayList<>();
private static List<String> files = new ArrayList<String>();
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -174,7 +135,8 @@ public class TestReplicationSourceManager {
HConstants.HREGION_OLDLOGDIR_NAME); HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(), logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME); HConstants.HREGION_LOGDIR_NAME);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); server = new DummyServer(conf, "example.hostname.com", zkw);
replication = new Replication(server, fs, logDir, oldLogDir);
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
manager.addSource(slaveId); manager.addSource(slaveId);
@ -196,26 +158,6 @@ public class TestReplicationSourceManager {
utility.shutdownMiniCluster(); utility.shutdownMiniCluster();
} }
@Rule
public TestName testName = new TestName();
private void cleanLogDir() throws IOException {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
}
@Before
public void setUp() throws Exception {
LOG.info("Start " + testName.getMethodName());
cleanLogDir();
}
@After
public void tearDown() throws Exception {
LOG.info("End " + testName.getMethodName());
cleanLogDir();
}
@Test @Test
public void testLogRoll() throws Exception { public void testLogRoll() throws Exception {
long baseline = 1000; long baseline = 1000;
@ -288,7 +230,7 @@ public class TestReplicationSourceManager {
@Test @Test
public void testClaimQueues() throws Exception { public void testClaimQueues() throws Exception {
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org"); final Server server = new DummyServer(conf, "hostname0.example.org", zkw);
ReplicationQueues rq = ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server); server);
@ -300,9 +242,9 @@ public class TestReplicationSourceManager {
rq.addLog("1", file); rq.addLog("1", file);
} }
// create 3 DummyServers // create 3 DummyServers
Server s1 = new DummyServer("dummyserver1.example.org"); Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
Server s2 = new DummyServer("dummyserver2.example.org"); Server s2 = new DummyServer(conf, "dummyserver2.example.org", zkw);
Server s3 = new DummyServer("dummyserver3.example.org"); Server s3 = new DummyServer(conf, "dummyserver3.example.org", zkw);
// create 3 DummyNodeFailoverWorkers // create 3 DummyNodeFailoverWorkers
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
@ -329,7 +271,7 @@ public class TestReplicationSourceManager {
@Test @Test
public void testCleanupFailoverQueues() throws Exception { public void testCleanupFailoverQueues() throws Exception {
final Server server = new DummyServer("hostname1.example.org"); final Server server = new DummyServer(conf, "hostname1.example.org", zkw);
ReplicationQueues rq = ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server); server);
@ -344,7 +286,7 @@ public class TestReplicationSourceManager {
for (String file : files) { for (String file : files) {
rq.addLog("1", file); rq.addLog("1", file);
} }
Server s1 = new DummyServer("dummyserver1.example.org"); Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
ReplicationQueues rq1 = ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString()); rq1.init(s1.getServerName().toString());
@ -368,7 +310,7 @@ public class TestReplicationSourceManager {
public void testNodeFailoverDeadServerParsing() throws Exception { public void testNodeFailoverDeadServerParsing() throws Exception {
LOG.debug("testNodeFailoverDeadServerParsing"); LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); final Server server = new DummyServer(conf, "ec2-54-234-230-108.compute-1.amazonaws.com", zkw);
ReplicationQueues repQueues = ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
repQueues.init(server.getServerName().toString()); repQueues.init(server.getServerName().toString());
@ -380,9 +322,9 @@ public class TestReplicationSourceManager {
} }
// create 3 DummyServers // create 3 DummyServers
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal"); Server s1 = new DummyServer(conf, "ip-10-8-101-114.ec2.internal", zkw);
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com"); Server s2 = new DummyServer(conf, "ec2-107-20-52-47.compute-1.amazonaws.com", zkw);
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); Server s3 = new DummyServer(conf, "ec2-23-20-187-167.compute-1.amazonaws.com", zkw);
// simulate three servers fail sequentially // simulate three servers fail sequentially
ReplicationQueues rq1 = ReplicationQueues rq1 =
@ -423,7 +365,7 @@ public class TestReplicationSourceManager {
LOG.debug("testFailoverDeadServerCversionChange"); LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org"); final Server s0 = new DummyServer(conf, "cversion-change0.example.org", zkw);
ReplicationQueues repQueues = ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
repQueues.init(s0.getServerName().toString()); repQueues.init(s0.getServerName().toString());
@ -434,7 +376,7 @@ public class TestReplicationSourceManager {
repQueues.addLog("1", file); repQueues.addLog("1", file);
} }
// simulate queue transfer // simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org"); Server s1 = new DummyServer(conf, "cversion-change1.example.org", zkw);
ReplicationQueues rq1 = ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString()); rq1.init(s1.getServerName().toString());
@ -459,7 +401,7 @@ public class TestReplicationSourceManager {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RU_INVOKE_RUN", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RU_INVOKE_RUN",
justification="Intended") justification="Intended")
public void testCleanupUnknownPeerZNode() throws Exception { public void testCleanupUnknownPeerZNode() throws Exception {
final Server server = new DummyServer("hostname2.example.org"); final Server server = new DummyServer(conf, "hostname2.example.org", zkw);
ReplicationQueues rq = ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server); server);
@ -521,16 +463,15 @@ public class TestReplicationSourceManager {
* corresponding ReplicationSourceInterface correctly cleans up the corresponding * corresponding ReplicationSourceInterface correctly cleans up the corresponding
* replication queue and ReplicationPeer. * replication queue and ReplicationPeer.
* See HBASE-16096. * See HBASE-16096.
* @throws Exception * @throws Exception exception
*/ */
@Test @Test
public void testPeerRemovalCleanup() throws Exception{ public void testPeerRemovalCleanup() throws Exception {
String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer"; final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig = final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"); new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try { try {
DummyServer server = new DummyServer();
final ReplicationQueues rq = final ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server); server);
@ -600,10 +541,10 @@ public class TestReplicationSourceManager {
/** /**
* Add a peer and wait for it to initialize * Add a peer and wait for it to initialize
* @param peerId * @param peerId the replication peer Id
* @param peerConfig * @param peerConfig the replication peer config
* @param waitForSource Whether to wait for replication source to initialize * @param waitForSource Whether to wait for replication source to initialize
* @throws Exception * @throws Exception exception
*/ */
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception { final boolean waitForSource) throws Exception {
@ -622,8 +563,8 @@ public class TestReplicationSourceManager {
/** /**
* Remove a peer and wait for it to get cleaned up * Remove a peer and wait for it to get cleaned up
* @param peerId * @param peerId the replication peer Id
* @throws Exception * @throws Exception exception
*/ */
private void removePeerAndWait(final String peerId) throws Exception { private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers(); final ReplicationPeers rp = manager.getReplicationPeers();
@ -639,7 +580,6 @@ public class TestReplicationSourceManager {
}); });
} }
private WALEdit getBulkLoadWALEdit() { private WALEdit getBulkLoadWALEdit() {
// 1. Create store files for the families // 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1); Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
@ -742,70 +682,4 @@ public class TestReplicationSourceManager {
throw new IOException("Failing deliberately"); throw new IOException("Failing deliberately");
} }
} }
static class DummyServer implements Server {
String hostname;
DummyServer() {
hostname = "hostname.example.org";
}
DummyServer(String hostname) {
this.hostname = hostname;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(hostname, 1234, 1L);
}
@Override
public void abort(String why, Throwable e) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean isStopped() {
return false; // To change body of implemented methods use File | Settings | File Templates.
}
@Override
public ChoreService getChoreService() {
return null;
}
}
} }

View File

@ -0,0 +1,85 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicationSourceBase {
/**
* When the peer is removed, hbase remove the peer znodes and there is zk watcher
* which terminates the replication sources. In case of zk watcher not getting invoked
* or a race condition between source deleting the log znode and zk watcher
* terminating the source, we might get the NoNode exception. In that case, the right
* thing is to terminate the replication source.
* @throws Exception throws exception
*/
@Test
public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(replication);
final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
manager.init();
final long txid = wal.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
edit, true);
wal.sync(txid);
wal.rollWriter();
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
ReplicationException exceptionThrown = null;
try {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
} catch (ReplicationException e) {
exceptionThrown = e;
}
Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);
}
}

View File

@ -0,0 +1,95 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver.helper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
public class DummyServer implements Server {
Configuration conf;
String hostname;
ZooKeeperWatcher zkw;
public DummyServer(Configuration conf, String hostname, ZooKeeperWatcher zkw) {
this.conf = conf;
this.hostname = hostname;
this.zkw = zkw;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(hostname, 1234, 1L);
}
@Override
public void abort(String why, Throwable e) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean isStopped() {
return false; // To change body of implemented methods use File | Settings | File Templates.
}
@Override
public ChoreService getChoreService() {
return null;
}
}