HDFS-13924. [SBN read] Handle BlockMissingException when reading from observer. Contributed by Chao Sun.

This commit is contained in:
Chao Sun 2018-10-23 22:36:23 -07:00 committed by Chen Liang
parent 37dfd294e2
commit 40d5cc15b2
5 changed files with 128 additions and 1 deletions

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
/**
* Thrown by a remote ObserverNode indicating the operation has failed and the
* client should retry active namenode directly (instead of retry other
* ObserverNodes).
*/
@InterfaceStability.Evolving
public class ObserverRetryOnActiveException extends IOException {
static final long serialVersionUID = 1L;
public ObserverRetryOnActiveException(String msg) {
super(msg);
}
}

View File

@ -38,7 +38,9 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -263,6 +265,16 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
throw ite.getCause();
}
Exception e = (Exception) ite.getCause();
if (e instanceof RemoteException) {
RemoteException re = (RemoteException) e;
Exception unwrapped = re.unwrapRemoteException(
ObserverRetryOnActiveException.class);
if (unwrapped instanceof ObserverRetryOnActiveException) {
LOG.info("Encountered ObserverRetryOnActiveException from {}." +
" Retry active namenode directly.", current.proxyInfo);
break;
}
}
RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
method.isAnnotationPresent(Idempotent.class)
|| method.isAnnotationPresent(AtMostOnce.class));

View File

@ -273,6 +273,7 @@ import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
import org.apache.hadoop.ipc.Server;
@ -1855,6 +1856,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
}
} else if (haEnabled && haContext != null &&
haContext.getState().getServiceState() == OBSERVER) {
for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
if (b.getLocations() == null || b.getLocations().length == 0) {
throw new ObserverRetryOnActiveException("Zero blocklocations for "
+ srcArg);
}
}
}
} catch (AccessControlException e) {
logAuditEvent(false, operationName, srcArg);

View File

@ -274,7 +274,7 @@ public class TestObserverNode {
*/
@Test
public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Create a new file - the request should go to active.
dfs.create(testPath, (short)1).close();
assertSentTo(0);
@ -309,12 +309,51 @@ public class TestObserverNode {
dfs.open(testPath).close();
assertSentTo(0);
Mockito.reset(bmSpy);
// Remove safe mode on observer, request should still go to it.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
dfs.open(testPath).close();
assertSentTo(2);
}
@Test
public void testObserverNodeBlockMissingRetry() throws Exception {
setObserverRead(true);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
// Mock block manager for observer to generate some fake blocks which
// will trigger the block missing exception.
BlockManager bmSpy = NameNodeAdapter
.spyOnBlockManager(dfsCluster.getNameNode(2));
final DatanodeInfo[] empty = {};
Answer ans = new Answer() {
@Override
public LocatedBlocks answer(InvocationOnMock invocationOnMock) throws Throwable {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Remove the datanode info for the only block so it will throw
// BlockMissingException and retry.
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, empty);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(0, false, fakeBlocks, null, true, null);
}
};
Mockito.doAnswer(ans).when(bmSpy).createLocatedBlocks((BlockInfoContiguous[])any(),
anyLong(), anyBoolean(), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), (FileEncryptionInfo)any());
dfs.open(testPath);
assertSentTo(0);
Mockito.reset(bmSpy);
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
@ -271,6 +272,26 @@ public class TestObserverReadProxyProvider {
assertHandledBy(1);
}
@Test
public void testObserverRetriableException() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
namenodeAnswers[1].setObserverState();
namenodeAnswers[2].setObserverState();
// Set the first observer to throw "ObserverRetryOnActiveException" so that
// the request should skip the second observer and be served by the active.
namenodeAnswers[1].setRetryActive(true);
doRead();
assertHandledBy(0);
namenodeAnswers[1].setRetryActive(false);
doRead();
assertHandledBy(1);
}
private void doRead() throws Exception {
doRead(proxyProvider.getProxy().proxy);
}
@ -302,6 +323,8 @@ public class TestObserverReadProxyProvider {
private static class NameNodeAnswer {
private volatile boolean unreachable = false;
private volatile boolean retryActive = false;
// Standby state by default
private volatile boolean allowWrites = false;
private volatile boolean allowReads = false;
@ -332,6 +355,12 @@ public class TestObserverReadProxyProvider {
if (unreachable) {
throw new IOException("Unavailable");
}
if (retryActive) {
throw new RemoteException(
ObserverRetryOnActiveException.class.getCanonicalName(),
"Try active!"
);
}
switch (invocationOnMock.getMethod().getName()) {
case "reportBadBlocks":
if (!allowWrites) {
@ -371,6 +400,9 @@ public class TestObserverReadProxyProvider {
allowWrites = false;
}
void setRetryActive(boolean shouldRetryActive) {
retryActive = shouldRetryActive;
}
}
}