HDFS-11597. Ozone: Add Ratis management API. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Anu Engineer 2017-06-01 15:36:58 -07:00 committed by Owen O'Malley
parent a8c0976fb4
commit 4893692972
5 changed files with 428 additions and 21 deletions

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.ratis;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import java.io.IOException;
import java.util.List;
/**
* Manage Ratis clusters.
*/
public interface RatisManager {
/**
* Create a new Ratis cluster with the given clusterId and datanodes.
*/
void createRatisCluster(String clusterId, List<DatanodeID> datanodes)
throws IOException;
/**
* Close the Ratis cluster with the given clusterId.
*/
void closeRatisCluster(String clusterId) throws IOException;
/**
* @return the datanode list of the Ratis cluster with the given clusterId.
*/
List<DatanodeID> getDatanodes(String clusterId) throws IOException;
/**
* Update the datanode list of the Ratis cluster with the given clusterId.
*/
void updateDatanodes(String clusterId, List<DatanodeID> newDatanodes)
throws IOException;
static RatisManager newRatisManager(OzoneConfiguration conf) {
final String rpc = conf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
return new RatisManagerImpl(rpc);
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.ratis;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedRunnable;
import org.apache.ratis.util.CheckedSupplier;
import org.apache.ratis.util.LifeCycle;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Implementation of {@link RatisManager}.
*/
public class RatisManagerImpl implements RatisManager {
static final RaftPeer[] EMPTY_RARTPEER_ARRAY = {};
static final class RatisCluster {
private final String clusterId;
private final LifeCycle state;
private List<DatanodeID> datanodes;
private RatisCluster(String clusterId, List<DatanodeID> datanodes) {
this.clusterId = clusterId;
this.state = new LifeCycle(toString());
this.datanodes = Collections.unmodifiableList(new ArrayList<>(datanodes));
}
synchronized List<DatanodeID> getDatanodes() {
return datanodes;
}
synchronized void setDatanodes(
CheckedSupplier<List<DatanodeID>, IOException> update)
throws IOException {
state.assertCurrentState(LifeCycle.State.RUNNING);
datanodes = Collections.unmodifiableList(update.get());
}
synchronized void init(CheckedRunnable<IOException> init)
throws IOException {
state.startAndTransition(() -> init.run());
}
synchronized void close(CheckedRunnable<IOException> close)
throws IOException {
state.checkStateAndClose(() -> close.run());
}
@Override
public String toString() {
return getClass().getSimpleName() + ":" + clusterId;
}
}
static final class RatisInfo {
private final RaftPeer peer;
private RatisInfo(DatanodeID datanode) {
this.peer = RatisHelper.toRaftPeer(datanode);
}
RaftPeer getPeer() {
return peer;
}
}
private final RpcType rpcType;
private final Map<String, RatisCluster> clusters = new ConcurrentHashMap<>();
private final Map<DatanodeID, RatisInfo> infos = new ConcurrentHashMap<>();
RatisManagerImpl(String rpc) {
rpcType = SupportedRpcType.valueOfIgnoreCase(rpc);
}
private RaftPeer getRaftPeer(DatanodeID datanode) {
return infos.computeIfAbsent(datanode, RatisInfo::new).getPeer();
}
@Override
public void createRatisCluster(String clusterId, List<DatanodeID> datanodes)
throws IOException {
final RatisCluster cluster = new RatisCluster(clusterId, datanodes);
final RatisCluster returned = clusters.putIfAbsent(clusterId, cluster);
if (returned != null) {
throw new IOException("Cluster " + clusterId + " already exists.");
}
final RaftPeer[] newPeers = datanodes.stream().map(this::getRaftPeer)
.toArray(RaftPeer[]::new);
cluster.init(() -> reinitialize(datanodes, newPeers));
}
private void reinitialize(List<DatanodeID> datanodes, RaftPeer[] newPeers)
throws IOException {
if (datanodes.isEmpty()) {
return;
}
IOException exception = null;
for (DatanodeID d : datanodes) {
try {
reinitialize(d, newPeers);
} catch (IOException ioe) {
if (exception == null) {
exception = new IOException(
"Failed to reinitialize some of the RaftPeer(s)", ioe);
} else {
exception.addSuppressed(ioe);
}
}
}
if (exception != null) {
throw exception;
}
}
private void reinitialize(DatanodeID datanode, RaftPeer[] newPeers)
throws IOException {
final RaftPeer p = getRaftPeer(datanode);
try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
client.reinitialize(newPeers, p.getId());
} catch (IOException ioe) {
throw new IOException("Failed to reinitialize RaftPeer " + p
+ "(datanode=" + datanode + ")", ioe);
}
}
@Override
public void closeRatisCluster(String clusterId) throws IOException {
final RatisCluster c = clusters.get(clusterId);
if (c == null) {
throw new IOException("Cluster " + clusterId + " not found.");
}
c.close(() -> reinitialize(c.getDatanodes(), EMPTY_RARTPEER_ARRAY));
}
@Override
public List<DatanodeID> getDatanodes(String clusterId) throws IOException {
return clusters.get(clusterId).getDatanodes();
}
@Override
public void updateDatanodes(String clusterId, List<DatanodeID> newDNs)
throws IOException {
final RatisCluster c = clusters.get(clusterId);
c.setDatanodes(() -> {
final List<DatanodeID> oldDNs = c.getDatanodes();
final RaftPeer[] newPeers = newDNs.stream().map(this::getRaftPeer)
.toArray(RaftPeer[]::new);
try (RaftClient client = newRaftClient(oldDNs)) {
client.setConfiguration(newPeers);
}
final List<DatanodeID> notInOld = newDNs.stream().filter(oldDNs::contains)
.collect(Collectors.toList());
reinitialize(notInOld, newPeers);
return newDNs;
});
}
private RaftClient newRaftClient(List<DatanodeID> datanodes)
throws IOException {
final List<RaftPeer> peers = datanodes.stream().map(this::getRaftPeer)
.collect(Collectors.toList());
return RatisHelper.newRaftClient(rpcType, peers.get(0).getId(), peers);
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.ratis;
/**
* This package contains classes related to Apache Ratis for SCM.
*/

View File

@ -18,23 +18,23 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.scm.ratis.RatisManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.ratis.RatisHelper;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.CollectionUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -93,25 +93,29 @@ public class TestOzoneContainerRatis {
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(numNodes)
.build();
cluster.waitOzoneReady();
final String containerName = OzoneUtils.getRequestID();
final List<DataNode> datanodes = cluster.getDataNodes();
final Pipeline pipeline = ContainerTestHelper.createPipeline(containerName,
CollectionUtils.as(datanodes, DataNode::getDatanodeId));
LOG.info("pipeline=" + pipeline);
// Create Ratis cluster
final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
for(RaftPeer p : peers) {
final RaftClient client = RatisHelper.newRaftClient(rpc, p);
client.reinitialize(peers, p.getId());
}
LOG.info("reinitialize done");
final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
pipeline, conf);
try {
cluster.waitOzoneReady();
final String containerName = OzoneUtils.getRequestID();
final List<DataNode> datanodes = cluster.getDataNodes();
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName,
CollectionUtils.as(datanodes, DataNode::getDatanodeId));
LOG.info("pipeline=" + pipeline);
// Create Ratis cluster
final String ratisId = "ratis1";
final RatisManager manager = RatisManager.newRatisManager(conf);
manager.createRatisCluster(ratisId, pipeline.getMachines());
LOG.info("Created RatisCluster " + ratisId);
// check Ratis cluster members
final List<DatanodeID> dns = manager.getDatanodes(ratisId);
Assert.assertEquals(pipeline.getMachines(), dns);
// run test
final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
pipeline, conf);
test.accept(containerName, client);
} finally {
cluster.shutdown();

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.scm.ratis.RatisManager;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* Tests ozone containers with Apache Ratis.
*/
public class TestRatisManager {
private static final Logger LOG = LoggerFactory.getLogger(
TestRatisManager.class);
static OzoneConfiguration newOzoneConfiguration() {
final OzoneConfiguration conf = new OzoneConfiguration();
ContainerTestHelper.setOzoneLocalStorageRoot(
TestRatisManager.class, conf);
return conf;
}
/** Set the timeout for every test. */
@Rule
public Timeout testTimeout = new Timeout(200_000);
@Test
public void testTestRatisManagerGrpc() throws Exception {
runTestRatisManager(SupportedRpcType.GRPC);
}
@Test
public void testTestRatisManagerNetty() throws Exception {
runTestRatisManager(SupportedRpcType.NETTY);
}
private static void runTestRatisManager(RpcType rpc) throws Exception {
LOG.info("runTestRatisManager, rpc=" + rpc);
// create Ozone clusters
final OzoneConfiguration conf = newOzoneConfiguration();
RatisTestHelper.initRatisConf(rpc, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(5)
.build();
try {
cluster.waitOzoneReady();
final List<DataNode> datanodes = cluster.getDataNodes();
final List<DatanodeID> allIds = datanodes.stream()
.map(DataNode::getDatanodeId).collect(Collectors.toList());
final RatisManager manager = RatisManager.newRatisManager(conf);
final int[] idIndex = {3, 4, 5};
for (int i = 0; i < idIndex.length; i++) {
final int previous = i == 0 ? 0 : idIndex[i - 1];
final List<DatanodeID> subIds = allIds.subList(previous, idIndex[i]);
// Create Ratis cluster
final String ratisId = "ratis" + i;
manager.createRatisCluster(ratisId, subIds);
LOG.info("Created RatisCluster " + ratisId);
// check Ratis cluster members
final List<DatanodeID> dns = manager.getDatanodes(ratisId);
Assert.assertEquals(subIds, dns);
}
// randomly close two of the clusters
final int chosen = ThreadLocalRandom.current().nextInt(idIndex.length);
LOG.info("chosen = " + chosen);
for (int i = 0; i < idIndex.length; i++) {
if (i != chosen) {
final String ratisId = "ratis" + i;
manager.closeRatisCluster(ratisId);
}
}
// update datanodes
final String ratisId = "ratis" + chosen;
manager.updateDatanodes(ratisId, allIds);
// check Ratis cluster members
final List<DatanodeID> dns = manager.getDatanodes(ratisId);
Assert.assertEquals(allIds, dns);
} finally {
cluster.shutdown();
}
}
}