diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 8f40f6b8824..7226caee7cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -58,6 +58,8 @@ class JNStorage extends Storage { private static final List PAXOS_DIR_PURGE_REGEXES = ImmutableList.of(Pattern.compile("(\\d+)")); + private static final String STORAGE_EDITS_SYNC = "edits.sync"; + /** * @param conf Configuration object * @param logDir the path to the directory in which data will be stored @@ -120,12 +122,29 @@ class JNStorage extends Storage { return new File(sd.getCurrentDir(), name); } - File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) { - return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp); + File getCurrentDir() { + return sd.getCurrentDir(); + } + + /** + * Directory {@code edits.sync} temporarily holds the log segments + * downloaded through {@link JournalNodeSyncer} before they are moved to + * {@code current} directory. + * + * @return the directory path + */ + File getEditsSyncDir() { + return new File(sd.getRoot(), STORAGE_EDITS_SYNC); + } + + File getTemporaryEditsFile(long startTxId, long endTxId) { + return new File(getEditsSyncDir(), String.format("%s_%019d-%019d", + NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId)); } File getFinalizedEditsFile(long startTxId, long endTxId) { - return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId); + return new File(sd.getCurrentDir(), String.format("%s_%019d-%019d", + NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index ca213736d03..0041d5eda7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.security.PrivilegedExceptionAction; import java.util.Iterator; import java.util.List; @@ -1092,19 +1094,28 @@ public class Journal implements Closeable { committedTxnId.set(startTxId - 1); } - synchronized boolean renameTmpSegment(File tmpFile, File finalFile, + synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile, long endTxId) throws IOException { final boolean success; if (endTxId <= committedTxnId.get()) { - success = tmpFile.renameTo(finalFile); - if (!success) { - LOG.warn("Unable to rename edits file from " + tmpFile + " to " + + if (!finalFile.getParentFile().exists()) { + LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " + + "segment move to current directory"); + return false; + } + Files.move(tmpFile.toPath(), finalFile.toPath(), + StandardCopyOption.ATOMIC_MOVE); + if (finalFile.exists() && FileUtil.canRead(finalFile)) { + success = true; + } else { + success = false; + LOG.warn("Unable to move edits file from " + tmpFile + " to " + finalFile); } } else { success = false; LOG.error("The endTxId of the temporary file is not less than the " + - "last committed transaction id. Aborting renaming to final file" + + "last committed transaction id. Aborting move to final file" + finalFile); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index f195c00b57c..788c5dee51d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +97,11 @@ public class JournalNodeSyncer { void stopSync() { shouldSync = false; + // Delete the edits.sync directory + File editsSyncDir = journal.getStorage().getEditsSyncDir(); + if (editsSyncDir.exists()) { + FileUtil.fullyDelete(editsSyncDir); + } if (syncJournalDaemon != null) { syncJournalDaemon.interrupt(); } @@ -112,6 +116,15 @@ public class JournalNodeSyncer { } } + private boolean createEditsSyncDir() { + File editsSyncDir = journal.getStorage().getEditsSyncDir(); + if (editsSyncDir.exists()) { + LOG.info(editsSyncDir + " directory already exists."); + return true; + } + return editsSyncDir.mkdir(); + } + private boolean getOtherJournalNodeProxies() { List otherJournalNodes = getOtherJournalNodeAddrs(); if (otherJournalNodes == null || otherJournalNodes.isEmpty()) { @@ -135,35 +148,51 @@ public class JournalNodeSyncer { } private void startSyncJournalsDaemon() { - syncJournalDaemon = new Daemon(new Runnable() { - @Override - public void run() { - while(shouldSync) { - try { - if (!journal.isFormatted()) { - LOG.warn("Journal not formatted. Cannot sync."); - } else { - syncJournals(); - } - Thread.sleep(journalSyncInterval); - } catch (Throwable t) { - if (!shouldSync) { - if (t instanceof InterruptedException) { - LOG.info("Stopping JournalNode Sync."); - } else { - LOG.warn("JournalNodeSyncer received an exception while " + - "shutting down.", t); - } - break; - } else { - if (t instanceof InterruptedException) { - LOG.warn("JournalNodeSyncer interrupted", t); - break; - } - } - LOG.error( - "JournalNodeSyncer daemon received Runtime exception. ", t); + syncJournalDaemon = new Daemon(() -> { + // Wait for journal to be formatted to create edits.sync directory + while(!journal.isFormatted()) { + try { + Thread.sleep(journalSyncInterval); + } catch (InterruptedException e) { + LOG.error("JournalNodeSyncer daemon received Runtime exception.", e); + Thread.currentThread().interrupt(); + return; + } + } + if (!createEditsSyncDir()) { + LOG.error("Failed to create directory for downloading log " + + "segments: %s. Stopping Journal Node Sync.", + journal.getStorage().getEditsSyncDir()); + return; + } + while(shouldSync) { + try { + if (!journal.isFormatted()) { + LOG.warn("Journal cannot sync. Not formatted."); + } else { + syncJournals(); } + Thread.sleep(journalSyncInterval); + } catch (Throwable t) { + if (!shouldSync) { + if (t instanceof InterruptedException) { + LOG.info("Stopping JournalNode Sync."); + Thread.currentThread().interrupt(); + return; + } else { + LOG.warn("JournalNodeSyncer received an exception while " + + "shutting down.", t); + } + break; + } else { + if (t instanceof InterruptedException) { + LOG.warn("JournalNodeSyncer interrupted", t); + Thread.currentThread().interrupt(); + return; + } + } + LOG.error( + "JournalNodeSyncer daemon received Runtime exception. ", t); } } }); @@ -335,8 +364,8 @@ public class JournalNodeSyncer { /** * Transfer an edit log from one journal node to another for sync-up. */ - private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws - IOException { + private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) + throws IOException { LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage .getRoot()); @@ -350,9 +379,10 @@ public class JournalNodeSyncer { return true; } - final long milliTime = Time.monotonicNow(); - File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log - .getEndTxId(), milliTime); + // Download the log segment to current.tmp directory first. + File tmpEditsFile = jnStorage.getTemporaryEditsFile( + log.getStartTxId(), log.getEndTxId()); + try { Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false, logSegmentTransferTimeout, throttler); @@ -367,14 +397,12 @@ public class JournalNodeSyncer { LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " + tmpEditsFile.length() + " bytes."); - LOG.debug("Renaming " + tmpEditsFile.getName() + " to " - + finalEditsFile.getName()); - boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile, + final boolean moveSuccess = journal.moveTmpSegmentToCurrent(tmpEditsFile, finalEditsFile, log.getEndTxId()); - if (!renameSuccess) { - //If rename is not successful, delete the tmpFile - LOG.debug("Renaming unsuccessful. Deleting temporary file: " - + tmpEditsFile); + if (!moveSuccess) { + // If move is not successful, delete the tmpFile + LOG.debug("Move to current directory unsuccessful. Deleting temporary " + + "file: " + tmpEditsFile); if (!tmpEditsFile.delete()) { LOG.warn("Deleting " + tmpEditsFile + " has failed"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java index 5375b028719..8415a6f54e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java @@ -57,6 +57,7 @@ public class TestJournalNodeSync { @Before public void setUpMiniCluster() throws IOException { final Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) .build();