diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index ca319f62dac..f44acfdce9a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -268,4 +268,7 @@ public final class OzoneConsts { Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER); public static final Metadata.Key USER_METADATA_KEY = Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER); + + // Default OMServiceID for OM Ratis servers to use as RaftGroupId + public static final String OM_SERVICE_ID_DEFAULT = "om-service-value"; } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 9c655e51fb3..819062c2d33 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -454,6 +454,48 @@ and DataNode. + + ozone.om.service.ids + + OM, HA + + Comma-separated list of OM service Ids. + + If not set, the default value of "om-service-value" is assigned as the + OM service ID. + + + + ozone.om.nodes.EXAMPLEOMSERVICEID + + OM, HA + + Comma-separated list of OM node Ids for a given OM service ID (eg. + EXAMPLEOMSERVICEID). The OM service ID should be the value (one of the + values if there are multiple) set for the parameter ozone.om.service.ids. + + Unique identifiers for each OM Node, delimited by commas. This will be + used by OzoneManagers in HA setup to determine all the OzoneManagers + belonging to the same OMservice in the cluster. For example, if you + used “omService1” as the OM service ID previously, and you wanted to + use “om1”, “om2” and "om3" as the individual IDs of the OzoneManagers, + you would configure a property ozone.om.nodes.omService1, and its value + "om1,om2,om3". + + + + ozone.om.node.id + + OM, HA + + The ID of this OM node. If the OM node ID is not configured it + is determined automatically by matching the local node's address + with the configured address. + + If node ID is not deterministic from the configuration, then it is set + to the OmId from the OM version file. + + ozone.om.address @@ -1452,15 +1494,6 @@ - - ozone.om.ratis.random.port - false - OZONE, OM, RATIS, DEBUG - Allocates a random free port for OM's Ratis server. This is - used only while running unit tests. - - - ozone.om.ratis.rpc.type GRPC @@ -1543,28 +1576,39 @@ The timeout duration for OM Ratis client request. - - ozone.om.ratis.client.request.max.retries - 180 - OZONE, OM, RATIS, MANAGEMENT - Number of retries for OM client request. - - - ozone.om.ratis.client.request.retry.interval - 100ms - OZONE, OM, RATIS, MANAGEMENT - Interval between successive retries for a OM client request. - - + + ozone.om.ratis.client.request.max.retries + 180 + OZONE, OM, RATIS, MANAGEMENT + Number of retries for OM client request. + + + ozone.om.ratis.client.request.retry.interval + 100ms + OZONE, OM, RATIS, MANAGEMENT + Interval between successive retries for a OM client request. + + + + + ozone.om.leader.election.minimum.timeout.duration + 1s + OZONE, OM, RATIS, MANAGEMENT + The minimum timeout duration for OM ratis leader election. + Default is 1s. + + + + + ozone.om.ratis.server.failure.timeout.duration + 120s + OZONE, OM, RATIS, MANAGEMENT + The timeout duration for ratis server failure detection, + once the threshold has reached, the ratis state machine will be informed + about the failure in the ratis ring. + + - - ozone.om.leader.election.minimum.timeout.duration - 1s - OZONE, OM, RATIS, MANAGEMENT - The minimum timeout duration for OM ratis leader election. - Default is 1s. - - ozone.acl.authorizer.class diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index ef0e88c1ec3..82b9f6afad7 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -17,12 +17,15 @@ package org.apache.hadoop.ozone; +import com.google.common.base.Joiner; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; import org.apache.commons.lang3.RandomStringUtils; @@ -38,7 +41,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,4 +196,64 @@ public final class OmUtils { "This could possibly indicate a faulty JRE"); } } + + /** + * Add non empty and non null suffix to a key. + */ + private static String addSuffix(String key, String suffix) { + if (suffix == null || suffix.isEmpty()) { + return key; + } + assert !suffix.startsWith(".") : + "suffix '" + suffix + "' should not already have '.' prepended."; + return key + "." + suffix; + } + + /** + * Concatenate list of suffix strings '.' separated. + */ + private static String concatSuffixes(String... suffixes) { + if (suffixes == null) { + return null; + } + return Joiner.on(".").skipNulls().join(suffixes); + } + + /** + * Return configuration key of format key.suffix1.suffix2...suffixN. + */ + public static String addKeySuffixes(String key, String... suffixes) { + String keySuffix = concatSuffixes(suffixes); + return addSuffix(key, keySuffix); + } + + /** + * Match input address to local address. + * Return true if it matches, false otherwsie. + */ + public static boolean isAddressLocal(InetSocketAddress addr) { + return NetUtils.isLocalAddress(addr.getAddress()); + } + + /** + * Get a collection of all omNodeIds for the given omServiceId. + */ + public static Collection getOMNodeIds(Configuration conf, + String omServiceId) { + String key = addSuffix(OZONE_OM_NODES_KEY, omServiceId); + return conf.getTrimmedStringCollection(key); + } + + /** + * @return coll if it is non-null and non-empty. Otherwise, + * returns a list with a single null value. + */ + public static Collection emptyAsSingletonNull(Collection + coll) { + if (coll == null || coll.isEmpty()) { + return Collections.singletonList(null); + } else { + return coll; + } + } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java new file mode 100644 index 00000000000..e732dc22f32 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Indicates that a method has been passed illegal or invalid argument. This + * exception is thrown instead of IllegalArgumentException to differentiate the + * exception thrown in Hadoop implementation from the one thrown in JDK. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class OzoneIllegalArgumentException extends IllegalArgumentException { + private static final long serialVersionUID = 1L; + + /** + * Constructs exception with the specified detail message. + * @param message detailed message. + */ + public OzoneIllegalArgumentException(final String message) { + super(message); + } +} \ No newline at end of file diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 867a3e160f9..ba6211c8664 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -40,6 +40,13 @@ public final class OMConfigKeys { "ozone.om.handler.count.key"; public static final int OZONE_OM_HANDLER_COUNT_DEFAULT = 20; + public static final String OZONE_OM_SERVICE_IDS_KEY = + "ozone.om.service.ids"; + public static final String OZONE_OM_NODES_KEY = + "ozone.om.nodes"; + public static final String OZONE_OM_NODE_ID_KEY = + "ozone.om.node.id"; + public static final String OZONE_OM_ADDRESS_KEY = "ozone.om.address"; public static final String OZONE_OM_BIND_HOST_DEFAULT = @@ -101,11 +108,6 @@ public final class OMConfigKeys { = "ozone.om.ratis.port"; public static final int OZONE_OM_RATIS_PORT_DEFAULT = 9872; - // When set to true, allocate a random free port for ozone ratis server - public static final String OZONE_OM_RATIS_RANDOM_PORT_KEY = - "ozone.om.ratis.random.port"; - public static final boolean OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT - = false; public static final String OZONE_OM_RATIS_RPC_TYPE_KEY = "ozone.om.ratis.rpc.type"; public static final String OZONE_OM_RATIS_RPC_TYPE_DEFAULT @@ -175,6 +177,11 @@ public final class OMConfigKeys { public static final TimeDuration OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = TimeDuration.valueOf(1, TimeUnit.SECONDS); + public static final String OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY + = "ozone.om.ratis.server.failure.timeout.duration"; + public static final TimeDuration + OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + = TimeDuration.valueOf(120, TimeUnit.SECONDS); public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om." + "kerberos.keytab.file"; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index b348b50c4d6..7b65c4689cd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -51,6 +51,17 @@ public interface MiniOzoneCluster { return new MiniOzoneClusterImpl.Builder(conf); } + /** + * Returns the Builder to construct MiniOzoneHACluster. + * + * @param conf OzoneConfiguration + * + * @return MiniOzoneCluster builder + */ + static Builder newHABuilder(OzoneConfiguration conf) { + return new MiniOzoneHAClusterImpl.Builder(conf); + } + /** * Returns the configuration object associated with the MiniOzoneCluster. * @@ -223,6 +234,8 @@ public interface MiniOzoneCluster { protected final String path; protected String clusterId; + protected String omServiceId; + protected int numOfOMs; protected Optional enableTrace = Optional.of(false); protected Optional hbInterval = Optional.empty(); @@ -416,6 +429,16 @@ public interface MiniOzoneCluster { return this; } + public Builder setNumOfOzoneManagers(int numOMs) { + this.numOfOMs = numOMs; + return this; + } + + public Builder setOMServiceId(String serviceId) { + this.omServiceId = serviceId; + return this; + } + /** * Constructs and returns MiniOzoneCluster. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index ab322a2227c..cc48116a5c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -83,14 +83,14 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys * StorageContainerManager and multiple DataNodes. */ @InterfaceAudience.Private -public final class MiniOzoneClusterImpl implements MiniOzoneCluster { +public class MiniOzoneClusterImpl implements MiniOzoneCluster { private static final Logger LOG = LoggerFactory.getLogger(MiniOzoneClusterImpl.class); private final OzoneConfiguration conf; private StorageContainerManager scm; - private final OzoneManager ozoneManager; + private OzoneManager ozoneManager; private final List hddsDatanodes; // Timeout for the cluster to be ready @@ -101,7 +101,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { * * @throws IOException if there is an I/O error */ - private MiniOzoneClusterImpl(OzoneConfiguration conf, + MiniOzoneClusterImpl(OzoneConfiguration conf, OzoneManager ozoneManager, StorageContainerManager scm, List hddsDatanodes) { @@ -111,6 +111,20 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { this.hddsDatanodes = hddsDatanodes; } + /** + * Creates a new MiniOzoneCluster without the OzoneManager. This is used by + * {@link MiniOzoneHAClusterImpl} for starting multiple OzoneManagers. + * @param conf + * @param scm + * @param hddsDatanodes + */ + MiniOzoneClusterImpl(OzoneConfiguration conf, StorageContainerManager scm, + List hddsDatanodes) { + this.conf = conf; + this.scm = scm; + this.hddsDatanodes = hddsDatanodes; + } + public OzoneConfiguration getConf() { return conf; } @@ -394,11 +408,11 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { } /** - * Initializes the configureation required for starting MiniOzoneCluster. + * Initializes the configuration required for starting MiniOzoneCluster. * * @throws IOException */ - private void initializeConfiguration() throws IOException { + void initializeConfiguration() throws IOException { conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, ozoneEnabled); Path metaDir = Paths.get(path, "ozone-meta"); Files.createDirectories(metaDir); @@ -434,7 +448,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { * * @throws IOException */ - private StorageContainerManager createSCM() + StorageContainerManager createSCM() throws IOException, AuthenticationException { configureSCM(); SCMStorageConfig scmStore = new SCMStorageConfig(conf); @@ -455,7 +469,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { scmStore.initialize(); } - private void initializeOmStorage(OMStorage omStorage) throws IOException{ + void initializeOmStorage(OMStorage omStorage) throws IOException{ if (omStorage.getState() == StorageState.INITIALIZED) { return; } @@ -487,7 +501,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { * * @throws IOException */ - private List createHddsDatanodes( + List createHddsDatanodes( StorageContainerManager scm) throws IOException { configureHddsDatanodes(); String scmAddress = scm.getDatanodeRpcAddress().getHostString() + diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java new file mode 100644 index 00000000000..a1ef1f6424a --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMStorage; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster + * with OM HA suitable for running tests. The cluster consists of a set of + * OzoneManagers, StorageContainerManager and multiple DataNodes. + */ +public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class); + + private List ozoneManagers; + + private static final Random RANDOM = new Random(); + private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds + + public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds + + /** + * Creates a new MiniOzoneCluster with OM HA. + * + * @throws IOException if there is an I/O error + */ + + private MiniOzoneHAClusterImpl( + OzoneConfiguration conf, + List omList, + StorageContainerManager scm, + List hddsDatanodes) { + super(conf, scm, hddsDatanodes); + this.ozoneManagers = omList; + } + + /** + * Returns the first OzoneManager from the list. + * @return + */ + @Override + public OzoneManager getOzoneManager() { + return this.ozoneManagers.get(0); + } + + public OzoneManager getOzoneManager(int index) { + return this.ozoneManagers.get(index); + } + + @Override + public void restartOzoneManager() throws IOException { + for (OzoneManager ozoneManager : ozoneManagers) { + ozoneManager.stop(); + ozoneManager.restart(); + } + } + + @Override + public void stop() { + for (OzoneManager ozoneManager : ozoneManagers) { + if (ozoneManager != null) { + LOG.info("Stopping the OzoneManager " + ozoneManager.getOMNodId()); + ozoneManager.stop(); + ozoneManager.join(); + } + } + super.stop(); + } + + public void stopOzoneManager(int index) { + ozoneManagers.get(index).stop(); + } + + /** + * Builder for configuring the MiniOzoneCluster to run. + */ + public static class Builder extends MiniOzoneClusterImpl.Builder { + + private final String nodeIdBaseStr = "omNode-"; + + /** + * Creates a new Builder. + * + * @param conf configuration + */ + public Builder(OzoneConfiguration conf) { + super(conf); + } + + @Override + public MiniOzoneCluster build() throws IOException { + DefaultMetricsSystem.setMiniClusterMode(true); + initializeConfiguration(); + StorageContainerManager scm; + List omList; + try { + scm = createSCM(); + scm.start(); + omList = createOMService(); + } catch (AuthenticationException ex) { + throw new IOException("Unable to build MiniOzoneCluster. ", ex); + } + + final List hddsDatanodes = createHddsDatanodes(scm); + MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList, + scm, hddsDatanodes); + if (startDataNodes) { + cluster.startHddsDatanodes(); + } + return cluster; + } + + /** + * Initialize OM configurations. + * @throws IOException + */ + @Override + void initializeConfiguration() throws IOException { + super.initializeConfiguration(); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + RATIS_LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY, + NODE_FAILURE_TIMEOUT, TimeUnit.MILLISECONDS); + conf.setInt(OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, + 10); + } + + /** + * Start OM service with multiple OMs. + * @return list of OzoneManagers + * @throws IOException + * @throws AuthenticationException + */ + private List createOMService() throws IOException, + AuthenticationException { + + List omList = new ArrayList<>(numOfOMs); + + int retryCount = 0; + int basePort = 10000; + + while (true) { + try { + basePort = 10000 + RANDOM.nextInt(1000) * 4; + initHAConfig(basePort); + + for (int i = 1; i<= numOfOMs; i++) { + // Set nodeId + conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i); + + // Set metadata/DB dir base path + String metaDirPath = path + "/" + nodeIdBaseStr + i; + conf.set(OZONE_METADATA_DIRS, metaDirPath); + OMStorage omStore = new OMStorage(conf); + initializeOmStorage(omStore); + + // Set HTTP address to the rpc port + 2 + int httpPort = basePort + (6*i) - 4; + conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, + "127.0.0.1:" + httpPort); + + OzoneManager om = OzoneManager.createOm(null, conf); + om.setCertClient(certClient); + omList.add(om); + + om.start(); + LOG.info("Started OzoneManager RPC server at " + + om.getOmRpcServerAddr()); + } + + // Set default OM address to point to the first OM. Clients would + // try connecting to this address by default + conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, + NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr())); + + break; + } catch (BindException e) { + for (OzoneManager om : omList) { + om.stop(); + om.join(); + LOG.info("Stopping OzoneManager server at " + + om.getOmRpcServerAddr()); + } + omList.clear(); + ++retryCount; + LOG.info("MiniOzoneHACluster port conflicts, retried " + + retryCount + " times"); + } + } + return omList; + } + + /** + * Initialize HA related configurations. + */ + private void initHAConfig(int basePort) throws IOException { + // Set configurations required for starting OM HA service + conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId); + String omNodesKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId); + StringBuilder omNodesKeyValue = new StringBuilder(); + + int port = basePort; + + for (int i = 1; i <= numOfOMs; i++, port+=6) { + String omNodeId = nodeIdBaseStr + i; + omNodesKeyValue.append(",").append(omNodeId); + String omAddrKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId); + String omRatisPortKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId); + + conf.set(omAddrKey, "127.0.0.1:" + port); + // Reserve port+2 for OMs HTTP server + conf.setInt(omRatisPortKey, port + 4); + } + + conf.set(omNodesKey, omNodesKeyValue.substring(1)); + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 0839292ac80..30f074925c7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -38,6 +38,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { errorIfMissingConfigProps = true; errorIfMissingXmlProps = true; xmlPropsToSkipCompare.add("hadoop.tags.custom"); + xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID"); addPropertiesNotInXml(); } @@ -45,5 +46,6 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_KEY_ALGORITHM); configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_SECURITY_PROVIDER); configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT); + configurationPropsToSkipCompare.add(OMConfigKeys.OZONE_OM_NODES_KEY); } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java index 140f91c3179..eb4421abbe2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java @@ -1381,7 +1381,7 @@ public class TestOzoneManager { * set to true. */ @Test - public void testRatsiServerOnOmInitialization() throws IOException { + public void testRatisServerOnOMInitialization() throws IOException { // OM Ratis server should not be started when OZONE_OM_RATIS_ENABLE_KEY // is not set to true Assert.assertNull("OM Ratis server started though OM Ratis is disabled.", diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java new file mode 100644 index 00000000000..4c422e6b76f --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneIllegalArgumentException; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.LifeCycle; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Tests OM related configurations. + */ +public class TestOzoneManagerConfiguration { + + private OzoneConfiguration conf; + private MiniOzoneCluster cluster; + private String omId; + private String clusterId; + private String scmId; + private OzoneManager om; + private OzoneManagerRatisServer omRatisServer; + + private static final long LEADER_ELECTION_TIMEOUT = 500L; + + @Before + public void init() throws IOException { + conf = new OzoneConfiguration(); + omId = UUID.randomUUID().toString(); + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + final String path = GenericTestUtils.getTempPath(omId); + Path metaDirPath = Paths.get(path, "om-meta"); + conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString()); + conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0"); + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + + OMStorage omStore = new OMStorage(conf); + omStore.setClusterId("testClusterId"); + omStore.setScmId("testScmId"); + // writes the version file properties + omStore.initialize(); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private void startCluster() throws Exception { + cluster = MiniOzoneCluster.newBuilder(conf) + .setClusterId(clusterId) + .setScmId(scmId) + .setOmId(omId) + .build(); + cluster.waitForClusterToBeReady(); + } + + /** + * Test a single node OM service (default setting for MiniOzoneCluster). + * @throws Exception + */ + @Test + public void testSingleNodeOMservice() throws Exception { + // Default settings of MiniOzoneCluster start a sinle node OM service. + startCluster(); + om = cluster.getOzoneManager(); + omRatisServer = om.getOmRatisServer(); + + Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState()); + // OM's Ratis server should have only 1 peer (itself) in its RaftGroup + Collection peers = omRatisServer.getRaftGroup().getPeers(); + Assert.assertEquals(1, peers.size()); + + // The RaftPeer id should match the configured omId + RaftPeer raftPeer = peers.toArray(new RaftPeer[1])[0]; + Assert.assertEquals(omId, raftPeer.getId().toString()); + } + + /** + * Test configurating an OM service with three OM nodes. + * @throws Exception + */ + @Test + public void testThreeNodeOMservice() throws Exception { + // Set the configuration for 3 node OM service. Set one node's rpc + // address to localhost. OM will parse all configurations and find the + // nodeId representing the localhost + + final String omServiceId = "om-service-test1"; + final String omNode1Id = "omNode1"; + final String omNode2Id = "omNode2"; + final String omNode3Id = "omNode3"; + + String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id; + String omNodesKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId); + + String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id); + String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id); + String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id); + + String omNode3RatisPortKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNode3Id); + + conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId); + conf.set(omNodesKey, omNodesKeyValue); + + // Set node2 to localhost and the other two nodes to dummy addresses + conf.set(omNode1RpcAddrKey, "123.0.0.123:9862"); + conf.set(omNode2RpcAddrKey, "0.0.0.0:9862"); + conf.set(omNode3RpcAddrKey, "124.0.0.124:9862"); + + conf.setInt(omNode3RatisPortKey, 9898); + + startCluster(); + om = cluster.getOzoneManager(); + omRatisServer = om.getOmRatisServer(); + + Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState()); + + // OM's Ratis server should have 3 peers in its RaftGroup + Collection peers = omRatisServer.getRaftGroup().getPeers(); + Assert.assertEquals(3, peers.size()); + + // Ratis server RaftPeerId should match with omNode2 ID as node2 is the + // localhost + Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString()); + + // Verify peer details + for (RaftPeer peer : peers) { + String expectedPeerAddress = null; + switch (peer.getId().toString()) { + case omNode1Id : + // Ratis port is not set for node1. So it should take the default port + expectedPeerAddress = "123.0.0.123:" + + OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT; + break; + case omNode2Id : + expectedPeerAddress = "0.0.0.0:"+ + OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT; + break; + case omNode3Id : + // Ratis port is not set for node3. So it should take the default port + expectedPeerAddress = "124.0.0.124:9898"; + break; + default : Assert.fail("Unrecognized RaftPeerId"); + } + Assert.assertEquals(expectedPeerAddress, peer.getAddress()); + } + } + + /** + * Test a wrong configuration for OM HA. A configuration with none of the + * OM addresses matching the local address should throw an error. + * @throws Exception + */ + @Test + public void testWrongConfiguration() throws Exception { + String omServiceId = "om-service-test1"; + + String omNode1Id = "omNode1"; + String omNode2Id = "omNode2"; + String omNode3Id = "omNode3"; + String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id; + String omNodesKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId); + + String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id); + String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id); + String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id); + + conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId); + conf.set(omNodesKey, omNodesKeyValue); + + // Set node2 to localhost and the other two nodes to dummy addresses + conf.set(omNode1RpcAddrKey, "123.0.0.123:9862"); + conf.set(omNode2RpcAddrKey, "125.0.0.2:9862"); + conf.set(omNode3RpcAddrKey, "124.0.0.124:9862"); + + try { + startCluster(); + Assert.fail("Wrong Configuration. OM initialization should have failed."); + } catch (OzoneIllegalArgumentException e) { + GenericTestUtils.assertExceptionContains("Configuration has no " + + OMConfigKeys.OZONE_OM_ADDRESS_KEY + " address that matches local " + + "node's address.", e); + } + } + + /** + * Test multiple OM service configuration. + */ + @Test + public void testMultipleOMServiceIds() throws Exception { + // Set up OZONE_OM_SERVICES_KEY with 2 service Ids. + String om1ServiceId = "om-service-test1"; + String om2ServiceId = "om-service-test2"; + String omServices = om1ServiceId + "," + om2ServiceId; + conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServices); + + String omNode1Id = "omNode1"; + String omNode2Id = "omNode2"; + String omNode3Id = "omNode3"; + String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id; + + // Set the node Ids for the 2 services. The nodeIds need to be + // distinch within one service. The ids can overlap between + // different services. + String om1NodesKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_NODES_KEY, om1ServiceId); + String om2NodesKey = OmUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_NODES_KEY, om2ServiceId); + conf.set(om1NodesKey, omNodesKeyValue); + conf.set(om2NodesKey, omNodesKeyValue); + + // Set the RPC addresses for all 6 OMs (3 for each service). Only one + // node out of these must have the localhost address. + conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode1Id), + "122.0.0.123:9862"); + conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode2Id), + "123.0.0.124:9862"); + conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode3Id), + "124.0.0.125:9862"); + conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode1Id), + "125.0.0.126:9862"); + conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode2Id), + "0.0.0.0:9862"); + conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode3Id), + "126.0.0.127:9862"); + + startCluster(); + om = cluster.getOzoneManager(); + omRatisServer = om.getOmRatisServer(); + + Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState()); + + // OM's Ratis server should have 3 peers in its RaftGroup + Collection peers = omRatisServer.getRaftGroup().getPeers(); + Assert.assertEquals(3, peers.size()); + + // Verify that the serviceId and nodeId match the node with the localhost + // address - om-service-test2 and omNode2 + Assert.assertEquals(om2ServiceId, om.getOMServiceId()); + Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString()); + } + + private String getOMAddrKeyWithSuffix(String serviceId, String nodeId) { + return OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY, + serviceId, nodeId); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java new file mode 100644 index 00000000000..be932a29321 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; +import org.apache.hadoop.ozone.*; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.handlers.VolumeArgs; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.response.VolumeInfo; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.junit.*; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.*; + +import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl + .NODE_FAILURE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; + +/** + * Test Ozone Manager operation in distributed handler scenario. + */ +public class TestOzoneManagerHA { + + private MiniOzoneHAClusterImpl cluster = null; + private StorageHandler storageHandler; + private UserArgs userArgs; + private OzoneConfiguration conf; + private String clusterId; + private String scmId; + private int numOfOMs = 3; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Rule + public Timeout timeout = new Timeout(60_000); + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + conf.setBoolean(OZONE_ACL_ENABLED, true); + conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2); + + cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) + .setClusterId(clusterId) + .setScmId(scmId) + .setOMServiceId("om-service-test1") + .setNumOfOzoneManagers(numOfOMs) + .build(); + cluster.waitForClusterToBeReady(); + storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); + userArgs = new UserArgs(null, OzoneUtils.getRequestID(), + null, null, null, null); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test a client request when all OM nodes are running. The request should + * succeed. + * @throws Exception + */ + @Test + public void testAllOMNodesRunning() throws Exception { + testCreateVolume(true); + } + + /** + * Test client request succeeds even if one OM is down. + */ + @Test + public void testOneOMNodeDown() throws Exception { + cluster.stopOzoneManager(1); + Thread.sleep(NODE_FAILURE_TIMEOUT * 2); + + testCreateVolume(true); + } + + /** + * Test client request fails when 2 OMs are down. + */ + @Test + public void testTwoOMNodesDown() throws Exception { + cluster.stopOzoneManager(1); + cluster.stopOzoneManager(2); + Thread.sleep(NODE_FAILURE_TIMEOUT * 2); + + testCreateVolume(false); + } + + /** + * Create a volume and test its attribute. + */ + private void testCreateVolume(boolean checkSuccess) throws Exception { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + + storageHandler.createVolume(createVolumeArgs); + + VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs); + VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs); + + if (checkSuccess) { + Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName)); + Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName)); + } else { + // Verify that the request failed + Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty()); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java new file mode 100644 index 00000000000..caa76741845 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.net.NetUtils; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * This class stores OM node details. + */ +public final class OMNodeDetails { + private String omServiceId; + private String omNodeId; + private InetSocketAddress rpcAddress; + private int rpcPort; + private int ratisPort; + + /** + * Constructs OMNodeDetails object. + */ + private OMNodeDetails(String serviceId, String nodeId, + InetSocketAddress rpcAddr, int rpcPort, int ratisPort) { + this.omServiceId = serviceId; + this.omNodeId = nodeId; + this.rpcAddress = rpcAddr; + this.rpcPort = rpcPort; + this.ratisPort = ratisPort; + } + + /** + * Builder class for OMNodeDetails. + */ + public static class Builder { + private String omServiceId; + private String omNodeId; + private InetSocketAddress rpcAddress; + private int rpcPort; + private int ratisPort; + + public Builder setRpcAddress(InetSocketAddress rpcAddr) { + this.rpcAddress = rpcAddr; + this.rpcPort = rpcAddress.getPort(); + return this; + } + + public Builder setRatisPort(int port) { + this.ratisPort = port; + return this; + } + + public Builder setOMServiceId(String serviceId) { + this.omServiceId = serviceId; + return this; + } + + public Builder setOMNodeId(String nodeId) { + this.omNodeId = nodeId; + return this; + } + + public OMNodeDetails build() { + return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort, + ratisPort); + } + } + + public String getOMServiceId() { + return omServiceId; + } + + public String getOMNodeId() { + return omNodeId; + } + + public InetSocketAddress getRpcAddress() { + return rpcAddress; + } + + public InetAddress getAddress() { + return rpcAddress.getAddress(); + } + + public int getRatisPort() { + return ratisPort; + } + + public String getRpcAddressString() { + return NetUtils.getHostPortString(rpcAddress); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 13bfd986245..2243fbdda75 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -25,8 +25,10 @@ import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import java.security.KeyPair; +import java.util.Collection; import java.util.Objects; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -50,6 +52,7 @@ import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ozone.OzoneIllegalArgumentException; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.security.OzoneSecurityException; @@ -144,7 +147,6 @@ import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep; -import static org.apache.hadoop.ozone.OmUtils.getOmAddress; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT; @@ -161,6 +163,12 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_METRICS_SAVE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_RATIS_PORT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER; import static org.apache.hadoop.ozone.protocol.proto @@ -200,6 +208,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private RPC.Server omRpcServer; private InetSocketAddress omRpcAddress; private String omId; + private OMNodeDetails omNodeDetails; + private List peerNodes; + private boolean isRatisEnabled; private OzoneManagerRatisServer omRatisServer; private OzoneManagerRatisClient omRatisClient; private final OMMetadataManager metadataManager; @@ -229,7 +240,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final S3SecretManager s3SecretManager; private volatile boolean isOmRpcServerRunning = false; - private OzoneManager(OzoneConfiguration conf) throws IOException { + private OzoneManager(OzoneConfiguration conf) throws IOException, + AuthenticationException { super(OzoneVersionInfo.OZONE_VERSION_INFO); Preconditions.checkNotNull(conf); configuration = conf; @@ -240,6 +252,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl ResultCodes.OM_NOT_INITIALIZED); } + // Load HA related configurations + loadOMHAConfigs(configuration); + + // Authenticate KSM if security is enabled + if (securityEnabled) { + loginOMUser(configuration); + } + if (!testSecureOmFlag || !isOzoneSecurityEnabled()) { scmContainerClient = getScmContainerClient(configuration); // verifies that the SCM info in the OM Version file is correct. @@ -256,12 +276,15 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl scmBlockClient = null; } - RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, ProtobufRpcEngine.class); + startRatisServer(); + startRatisClient(); + + InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress(); + omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString()); - omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration)); secConfig = new SecurityConfig(configuration); if (secConfig.isBlockTokenEnabled()) { blockTokenMgr = createBlockTokenSecretManager(configuration); @@ -269,7 +292,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl if(secConfig.isSecurityEnabled()){ delegationTokenMgr = createDelegationTokenSecretManager(configuration); } - InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); + omRpcServer = getRpcServer(conf); omRpcAddress = updateRPCListenAddress(configuration, OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); @@ -297,7 +320,157 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl accessAuthorizer = null; } omMetaDir = OmUtils.getOmDbDir(configuration); + } + /** + * Inspects and loads OM node configurations. + * + * If {@link OMConfigKeys#OZONE_OM_SERVICE_IDS_KEY} is configured with + * multiple ids and/ or if {@link OMConfigKeys#OZONE_OM_NODE_ID_KEY} is not + * specifically configured , this method determines the omServiceId + * and omNodeId by matching the node's address with the configured + * addresses. When a match is found, it sets the omServicId and omNodeId from + * the corresponding configuration key. This method also finds the OM peers + * nodes belonging to the same OM service. + * + * @param conf + */ + private void loadOMHAConfigs(Configuration conf) { + InetSocketAddress localRpcAddress = null; + String localOMServiceId = null; + String localOMNodeId = null; + int localRatisPort = 0; + Collection omServiceIds = conf.getTrimmedStringCollection( + OZONE_OM_SERVICE_IDS_KEY); + + String knownOMNodeId = conf.get(OZONE_OM_NODE_ID_KEY); + int found = 0; + boolean isOMAddressSet = false; + + for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) { + Collection omNodeIds = OmUtils.getOMNodeIds(conf, serviceId); + + List peerNodesList = new ArrayList<>(); + boolean isPeer = false; + for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) { + if (knownOMNodeId != null && !knownOMNodeId.equals(nodeId)) { + isPeer = true; + } else { + isPeer = false; + } + String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, + serviceId, nodeId); + String rpcAddrStr = conf.get(rpcAddrKey); + if (rpcAddrStr == null) { + continue; + } + + // If OM address is set for any node id, we will not fallback to the + // default + isOMAddressSet = true; + + String ratisPortKey = OmUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY, + serviceId, nodeId); + int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT); + + InetSocketAddress addr = null; + try { + addr = NetUtils.createSocketAddr(rpcAddrStr); + } catch (Exception e) { + LOG.warn("Exception in creating socket address " + addr, e); + continue; + } + if (!addr.isUnresolved()) { + if (!isPeer && OmUtils.isAddressLocal(addr)) { + localRpcAddress = addr; + localOMServiceId = serviceId; + localOMNodeId = nodeId; + localRatisPort = ratisPort; + found++; + } else { + // This OMNode belongs to same OM service as the current OMNode. + // Add it to peerNodes list. + OMNodeDetails peerNodeInfo = new OMNodeDetails.Builder() + .setOMServiceId(serviceId) + .setOMNodeId(nodeId) + .setRpcAddress(addr) + .setRatisPort(ratisPort) + .build(); + peerNodesList.add(peerNodeInfo); + } + } + } + if (found == 1) { + LOG.debug("Found one matching OM address with service ID: {} and node" + + " ID: {}", localOMServiceId, localOMNodeId); + + setOMNodeDetails(localOMServiceId, localOMNodeId, localRpcAddress, + localRatisPort); + this.peerNodes = peerNodesList; + + LOG.info("Found matching OM address with OMServiceId: {}, " + + "OMNodeId: {}, RPC Address: {} and Ratis port: {}", + localOMServiceId, localOMNodeId, + NetUtils.getHostPortString(localRpcAddress), localRatisPort); + return; + } else if (found > 1) { + String msg = "Configuration has multiple " + OZONE_OM_ADDRESS_KEY + + " addresses that match local node's address. Please configure the" + + " system with " + OZONE_OM_SERVICE_IDS_KEY + " and " + + OZONE_OM_ADDRESS_KEY; + throw new OzoneIllegalArgumentException(msg); + } + } + + if (!isOMAddressSet) { + // No OM address is set. Fallback to default + InetSocketAddress omAddress = OmUtils.getOmAddress(conf); + int ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY, + OZONE_OM_RATIS_PORT_DEFAULT); + + LOG.info("Configuration either no {} set. Falling back to the default " + + "OM address {}", OZONE_OM_ADDRESS_KEY, omAddress); + + setOMNodeDetails(null, null, omAddress, ratisPort); + + } else { + String msg = "Configuration has no " + OZONE_OM_ADDRESS_KEY + " " + + "address that matches local node's address. Please configure the " + + "system with " + OZONE_OM_ADDRESS_KEY; + LOG.info(msg); + throw new OzoneIllegalArgumentException(msg); + } + } + + /** + * Builds and sets OMNodeDetails object. + */ + private void setOMNodeDetails(String serviceId, String nodeId, + InetSocketAddress rpcAddress, int ratisPort) { + + if (serviceId == null) { + // If no serviceId is set, take the default serviceID om-service + serviceId = OzoneConsts.OM_SERVICE_ID_DEFAULT; + LOG.info("OM Service ID is not set. Setting it to the default ID: {}", + serviceId); + } + if (nodeId == null) { + // If no nodeId is set, take the omId from omStorage as the nodeID + nodeId = omId; + LOG.info("OM Node ID is not set. Setting it to the OmStorage's " + + "OmID: {}", nodeId); + } + + this.omNodeDetails = new OMNodeDetails.Builder() + .setOMServiceId(serviceId) + .setOMNodeId(nodeId) + .setRpcAddress(rpcAddress) + .setRatisPort(ratisPort) + .build(); + + // Set this nodes OZONE_OM_ADDRESS_KEY to the discovered address. + configuration.set(OZONE_OM_ADDRESS_KEY, + NetUtils.getHostPortString(rpcAddress)); } /** @@ -479,7 +652,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl * @param conf * @throws IOException, AuthenticationException */ - private static void loginOMUser(OzoneConfiguration conf) + private void loginOMUser(OzoneConfiguration conf) throws IOException, AuthenticationException { if (SecurityUtil.getAuthenticationMethod(conf).equals( @@ -491,7 +664,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl UserGroupInformation.setConfiguration(conf); - InetSocketAddress socAddr = getOmAddress(conf); + InetSocketAddress socAddr = OmUtils.getOmAddress(conf); SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName()); } else { @@ -660,10 +833,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf); - // Authenticate KSM if security is enabled - if (securityEnabled) { - loginOMUser(conf); - } + switch (startOpt) { case INIT: if (printBanner) { @@ -792,6 +962,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl return omStorage; } + @VisibleForTesting + public OzoneManagerRatisServer getOmRatisServer() { + return omRatisServer; + } + + @VisibleForTesting + public InetSocketAddress getOmRpcServerAddr() { + return omRpcAddress; + } + @VisibleForTesting public LifeCycle.State getOmRatisServerState() { if (omRatisServer == null) { @@ -866,7 +1046,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl LOG.info(buildRpcServerStartMessage("OzoneManager RPC server", omRpcAddress)); - DefaultMetricsSystem.initialize("OzoneManager"); metadataManager.start(configuration); @@ -894,6 +1073,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl omRpcServer = getRpcServer(configuration); omRpcServer.start(); isOmRpcServerRunning = true; + + startRatisServer(); + startRatisClient(); + try { httpServer = new OzoneManagerHttpServer(configuration, this); httpServer.start(); @@ -919,28 +1102,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl return omRpcServer; } - InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); - // This is a temporary check. Once fully implemented, all OM state change - // should go through Ratis - either standalone (for non-HA) or replicated - // (for HA). - boolean omRatisEnabled = configuration.getBoolean( - OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, - OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); - if (omRatisEnabled) { - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId, - omNodeRpcAddr.getAddress(), configuration); - omRatisServer.start(); - - LOG.info("OzoneManager Ratis server started at port {}", - omRatisServer.getServerPort()); - - omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( - omId, omRatisServer.getRaftGroup(), configuration); - omRatisClient.connect(); - } else { - omRatisServer = null; - omRatisClient = null; - } + InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(configuration); final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY, OZONE_OM_HANDLER_COUNT_DEFAULT); @@ -949,12 +1111,58 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl BlockingService omService = newReflectiveBlockingService( new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient, - omRatisEnabled)); + isRatisEnabled)); return startRpcServer(configuration, omNodeRpcAddr, OzoneManagerProtocolPB.class, omService, handlerCount); } + /** + * Creates an instance of ratis server. + */ + private void startRatisServer() throws IOException { + // This is a temporary check. Once fully implemented, all OM state change + // should go through Ratis - be it standalone (for non-HA) or replicated + // (for HA). + isRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); + if (isRatisEnabled) { + if (omRatisServer == null) { + omRatisServer = OzoneManagerRatisServer.newOMRatisServer( + configuration, this, omNodeDetails, peerNodes); + } + omRatisServer.start(); + + LOG.info("OzoneManager Ratis server started at port {}", + omRatisServer.getServerPort()); + } else { + omRatisServer = null; + } + } + + /** + * Creates an instance of ratis client. + */ + private void startRatisClient() throws IOException { + // This is a temporary check. Once fully implemented, all OM state change + // should go through Ratis - be it standalone (for non-HA) or replicated + // (for HA). + isRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); + if (isRatisEnabled) { + if (omRatisClient == null) { + omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( + omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(), + configuration); + } + omRatisClient.connect(); + } else { + omRatisClient = null; + } + } + /** * Stop service. */ @@ -970,6 +1178,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl if (omRatisServer != null) { omRatisServer.stop(); } + if (omRatisClient != null) { + omRatisClient.close(); + } isOmRpcServerRunning = false; keyManager.stop(); stopSecretManager(); @@ -2188,4 +2399,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl public static void setTestSecureOmFlag(boolean testSecureOmFlag) { OzoneManager.testSecureOmFlag = testSecureOmFlag; } + + public String getOMNodId() { + return omNodeDetails.getOMNodeId(); + } + + public String getOMServiceId() { + return omNodeDetails.getOMServiceId(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java index c18437c8bc7..9e1cafc30c9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java @@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.om.ratis; import java.io.Closeable; import java.io.IOException; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OmUtils; @@ -57,7 +55,7 @@ public final class OzoneManagerRatisClient implements Closeable { private final RaftGroup raftGroup; private final String omID; private final RpcType rpcType; - private final AtomicReference client = new AtomicReference<>(); + private RaftClient raftClient; private final RetryPolicy retryPolicy; private final Configuration conf; @@ -101,32 +99,21 @@ public final class OzoneManagerRatisClient implements Closeable { // maxOutstandingRequests so as to set the upper bound on max no of async // requests to be handled by raft client - if (!client.compareAndSet(null, OMRatisHelper.newRaftClient( - rpcType, omID, raftGroup, retryPolicy, conf))) { - throw new IllegalStateException("Client is already connected."); - } + raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup, + retryPolicy, conf); } @Override public void close() { - final RaftClient c = client.getAndSet(null); - if (c != null) { - closeRaftClient(c); + if (raftClient != null) { + try { + raftClient.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } } } - private void closeRaftClient(RaftClient raftClient) { - try { - raftClient.close(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - - private RaftClient getClient() { - return Objects.requireNonNull(client.get(), "client is null"); - } - /** * Sends a given request to server and gets the reply back. * @param request Request @@ -188,7 +175,7 @@ public final class OzoneManagerRatisClient implements Closeable { boolean isReadOnlyRequest = OmUtils.isReadOnly(request); ByteString byteString = OMRatisHelper.convertRequestToByteString(request); LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request); - return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) : - getClient().sendAsync(() -> byteString); + return isReadOnlyRequest ? raftClient.sendReadOnlyAsync(() -> byteString) : + raftClient.sendAsync(() -> byteString); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index d49d5e6a3b3..be4bf5976e0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -22,19 +22,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; -import java.util.Objects; +import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMNodeDetails; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys; @@ -50,6 +48,7 @@ import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; @@ -71,28 +70,37 @@ public final class OzoneManagerRatisServer { private final RaftPeerId raftPeerId; private final OzoneManagerProtocol ozoneManager; - private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); - - private static long nextCallId() { - return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; - } - - private OzoneManagerRatisServer(OzoneManagerProtocol om, String omId, - InetAddress addr, int port, Configuration conf) throws IOException { - Objects.requireNonNull(omId, "omId is null"); + /** + * Returns an OM Ratis server. + * @param conf configuration + * @param om the OM instance starting the ratis server + * @param raftGroupIdStr raft group id string + * @param localRaftPeerId raft peer id of this Ratis server + * @param addr address of the ratis server + * @param raftPeers peer nodes in the raft ring + * @throws IOException + */ + private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om, + String raftGroupIdStr, RaftPeerId localRaftPeerId, + InetSocketAddress addr, List raftPeers) + throws IOException { this.ozoneManager = om; - this.port = port; - this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port); + this.omRatisAddress = addr; + this.port = addr.getPort(); RaftProperties serverProperties = newRaftProperties(conf); - // TODO: When implementing replicated OM ratis servers, RaftGroupID - // should be the same across all the OMs. Add all the OM servers as Raft - // Peers. - this.raftGroupId = RaftGroupId.randomId(); - this.raftPeerId = RaftPeerId.getRaftPeerId(omId); + this.raftPeerId = localRaftPeerId; + this.raftGroupId = RaftGroupId.valueOf( + ByteString.copyFromUtf8(raftGroupIdStr)); + this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); + + StringBuilder raftPeersStr = new StringBuilder(); + for (RaftPeer peer : raftPeers) { + raftPeersStr.append(", ").append(peer.getAddress()); + } + LOG.info("Instantiating OM Ratis server with GroupID: {} and " + + "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2)); - RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress); - this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer); this.server = RaftServer.newBuilder() .setServerId(this.raftPeerId) .setGroup(this.raftGroup) @@ -101,31 +109,42 @@ public final class OzoneManagerRatisServer { .build(); } + /** + * Creates an instance of OzoneManagerRatisServer. + */ public static OzoneManagerRatisServer newOMRatisServer( - OzoneManagerProtocol om, String omId, InetAddress omAddress, - Configuration ozoneConf) throws IOException { - int localPort = ozoneConf.getInt( - OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, - OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); + Configuration ozoneConf, OzoneManagerProtocol om, + OMNodeDetails omNodeDetails, List peerNodes) + throws IOException { - // Get an available port on current node and - // use that as the container port - if (ozoneConf.getBoolean( - OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY, - OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT)) { - try (ServerSocket socket = new ServerSocket()) { - socket.setReuseAddress(true); - SocketAddress address = new InetSocketAddress(0); - socket.bind(address); - localPort = socket.getLocalPort(); - LOG.info("Found a free port for the OM Ratis server : {}", localPort); - } catch (IOException e) { - LOG.error("Unable find a random free port for the server, " - + "fallback to use default port {}", localPort, e); - } + // RaftGroupId is the omServiceId + String omServiceId = omNodeDetails.getOMServiceId(); + + String omNodeId = omNodeDetails.getOMNodeId(); + RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId); + + InetSocketAddress ratisAddr = new InetSocketAddress( + omNodeDetails.getAddress(), omNodeDetails.getRatisPort()); + + RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr); + + List raftPeers = new ArrayList<>(); + // Add this Ratis server to the Ratis ring + raftPeers.add(localRaftPeer); + + for (OMNodeDetails peerInfo : peerNodes) { + String peerNodeId = peerInfo.getOMNodeId(); + InetSocketAddress peerRatisAddr = new InetSocketAddress( + peerInfo.getAddress(), peerInfo.getRatisPort()); + RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId); + RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr); + + // Add other OM nodes belonging to the same OM service to the Ratis ring + raftPeers.add(raftPeer); } - return new OzoneManagerRatisServer(om, omId, omAddress, localPort, - ozoneConf); + + return new OzoneManagerRatisServer(ozoneConf, om, omServiceId, + localRaftPeerId, ratisAddr, raftPeers); } public RaftGroup getRaftGroup() { @@ -139,6 +158,10 @@ public final class OzoneManagerRatisServer { return new OzoneManagerStateMachine(ozoneManager); } + /** + * Start the Ratis server. + * @throws IOException + */ public void start() throws IOException { LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), server.getId(), port); @@ -266,11 +289,6 @@ public final class OzoneManagerRatisServer { // TODO: set max write buffer size - /** - * TODO: set following ratis leader election related configs when - * replicated ratis server is implemented. - * 1. node failure timeout - */ // Set the ratis leader election timeout TimeUnit leaderElectionMinTimeoutUnit = OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT @@ -288,6 +306,20 @@ public final class OzoneManagerRatisServer { RaftServerConfigKeys.Rpc.setTimeoutMax(properties, TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); + TimeUnit nodeFailureTimeoutUnit = + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + .getUnit(); + long nodeFailureTimeoutDuration = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY, + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + .getDuration(), nodeFailureTimeoutUnit); + final TimeDuration nodeFailureTimeout = TimeDuration.valueOf( + nodeFailureTimeoutDuration, nodeFailureTimeoutUnit); + RaftServerConfigKeys.setLeaderElectionTimeout(properties, + nodeFailureTimeout); + RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, + nodeFailureTimeout); + /** * TODO: when ratis snapshots are implemented, set snapshot threshold and * queue size. @@ -305,6 +337,11 @@ public final class OzoneManagerRatisServer { return server.getLifeCycleState(); } + @VisibleForTesting + public RaftPeerId getRaftPeerId() { + return this.raftPeerId; + } + /** * Get the local directory where ratis logs will be stored. */ diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 1f285db38bb..ffa66805509 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -19,14 +19,19 @@ package org.apache.hadoop.ozone.om.ratis; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMNodeDetails; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos @@ -64,8 +69,20 @@ public class TestOzoneManagerRatisServer { conf.setTimeDuration( OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(null, omID, - InetAddress.getLocalHost(), conf); + int ratisPort = conf.getInt( + OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, + OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); + InetSocketAddress rpcAddress = new InetSocketAddress( + InetAddress.getLocalHost(), 0); + OMNodeDetails omNodeDetails = new OMNodeDetails.Builder() + .setRpcAddress(rpcAddress) + .setRatisPort(ratisPort) + .setOMNodeId(omID) + .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT) + .build(); + // Starts a single node Ratis server + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null, + omNodeDetails, Collections.emptyList()); omRatisServer.start(); omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID, omRatisServer.getRaftGroup(), conf); @@ -94,8 +111,6 @@ public class TestOzoneManagerRatisServer { /** * Submit any request to OM Ratis server and check that the dummy response * message is received. - * TODO: Once state machine is implemented, submitting a request to Ratis - * server should result in a valid response. */ @Test public void testSubmitRatisRequest() throws Exception {