HDFS-13977. Override shouldForceSync in QuorumOutputStream to allow for proper auto-sync behavior. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2019-08-16 11:11:46 -07:00
parent 894e2300d6
commit d699022fce
5 changed files with 97 additions and 5 deletions

View File

@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
@ -104,7 +105,8 @@ public class QuorumJournalManager implements JournalManager {
private final AsyncLoggerSet loggers;
private int outputBufferCapacity = 512 * 1024;
private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
private int outputBufferCapacity;
private final URLConnectionFactory connectionFactory;
/** Limit logging about input stream selection to every 5 seconds max. */
@ -189,6 +191,7 @@ public class QuorumJournalManager implements JournalManager {
DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT);
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf);
setOutputBufferCapacity(OUTPUT_BUFFER_CAPACITY_DEFAULT);
}
protected List<AsyncLogger> createLoggers(
@ -447,6 +450,15 @@ public class QuorumJournalManager implements JournalManager {
@Override
public void setOutputBufferCapacity(int size) {
int ipcMaxDataLength = conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (size >= ipcMaxDataLength) {
throw new IllegalArgumentException("Attempted to use QJM output buffer "
+ "capacity (" + size + ") greater than the IPC max data length ("
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH + " = "
+ ipcMaxDataLength + "). This will cause journals to reject edits.");
}
outputBufferCapacity = size;
}

View File

@ -80,6 +80,11 @@ class QuorumOutputStream extends EditLogOutputStream {
buf.setReadyToFlush();
}
@Override
public boolean shouldForceSync() {
return buf.shouldForceSync();
}
@Override
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes();

View File

@ -1807,7 +1807,8 @@ public class FSEditLog implements LogsPurgeable {
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
private JournalManager createJournal(URI uri) {
@VisibleForTesting
JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
@ -29,9 +31,12 @@ import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@ -39,8 +44,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.event.Level;
import org.junit.Before;
@ -58,6 +67,8 @@ import com.google.protobuf.ByteString;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
/**
* True unit tests for QuorumJournalManager
@ -107,7 +118,7 @@ public class TestQuorumJournalManagerUnit {
}
private AsyncLogger mockLogger() {
return Mockito.mock(AsyncLogger.class);
return mock(AsyncLogger.class);
}
static <V> Stubber futureReturns(V value) {
@ -204,7 +215,53 @@ public class TestQuorumJournalManagerUnit {
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
stm.flush();
}
@Test(expected = IllegalArgumentException.class)
public void testSetOutputBufferCapacityTooLarge() throws Exception {
qjm.setOutputBufferCapacity(
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT + 1);
}
// Regression test for HDFS-13977
@Test
public void testFSEditLogAutoSyncToQuorumStream() throws Exception {
// Set the buffer capacity low to make it easy to fill it
qjm.setOutputBufferCapacity(512);
// Set up mocks
NNStorage mockStorage = mock(NNStorage.class);
createLogSegment(); // sets up to the mocks for startLogSegment
for (int logIdx = 0; logIdx < 3; logIdx++) {
futureReturns(null).when(spyLoggers.get(logIdx))
.sendEdits(anyLong(), anyLong(), anyInt(), any());
}
PermissionStatus permStat = PermissionStatus
.createImmutable("user", "group", FsPermission.getDefault());
INode fakeInode = FSImageTestUtil.createEmptyInodeFile(1, "foo",
permStat, 1, 1, (short) 1, 1);
// Create a fake FSEditLog using this QJM
String mockQjmEdits = "qjournal://mock/";
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mockQjmEdits);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, mockQjmEdits);
FSEditLog editLog = FSImageTestUtil.createEditLogWithJournalManager(
conf, mockStorage, URI.create(mockQjmEdits), qjm);
editLog.initJournalsForWrite();
editLog.startLogSegment(1, false,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Write enough edit ops that the output buffer capacity should fill and
// an auto-sync should be triggered
for (int i = 0; i < 12; i++) {
editLog.logMkDir("/fake/path", fakeInode);
}
for (int i = 0; i < 3; i++) {
Mockito.verify(spyLoggers.get(i), times(1))
.sendEdits(eq(1L), eq(1L), anyInt(), any());
}
}
@Test
public void testWriteEditsOneSlow() throws Exception {
EditLogOutputStream stm = createLogSegment();

View File

@ -30,6 +30,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -208,7 +209,23 @@ public abstract class FSImageTestUtil {
editLog.initJournalsForWrite();
return editLog;
}
public static INodeFile createEmptyInodeFile(long id, String name,
PermissionStatus permissions, long mtime, long atime, short replication,
long preferredBlockSize) {
return new INodeFile(id, name.getBytes(StandardCharsets.UTF_8),
permissions, mtime, atime, null, replication, preferredBlockSize);
}
public static FSEditLog createEditLogWithJournalManager(Configuration conf,
NNStorage storage, URI editsUri, final JournalManager manager) {
return new FSEditLog(conf, storage, ImmutableList.of(editsUri)) {
@Override
protected JournalManager createJournal(URI uri) {
return manager;
}
};
}
/**
* Create an aborted in-progress log in the given directory, containing