diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index ce12b4f0913..674ca704bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; @@ -104,7 +105,8 @@ public class QuorumJournalManager implements JournalManager { private final AsyncLoggerSet loggers; - private int outputBufferCapacity = 512 * 1024; + private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024; + private int outputBufferCapacity; private final URLConnectionFactory connectionFactory; /** Limit logging about input stream selection to every 5 seconds max. */ @@ -189,6 +191,7 @@ public class QuorumJournalManager implements JournalManager { DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT); this.connectionFactory = URLConnectionFactory .newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf); + setOutputBufferCapacity(OUTPUT_BUFFER_CAPACITY_DEFAULT); } protected List createLoggers( @@ -447,6 +450,15 @@ public class QuorumJournalManager implements JournalManager { @Override public void setOutputBufferCapacity(int size) { + int ipcMaxDataLength = conf.getInt( + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + if (size >= ipcMaxDataLength) { + throw new IllegalArgumentException("Attempted to use QJM output buffer " + + "capacity (" + size + ") greater than the IPC max data length (" + + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH + " = " + + ipcMaxDataLength + "). This will cause journals to reject edits."); + } outputBufferCapacity = size; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index 26f978a284e..a02c0f9f1b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -80,6 +80,11 @@ class QuorumOutputStream extends EditLogOutputStream { buf.setReadyToFlush(); } + @Override + public boolean shouldForceSync() { + return buf.shouldForceSync(); + } + @Override protected void flushAndSync(boolean durable) throws IOException { int numReadyBytes = buf.countReadyBytes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 0db17120606..7b3f6a09802 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -1802,7 +1802,8 @@ public class FSEditLog implements LogsPurgeable { * @return The constructed journal manager * @throws IllegalArgumentException if no class is configured for uri */ - private JournalManager createJournal(URI uri) { + @VisibleForTesting + JournalManager createJournal(URI uri) { Class clazz = getJournalClass(conf, uri.getScheme()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index 837c7d9e7b3..28f07f8b372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.client; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; @@ -29,9 +31,12 @@ import java.io.IOException; import java.net.URI; import java.util.List; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; @@ -39,8 +44,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog; +import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; +import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.test.GenericTestUtils; import org.slf4j.event.Level; import org.junit.Before; @@ -58,6 +67,8 @@ import com.google.protobuf.ByteString; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; /** * True unit tests for QuorumJournalManager @@ -107,7 +118,7 @@ public class TestQuorumJournalManagerUnit { } private AsyncLogger mockLogger() { - return Mockito.mock(AsyncLogger.class); + return mock(AsyncLogger.class); } static Stubber futureReturns(V value) { @@ -204,7 +215,53 @@ public class TestQuorumJournalManagerUnit { anyLong(), eq(3L), eq(1), Mockito.any()); stm.flush(); } - + + @Test(expected = IllegalArgumentException.class) + public void testSetOutputBufferCapacityTooLarge() throws Exception { + qjm.setOutputBufferCapacity( + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT + 1); + } + + // Regression test for HDFS-13977 + @Test + public void testFSEditLogAutoSyncToQuorumStream() throws Exception { + // Set the buffer capacity low to make it easy to fill it + qjm.setOutputBufferCapacity(512); + + // Set up mocks + NNStorage mockStorage = mock(NNStorage.class); + createLogSegment(); // sets up to the mocks for startLogSegment + for (int logIdx = 0; logIdx < 3; logIdx++) { + futureReturns(null).when(spyLoggers.get(logIdx)) + .sendEdits(anyLong(), anyLong(), anyInt(), any()); + } + PermissionStatus permStat = PermissionStatus + .createImmutable("user", "group", FsPermission.getDefault()); + INode fakeInode = FSImageTestUtil.createEmptyInodeFile(1, "foo", + permStat, 1, 1, (short) 1, 1); + + // Create a fake FSEditLog using this QJM + String mockQjmEdits = "qjournal://mock/"; + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mockQjmEdits); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, mockQjmEdits); + FSEditLog editLog = FSImageTestUtil.createEditLogWithJournalManager( + conf, mockStorage, URI.create(mockQjmEdits), qjm); + + editLog.initJournalsForWrite(); + editLog.startLogSegment(1, false, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + // Write enough edit ops that the output buffer capacity should fill and + // an auto-sync should be triggered + for (int i = 0; i < 12; i++) { + editLog.logMkDir("/fake/path", fakeInode); + } + + for (int i = 0; i < 3; i++) { + Mockito.verify(spyLoggers.get(i), times(1)) + .sendEdits(eq(1L), eq(1L), anyInt(), any()); + } + } + @Test public void testWriteEditsOneSlow() throws Exception { EditLogOutputStream stm = createLogSegment(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 883e43ca904..5d0bd543809 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -29,6 +29,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -208,7 +209,23 @@ public abstract class FSImageTestUtil { editLog.initJournalsForWrite(); return editLog; } - + + public static INodeFile createEmptyInodeFile(long id, String name, + PermissionStatus permissions, long mtime, long atime, short replication, + long preferredBlockSize) { + return new INodeFile(id, name.getBytes(StandardCharsets.UTF_8), + permissions, mtime, atime, null, replication, preferredBlockSize); + } + + public static FSEditLog createEditLogWithJournalManager(Configuration conf, + NNStorage storage, URI editsUri, final JournalManager manager) { + return new FSEditLog(conf, storage, ImmutableList.of(editsUri)) { + @Override + protected JournalManager createJournal(URI uri) { + return manager; + } + }; + } /** * Create an aborted in-progress log in the given directory, containing