diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ca4d7dd0657..94560821af1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -35,7 +35,6 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,7 +80,7 @@ import org.apache.zookeeper.KeeperException; public class ReplicationSource extends Thread implements ReplicationSourceInterface { - private static final Log LOG = LogFactory.getLog(ReplicationSource.class); + public static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queue of logs to process private PriorityBlockingQueue queue; // container of entries to replicate @@ -121,6 +120,8 @@ public class ReplicationSource extends Thread private UUID peerClusterId; // total number of edits we replicated private long totalReplicatedEdits = 0; + // total number of edits we replicated + private long totalReplicatedOperations = 0; // The znode we currently play with private String peerClusterZnode; // Maximum number of retries before taking bold actions @@ -206,7 +207,7 @@ public class ReplicationSource extends Thread List addresses = this.zkHelper.getSlavesAddresses(this.peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); - LOG.info("Getting " + nbPeers + + LOG.debug("Getting " + nbPeers + " rs from peer cluster # " + this.peerId); for (int i = 0; i < nbPeers; i++) { ServerName sn; @@ -255,6 +256,10 @@ public class ReplicationSource extends Thread try { this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( this.peerClusterZnode, this.queue.peek().getName())); + if (LOG.isTraceEnabled()) { + LOG.trace("Recovered queue started with log " + this.queue.peek() + + " at position " + this.repLogReader.getPosition()); + } } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + this.peerClusterZnode, e); @@ -282,10 +287,6 @@ public class ReplicationSource extends Thread sleepMultiplier++; } continue; - } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) { - this.manager.cleanOldLogs(getCurrentPath().getName(), - this.peerId, - this.replicationQueueInfo.isQueueRecovered()); } boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the @@ -402,6 +403,10 @@ public class ReplicationSource extends Thread protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) throws IOException{ long seenEntries = 0; + if (LOG.isTraceEnabled()) { + LOG.trace("Seeking in " + this.currentPath + " at position " + + this.repLogReader.getPosition()); + } this.repLogReader.seek(); HLog.Entry entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); @@ -475,6 +480,14 @@ public class ReplicationSource extends Thread if (this.currentPath == null) { this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); this.metrics.setSizeOfLogQueue(queue.size()); + if (this.currentPath != null) { + this.manager.cleanOldLogs(this.currentPath.getName(), + this.peerId, + this.replicationQueueInfo.isQueueRecovered()); + if (LOG.isTraceEnabled()) { + LOG.trace("New log: " + this.currentPath); + } + } } } catch (InterruptedException e) { LOG.warn("Interrupted while reading edits", e); @@ -491,6 +504,9 @@ public class ReplicationSource extends Thread protected boolean openReader(int sleepMultiplier) { try { try { + if (LOG.isTraceEnabled()) { + LOG.trace("Opening log " + this.currentPath); + } this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { if (this.replicationQueueInfo.isQueueRecovered()) { @@ -643,6 +659,9 @@ public class ReplicationSource extends Thread } try { AdminService.BlockingInterface rrs = getRS(); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicating " + this.currentNbEntries + " entries"); + } ReplicationProtbufUtil.replicateWALEntry(rrs, Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.repLogReader.getPosition()) { @@ -652,9 +671,14 @@ public class ReplicationSource extends Thread this.lastLoggedPosition = this.repLogReader.getPosition(); } this.totalReplicatedEdits += currentNbEntries; + this.totalReplicatedOperations += currentNbOperations; this.metrics.shipBatch(this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); + if (LOG.isTraceEnabled()) { + LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " + + this.totalReplicatedOperations + " operations"); + } break; } catch (IOException ioe) { @@ -724,6 +748,15 @@ public class ReplicationSource extends Thread */ protected boolean processEndOfFile() { if (this.queue.size() != 0) { + if (LOG.isTraceEnabled()) { + String filesize = "N/A"; + try { + FileStatus stat = this.fs.getFileStatus(this.currentPath); + filesize = stat.getLen()+""; + } catch (IOException ex) {} + LOG.trace("Reached the end of a log, stats: " + getStats() + + ", and the length of the file is " + filesize); + } this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; @@ -851,13 +884,7 @@ public class ReplicationSource extends Thread @Override public String getStats() { - String position = "N/A"; - try { - if (this.reader != null) { - position = this.reader.getPosition()+""; - } - } catch (IOException ioe) { - } + long position = this.repLogReader.getPosition(); return "Total replicated edits: " + totalReplicatedEdits + ", currently replicating from: " + this.currentPath + " at position: " + position; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java index 9c0b3b9f05e..d4df5bfb2c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.replication; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.exceptions.UnknownScannerException; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.log4j.Level; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -35,6 +38,10 @@ import static org.junit.Assert.fail; @Category(LargeTests.class) public class TestReplicationKillRS extends TestReplicationBase { + { + ((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL); + } + private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class); /**