From 43baa036aeb025bcbed1aca19837b072f2c14a6a Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 13 Jun 2018 09:50:10 -0700 Subject: [PATCH] HDDS-109. Add reconnect logic for XceiverClientGrpc. Contributed by Lokesh Jain. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 23 +++++++++++++++ .../ozone/scm/TestXceiverClientManager.java | 6 ++-- .../hadoop/ozone/web/client/TestKeys.java | 29 +++++++++++++++---- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index c78702486fb..92df46e492f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -55,6 +55,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { private XceiverClientMetrics metrics; private ManagedChannel channel; private final Semaphore semaphore; + private boolean closed = false; /** * Constructs a client that can communicate with the Container framework on @@ -105,6 +106,7 @@ public class XceiverClientGrpc extends XceiverClientSpi { @Override public void close() { + closed = true; channel.shutdownNow(); try { channel.awaitTermination(60, TimeUnit.MINUTES); @@ -153,6 +155,14 @@ public class XceiverClientGrpc extends XceiverClientSpi { public CompletableFuture sendCommandAsync(ContainerCommandRequestProto request) throws IOException, ExecutionException, InterruptedException { + if(closed){ + throw new IOException("This channel is not connected."); + } + + if(channel == null || !isConnected()) { + reconnect(); + } + final CompletableFuture replyFuture = new CompletableFuture<>(); semaphore.acquire(); @@ -192,6 +202,19 @@ public class XceiverClientGrpc extends XceiverClientSpi { return replyFuture; } + private void reconnect() throws IOException { + try { + connect(); + } catch (Exception e) { + LOG.error("Error while connecting: ", e); + throw new IOException(e); + } + + if (channel == null || !isConnected()) { + throw new IOException("This channel is not connected."); + } + } + /** * Create a pipeline. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java index 478cf690c7b..56f3c7a0fa0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -163,8 +163,7 @@ public class TestXceiverClientManager { // and any container operations should fail clientManager.releaseClient(client1); - String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : - "This channel is not connected."; + String expectedMessage = "This channel is not connected."; try { ContainerProtocolCalls.createContainer(client1, container1.getContainerID(), traceID1); @@ -213,8 +212,7 @@ public class TestXceiverClientManager { // Any container operation should now fail String traceID2 = "trace" + RandomStringUtils.randomNumeric(4); - String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : - "This channel is not connected."; + String expectedMessage = "This channel is not connected."; try { ContainerProtocolCalls.createContainer(client1, container1.getContainerID(), traceID2); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 28a138ec527..b86c57721c8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -62,12 +63,14 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.FileInputStream; @@ -77,6 +80,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -93,6 +97,7 @@ import static org.junit.Assert.fail; /** * Test Ozone Key Lifecycle. */ +@RunWith(Parameterized.class) public class TestKeys { /** * Set the timeout for every test. @@ -107,19 +112,31 @@ public class TestKeys { private static long currentTime; private static ReplicationFactor replicationFactor = ReplicationFactor.ONE; private static ReplicationType replicationType = ReplicationType.STAND_ALONE; + private static boolean shouldUseGrpc; + + @Parameterized.Parameters + public static Collection withGrpc() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + + public TestKeys(boolean useGrpc) { + shouldUseGrpc = useGrpc; + } /** * Create a MiniDFSCluster for testing. * * @throws IOException */ - @BeforeClass - public static void init() throws Exception { + @Before + public void init() throws Exception { conf = new OzoneConfiguration(); // Set short block deleting service interval to speed up deletions. conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1000, TimeUnit.MILLISECONDS); + conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, + shouldUseGrpc); path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName()); Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG); @@ -133,8 +150,8 @@ public class TestKeys { /** * shutdown MiniDFSCluster. */ - @AfterClass - public static void shutdown() { + @After + public void shutdown() { if (ozoneCluster != null) { ozoneCluster.shutdown(); }