HBASE-6758 [replication] The replication-executor should make sure the file

that it is replicating is closed before declaring success on that
            file (Devaraj Das via JD)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1399517 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-10-18 04:52:19 +00:00
parent f065232db0
commit d6d73eb286
5 changed files with 50 additions and 17 deletions

View File

@ -510,12 +510,6 @@ class FSHLog implements HLog, Syncable {
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogRoll(oldPath, newPath);
}
}
synchronized (updateLock) {
// Clean up current writer.
@ -531,6 +525,13 @@ class FSHLog implements HLog, Syncable {
" for " + FSUtils.getPath(newPath));
this.numEntries.set(0);
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogRoll(oldPath, newPath);
}
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {

View File

@ -189,12 +189,12 @@ public class Replication implements WALActionsListener,
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
// Not interested
getReplicationManager().preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
getReplicationManager().logRolled(newPath);
getReplicationManager().postLogRoll(newPath);
}
@Override

View File

@ -300,6 +300,18 @@ public class ReplicationSource extends Thread
}
continue;
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the
//current WAL file (this.currentPath) is in use (for writing) NOW!
//Since the new WAL paths are enqueued only after the prev WAL file
//is 'closed', presence of an element in the queue means that
//the previous WAL file was closed, else the file is in use (currentPath)
//We take the snapshot now so that we are protected against races
//where a new file gets enqueued while the current file is being processed
//(and where we just finished reading the current file).
if (!this.queueRecovered && queue.size() == 0) {
currentWALisBeingWrittenTo = true;
}
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
@ -318,7 +330,7 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbEntries = 0;
try {
if(readAllEntriesToReplicateOrNextFile()) {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
continue;
}
} catch (IOException ioe) {
@ -367,7 +379,7 @@ public class ReplicationSource extends Thread
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@ -376,7 +388,7 @@ public class ReplicationSource extends Thread
continue;
}
sleepMultiplier = 1;
shipEdits();
shipEdits(currentWALisBeingWrittenTo);
}
if (this.conn != null) {
@ -393,11 +405,13 @@ public class ReplicationSource extends Thread
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
@ -447,6 +461,9 @@ public class ReplicationSource extends Thread
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - startPosition));
if (currentWALisBeingWrittenTo) {
return false;
}
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
@ -620,8 +637,10 @@ public class ReplicationSource extends Thread
/**
* Do the shipping logic
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
protected void shipEdits() {
protected void shipEdits(boolean currentWALisBeingWrittenTo) {
int sleepMultiplier = 1;
if (this.currentNbEntries == 0) {
LOG.warn("Was given 0 edits to ship");
@ -641,7 +660,7 @@ public class ReplicationSource extends Thread
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;

View File

@ -146,11 +146,16 @@ public class ReplicationSourceManager {
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
public void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position);
if (holdLogInZK) {
return;
}
synchronized (this.hlogsById) {
SortedSet<String> hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
@ -252,7 +257,7 @@ public class ReplicationSourceManager {
return this.sources;
}
void logRolled(Path newLog) throws IOException {
void preLogRoll(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
@ -278,6 +283,14 @@ public class ReplicationSourceManager {
}
this.latestPath = newLog;
}
void postLogRoll(Path newLog) throws IOException {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
}
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);

View File

@ -203,7 +203,7 @@ public class TestReplicationSourceManager {
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false);
"1", 0, false, false);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);