From b2f62356685f34e56796fde6c0c0f97c1c1413c5 Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Thu, 29 Sep 2011 23:33:39 +0000 Subject: [PATCH] HBASE-3130 [replication] ReplicationSource can't recover from session expired on remote clusters (Chris Trezzo via JD) HBASE-4499 [replication] Source shouldn't update ZK if it didn't progress (Chris Trezzo via JD) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1177461 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../hbase/replication/ReplicationPeer.java | 40 ++++++++-- .../replication/ReplicationZookeeper.java | 42 ++++++---- .../regionserver/ReplicationSource.java | 26 ++++--- .../hadoop/hbase/HBaseTestingUtility.java | 12 ++- .../replication/TestReplicationPeer.java | 77 +++++++++++++++++++ 6 files changed, 167 insertions(+), 34 deletions(-) create mode 100644 src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java diff --git a/CHANGES.txt b/CHANGES.txt index c59fc9eaa16..3ecedec52c4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -316,6 +316,8 @@ Release 0.92.0 - Unreleased HBASE-4455 Rolling restart RSs scenario, -ROOT-, .META. regions are lost in AssignmentManager (Ming Ma) HBASE-4513 NOTICES.txt refers to Facebook for Thrift + HBASE-3130 [replication] ReplicationSource can't recover from session + expired on remote clusters (Chris Trezzo via JD) TESTS HBASE-4450 test for number of blocks read: to serve as baseline for expected @@ -558,6 +560,8 @@ Release 0.92.0 - Unreleased (David Revell) HBASE-4454 Add failsafe plugin to build and rename integration tests (Jesse Yates) + HBASE-4499 [replication] Source shouldn't update ZK if it didn't progress + (Chris Trezzo via JD) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 6de47e63243..6495207bf7e 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -19,21 +19,25 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * This class acts as a wrapper for all the objects used to identify and - * communicate with remote peers. Everything needs to be created for objects - * of this class as it doesn't encapsulate any specific functionality e.g. - * it's a container class. + * communicate with remote peers and is responsible for answering to expired + * sessions and re-establishing the ZK connections. */ -public class ReplicationPeer { +public class ReplicationPeer implements Abortable { + private static final Log LOG = LogFactory.getLog(ReplicationPeer.class); private final String clusterKey; private final String id; @@ -49,14 +53,13 @@ public class ReplicationPeer { * @param conf configuration object to this peer * @param key cluster key used to locate the peer * @param id string representation of this peer's identifier - * @param zkw zookeeper connection to the peer */ public ReplicationPeer(Configuration conf, String key, - String id, ZooKeeperWatcher zkw) { + String id) throws IOException { this.conf = conf; this.clusterKey = key; this.id = id; - this.zkw = zkw; + this.reloadZkWatcher(); } /** @@ -116,4 +119,27 @@ public class ReplicationPeer { public Configuration getConfiguration() { return conf; } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("The ReplicationPeer coresponding to peer " + clusterKey + + " was aborted for the following reason(s):" + why, e); + } + + /** + * Closes the current ZKW (if not null) and creates a new one + * @throws IOException If anything goes wrong connecting + */ + public void reloadZkWatcher() throws IOException { + if (zkw != null) zkw.close(); + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); + } + + @Override + public boolean isAborted() { + // Currently the replication peer is never "Aborted", we just log when the + // abort method is called. + return false; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 6d20ebe2b6c..a57bc11d5e4 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; /** * This class serves as a helper for all things related to zookeeper @@ -210,8 +213,7 @@ public class ReplicationZookeeper { * @param peerClusterId (byte) the cluster to interrogate * @return addresses of all region servers */ - public List getSlavesAddresses(String peerClusterId) - throws KeeperException { + public List getSlavesAddresses(String peerClusterId) { if (this.peerClusters.size() == 0) { return new ArrayList(0); } @@ -219,7 +221,27 @@ public class ReplicationZookeeper { if (peer == null) { return new ArrayList(0); } - peer.setRegionServers(fetchSlavesAddresses(peer.getZkw())); + + List addresses; + try { + addresses = fetchSlavesAddresses(peer.getZkw()); + } catch (KeeperException ke) { + if (ke instanceof ConnectionLossException + || ke instanceof SessionExpiredException) { + LOG.warn( + "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), + ke); + try { + peer.reloadZkWatcher(); + } catch(IOException io) { + LOG.warn( + "Creation of ZookeeperWatcher failed for peer " + + peer.getClusterKey(), io); + } + } + addresses = Collections.emptyList(); + } + peer.setRegionServers(addresses); return peer.getRegionServers(); } @@ -229,13 +251,9 @@ public class ReplicationZookeeper { * @return list of region server addresses or an empty list if the slave * is unavailable */ - private List fetchSlavesAddresses(ZooKeeperWatcher zkw) { - try { - return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); - } catch (KeeperException e) { - LOG.warn("Cannot get peer's region server addresses", e); - return new ArrayList(0); - } + private List fetchSlavesAddresses(ZooKeeperWatcher zkw) + throws KeeperException { + return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); } /** @@ -318,10 +336,8 @@ public class ReplicationZookeeper { return null; } - ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, - "connection to cluster: " + peerId, this.abortable); return new ReplicationPeer(otherConf, peerId, - otherClusterKey, zkw); + otherClusterKey); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8ecef1f5b6b..5fd467df932 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -106,6 +106,8 @@ public class ReplicationSource extends Thread private HLog.Reader reader; // Current position in the log private long position = 0; + // Last position in the log that we sent to ZooKeeper + private long lastLoggedPosition = -1; // Path of the current log private volatile Path currentPath; private FileSystem fs; @@ -210,10 +212,9 @@ public class ReplicationSource extends Thread /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() throws KeeperException { + private void chooseSinks() { this.currentPeers.clear(); - List addresses = - this.zkHelper.getSlavesAddresses(peerId); + List addresses = this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + @@ -349,8 +350,11 @@ public class ReplicationSource extends Thread // wait a bit and retry. // But if we need to stop, don't bother sleeping if (this.isActive() && (gotIOE || currentNbEntries == 0)) { - this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + if (this.lastLoggedPosition != this.position) { + this.manager.logPositionAndCleanOldLogs(this.currentPath, + this.peerClusterZnode, this.position, queueRecovered); + this.lastLoggedPosition = this.position; + } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { sleepMultiplier++; } @@ -435,8 +439,6 @@ public class ReplicationSource extends Thread Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -592,8 +594,11 @@ public class ReplicationSource extends Thread HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); - this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + if (this.lastLoggedPosition != this.position) { + this.manager.logPositionAndCleanOldLogs(this.currentPath, + this.peerClusterZnode, this.position, queueRecovered); + this.lastLoggedPosition = this.position; + } this.totalReplicatedEdits += currentNbEntries; this.metrics.shippedBatchesRate.inc(1); this.metrics.shippedOpsRate.inc( @@ -627,10 +632,7 @@ public class ReplicationSource extends Thread } while (this.isActive() && down ); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } - } } } diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 5edb3819276..a82884661cf 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1240,7 +1240,12 @@ public class HBaseTestingUtility { } public void expireSession(ZooKeeperWatcher nodeZK, Server server) - throws Exception { + throws Exception { + expireSession(nodeZK, server, false); + } + + public void expireSession(ZooKeeperWatcher nodeZK, Server server, + boolean checkStatus) throws Exception { Configuration c = new Configuration(this.conf); String quorumServers = ZKConfig.getZKQuorumServersString(c); int sessionTimeout = 5 * 1000; // 5 seconds @@ -1257,9 +1262,12 @@ public class HBaseTestingUtility { Thread.sleep(sleep); - new HTable(new Configuration(conf), HConstants.META_TABLE_NAME); + if (checkStatus) { + new HTable(new Configuration(conf), HConstants.META_TABLE_NAME); + } } + /** * Get the HBase cluster. * diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java new file mode 100644 index 00000000000..16e44ab48e4 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestReplicationPeer { + + private static final Log LOG = LogFactory.getLog(TestReplicationPeer.class); + private static HBaseTestingUtility utility; + private static Configuration conf; + private static ReplicationPeer rp; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + utility = new HBaseTestingUtility(conf); + conf = utility.getConfiguration(); + utility.startMiniZKCluster(); + + rp = new ReplicationPeer(conf, "clusterKey", "clusterId"); + } + + @Test(timeout=300000) + public void testResetZooKeeperSession() throws Exception { + ZooKeeperWatcher zkw = rp.getZkw(); + zkw.getRecoverableZooKeeper().exists("/1/2", false); + + LOG.info("Expiring ReplicationPeer ZooKeeper session."); + utility.expireSession(zkw, null, false); + + try { + LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session."); + // Trying to use the expired session to assert that it is indeed closed + zkw.getRecoverableZooKeeper().exists("/1/2", false); + } catch (SessionExpiredException k) { + rp.reloadZkWatcher(); + + zkw = rp.getZkw(); + + // Try to use the connection again + LOG.info("Attempting to use refreshed " + + "ReplicationPeer ZooKeeper session."); + zkw.getRecoverableZooKeeper().exists("/1/2", false); + + return; + } + + Assert.fail("ReplicationPeer ZooKeeper session was not properly expired."); + } + +}