HDFS-13767. Add msync server implementation. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2018-08-13 10:30:06 -07:00
parent 6fe755f0c7
commit e2337bfc57
7 changed files with 114 additions and 58 deletions

View File

@ -60,6 +60,13 @@ public interface AlignmentContext {
* This is the intended server method call to implement to receive * This is the intended server method call to implement to receive
* client state info during RPC response header processing. * client state info during RPC response header processing.
* @param header The RPC request header. * @param header The RPC request header.
* @return state id of in the request header.
*/ */
void receiveRequestState(RpcRequestHeaderProto header); long receiveRequestState(RpcRequestHeaderProto header);
/**
* Returns the last seen state id of the alignment context instance.
* @return the value of the last seen state id.
*/
long getLastSeenStateId();
} }

View File

@ -705,6 +705,7 @@ public abstract class Server {
private boolean deferredResponse = false; private boolean deferredResponse = false;
private int priorityLevel; private int priorityLevel;
// the priority level assigned by scheduler, 0 by default // the priority level assigned by scheduler, 0 by default
private long clientStateId;
Call() { Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@ -735,6 +736,7 @@ public abstract class Server {
this.clientId = clientId; this.clientId = clientId;
this.traceScope = traceScope; this.traceScope = traceScope;
this.callerContext = callerContext; this.callerContext = callerContext;
this.clientStateId = Long.MIN_VALUE;
} }
@Override @Override
@ -812,6 +814,14 @@ public abstract class Server {
this.priorityLevel = priorityLevel; this.priorityLevel = priorityLevel;
} }
public long getClientStateId() {
return this.clientStateId;
}
public void setClientStateId(long stateId) {
this.clientStateId = stateId;
}
@InterfaceStability.Unstable @InterfaceStability.Unstable
public void deferResponse() { public void deferResponse() {
this.deferredResponse = true; this.deferredResponse = true;
@ -2500,11 +2510,6 @@ public abstract class Server {
} }
} }
if (alignmentContext != null) {
// Check incoming RPC request's state.
alignmentContext.receiveRequestState(header);
}
CallerContext callerContext = null; CallerContext callerContext = null;
if (header.hasCallerContext()) { if (header.hasCallerContext()) {
callerContext = callerContext =
@ -2521,6 +2526,10 @@ public abstract class Server {
// Save the priority level assignment by the scheduler // Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call)); call.setPriorityLevel(callQueue.getPriorityLevel(call));
if(alignmentContext != null) {
long stateId = alignmentContext.receiveRequestState(header);
call.setClientStateId(stateId);
}
try { try {
internalQueueCall(call); internalQueueCall(call);
@ -2702,6 +2711,24 @@ public abstract class Server {
TraceScope traceScope = null; TraceScope traceScope = null;
try { try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (alignmentContext != null && call.getClientStateId() >
alignmentContext.getLastSeenStateId()) {
/*
* The call processing should be postponed until the client call's
* state id is aligned (>=) with the server state id.
* NOTE:
* Inserting the call back to the queue can change the order of call
* execution comparing to their original placement into the queue.
* This is not a problem, because Hadoop RPC does not have any
* constraints on ordering the incoming rpc requests.
* In case of Observer, it handles only reads, which are
* commutative.
*/
//Re-queue the call and continue
internalQueueCall(call);
continue;
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
} }

View File

@ -39,7 +39,8 @@ public class ClientGSIContext implements AlignmentContext {
private final LongAccumulator lastSeenStateId = private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE); new LongAccumulator(Math::max, Long.MIN_VALUE);
long getLastSeenStateId() { @Override
public long getLastSeenStateId() {
return lastSeenStateId.get(); return lastSeenStateId.get();
} }
@ -73,7 +74,8 @@ public class ClientGSIContext implements AlignmentContext {
* Client does not receive RPC requests therefore this does nothing. * Client does not receive RPC requests therefore this does nothing.
*/ */
@Override @Override
public void receiveRequestState(RpcRequestHeaderProto header) { public long receiveRequestState(RpcRequestHeaderProto header) {
// Do nothing. // Do nothing.
return 0;
} }
} }

View File

@ -159,7 +159,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Mkdirs
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -46,7 +47,11 @@ class GlobalStateIdContext implements AlignmentContext {
*/ */
@Override @Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) { public void updateResponseState(RpcResponseHeaderProto.Builder header) {
header.setStateId(namesystem.getLastWrittenTransactionId()); // Using getCorrectLastAppliedOrWrittenTxId will acquire the lock on
// FSEditLog. This is needed so that ANN will return the correct state id
// it currently has. But this may not be necessary for Observer, may want
// revisit for optimization. Same goes to receiveRequestState.
header.setStateId(getLastSeenStateId());
} }
/** /**
@ -71,13 +76,20 @@ class GlobalStateIdContext implements AlignmentContext {
* Server side implementation for processing state alignment info in requests. * Server side implementation for processing state alignment info in requests.
*/ */
@Override @Override
public void receiveRequestState(RpcRequestHeaderProto header) { public long receiveRequestState(RpcRequestHeaderProto header) {
long serverStateId = namesystem.getLastWrittenTransactionId(); long serverStateId =
namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
long clientStateId = header.getStateId(); long clientStateId = header.getStateId();
if (clientStateId > serverStateId) { if (clientStateId > serverStateId &&
HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId + FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
", but server state is: " + serverStateId); ", but server state is: " + serverStateId);
} }
return clientStateId;
} }
@Override
public long getLastSeenStateId() {
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
}
} }

View File

@ -280,42 +280,6 @@ public class TestStateAlignmentContextWithHA {
} }
} }
/**
* This test mocks an AlignmentContext to send stateIds greater than
* server's stateId in RPC requests.
*/
@Test
public void testClientSendsGreaterState() throws Exception {
ClientGSIContext alignmentContext = new ClientGSIContext();
ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext);
spy = spiedAlignContext;
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
// Make every client call have a stateId > server's stateId.
Mockito.doAnswer(a -> {
Object[] arguments = a.getArguments();
RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
(RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
try {
return a.callRealMethod();
} finally {
header.setStateId(Long.MAX_VALUE);
}
}).when(spiedAlignContext).updateRequestState(Mockito.any());
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
logCapturer.stopCapturing();
String output = logCapturer.getOutput();
assertThat(output, containsString("A client sent stateId: "));
}
}
/** /**
* This test checks if after a client writes we can see the state id in * This test checks if after a client writes we can see the state id in
* updated via the response. * updated via the response.

View File

@ -32,15 +32,21 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.net.URI; import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
// Main unit tests for ObserverNode // Main unit tests for ObserverNode
public class TestObserverNode { public class TestObserverNode {
private Configuration conf; private Configuration conf;
@ -58,7 +64,9 @@ public class TestObserverNode {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf = new Configuration(); conf = new Configuration();
setUpCluster(1); conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
testPath = new Path("/test"); testPath = new Path("/test");
testPath2 = new Path("/test2"); testPath2 = new Path("/test2");
@ -74,18 +82,12 @@ public class TestObserverNode {
@Test @Test
public void testSimpleRead() throws Exception { public void testSimpleRead() throws Exception {
setUpCluster(1);
setObserverRead(true); setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault()); dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0); assertSentTo(0);
try {
dfs.getFileStatus(testPath);
fail("Should throw FileNotFoundException");
} catch (FileNotFoundException e) {
// Pass
}
rollEditLogAndTail(0); rollEditLogAndTail(0);
dfs.getFileStatus(testPath); dfs.getFileStatus(testPath);
assertSentTo(2); assertSentTo(2);
@ -96,6 +98,7 @@ public class TestObserverNode {
@Test @Test
public void testFailover() throws Exception { public void testFailover() throws Exception {
setUpCluster(1);
setObserverRead(false); setObserverRead(false);
dfs.mkdir(testPath, FsPermission.getDefault()); dfs.mkdir(testPath, FsPermission.getDefault());
@ -115,6 +118,7 @@ public class TestObserverNode {
@Test @Test
public void testDoubleFailover() throws Exception { public void testDoubleFailover() throws Exception {
setUpCluster(1);
setObserverRead(true); setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault()); dfs.mkdir(testPath, FsPermission.getDefault());
@ -180,6 +184,7 @@ public class TestObserverNode {
@Test @Test
public void testObserverShutdown() throws Exception { public void testObserverShutdown() throws Exception {
setUpCluster(1);
setObserverRead(true); setObserverRead(true);
dfs.mkdir(testPath, FsPermission.getDefault()); dfs.mkdir(testPath, FsPermission.getDefault());
@ -201,6 +206,7 @@ public class TestObserverNode {
@Test @Test
public void testObserverFailOverAndShutdown() throws Exception { public void testObserverFailOverAndShutdown() throws Exception {
setUpCluster(1);
// Test the case when there is a failover before ONN shutdown // Test the case when there is a failover before ONN shutdown
setObserverRead(true); setObserverRead(true);
@ -273,6 +279,7 @@ public class TestObserverNode {
@Test @Test
public void testBootstrap() throws Exception { public void testBootstrap() throws Exception {
setUpCluster(1);
for (URI u : dfsCluster.getNameDirs(2)) { for (URI u : dfsCluster.getNameDirs(2)) {
File dir = new File(u.getPath()); File dir = new File(u.getPath());
assertTrue(FileUtil.fullyDelete(dir)); assertTrue(FileUtil.fullyDelete(dir));
@ -284,6 +291,44 @@ public class TestObserverNode {
assertEquals(0, rc); assertEquals(0, rc);
} }
@Test
public void testMsyncSimple() throws Exception {
// disable fast path here because this test's assertions are based on the
// timing of explicitly called rollEditLogAndTail. Although this means this
// test takes some time to run
// TODO: revisit if there is a better way.
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS);
conf.setTimeDuration(
DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS);
setUpCluster(1);
setObserverRead(true);
AtomicBoolean readSucceed = new AtomicBoolean(false);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
Thread reader = new Thread(() -> {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
readSucceed.set(true);
} catch (IOException e) {
e.printStackTrace();
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
assertFalse(readSucceed.get());
rollEditLogAndTail(0);
// wait a while for all the change to be done
Thread.sleep(100);
// the reader should have succeed.
assertTrue(readSucceed.get());
}
private void setUpCluster(int numObservers) throws Exception { private void setUpCluster(int numObservers) throws Exception {
qjmhaCluster = new MiniQJMHACluster.Builder(conf) qjmhaCluster = new MiniQJMHACluster.Builder(conf)
.setNumNameNodes(2 + numObservers) .setNumNameNodes(2 + numObservers)