From 7eacf5f8f1a85cbf351f781a826a56cdf3162a34 Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Mon, 17 Sep 2018 18:25:27 -0700 Subject: [PATCH] HDFS-13778. [SBN read] TestStateAlignmentContextWithHA should use real ObserverReadProxyProvider instead of AlignmentContextProxyProvider. Contributed by Konstantin Shvachko and Plamen Jeliazkov. Also fix a number of java7 incompatibilities from previous SbN read commits. --- .../qjournal/client/QuorumJournalManager.java | 2 +- .../server/namenode/GlobalStateIdContext.java | 8 +- .../hdfs/TestStateAlignmentContextWithHA.java | 189 ++++++------------ .../client/TestQuorumJournalManager.java | 3 +- .../server/namenode/ha/TestObserverNode.java | 25 ++- .../ha/TestObserverReadProxyProvider.java | 2 +- .../ha/TestStandbyInProgressTail.java | 4 +- 7 files changed, 82 insertions(+), 151 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index df0b15680f3..99e5ffbd4d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -568,7 +568,7 @@ private void selectRpcInputStreams(Collection streams, LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions starting from " + fromTxnId); PriorityQueue allStreams = new PriorityQueue<>( - JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + responseMap.size(), JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); for (GetJournaledEditsResponseProto resp : responseMap.values()) { long endTxnId = fromTxnId - 1 + Math.min(maxAllowedTxns, resp.getTxnCount()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index ecb9fd36247..14423c065be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.HashSet; import org.apache.hadoop.classification.InterfaceAudience; @@ -50,9 +51,10 @@ class GlobalStateIdContext implements AlignmentContext { // For now, only ClientProtocol methods can be coordinated, so only checking // against ClientProtocol. for (Method method : ClientProtocol.class.getDeclaredMethods()) { - if (method.isAnnotationPresent(ReadOnly.class) && - method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { - coordinatedMethods.add(method.getName()); + for (Annotation anno : method.getAnnotations()) { + if ((anno instanceof ReadOnly) && ((ReadOnly) anno).isCoordinated()) { + coordinatedMethods.add(method.getName()); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index ba8a039fa9b..0596f3e1af1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -18,28 +18,24 @@ package org.apache.hadoop.hdfs; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; -import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -64,55 +60,31 @@ * to the most recent alignment state of the server. */ public class TestStateAlignmentContextWithHA { + public static final Logger LOG = + LoggerFactory.getLogger(TestStateAlignmentContextWithHA.class.getName()); private static final int NUMDATANODES = 1; private static final int NUMCLIENTS = 10; - private static final int NUMFILES = 300; + private static final int NUMFILES = 120; private static final Configuration CONF = new HdfsConfiguration(); - private static final String NAMESERVICE = "nameservice"; private static final List AC_LIST = new ArrayList<>(); private static MiniDFSCluster cluster; private static List clients; - private static ClientGSIContext spy; private DistributedFileSystem dfs; private int active = 0; private int standby = 1; - static class AlignmentContextProxyProvider - extends ConfiguredFailoverProxyProvider { + static class ORPPwithAlignmentContexts + extends ObserverReadProxyProvider { - private ClientGSIContext alignmentContext; - - public AlignmentContextProxyProvider( + public ORPPwithAlignmentContexts( Configuration conf, URI uri, Class xface, HAProxyFactory factory) throws IOException { super(conf, uri, xface, factory); - // Create and set AlignmentContext in HAProxyFactory. - // All proxies by factory will now have AlignmentContext assigned. - this.alignmentContext = (spy != null ? spy : new ClientGSIContext()); - ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); - - AC_LIST.add(alignmentContext); - } - } - - static class SpyConfiguredContextProxyProvider - extends ConfiguredFailoverProxyProvider { - - private ClientGSIContext alignmentContext; - - public SpyConfiguredContextProxyProvider( - Configuration conf, URI uri, Class xface, - HAProxyFactory factory) throws IOException { - super(conf, uri, xface, factory); - - // Create but DON'T set in HAProxyFactory. - this.alignmentContext = (spy != null ? spy : new ClientGSIContext()); - - AC_LIST.add(alignmentContext); + AC_LIST.add((ClientGSIContext) getAlignmentContext()); } } @@ -124,23 +96,21 @@ public static void startUpCluster() throws IOException { CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); CONF.setBoolean("fs.hdfs.impl.disable.cache", true); - MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE); - nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1")); - nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2")); - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) - .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf)) + .nnTopology(MiniDFSNNTopology.simpleHATopology(3)) .build(); cluster.waitActive(); cluster.transitionToActive(0); + cluster.transitionToObserver(2); + + String nameservice = HATestUtil.getLogicalHostname(cluster); + HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0); + CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + nameservice, ORPPwithAlignmentContexts.class.getName()); } @Before public void before() throws IOException, URISyntaxException { - killWorkers(); - HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0); - CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + - "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName()); dfs = (DistributedFileSystem) FileSystem.get(CONF); } @@ -154,6 +124,7 @@ public static void shutDownCluster() throws IOException { @After public void after() throws IOException { + killWorkers(); cluster.transitionToStandby(1); cluster.transitionToActive(0); active = 0; @@ -163,26 +134,6 @@ public void after() throws IOException { dfs = null; } AC_LIST.clear(); - spy = null; - } - - /** - * This test checks if after a client writes we can see the state id in - * updated via the response. - */ - @Test - public void testNoStateOnConfiguredProxyProvider() throws Exception { - Configuration confCopy = new Configuration(CONF); - confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + - "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName()); - - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(confCopy)) { - ClientGSIContext clientState = getContext(1); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); - DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state"); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); - } } /** @@ -236,51 +187,6 @@ public void testStateTransferOnFreshClient() throws Exception { } } - /** - * This test mocks an AlignmentContext and ensures that DFSClient - * writes its lastSeenStateId into RPC requests. - */ - @Test - public void testClientSendsState() throws Exception { - ClientGSIContext alignmentContext = new ClientGSIContext(); - ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext); - spy = spiedAlignContext; - - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(CONF)) { - - // Collect RpcRequestHeaders for verification later. - final List headers = - new ArrayList<>(); - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock a) throws Throwable { - Object[] arguments = a.getArguments(); - RpcHeaderProtos.RpcRequestHeaderProto.Builder header = - (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; - headers.add(header); - return a.callRealMethod(); - } - }).when(spiedAlignContext).updateRequestState(Mockito.any()); - - DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv"); - - // Ensure first header and last header have different state. - assertThat(headers.size() > 1, is(true)); - assertThat(headers.get(0).getStateId(), - is(not(headers.get(headers.size() - 1)))); - - // Ensure collected RpcRequestHeaders are in increasing order. - long lastHeader = headers.get(0).getStateId(); - for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header : - headers.subList(1, headers.size())) { - long currentHeader = header.getStateId(); - assertThat(currentHeader >= lastHeader, is(true)); - lastHeader = header.getStateId(); - } - } - } - /** * This test checks if after a client writes we can see the state id in * updated via the response. @@ -316,14 +222,22 @@ public void testStateTransferOnWriteWithFailover() throws Exception { @Test(timeout=300000) public void testMultiClientStatesWithRandomFailovers() throws Exception { - // We want threads to run during failovers; assuming at minimum 4 cores, - // would like to see 2 clients competing against 2 NameNodes. + // First run, half the load, with one failover. + runClientsWithFailover(1, NUMCLIENTS/2, NUMFILES/2); + // Second half, with fail back. + runClientsWithFailover(NUMCLIENTS/2 + 1, NUMCLIENTS, NUMFILES/2); + } + + private void runClientsWithFailover(int clientStartId, + int numClients, + int numFiles) + throws Exception { ExecutorService execService = Executors.newFixedThreadPool(2); - clients = new ArrayList<>(NUMCLIENTS); - for (int i = 1; i <= NUMCLIENTS; i++) { + clients = new ArrayList<>(numClients); + for (int i = clientStartId; i <= numClients; i++) { DistributedFileSystem haClient = (DistributedFileSystem) FileSystem.get(CONF); - clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i)); + clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i)); } // Execute workers in threadpool with random failovers. @@ -331,15 +245,18 @@ public void testMultiClientStatesWithRandomFailovers() throws Exception { execService.shutdown(); boolean finished = false; + failOver(); + while (!finished) { - failOver(); - finished = execService.awaitTermination(1L, TimeUnit.SECONDS); + finished = execService.awaitTermination(20L, TimeUnit.SECONDS); } // Validation. for (Future future : futures) { assertThat(future.get(), is(STATE.SUCCESS)); } + + clients.clear(); } private ClientGSIContext getContext(int clientCreationIndex) { @@ -347,7 +264,9 @@ private ClientGSIContext getContext(int clientCreationIndex) { } private void failOver() throws IOException { + LOG.info("Transitioning Active to Standby"); cluster.transitionToStandby(active); + LOG.info("Transitioning Standby to Active"); cluster.transitionToActive(standby); int tempActive = active; active = standby; @@ -394,30 +313,36 @@ private class Worker implements Callable { @Override public STATE call() { + int i = -1; try { - for (int i = 0; i < filesToMake; i++) { - long preClientStateFO = - getContext(nonce).getLastSeenStateId(); + for (i = 0; i < filesToMake; i++) { + ClientGSIContext gsiContext = getContext(nonce); + long preClientStateFO = gsiContext.getLastSeenStateId(); // Write using HA client. - Path path = new Path(filePath + nonce + i); + Path path = new Path(filePath + nonce + "_" + i); DFSTestUtil.writeFile(client, path, "erk"); - long postClientStateFO = - getContext(nonce).getLastSeenStateId(); + long postClientStateFO = gsiContext.getLastSeenStateId(); // Write(s) should have increased state. Check for greater than. - if (postClientStateFO <= preClientStateFO) { - System.out.println("FAIL: Worker started with: " + - preClientStateFO + ", but finished with: " + postClientStateFO); + if (postClientStateFO < 0 || postClientStateFO <= preClientStateFO) { + LOG.error("FAIL: Worker started with: {} , but finished with: {}", + preClientStateFO, postClientStateFO); return STATE.FAIL; } + + if(i % (NUMFILES/10) == 0) { + LOG.info("Worker {} created {} files", nonce, i); + LOG.info("LastSeenStateId = {}", postClientStateFO); + } } - client.close(); return STATE.SUCCESS; - } catch (IOException e) { - System.out.println("ERROR: Worker failed with: " + e); + } catch (Exception e) { + LOG.error("ERROR: Worker failed with: ", e); return STATE.ERROR; + } finally { + LOG.info("Worker {} created {} files", nonce, i); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index c88ec40edb7..6abcb32b80b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -1032,7 +1032,8 @@ public void testSelectViaRpcTwoDeadJNs() throws Exception { cluster.getJournalNode(1).stopAndJoin(0); try { - qjm.selectInputStreams(new ArrayList<>(), 1, true, false); + qjm.selectInputStreams(new ArrayList(), 1, + true, false); fail(""); } catch (QuorumException qe) { GenericTestUtils.assertExceptionContains( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 6f8375b8e6f..de10b006140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -368,19 +368,22 @@ public void testUncoordinatedCall() throws Exception { // a status flag, initialized to 0, after reader finished, this will be // updated to 1, -1 on error - AtomicInteger readStatus = new AtomicInteger(0); + final AtomicInteger readStatus = new AtomicInteger(0); // create a separate thread to make a blocking read. - Thread reader = new Thread(() -> { - try { - // this read call will block until server state catches up. But due to - // configuration, this will take a very long time. - dfs.getClient().getFileInfo("/"); - readStatus.set(1); - fail("Should have been interrupted before getting here."); - } catch (IOException e) { - e.printStackTrace(); - readStatus.set(-1); + Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } } }); reader.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 4d5bc13d60e..c04f138a259 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -72,7 +72,7 @@ private void setupProxyProvider(int namenodeCount) throws Exception { namenodeAddrs = new String[namenodeCount]; namenodeAnswers = new ClientProtocolAnswer[namenodeCount]; ClientProtocol[] proxies = new ClientProtocol[namenodeCount]; - Map proxyMap = new HashMap<>(); + final Map proxyMap = new HashMap<>(); for (int i = 0; i < namenodeCount; i++) { namenodeIDs[i] = "nn" + i; namenodeAddrs[i] = "namenode" + i + ".test:8020"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java index 75447497cdb..0f4ab5b3458 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java @@ -441,9 +441,9 @@ private static void mkdirs(NameNode nameNode, String... dirNames) * Wait up to 1 second until the given NameNode is aware of the existing of * all of the provided fileNames. */ - private static void waitForFileInfo(NameNode standbyNN, String... fileNames) + private static void waitForFileInfo(final NameNode standbyNN, String... fileNames) throws Exception { - List remainingFiles = Lists.newArrayList(fileNames); + final List remainingFiles = Lists.newArrayList(fileNames); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() {