HBASE-3130 [replication] ReplicationSource can't recover from session

expired on remote clusters (Chris Trezzo via JD)
   HBASE-4499  [replication] Source shouldn't update ZK if it didn't progress
               (Chris Trezzo via JD)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1177461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2011-09-29 23:33:39 +00:00
parent e2da35f666
commit b2f6235668
6 changed files with 167 additions and 34 deletions

View File

@ -316,6 +316,8 @@ Release 0.92.0 - Unreleased
HBASE-4455 Rolling restart RSs scenario, -ROOT-, .META. regions are lost in
AssignmentManager (Ming Ma)
HBASE-4513 NOTICES.txt refers to Facebook for Thrift
HBASE-3130 [replication] ReplicationSource can't recover from session
expired on remote clusters (Chris Trezzo via JD)
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected
@ -558,6 +560,8 @@ Release 0.92.0 - Unreleased
(David Revell)
HBASE-4454 Add failsafe plugin to build and rename integration tests
(Jesse Yates)
HBASE-4499 [replication] Source shouldn't update ZK if it didn't progress
(Chris Trezzo via JD)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -19,21 +19,25 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* This class acts as a wrapper for all the objects used to identify and
* communicate with remote peers. Everything needs to be created for objects
* of this class as it doesn't encapsulate any specific functionality e.g.
* it's a container class.
* communicate with remote peers and is responsible for answering to expired
* sessions and re-establishing the ZK connections.
*/
public class ReplicationPeer {
public class ReplicationPeer implements Abortable {
private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
private final String clusterKey;
private final String id;
@ -49,14 +53,13 @@ public class ReplicationPeer {
* @param conf configuration object to this peer
* @param key cluster key used to locate the peer
* @param id string representation of this peer's identifier
* @param zkw zookeeper connection to the peer
*/
public ReplicationPeer(Configuration conf, String key,
String id, ZooKeeperWatcher zkw) {
String id) throws IOException {
this.conf = conf;
this.clusterKey = key;
this.id = id;
this.zkw = zkw;
this.reloadZkWatcher();
}
/**
@ -116,4 +119,27 @@ public class ReplicationPeer {
public Configuration getConfiguration() {
return conf;
}
@Override
public void abort(String why, Throwable e) {
LOG.warn("The ReplicationPeer coresponding to peer " + clusterKey
+ " was aborted for the following reason(s):" + why, e);
}
/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
public void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
zkw = new ZooKeeperWatcher(conf,
"connection to cluster: " + id, this);
}
@Override
public boolean isAborted() {
// Currently the replication peer is never "Aborted", we just log when the
// abort method is called.
return false;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
/**
* This class serves as a helper for all things related to zookeeper
@ -210,8 +213,7 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
public List<ServerName> getSlavesAddresses(String peerClusterId)
throws KeeperException {
public List<ServerName> getSlavesAddresses(String peerClusterId) {
if (this.peerClusters.size() == 0) {
return new ArrayList<ServerName>(0);
}
@ -219,7 +221,27 @@ public class ReplicationZookeeper {
if (peer == null) {
return new ArrayList<ServerName>(0);
}
peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
List<ServerName> addresses;
try {
addresses = fetchSlavesAddresses(peer.getZkw());
} catch (KeeperException ke) {
if (ke instanceof ConnectionLossException
|| ke instanceof SessionExpiredException) {
LOG.warn(
"Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
ke);
try {
peer.reloadZkWatcher();
} catch(IOException io) {
LOG.warn(
"Creation of ZookeeperWatcher failed for peer "
+ peer.getClusterKey(), io);
}
}
addresses = Collections.emptyList();
}
peer.setRegionServers(addresses);
return peer.getRegionServers();
}
@ -229,13 +251,9 @@ public class ReplicationZookeeper {
* @return list of region server addresses or an empty list if the slave
* is unavailable
*/
private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
try {
return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
} catch (KeeperException e) {
LOG.warn("Cannot get peer's region server addresses", e);
return new ArrayList<ServerName>(0);
}
private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
}
/**
@ -318,10 +336,8 @@ public class ReplicationZookeeper {
return null;
}
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
"connection to cluster: " + peerId, this.abortable);
return new ReplicationPeer(otherConf, peerId,
otherClusterKey, zkw);
otherClusterKey);
}
/**

View File

@ -106,6 +106,8 @@ public class ReplicationSource extends Thread
private HLog.Reader reader;
// Current position in the log
private long position = 0;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
private FileSystem fs;
@ -210,10 +212,9 @@ public class ReplicationSource extends Thread
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() throws KeeperException {
private void chooseSinks() {
this.currentPeers.clear();
List<ServerName> addresses =
this.zkHelper.getSlavesAddresses(peerId);
List<ServerName> addresses = this.zkHelper.getSlavesAddresses(peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
@ -349,8 +350,11 @@ public class ReplicationSource extends Thread
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
@ -435,8 +439,6 @@ public class ReplicationSource extends Thread
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
LOG.error("Interrupted while trying to connect to sinks", e);
} catch (KeeperException e) {
LOG.error("Error talking to zookeeper, retrying", e);
}
}
}
@ -592,8 +594,11 @@ public class ReplicationSource extends Thread
HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
this.metrics.shippedBatchesRate.inc(1);
this.metrics.shippedOpsRate.inc(
@ -627,10 +632,7 @@ public class ReplicationSource extends Thread
} while (this.isActive() && down );
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster");
} catch (KeeperException e) {
LOG.error("Error talking to zookeeper, retrying", e);
}
}
}
}

View File

@ -1240,7 +1240,12 @@ public class HBaseTestingUtility {
}
public void expireSession(ZooKeeperWatcher nodeZK, Server server)
throws Exception {
throws Exception {
expireSession(nodeZK, server, false);
}
public void expireSession(ZooKeeperWatcher nodeZK, Server server,
boolean checkStatus) throws Exception {
Configuration c = new Configuration(this.conf);
String quorumServers = ZKConfig.getZKQuorumServersString(c);
int sessionTimeout = 5 * 1000; // 5 seconds
@ -1257,9 +1262,12 @@ public class HBaseTestingUtility {
Thread.sleep(sleep);
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME);
if (checkStatus) {
new HTable(new Configuration(conf), HConstants.META_TABLE_NAME);
}
}
/**
* Get the HBase cluster.
*

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestReplicationPeer {
private static final Log LOG = LogFactory.getLog(TestReplicationPeer.class);
private static HBaseTestingUtility utility;
private static Configuration conf;
private static ReplicationPeer rp;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
utility = new HBaseTestingUtility(conf);
conf = utility.getConfiguration();
utility.startMiniZKCluster();
rp = new ReplicationPeer(conf, "clusterKey", "clusterId");
}
@Test(timeout=300000)
public void testResetZooKeeperSession() throws Exception {
ZooKeeperWatcher zkw = rp.getZkw();
zkw.getRecoverableZooKeeper().exists("/1/2", false);
LOG.info("Expiring ReplicationPeer ZooKeeper session.");
utility.expireSession(zkw, null, false);
try {
LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session.");
// Trying to use the expired session to assert that it is indeed closed
zkw.getRecoverableZooKeeper().exists("/1/2", false);
} catch (SessionExpiredException k) {
rp.reloadZkWatcher();
zkw = rp.getZkw();
// Try to use the connection again
LOG.info("Attempting to use refreshed "
+ "ReplicationPeer ZooKeeper session.");
zkw.getRecoverableZooKeeper().exists("/1/2", false);
return;
}
Assert.fail("ReplicationPeer ZooKeeper session was not properly expired.");
}
}