HDFS-13977. Override shouldForceSync in QuorumOutputStream to allow for proper auto-sync behavior. Contributed by Erik Krogen.

(cherry picked from d699022fce)
(cherry picked from 2408c2491f)
This commit is contained in:
Erik Krogen 2019-08-16 11:11:46 -07:00 committed by Erik Krogen
parent 0839cf1d4e
commit 9dc921f5e5
5 changed files with 97 additions and 5 deletions

View File

@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
@ -104,7 +105,8 @@ public class QuorumJournalManager implements JournalManager {
private final AsyncLoggerSet loggers; private final AsyncLoggerSet loggers;
private int outputBufferCapacity = 512 * 1024; private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
private int outputBufferCapacity;
private final URLConnectionFactory connectionFactory; private final URLConnectionFactory connectionFactory;
/** Limit logging about input stream selection to every 5 seconds max. */ /** Limit logging about input stream selection to every 5 seconds max. */
@ -189,6 +191,7 @@ public class QuorumJournalManager implements JournalManager {
DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT);
this.connectionFactory = URLConnectionFactory this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf); .newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf);
setOutputBufferCapacity(OUTPUT_BUFFER_CAPACITY_DEFAULT);
} }
protected List<AsyncLogger> createLoggers( protected List<AsyncLogger> createLoggers(
@ -447,6 +450,15 @@ public class QuorumJournalManager implements JournalManager {
@Override @Override
public void setOutputBufferCapacity(int size) { public void setOutputBufferCapacity(int size) {
int ipcMaxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (size >= ipcMaxDataLength) {
throw new IllegalArgumentException("Attempted to use QJM output buffer "
+ "capacity (" + size + ") greater than the IPC max data length ("
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH + " = "
+ ipcMaxDataLength + "). This will cause journals to reject edits.");
}
outputBufferCapacity = size; outputBufferCapacity = size;
} }

View File

@ -80,6 +80,11 @@ class QuorumOutputStream extends EditLogOutputStream {
buf.setReadyToFlush(); buf.setReadyToFlush();
} }
@Override
public boolean shouldForceSync() {
return buf.shouldForceSync();
}
@Override @Override
protected void flushAndSync(boolean durable) throws IOException { protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes(); int numReadyBytes = buf.countReadyBytes();

View File

@ -1801,7 +1801,8 @@ public class FSEditLog implements LogsPurgeable {
* @return The constructed journal manager * @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri * @throws IllegalArgumentException if no class is configured for uri
*/ */
private JournalManager createJournal(URI uri) { @VisibleForTesting
JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme()); = getJournalClass(conf, uri.getScheme());

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -28,9 +30,12 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@ -38,8 +43,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Before; import org.junit.Before;
@ -57,6 +66,8 @@ import com.google.protobuf.ByteString;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
/** /**
* True unit tests for QuorumJournalManager * True unit tests for QuorumJournalManager
@ -105,7 +116,7 @@ public class TestQuorumJournalManagerUnit {
} }
private AsyncLogger mockLogger() { private AsyncLogger mockLogger() {
return Mockito.mock(AsyncLogger.class); return mock(AsyncLogger.class);
} }
static <V> Stubber futureReturns(V value) { static <V> Stubber futureReturns(V value) {
@ -203,6 +214,52 @@ public class TestQuorumJournalManagerUnit {
stm.flush(); stm.flush();
} }
@Test(expected = IllegalArgumentException.class)
public void testSetOutputBufferCapacityTooLarge() throws Exception {
qjm.setOutputBufferCapacity(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT + 1);
}
// Regression test for HDFS-13977
@Test
public void testFSEditLogAutoSyncToQuorumStream() throws Exception {
// Set the buffer capacity low to make it easy to fill it
qjm.setOutputBufferCapacity(512);
// Set up mocks
NNStorage mockStorage = mock(NNStorage.class);
createLogSegment(); // sets up to the mocks for startLogSegment
for (int logIdx = 0; logIdx < 3; logIdx++) {
futureReturns(null).when(spyLoggers.get(logIdx))
.sendEdits(anyLong(), anyLong(), anyInt(), any());
}
PermissionStatus permStat = PermissionStatus
.createImmutable("user", "group", FsPermission.getDefault());
INode fakeInode = FSImageTestUtil.createEmptyInodeFile(1, "foo",
permStat, 1, 1, (short) 1, 1);
// Create a fake FSEditLog using this QJM
String mockQjmEdits = "qjournal://mock/";
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mockQjmEdits);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, mockQjmEdits);
FSEditLog editLog = FSImageTestUtil.createEditLogWithJournalManager(
conf, mockStorage, URI.create(mockQjmEdits), qjm);
editLog.initJournalsForWrite();
editLog.startLogSegment(1, false,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Write enough edit ops that the output buffer capacity should fill and
// an auto-sync should be triggered
for (int i = 0; i < 12; i++) {
editLog.logMkDir("/fake/path", fakeInode);
}
for (int i = 0; i < 3; i++) {
Mockito.verify(spyLoggers.get(i), times(1))
.sendEdits(eq(1L), eq(1L), anyInt(), any());
}
}
@Test @Test
public void testWriteEditsOneSlow() throws Exception { public void testWriteEditsOneSlow() throws Exception {
EditLogOutputStream stm = createLogSegment(); EditLogOutputStream stm = createLogSegment();

View File

@ -29,6 +29,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -208,6 +209,22 @@ public abstract class FSImageTestUtil {
return editLog; return editLog;
} }
public static INodeFile createEmptyInodeFile(long id, String name,
PermissionStatus permissions, long mtime, long atime, short replication,
long preferredBlockSize) {
return new INodeFile(id, name.getBytes(StandardCharsets.UTF_8),
permissions, mtime, atime, null, replication, preferredBlockSize);
}
public static FSEditLog createEditLogWithJournalManager(Configuration conf,
NNStorage storage, URI editsUri, final JournalManager manager) {
return new FSEditLog(conf, storage, ImmutableList.of(editsUri)) {
@Override
protected JournalManager createJournal(URI uri) {
return manager;
}
};
}
/** /**
* Create an aborted in-progress log in the given directory, containing * Create an aborted in-progress log in the given directory, containing