HBASE-6321 ReplicationSource dies reading the peer's id

HBASE-6647  [performance regression] appendNoSync/HBASE-4528 doesn't
            take deferred log flush into account


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1379289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-08-31 04:59:02 +00:00
parent 844f27c8f8
commit c4f7a53747
4 changed files with 90 additions and 35 deletions

View File

@ -2295,10 +2295,8 @@ public class HRegion implements HeapSize { // , Writable{
// ------------------------- // -------------------------
// STEP 7. Sync wal. // STEP 7. Sync wal.
// ------------------------- // -------------------------
if (walEdit.size() > 0 && if (walEdit.size() > 0) {
(this.regionInfo.isMetaRegion() || syncOrDefer(txid);
!this.htableDescriptor.isDeferredLogFlush())) {
this.log.sync(txid);
} }
walSyncSuccessful = true; walSyncSuccessful = true;
// ------------------------------------------------------------------ // ------------------------------------------------------------------
@ -4498,10 +4496,8 @@ public class HRegion implements HeapSize { // , Writable{
acquiredLocks = null; acquiredLocks = null;
} }
// 10. Sync edit log // 10. Sync edit log
if (txid != 0 && if (txid != 0) {
(this.regionInfo.isMetaRegion() || syncOrDefer(txid);
!this.htableDescriptor.isDeferredLogFlush())) {
this.log.sync(txid);
} }
walSyncSuccessful = true; walSyncSuccessful = true;
} }
@ -4750,7 +4746,7 @@ public class HRegion implements HeapSize { // , Writable{
releaseRowLock(lid); releaseRowLock(lid);
} }
if (writeToWAL) { if (writeToWAL) {
this.log.sync(txid); // sync the transaction log outside the rowlock syncOrDefer(txid); // sync the transaction log outside the rowlock
} }
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
@ -4878,7 +4874,7 @@ public class HRegion implements HeapSize { // , Writable{
releaseRowLock(lid); releaseRowLock(lid);
} }
if (writeToWAL) { if (writeToWAL) {
this.log.sync(txid); // sync the transaction log outside the rowlock syncOrDefer(txid); // sync the transaction log outside the rowlock
} }
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
@ -4976,7 +4972,7 @@ public class HRegion implements HeapSize { // , Writable{
releaseRowLock(lid); releaseRowLock(lid);
} }
if (writeToWAL) { if (writeToWAL) {
this.log.sync(txid); // sync the transaction log outside the rowlock syncOrDefer(txid); // sync the transaction log outside the rowlock
} }
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
@ -5389,6 +5385,19 @@ public class HRegion implements HeapSize { // , Writable{
dataInMemoryWithoutWAL.addAndGet(putSize); dataInMemoryWithoutWAL.addAndGet(putSize);
} }
/**
* Calls sync with the given transaction ID if the region's table is not
* deferring it.
* @param txid should sync up to which transaction
* @throws IOException If anything goes wrong with DFS
*/
private void syncOrDefer(long txid) throws IOException {
if (this.regionInfo.isMetaRegion() ||
!this.htableDescriptor.isDeferredLogFlush()) {
this.log.sync(txid);
}
}
/** /**
* A mocked list implementaion - discards all updates. * A mocked list implementaion - discards all updates.
*/ */

View File

@ -1193,10 +1193,14 @@ public class HLog implements Syncable {
} }
/** /**
* This thread is responsible to call syncFs and buffer up the writers while * This class is responsible to hold the HLog's appended Entry list
* it happens. * and to sync them according to a configurable interval.
*
* Deferred log flushing works first by piggy backing on this process by
* simply not sync'ing the appended Entry. It can also be sync'd by other
* non-deferred log flushed entries outside of this thread.
*/ */
class LogSyncer extends HasThread { class LogSyncer extends HasThread {
private final long optionalFlushInterval; private final long optionalFlushInterval;
@ -1227,6 +1231,9 @@ public class HLog implements Syncable {
closeLogSyncer.wait(this.optionalFlushInterval); closeLogSyncer.wait(this.optionalFlushInterval);
} }
} }
// Calling sync since we waited or had unflushed entries.
// Entries appended but not sync'd are taken care of here AKA
// deferred log flush
sync(); sync();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e); LOG.error("Error while syncing, requesting close of hlog ", e);

View File

@ -30,6 +30,7 @@ import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -43,7 +44,9 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
@ -253,19 +256,7 @@ public class ReplicationZookeeper implements Closeable{
try { try {
addresses = fetchSlavesAddresses(peer.getZkw()); addresses = fetchSlavesAddresses(peer.getZkw());
} catch (KeeperException ke) { } catch (KeeperException ke) {
if (ke instanceof ConnectionLossException reconnectPeer(ke, peer);
|| ke instanceof SessionExpiredException) {
LOG.warn(
"Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
ke);
try {
peer.reloadZkWatcher();
} catch(IOException io) {
LOG.warn(
"Creation of ZookeeperWatcher failed for peer "
+ peer.getClusterKey(), io);
}
}
addresses = Collections.emptyList(); addresses = Collections.emptyList();
} }
peer.setRegionServers(addresses); peer.setRegionServers(addresses);
@ -976,6 +967,50 @@ public class ReplicationZookeeper implements Closeable{
return 0; return 0;
} }
/**
* Returns the UUID of the provided peer id. Should a connection loss or session
* expiration happen, the ZK handler will be reopened once and if it still doesn't
* work then it will bail and return null.
* @param peerId the peer's ID that will be converted into a UUID
* @return a UUID or null if there's a ZK connection issue
*/
public UUID getPeerUUID(String peerId) {
ReplicationPeer peer = getPeerClusters().get(peerId);
UUID peerUUID = null;
try {
peerUUID = getUUIDForCluster(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
}
return peerUUID;
}
/**
* Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
* @param zkw watcher connected to an ensemble
* @return the UUID read from zookeeper
* @throws KeeperException
*/
public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
}
private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
if (ke instanceof ConnectionLossException
|| ke instanceof SessionExpiredException) {
LOG.warn(
"Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
ke);
try {
peer.reloadZkWatcher();
} catch(IOException io) {
LOG.warn(
"Creation of ZookeeperWatcher failed for peer "
+ peer.getClusterKey(), io);
}
}
}
public void registerRegionServerListener(ZooKeeperListener listener) { public void registerRegionServerListener(ZooKeeperListener listener) {
this.zookeeper.registerListener(listener); this.zookeeper.registerListener(listener);
} }

View File

@ -189,8 +189,7 @@ public class ReplicationSource extends Thread
this.metrics = new ReplicationSourceMetrics(peerClusterZnode); this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
try { try {
this.clusterId = UUID.fromString(ZKClusterId.readClusterIdZNode(zkHelper this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
.getZookeeperWatcher()));
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke); throw new IOException("Could not read cluster id", ke);
} }
@ -250,13 +249,19 @@ public class ReplicationSource extends Thread
metrics.clear(); metrics.clear();
return; return;
} }
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
try { while (this.peerClusterId == null) {
this.peerClusterId = UUID.fromString(ZKClusterId this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
.readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw())); if (this.peerClusterId == null) {
} catch (KeeperException ke) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
this.terminate("Could not read peer's cluster id", ke); sleepMultiplier++;
}
}
} }
// resetting to 1 to reuse later
sleepMultiplier = 1;
LOG.info("Replicating "+clusterId + " -> " + peerClusterId); LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
// If this is recovered, the queue is already full and the first log // If this is recovered, the queue is already full and the first log
@ -270,7 +275,6 @@ public class ReplicationSource extends Thread
peerClusterZnode, e); peerClusterZnode, e);
} }
} }
int sleepMultiplier = 1;
// Loop until we close down // Loop until we close down
while (isActive()) { while (isActive()) {
// Sleep until replication is enabled again // Sleep until replication is enabled again