YARN-10210. Add a RMFailoverProxyProvider that does DNS resolution on failover.
This commit is contained in:
parent
3d5ade1839
commit
50f7f6dfd1
@ -0,0 +1,272 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link DefaultNoHARMFailoverProxyProvider} and
|
||||
* {@link AutoRefreshNoHARMFailoverProxyProvider}.
|
||||
*/
|
||||
public class TestNoHaRMFailoverProxyProvider {
|
||||
|
||||
// Default port of yarn RM
|
||||
private static final int RM1_PORT = 8032;
|
||||
private static final int RM2_PORT = 8031;
|
||||
|
||||
private static final int NUMNODEMANAGERS = 1;
|
||||
private Configuration conf;
|
||||
|
||||
private class TestProxy extends Proxy implements Closeable {
|
||||
protected TestProxy(InvocationHandler h) {
|
||||
super(h);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, YarnException {
|
||||
conf = new YarnConfiguration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the proxy generated by {@link DefaultNoHAFailoverProxyProvider}
|
||||
* will connect to RM.
|
||||
*/
|
||||
@Test
|
||||
public void testRestartedRM() throws Exception {
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testRestartedRMNegative", NUMNODEMANAGERS, 1, 1);
|
||||
YarnClient rmClient = YarnClient.createYarnClient();
|
||||
try {
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
List <NodeReport> nodeReports = rmClient.getNodeReports();
|
||||
assertEquals(
|
||||
"The proxy didn't get expected number of node reports",
|
||||
NUMNODEMANAGERS, nodeReports.size());
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the proxy generated by
|
||||
* {@link AutoRefreshNoHARMFailoverProxyProvider} will connect to RM.
|
||||
*/
|
||||
@Test
|
||||
public void testConnectingToRM() throws Exception {
|
||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
|
||||
AutoRefreshNoHARMFailoverProxyProvider.class,
|
||||
RMFailoverProxyProvider.class);
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testRestartedRMNegative", NUMNODEMANAGERS, 1, 1);
|
||||
YarnClient rmClient = null;
|
||||
try {
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
List <NodeReport> nodeReports = rmClient.getNodeReports();
|
||||
assertEquals(
|
||||
"The proxy didn't get expected number of node reports",
|
||||
NUMNODEMANAGERS, nodeReports.size());
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the {@link DefaultNoHARMFailoverProxyProvider}
|
||||
* will generate different proxies after RM IP changed
|
||||
* and {@link DefaultNoHARMFailoverProxyProvider#performFailover(Object)}
|
||||
* get called.
|
||||
*/
|
||||
@Test
|
||||
public void testDefaultFPPGetOneProxy() throws Exception {
|
||||
// Create a proxy and mock a RMProxy
|
||||
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
|
||||
Class protocol = ApplicationClientProtocol.class;
|
||||
RMProxy mockRMProxy = mock(RMProxy.class);
|
||||
DefaultNoHARMFailoverProxyProvider <RMProxy> fpp =
|
||||
new DefaultNoHARMFailoverProxyProvider<RMProxy>();
|
||||
|
||||
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
|
||||
|
||||
// Mock RMProxy methods
|
||||
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd1);
|
||||
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
|
||||
|
||||
// Initialize failover proxy provider and get proxy from it.
|
||||
fpp.init(conf, mockRMProxy, protocol);
|
||||
FailoverProxyProvider.ProxyInfo<RMProxy> actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// Invoke fpp.getProxy() multiple times and
|
||||
// validate the returned proxy is always mockProxy1
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() is invoked once only.
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy1.proxy);
|
||||
FailoverProxyProvider.ProxyInfo<RMProxy> actualProxy2 = fpp.getProxy();
|
||||
assertEquals("AutoRefreshRMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy1, actualProxy2.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() didn't get invoked again after
|
||||
// performFailover()
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the {@link AutoRefreshNoHARMFailoverProxyProvider}
|
||||
* will generate different proxies after RM IP changed
|
||||
* and {@link AutoRefreshNoHARMFailoverProxyProvider#performFailover(Object)}
|
||||
* get called.
|
||||
*/
|
||||
@Test
|
||||
public void testAutoRefreshIPChange() throws Exception {
|
||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
|
||||
AutoRefreshNoHARMFailoverProxyProvider.class,
|
||||
RMFailoverProxyProvider.class);
|
||||
|
||||
// Create two proxies and mock a RMProxy
|
||||
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
|
||||
Proxy mockProxy2 = new TestProxy((proxy, method, args) -> null);
|
||||
Class protocol = ApplicationClientProtocol.class;
|
||||
RMProxy mockRMProxy = mock(RMProxy.class);
|
||||
AutoRefreshNoHARMFailoverProxyProvider<RMProxy> fpp =
|
||||
new AutoRefreshNoHARMFailoverProxyProvider<RMProxy>();
|
||||
|
||||
// generate two address with different ports.
|
||||
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
|
||||
InetSocketAddress mockAdd2 = new InetSocketAddress(RM2_PORT);
|
||||
|
||||
// Mock RMProxy methods
|
||||
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd1);
|
||||
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
|
||||
|
||||
// Initialize proxy provider and get proxy from it.
|
||||
fpp.init(conf, mockRMProxy, protocol);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// Invoke fpp.getProxy() multiple times and
|
||||
// validate the returned proxy is always mockProxy1
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() is invoked once only.
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
|
||||
// Mock RMProxy methods to generate different proxy
|
||||
// based on different IP address.
|
||||
when(mockRMProxy.getRMAddress(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd2);
|
||||
when(mockRMProxy.getProxy(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd2))).thenReturn(mockProxy2);
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy1.proxy);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy2 = fpp.getProxy();
|
||||
assertEquals("AutoRefreshNoHARMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy2, actualProxy2.proxy);
|
||||
|
||||
// check the proxy is different with the one we created before.
|
||||
assertNotEquals("AutoRefreshNoHARMFailoverProxyProvider " +
|
||||
"shouldn't generate same proxy after failover",
|
||||
actualProxy1.proxy, actualProxy2.proxy);
|
||||
}
|
||||
}
|
@ -0,0 +1,307 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link ConfiguredRMFailoverProxyProvider} and
|
||||
* {@link AutoRefreshRMFailoverProxyProvider}.
|
||||
*/
|
||||
public class TestRMFailoverProxyProvider {
|
||||
|
||||
// Default port of yarn RM
|
||||
private static final int RM1_PORT = 8032;
|
||||
private static final int RM2_PORT = 8031;
|
||||
private static final int RM3_PORT = 8033;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private class TestProxy extends Proxy implements Closeable {
|
||||
protected TestProxy(InvocationHandler h) {
|
||||
super(h);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException, YarnException {
|
||||
conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
|
||||
ConfiguredRMFailoverProxyProvider.class, RMFailoverProxyProvider.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the {@link ConfiguredRMFailoverProxyProvider}
|
||||
* will loop through its set of proxies when
|
||||
* and {@link ConfiguredRMFailoverProxyProvider#performFailover(Object)}
|
||||
* gets called.
|
||||
*/
|
||||
@Test
|
||||
public void testFailoverChange() throws Exception {
|
||||
//Adjusting the YARN Conf
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm0, rm1");
|
||||
|
||||
// Create two proxies and mock a RMProxy
|
||||
Proxy mockProxy2 = new TestProxy((proxy, method, args) -> null);
|
||||
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
|
||||
|
||||
Class protocol = ApplicationClientProtocol.class;
|
||||
RMProxy mockRMProxy = mock(RMProxy.class);
|
||||
ConfiguredRMFailoverProxyProvider<RMProxy> fpp =
|
||||
new ConfiguredRMFailoverProxyProvider<RMProxy>();
|
||||
|
||||
// generate two address with different ports.
|
||||
// Default port of yarn RM
|
||||
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
|
||||
InetSocketAddress mockAdd2 = new InetSocketAddress(RM2_PORT);
|
||||
|
||||
// Mock RMProxy methods
|
||||
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd1);
|
||||
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
|
||||
|
||||
// Initialize failover proxy provider and get proxy from it.
|
||||
fpp.init(conf, mockRMProxy, protocol);
|
||||
FailoverProxyProvider.ProxyInfo<RMProxy> actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"ConfiguredRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// Invoke fpp.getProxy() multiple times and
|
||||
// validate the returned proxy is always mockProxy1
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"ConfiguredRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"ConfiguredRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() is invoked once only.
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
|
||||
// Mock RMProxy methods to generate different proxy
|
||||
// based on different IP address.
|
||||
when(mockRMProxy.getRMAddress(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd2);
|
||||
when(mockRMProxy.getProxy(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd2))).thenReturn(mockProxy2);
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy1.proxy);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy2 = fpp.getProxy();
|
||||
assertEquals("ConfiguredRMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy2, actualProxy2.proxy);
|
||||
|
||||
// check the proxy is different with the one we created before.
|
||||
assertNotEquals("ConfiguredRMFailoverProxyProvider " +
|
||||
"shouldn't generate same proxy after failover",
|
||||
actualProxy1.proxy, actualProxy2.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() has been one with each address
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd2));
|
||||
|
||||
// Mock RMProxy methods to generate the first proxy again
|
||||
when(mockRMProxy.getRMAddress(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd1);
|
||||
when(mockRMProxy.getProxy(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy2.proxy);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy3 = fpp.getProxy();
|
||||
|
||||
// check the proxy is the same as the one we created before.
|
||||
assertEquals("ConfiguredRMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy1, actualProxy3.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() has still only been invoked twice
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the {@link AutoRefreshRMFailoverProxyProvider}
|
||||
* will loop through its set of proxies when
|
||||
* and {@link AutoRefreshRMFailoverProxyProvider#performFailover(Object)}
|
||||
* gets called.
|
||||
*/
|
||||
@Test
|
||||
public void testAutoRefreshFailoverChange() throws Exception {
|
||||
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,
|
||||
AutoRefreshRMFailoverProxyProvider.class,
|
||||
RMFailoverProxyProvider.class);
|
||||
|
||||
//Adjusting the YARN Conf
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, "rm0, rm1");
|
||||
|
||||
// Create three proxies and mock a RMProxy
|
||||
Proxy mockProxy1 = new TestProxy((proxy, method, args) -> null);
|
||||
Proxy mockProxy2 = new TestProxy((proxy, method, args) -> null);
|
||||
Proxy mockProxy3 = new TestProxy((proxy, method, args) -> null);
|
||||
Class protocol = ApplicationClientProtocol.class;
|
||||
RMProxy mockRMProxy = mock(RMProxy.class);
|
||||
AutoRefreshRMFailoverProxyProvider<RMProxy> fpp =
|
||||
new AutoRefreshRMFailoverProxyProvider<RMProxy>();
|
||||
|
||||
// generate three address with different ports.
|
||||
InetSocketAddress mockAdd1 = new InetSocketAddress(RM1_PORT);
|
||||
InetSocketAddress mockAdd2 = new InetSocketAddress(RM2_PORT);
|
||||
InetSocketAddress mockAdd3 = new InetSocketAddress(RM3_PORT);
|
||||
|
||||
|
||||
// Mock RMProxy methods
|
||||
when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd1);
|
||||
when(mockRMProxy.getProxy(any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd1))).thenReturn(mockProxy1);
|
||||
|
||||
// Initialize failover proxy provider and get proxy from it.
|
||||
fpp.init(conf, mockRMProxy, protocol);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// Invoke fpp.getProxy() multiple times and
|
||||
// validate the returned proxy is always mockProxy1
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
actualProxy1 = fpp.getProxy();
|
||||
assertEquals(
|
||||
"AutoRefreshRMFailoverProxyProvider doesn't generate " +
|
||||
"expected proxy",
|
||||
mockProxy1, actualProxy1.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() is invoked once only.
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
|
||||
// Mock RMProxy methods to generate different proxy
|
||||
// based on different IP address.
|
||||
when(mockRMProxy.getRMAddress(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd2);
|
||||
when(mockRMProxy.getProxy(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd2))).thenReturn(mockProxy2);
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy1.proxy);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy2 = fpp.getProxy();
|
||||
assertEquals("AutoRefreshRMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy2, actualProxy2.proxy);
|
||||
|
||||
// check the proxy is different with the one we created before.
|
||||
assertNotEquals("AutoRefreshRMFailoverProxyProvider " +
|
||||
"shouldn't generate same proxy after failover",
|
||||
actualProxy1.proxy, actualProxy2.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() has been one with each address
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd2));
|
||||
|
||||
// Mock RMProxy methods to generate a different address
|
||||
when(mockRMProxy.getRMAddress(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class))).thenReturn(mockAdd3);
|
||||
when(mockRMProxy.getProxy(
|
||||
any(YarnConfiguration.class),
|
||||
any(Class.class), eq(mockAdd3))).thenReturn(mockProxy1);
|
||||
|
||||
// Perform Failover and get proxy again from failover proxy provider
|
||||
fpp.performFailover(actualProxy2.proxy);
|
||||
FailoverProxyProvider.ProxyInfo <RMProxy> actualProxy3 = fpp.getProxy();
|
||||
|
||||
// check the proxy is the same as the one we created before.
|
||||
assertEquals("ConfiguredRMFailoverProxyProvider " +
|
||||
"doesn't generate expected proxy after failover",
|
||||
mockProxy1, actualProxy3.proxy);
|
||||
|
||||
// verify that mockRMProxy.getProxy() is still only been invoked thrice
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd1));
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd2));
|
||||
verify(mockRMProxy, times(1))
|
||||
.getProxy(any(YarnConfiguration.class), any(Class.class),
|
||||
eq(mockAdd3));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,83 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* A subclass of {@link RMFailoverProxyProvider} which tries to
|
||||
* resolve the proxy DNS in the event of failover.
|
||||
* This provider doesn't support HA or Federation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class AutoRefreshNoHARMFailoverProxyProvider<T>
|
||||
extends DefaultNoHARMFailoverProxyProvider<T> {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AutoRefreshNoHARMFailoverProxyProvider.class);
|
||||
|
||||
protected RMProxy<T> rmProxy;
|
||||
protected YarnConfiguration conf;
|
||||
|
||||
@Override
|
||||
public void init(Configuration configuration, RMProxy<T> rmProxy,
|
||||
Class<T> protocol) {
|
||||
this.rmProxy = rmProxy;
|
||||
this.protocol = protocol;
|
||||
this.conf = new YarnConfiguration(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ProxyInfo<T> getProxy() {
|
||||
if (proxy == null) {
|
||||
proxy = getProxyInternal();
|
||||
}
|
||||
return new ProxyInfo<T>(proxy, null);
|
||||
}
|
||||
|
||||
protected T getProxyInternal() {
|
||||
try {
|
||||
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||
return rmProxy.getProxy(conf, protocol, rmAddress);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to create proxy to the ResourceManager",
|
||||
ioe.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the current proxy when performFailover.
|
||||
* @param currentProxy
|
||||
*/
|
||||
@Override
|
||||
public synchronized void performFailover(T currentProxy) {
|
||||
RPC.stopProxy(proxy);
|
||||
proxy = null;
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
||||
/**
|
||||
* A subclass of {@link RMFailoverProxyProvider} which tries to
|
||||
* resolve the proxy DNS in the event of failover.
|
||||
* This provider supports YARN Resourcemanager's HA mode.
|
||||
* This provider doesn't support Federation.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class AutoRefreshRMFailoverProxyProvider<T>
|
||||
extends ConfiguredRMFailoverProxyProvider<T> {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AutoRefreshRMFailoverProxyProvider.class);
|
||||
|
||||
@Override
|
||||
public synchronized void performFailover(T currentProxy) {
|
||||
RPC.stopProxy(currentProxy);
|
||||
|
||||
//clears out all keys that map to currentProxy
|
||||
Set<String> rmIds = new HashSet<>();
|
||||
for (Entry<String, T> entry : proxies.entrySet()) {
|
||||
T proxy = entry.getValue();
|
||||
if (proxy.equals(currentProxy)) {
|
||||
String rmId = entry.getKey();
|
||||
rmIds.add(rmId);
|
||||
}
|
||||
}
|
||||
for (String rmId : rmIds) {
|
||||
proxies.remove(rmId);
|
||||
}
|
||||
|
||||
super.performFailover(currentProxy);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user