JCLOUDS-528: Ability to control interfaces to test for SocketFinder

This commit is contained in:
ashley-taylor 2014-04-09 14:17:38 +12:00 committed by Ignasi Barrera
parent 1667c86998
commit 5cc4659bce
3 changed files with 81 additions and 15 deletions

View File

@ -108,4 +108,9 @@ public interface ComputeServiceProperties {
*/
public static final String OS_VERSION_MAP_JSON = "jclouds.compute.os-version-map-json";
/**
* can be set to either PRIVATE, PUBLIC, ALL (default) controls what interfaces will be used when scanning for an open connection
*/
public static final String SOCKET_FINDER_ALLOWED_INTERFACES = "jclouds.compute.socket-finder-allowed-interfaces";
}

View File

@ -19,13 +19,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.or;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.size;
import static com.google.common.util.concurrent.Atomics.newReference;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.lang.String.format;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import static org.jclouds.compute.config.ComputeServiceProperties.TIMEOUT_NODE_RUNNING;
import static org.jclouds.compute.config.ComputeServiceProperties.SOCKET_FINDER_ALLOWED_INTERFACES;
import static org.jclouds.util.Predicates2.retry;
import java.util.NoSuchElementException;
@ -64,6 +63,10 @@ public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private final ListeningExecutorService userExecutor;
@Inject(optional = true)
@Named(SOCKET_FINDER_ALLOWED_INTERFACES)
private AllowedInterfaces allowedInterfaces = AllowedInterfaces.ALL;
@Inject
@VisibleForTesting
ConcurrentOpenSocketFinder(SocketOpen socketTester,
@ -76,7 +79,7 @@ public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
@Override
public HostAndPort findOpenSocketOnNode(NodeMetadata node, final int port, long timeout, TimeUnit timeUnits) {
ImmutableSet<HostAndPort> sockets = checkNodeHasIps(node).transform(new Function<String, HostAndPort>() {
ImmutableSet<HostAndPort> sockets = checkNodeHasIps(node, allowedInterfaces).transform(new Function<String, HostAndPort>() {
@Override
public HostAndPort apply(String from) {
@ -175,10 +178,18 @@ public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
};
}
private static FluentIterable<String> checkNodeHasIps(NodeMetadata node) {
FluentIterable<String> ips = FluentIterable.from(concat(node.getPublicAddresses(), node.getPrivateAddresses()));
checkState(size(ips) > 0, "node does not have IP addresses configured: " + node);
return ips;
@VisibleForTesting
static FluentIterable<String> checkNodeHasIps(NodeMetadata node, AllowedInterfaces allowedInterfaces) {
ImmutableSet.Builder<String> ipsBuilder = ImmutableSet.builder();
if (allowedInterfaces.scanPublic) {
ipsBuilder.addAll(node.getPublicAddresses());
}
if (allowedInterfaces.scanPrivate) {
ipsBuilder.addAll(node.getPrivateAddresses());
}
ImmutableSet<String> ips = ipsBuilder.build();
checkState(!ips.isEmpty(), "node does not have IP addresses configured: %s", node);
return FluentIterable.from(ips);
}
private static void blockOn(Iterable<ListenableFuture<?>> immutableList) {
@ -191,4 +202,21 @@ public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
throw propagate(e);
}
}
@VisibleForTesting
enum AllowedInterfaces {
ALL(true, true),
PUBLIC(true, false),
PRIVATE(false, true);
private final boolean scanPublic;
private final boolean scanPrivate;
private AllowedInterfaces(boolean scanPublic, boolean scanPrivate) {
this.scanPublic = scanPublic;
this.scanPrivate = scanPrivate;
}
}
}

View File

@ -26,6 +26,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.jclouds.compute.domain.NodeMetadata.Status.RUNNING;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;
import java.util.Map;
@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.util.ConcurrentOpenSocketFinder.AllowedInterfaces;
import org.jclouds.predicates.SocketOpen;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@ -43,6 +45,7 @@ import org.testng.annotations.Test;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
@ -57,10 +60,13 @@ public class ConcurrentOpenSocketFinderTest {
private static final long SLOW_GRACE = 700;
private static final long EARLY_GRACE = 10;
private static final String PUBLIC_IP = "1.2.3.4";
private static final String PRIVATE_IP = "1.2.3.5";
private final NodeMetadata node = new NodeMetadataBuilder().id("myid")
.status(RUNNING)
.publicAddresses(ImmutableSet.of("1.2.3.4"))
.privateAddresses(ImmutableSet.of("1.2.3.5")).build();
.publicAddresses(ImmutableSet.of(PUBLIC_IP))
.privateAddresses(ImmutableSet.of(PRIVATE_IP)).build();
private final SocketOpen socketAlwaysClosed = new SocketOpen() {
@Override
@ -68,7 +74,12 @@ public class ConcurrentOpenSocketFinderTest {
return false;
}
};
private final SocketOpen socketAlwaysOpen = new SocketOpen() {
@Override
public boolean apply(HostAndPort input) {
return true;
}
};
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning = alwaysTrue();
private final Predicate<AtomicReference<NodeMetadata>> nodeNotRunning = alwaysFalse();
@ -110,27 +121,27 @@ public class ConcurrentOpenSocketFinderTest {
SocketOpen secondSocketOpen = new SocketOpen() {
@Override
public boolean apply(HostAndPort input) {
return HostAndPort.fromParts("1.2.3.5", 22).equals(input);
return HostAndPort.fromParts(PRIVATE_IP, 22).equals(input);
}
};
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(secondSocketOpen, nodeRunning, userExecutor);
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, MILLISECONDS);
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
assertEquals(result, HostAndPort.fromParts(PRIVATE_IP, 22));
}
@Test
public void testChecksSocketsConcurrently() throws Exception {
ControllableSocketOpen socketTester = new ControllableSocketOpen(ImmutableMap.of(
HostAndPort.fromParts("1.2.3.4", 22), new SlowCallable<Boolean>(true, 1500),
HostAndPort.fromParts("1.2.3.5", 22), new SlowCallable<Boolean>(true, 1000)));
HostAndPort.fromParts(PUBLIC_IP, 22), new SlowCallable<Boolean>(true, 1500),
HostAndPort.fromParts(PRIVATE_IP, 22), new SlowCallable<Boolean>(true, 1000)));
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, nodeRunning, userExecutor);
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, MILLISECONDS);
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
assertEquals(result, HostAndPort.fromParts(PRIVATE_IP, 22));
}
@Test
@ -164,6 +175,28 @@ public class ConcurrentOpenSocketFinderTest {
}
}
@Test
public void testSocketFinderAllowedInterfacesAll() throws Exception {
FluentIterable<String> ips = ConcurrentOpenSocketFinder.checkNodeHasIps(node, AllowedInterfaces.ALL);
assertTrue(ips.contains(PUBLIC_IP));
assertTrue(ips.contains(PRIVATE_IP));
}
@Test
public void testSocketFinderAllowedInterfacesPrivate() throws Exception {
FluentIterable<String> ips = ConcurrentOpenSocketFinder.checkNodeHasIps(node, AllowedInterfaces.PRIVATE);
assertFalse(ips.contains(PUBLIC_IP));
assertTrue(ips.contains(PRIVATE_IP));
}
@Test
public void testSocketFinderAllowedInterfacesPublic() throws Exception {
FluentIterable<String> ips = ConcurrentOpenSocketFinder.checkNodeHasIps(node, AllowedInterfaces.PUBLIC);
assertTrue(ips.contains(PUBLIC_IP));
assertFalse(ips.contains(PRIVATE_IP));
}
private static class SlowCallable<T> implements Callable<T> {
private final T result;
private final long delay;