HDFS-11448. JN log segment syncing should support HA upgrade. Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-05-04 15:57:44 -07:00
parent 54e2b9e876
commit 07761af357
4 changed files with 108 additions and 49 deletions

View File

@ -58,6 +58,8 @@ class JNStorage extends Storage {
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
ImmutableList.of(Pattern.compile("(\\d+)"));
private static final String STORAGE_EDITS_SYNC = "edits.sync";
/**
* @param conf Configuration object
* @param logDir the path to the directory in which data will be stored
@ -120,12 +122,29 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(), name);
}
File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
File getCurrentDir() {
return sd.getCurrentDir();
}
/**
* Directory {@code edits.sync} temporarily holds the log segments
* downloaded through {@link JournalNodeSyncer} before they are moved to
* {@code current} directory.
*
* @return the directory path
*/
File getEditsSyncDir() {
return new File(sd.getRoot(), STORAGE_EDITS_SYNC);
}
File getTemporaryEditsFile(long startTxId, long endTxId) {
return new File(getEditsSyncDir(), String.format("%s_%019d-%019d",
NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
}
File getFinalizedEditsFile(long startTxId, long endTxId) {
return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
return new File(sd.getCurrentDir(), String.format("%s_%019d-%019d",
NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
}
/**

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
@ -1092,19 +1094,28 @@ public class Journal implements Closeable {
committedTxnId.set(startTxId - 1);
}
synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
long endTxId) throws IOException {
final boolean success;
if (endTxId <= committedTxnId.get()) {
success = tmpFile.renameTo(finalFile);
if (!success) {
LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
if (!finalFile.getParentFile().exists()) {
LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
"segment move to current directory");
return false;
}
Files.move(tmpFile.toPath(), finalFile.toPath(),
StandardCopyOption.ATOMIC_MOVE);
if (finalFile.exists() && FileUtil.canRead(finalFile)) {
success = true;
} else {
success = false;
LOG.warn("Unable to move edits file from " + tmpFile + " to " +
finalFile);
}
} else {
success = false;
LOG.error("The endTxId of the temporary file is not less than the " +
"last committed transaction id. Aborting renaming to final file" +
"last committed transaction id. Aborting move to final file" +
finalFile);
}

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -98,6 +97,11 @@ public class JournalNodeSyncer {
void stopSync() {
shouldSync = false;
// Delete the edits.sync directory
File editsSyncDir = journal.getStorage().getEditsSyncDir();
if (editsSyncDir.exists()) {
FileUtil.fullyDelete(editsSyncDir);
}
if (syncJournalDaemon != null) {
syncJournalDaemon.interrupt();
}
@ -112,6 +116,15 @@ public class JournalNodeSyncer {
}
}
private boolean createEditsSyncDir() {
File editsSyncDir = journal.getStorage().getEditsSyncDir();
if (editsSyncDir.exists()) {
LOG.info(editsSyncDir + " directory already exists.");
return true;
}
return editsSyncDir.mkdir();
}
private boolean getOtherJournalNodeProxies() {
List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
@ -135,35 +148,51 @@ public class JournalNodeSyncer {
}
private void startSyncJournalsDaemon() {
syncJournalDaemon = new Daemon(new Runnable() {
@Override
public void run() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal not formatted. Cannot sync.");
} else {
syncJournals();
}
Thread.sleep(journalSyncInterval);
} catch (Throwable t) {
if (!shouldSync) {
if (t instanceof InterruptedException) {
LOG.info("Stopping JournalNode Sync.");
} else {
LOG.warn("JournalNodeSyncer received an exception while " +
"shutting down.", t);
}
break;
} else {
if (t instanceof InterruptedException) {
LOG.warn("JournalNodeSyncer interrupted", t);
break;
}
}
LOG.error(
"JournalNodeSyncer daemon received Runtime exception. ", t);
syncJournalDaemon = new Daemon(() -> {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
Thread.currentThread().interrupt();
return;
}
}
if (!createEditsSyncDir()) {
LOG.error("Failed to create directory for downloading log " +
"segments: %s. Stopping Journal Node Sync.",
journal.getStorage().getEditsSyncDir());
return;
}
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal cannot sync. Not formatted.");
} else {
syncJournals();
}
Thread.sleep(journalSyncInterval);
} catch (Throwable t) {
if (!shouldSync) {
if (t instanceof InterruptedException) {
LOG.info("Stopping JournalNode Sync.");
Thread.currentThread().interrupt();
return;
} else {
LOG.warn("JournalNodeSyncer received an exception while " +
"shutting down.", t);
}
break;
} else {
if (t instanceof InterruptedException) {
LOG.warn("JournalNodeSyncer interrupted", t);
Thread.currentThread().interrupt();
return;
}
}
LOG.error(
"JournalNodeSyncer daemon received Runtime exception. ", t);
}
}
});
@ -335,8 +364,8 @@ public class JournalNodeSyncer {
/**
* Transfer an edit log from one journal node to another for sync-up.
*/
private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
IOException {
private boolean downloadMissingLogSegment(URL url, RemoteEditLog log)
throws IOException {
LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
.getRoot());
@ -350,9 +379,10 @@ public class JournalNodeSyncer {
return true;
}
final long milliTime = Time.monotonicNow();
File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
.getEndTxId(), milliTime);
// Download the log segment to current.tmp directory first.
File tmpEditsFile = jnStorage.getTemporaryEditsFile(
log.getStartTxId(), log.getEndTxId());
try {
Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
logSegmentTransferTimeout, throttler);
@ -367,14 +397,12 @@ public class JournalNodeSyncer {
LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
tmpEditsFile.length() + " bytes.");
LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
+ finalEditsFile.getName());
boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
final boolean moveSuccess = journal.moveTmpSegmentToCurrent(tmpEditsFile,
finalEditsFile, log.getEndTxId());
if (!renameSuccess) {
//If rename is not successful, delete the tmpFile
LOG.debug("Renaming unsuccessful. Deleting temporary file: "
+ tmpEditsFile);
if (!moveSuccess) {
// If move is not successful, delete the tmpFile
LOG.debug("Move to current directory unsuccessful. Deleting temporary " +
"file: " + tmpEditsFile);
if (!tmpEditsFile.delete()) {
LOG.warn("Deleting " + tmpEditsFile + " has failed");
}

View File

@ -57,6 +57,7 @@ public class TestJournalNodeSync {
@Before
public void setUpMiniCluster() throws IOException {
final Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build();