HBASE-9033 Add tracing to ReplicationSource and enable it in tests

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1506638 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-07-24 17:40:00 +00:00
parent 45472c51bb
commit e5a3842a12
2 changed files with 48 additions and 14 deletions

View File

@ -35,7 +35,6 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -81,7 +80,7 @@ import org.apache.zookeeper.KeeperException;
public class ReplicationSource extends Thread
implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
// container of entries to replicate
@ -121,6 +120,8 @@ public class ReplicationSource extends Thread
private UUID peerClusterId;
// total number of edits we replicated
private long totalReplicatedEdits = 0;
// total number of edits we replicated
private long totalReplicatedOperations = 0;
// The znode we currently play with
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
@ -206,7 +207,7 @@ public class ReplicationSource extends Thread
List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
LOG.debug("Getting " + nbPeers +
" rs from peer cluster # " + this.peerId);
for (int i = 0; i < nbPeers; i++) {
ServerName sn;
@ -255,6 +256,10 @@ public class ReplicationSource extends Thread
try {
this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() +
" at position " + this.repLogReader.getPosition());
}
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
this.peerClusterZnode, e);
@ -282,10 +287,6 @@ public class ReplicationSource extends Thread
sleepMultiplier++;
}
continue;
} else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
this.manager.cleanOldLogs(getCurrentPath().getName(),
this.peerId,
this.replicationQueueInfo.isQueueRecovered());
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the
@ -402,6 +403,10 @@ public class ReplicationSource extends Thread
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
+ this.repLogReader.getPosition());
}
this.repLogReader.seek();
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
@ -475,6 +480,14 @@ public class ReplicationSource extends Thread
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
this.metrics.setSizeOfLogQueue(queue.size());
if (this.currentPath != null) {
this.manager.cleanOldLogs(this.currentPath.getName(),
this.peerId,
this.replicationQueueInfo.isQueueRecovered());
if (LOG.isTraceEnabled()) {
LOG.trace("New log: " + this.currentPath);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
@ -491,6 +504,9 @@ public class ReplicationSource extends Thread
protected boolean openReader(int sleepMultiplier) {
try {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Opening log " + this.currentPath);
}
this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.replicationQueueInfo.isQueueRecovered()) {
@ -643,6 +659,9 @@ public class ReplicationSource extends Thread
}
try {
AdminService.BlockingInterface rrs = getRS();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + this.currentNbEntries + " entries");
}
ReplicationProtbufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
@ -652,9 +671,14 @@ public class ReplicationSource extends Thread
this.lastLoggedPosition = this.repLogReader.getPosition();
}
this.totalReplicatedEdits += currentNbEntries;
this.totalReplicatedOperations += currentNbOperations;
this.metrics.shipBatch(this.currentNbOperations);
this.metrics.setAgeOfLastShippedOp(
this.entriesArray[currentNbEntries-1].getKey().getWriteTime());
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
+ this.totalReplicatedOperations + " operations");
}
break;
} catch (IOException ioe) {
@ -724,6 +748,15 @@ public class ReplicationSource extends Thread
*/
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
if (LOG.isTraceEnabled()) {
String filesize = "N/A";
try {
FileStatus stat = this.fs.getFileStatus(this.currentPath);
filesize = stat.getLen()+"";
} catch (IOException ex) {}
LOG.trace("Reached the end of a log, stats: " + getStats() +
", and the length of the file is " + filesize);
}
this.currentPath = null;
this.repLogReader.finishCurrentFile();
this.reader = null;
@ -851,13 +884,7 @@ public class ReplicationSource extends Thread
@Override
public String getStats() {
String position = "N/A";
try {
if (this.reader != null) {
position = this.reader.getPosition()+"";
}
} catch (IOException ioe) {
}
long position = this.repLogReader.getPosition();
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
" at position: " + position;

View File

@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -35,6 +38,10 @@ import static org.junit.Assert.fail;
@Category(LargeTests.class)
public class TestReplicationKillRS extends TestReplicationBase {
{
((Log4JLogger) ReplicationSource.LOG).getLogger().setLevel(Level.ALL);
}
private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
/**