+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.exceptions;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by
+ * {@link org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB} when
+ * a read request is received by a non leader OM node.
+ */
+public class NotLeaderException extends IOException {
+
+ private final String currentPeerId;
+ private final String leaderPeerId;
+
+ public NotLeaderException(String currentPeerIdStr) {
+ super("OM " + currentPeerIdStr + " is not the leader. Could not " +
+ "determine the leader node.");
+ this.currentPeerId = currentPeerIdStr;
+ this.leaderPeerId = null;
+ }
+
+ public NotLeaderException(String currentPeerIdStr,
+ String suggestedLeaderPeerIdStr) {
+ super("OM " + currentPeerIdStr + " is not the leader. Suggested leader is "
+ + suggestedLeaderPeerIdStr);
+ this.currentPeerId = currentPeerIdStr;
+ this.leaderPeerId = suggestedLeaderPeerIdStr;
+ }
+
+ public String getSuggestedLeaderNodeId() {
+ return leaderPeerId;
+ }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5c1b39fc0a2..b4a48573685 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -226,8 +226,14 @@ public class OMFailoverProxyProvider implements
* not match the current leaderOMNodeId cached by the proxy provider.
*/
public void performFailoverIfRequired(String newLeaderOMNodeId) {
- if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
- LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+ if (newLeaderOMNodeId == null) {
+ LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
+ " node");
+ performFailover(null);
+ } else {
+ if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
+ LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+ }
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
similarity index 77%
rename from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index 8e4582d6607..bc64d6c5a1f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -15,7 +15,7 @@
* the License.
*/
-package org.apache.hadoop.ozone.om.ratis;
+package org.apache.hadoop.ozone.om.helpers;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
@@ -25,8 +25,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
@@ -54,14 +52,15 @@ public final class OMRatisHelper {
/**
* Creates a new RaftClient object.
- * @param rpcType Replication Type
- * @param omId OM id of the client
- * @param group RaftGroup
+ *
+ * @param rpcType Replication Type
+ * @param omId OM id of the client
+ * @param group RaftGroup
* @param retryPolicy Retry policy
* @return RaftClient object
*/
- static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
- group, RetryPolicy retryPolicy, Configuration conf) {
+ public static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
+ group, RetryPolicy retryPolicy, Configuration conf) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
@@ -85,36 +84,27 @@ public final class OMRatisHelper {
return RaftPeerId.valueOf(omId);
}
- static ByteString convertRequestToByteString(OMRequest request) {
+ public static ByteString convertRequestToByteString(OMRequest request) {
byte[] requestBytes = request.toByteArray();
return ByteString.copyFrom(requestBytes);
}
- static OMRequest convertByteStringToOMRequest(ByteString byteString)
+ public static OMRequest convertByteStringToOMRequest(ByteString byteString)
throws InvalidProtocolBufferException {
byte[] bytes = byteString.toByteArray();
return OMRequest.parseFrom(bytes);
}
- static Message convertResponseToMessage(OMResponse response) {
+ public static Message convertResponseToMessage(OMResponse response) {
byte[] requestBytes = response.toByteArray();
return Message.valueOf(ByteString.copyFrom(requestBytes));
}
- static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
- throws InvalidProtocolBufferException {
+ public static OMResponse getOMResponseFromRaftClientReply(
+ RaftClientReply reply) throws InvalidProtocolBufferException {
byte[] bytes = reply.getMessage().getContent().toByteArray();
return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
.setLeaderOMNodeId(reply.getReplierId())
.build();
}
-
- static OMResponse getErrorResponse(Type cmdType, Exception e) {
- return OMResponse.newBuilder()
- .setCmdType(cmdType)
- .setSuccess(false)
- .setMessage(e.getMessage())
- .setStatus(Status.INTERNAL_ERROR)
- .build();
- }
-}
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 63a656c9f96..c06efdc2c11 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
@@ -195,29 +196,49 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
private OzoneManagerProtocolPB createRetryProxy(
OMFailoverProxyProvider failoverProxyProvider,
int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
+
RetryPolicy retryPolicyOnNetworkException = RetryPolicies
.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
maxFailovers, maxRetries, delayMillis, maxDelayBase);
+
RetryPolicy retryPolicy = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception exception, int retries,
int failovers, boolean isIdempotentOrAtMostOnce)
throws Exception {
- if (exception instanceof EOFException ||
- exception instanceof ServiceException) {
- if (retries < maxRetries && failovers < maxFailovers) {
- return RetryAction.FAILOVER_AND_RETRY;
+
+ if (exception instanceof ServiceException) {
+ Throwable cause = exception.getCause();
+ if (cause instanceof NotLeaderException) {
+ NotLeaderException notLeaderException = (NotLeaderException) cause;
+ omFailoverProxyProvider.performFailoverIfRequired(
+ notLeaderException.getSuggestedLeaderNodeId());
+ return getRetryAction(RetryAction.RETRY, retries, failovers);
} else {
- FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
- "Attempted {} retries and {} failovers", retries, failovers);
- return RetryAction.FAIL;
+ return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
+ failovers);
}
+ } else if (exception instanceof EOFException) {
+ return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
+ failovers);
} else {
return retryPolicyOnNetworkException.shouldRetry(
- exception, retries, failovers, isIdempotentOrAtMostOnce);
+ exception, retries, failovers, isIdempotentOrAtMostOnce);
+ }
+ }
+
+ private RetryAction getRetryAction(RetryAction fallbackAction,
+ int retries, int failovers) {
+ if (retries < maxRetries && failovers < maxFailovers) {
+ return fallbackAction;
+ } else {
+ FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
+ "Attempted {} retries and {} failovers", retries, failovers);
+ return RetryAction.FAIL;
}
}
};
+
OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
return proxy;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index da8f8706501..86a83b78c8d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -52,8 +52,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -75,7 +73,7 @@ public class TestOzoneManagerHA {
public ExpectedException exception = ExpectedException.none();
@Rule
- public Timeout timeout = new Timeout(120_000);
+ public Timeout timeout = new Timeout(300_000);
/**
* Create a MiniDFSCluster for testing.
@@ -93,7 +91,6 @@ public class TestOzoneManagerHA {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
- conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
@@ -313,4 +310,41 @@ public class TestOzoneManagerHA {
"3 retries and 3 failovers"));
}
}
+
+ @Test
+ public void testReadRequest() throws Exception {
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ objectStore.createVolume(volumeName);
+
+ OMFailoverProxyProvider omFailoverProxyProvider =
+ objectStore.getClientProxy().getOMProxyProvider();
+ String currentLeaderNodeId = omFailoverProxyProvider
+ .getCurrentProxyOMNodeId();
+
+ // A read request from any proxy should failover to the current leader OM
+ for (int i = 0; i < numOfOMs; i++) {
+ // Failover OMFailoverProxyProvider to OM at index i
+ OzoneManager ozoneManager = cluster.getOzoneManager(i);
+ String omHostName = ozoneManager.getOmRpcServerAddr().getHostName();
+ int rpcPort = ozoneManager.getOmRpcServerAddr().getPort();
+
+ // Get the ObjectStore and FailoverProxyProvider for OM at index i
+ final ObjectStore store = OzoneClientFactory.getRpcClient(
+ omHostName, rpcPort, conf).getObjectStore();
+ final OMFailoverProxyProvider proxyProvider =
+ store.getClientProxy().getOMProxyProvider();
+
+ // Failover to the OM node that the objectStore points to
+ omFailoverProxyProvider.performFailoverIfRequired(
+ ozoneManager.getOMNodId());
+
+ // A read request should result in the proxyProvider failing over to
+ // leader node.
+ OzoneVolume volume = store.getVolume(volumeName);
+ Assert.assertEquals(volumeName, volume.getName());
+
+ Assert.assertEquals(currentLeaderNodeId,
+ proxyProvider.getCurrentProxyOMNodeId());
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index fc4ad01801f..326b12c0837 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1236,8 +1236,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
ProtobufRpcEngine.class);
BlockingService omService = newReflectiveBlockingService(
- new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
- isRatisEnabled));
+ new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer,
+ omRatisClient, isRatisEnabled));
return startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 1b4c6347d9b..c9c48a4422c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index e03293a8f14..a3cde3e9d52 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -27,8 +27,13 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
@@ -41,6 +46,11 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@@ -50,6 +60,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -69,7 +80,22 @@ public final class OzoneManagerRatisServer {
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
+
private final OzoneManagerProtocol ozoneManager;
+ private final ClientId clientId = ClientId.randomId();
+
+ private final ScheduledExecutorService scheduledRoleChecker;
+ private long roleCheckInitialDelayMs = 1000; // 1 second default
+ private long roleCheckIntervalMs;
+ private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+ private Optional