diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java index 3fabe48bdfc..de159abbce2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java @@ -19,15 +19,18 @@ package org.apache.ratis; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +116,8 @@ public interface RatisHelper { LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group); final RaftProperties properties = new RaftProperties(); RaftConfigKeys.Rpc.setType(properties, rpcType); + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)); return RaftClient.newBuilder() .setRaftGroup(group) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c96cc5d0a07..569fb2333f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; -import org.apache.ratis.statemachine.BaseStateMachine; -import org.apache.ratis.statemachine.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.ratis.shaded.com.google.protobuf.ByteString; @@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; * * Read only requests are classified in * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly} - * and these readonly requests are replied from the - * {@link #query(RaftClientRequest)} + * and these readonly requests are replied from the {@link #query(Message)}. * * The write requests can be divided into requests with user data * (WriteChunkRequest) and other request without user data. @@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine { = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; private ThreadPoolExecutor writeChunkExecutor; - private final ConcurrentHashMap> + private final ConcurrentHashMap> writeChunkFutureMap; private final ConcurrentHashMap> createContainerFutureMap; @@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine { .setData(request.getMessage().getContent()) .build(); } - return new TransactionContext(this, request, log); + return new TransactionContextImpl(this, request, log); } private ByteString getShadedByteString(ContainerCommandRequestProto proto) { @@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine { return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray()); } + private CompletableFuture handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture future = + createContainerFutureMap.get(containerName); + CompletableFuture writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); + return writeChunkFuture; + } + + private CompletableFuture handleCreateContainer( + ContainerCommandRequestProto requestProto) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + @Override public CompletableFuture writeStateMachineData(LogEntryProto entry) { try { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); - if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { - String containerName = - requestProto.getCreateContainer().getContainerData().getName(); - createContainerFutureMap. - computeIfAbsent(containerName, k -> new CompletableFuture<>()); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); - } else { - final WriteChunkRequestProto write = requestProto.getWriteChunk(); - String containerName = write.getPipeline().getContainerName(); - CompletableFuture future = - createContainerFutureMap.get(containerName); - - CompletableFuture writeChunkFuture; - if (future != null) { - writeChunkFuture = future.thenApplyAsync( - v -> runCommand(requestProto), writeChunkExecutor); - } else { - writeChunkFuture = CompletableFuture.supplyAsync( - () -> runCommand(requestProto), writeChunkExecutor); - } - writeChunkFutureMap - .put(write.getChunkData().getChunkName(), writeChunkFuture); - return writeChunkFuture; + ContainerProtos.Type cmdType = requestProto.getCmdType(); + switch (cmdType) { + case CreateContainer: + return handleCreateContainer(requestProto); + case WriteChunk: + return handleWriteChunk(requestProto, entry.getIndex()); + default: + throw new IllegalStateException("Cmd Type:" + cmdType + + " should not have state machine data"); } } catch (IOException e) { return completeExceptionally(e); @@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine { } @Override - public CompletableFuture query(RaftClientRequest request) { + public CompletableFuture query(Message request) { try { final ContainerCommandRequestProto requestProto = - getRequestProto(request.getMessage().getContent()); - RaftClientReply raftClientReply = - new RaftClientReply(request, runCommand(requestProto)); - return CompletableFuture.completedFuture(raftClientReply); + getRequestProto(request.getContent()); + return CompletableFuture.completedFuture(runCommand(requestProto)); } catch (IOException e) { return completeExceptionally(e); } @@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine { try { ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); + ContainerProtos.Type cmdType = requestProto.getCmdType(); - if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { + if (cmdType == ContainerProtos.Type.WriteChunk) { WriteChunkRequestProto write = requestProto.getWriteChunk(); // the data field has already been removed in start Transaction Preconditions.checkArgument(!write.hasData()); CompletableFuture stateMachineFuture = - writeChunkFutureMap.remove(write.getChunkData().getChunkName()); + writeChunkFutureMap.remove(trx.getLogEntry().getIndex()); return stateMachineFuture .thenComposeAsync(v -> CompletableFuture.completedFuture(runCommand(requestProto))); } else { Message message = runCommand(requestProto); - if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + if (cmdType == ContainerProtos.Type.CreateContainer) { String containerName = requestProto.getCreateContainer().getContainerData().getName(); createContainerFutureMap.remove(containerName).complete(message); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java new file mode 100644 index 00000000000..0ba1cdebedf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestDataValidate.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.tools; + +import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneClassicCluster; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests Freon, with MiniOzoneCluster and validate data. + */ +public class TestDataValidate { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + cluster = new MiniOzoneClassicCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .numDataNodes(5).build(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void ratisTestLargeKey() throws Exception { + List args = new ArrayList<>(); + args.add("-validateWrites"); + args.add("-numOfVolumes"); + args.add("1"); + args.add("-numOfBuckets"); + args.add("1"); + args.add("-numOfKeys"); + args.add("1"); + args.add("-ratis"); + args.add("3"); + args.add("-keySize"); + args.add("104857600"); + Freon freon = new Freon(conf); + int res = ToolRunner.run(conf, freon, + args.toArray(new String[0])); + Assert.assertEquals(1, freon.getNumberOfVolumesCreated()); + Assert.assertEquals(1, freon.getNumberOfBucketsCreated()); + Assert.assertEquals(1, freon.getNumberOfKeysAdded()); + Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); + Assert.assertEquals(0, res); + } + + @Test + public void standaloneTestLargeKey() throws Exception { + List args = new ArrayList<>(); + args.add("-validateWrites"); + args.add("-numOfVolumes"); + args.add("1"); + args.add("-numOfBuckets"); + args.add("1"); + args.add("-numOfKeys"); + args.add("1"); + args.add("-keySize"); + args.add("104857600"); + Freon freon = new Freon(conf); + int res = ToolRunner.run(conf, freon, + args.toArray(new String[0])); + Assert.assertEquals(1, freon.getNumberOfVolumesCreated()); + Assert.assertEquals(1, freon.getNumberOfBucketsCreated()); + Assert.assertEquals(1, freon.getNumberOfKeysAdded()); + Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); + Assert.assertEquals(0, res); + } + + @Test + public void validateWriteTest() throws Exception { + PrintStream originalStream = System.out; + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + System.setOut(new PrintStream(outStream)); + List args = new ArrayList<>(); + args.add("-validateWrites"); + args.add("-numOfVolumes"); + args.add("2"); + args.add("-numOfBuckets"); + args.add("5"); + args.add("-numOfKeys"); + args.add("10"); + Freon freon = new Freon(conf); + int res = ToolRunner.run(conf, freon, + args.toArray(new String[0])); + Assert.assertEquals(0, res); + Assert.assertEquals(2, freon.getNumberOfVolumesCreated()); + Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); + Assert.assertEquals(100, freon.getNumberOfKeysAdded()); + Assert.assertTrue(freon.getValidateWrites()); + Assert.assertNotEquals(0, freon.getTotalKeysValidated()); + Assert.assertNotEquals(0, freon.getSuccessfulValidationCount()); + Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); + System.setOut(originalStream); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java index c356ea30f03..d002e9f26d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestFreon.java @@ -29,9 +29,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; import java.util.ArrayList; import java.util.List; @@ -90,33 +88,6 @@ public class TestFreon { Assert.assertEquals(0, res); } - @Test - public void validateWriteTest() throws Exception { - PrintStream originalStream = System.out; - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - System.setOut(new PrintStream(outStream)); - List args = new ArrayList<>(); - args.add("-validateWrites"); - args.add("-numOfVolumes"); - args.add("2"); - args.add("-numOfBuckets"); - args.add("5"); - args.add("-numOfKeys"); - args.add("10"); - Freon freon = new Freon(conf); - int res = ToolRunner.run(conf, freon, - args.toArray(new String[0])); - Assert.assertEquals(0, res); - Assert.assertEquals(2, freon.getNumberOfVolumesCreated()); - Assert.assertEquals(10, freon.getNumberOfBucketsCreated()); - Assert.assertEquals(100, freon.getNumberOfKeysAdded()); - Assert.assertTrue(freon.getValidateWrites()); - Assert.assertNotEquals(0, freon.getTotalKeysValidated()); - Assert.assertNotEquals(0, freon.getSuccessfulValidationCount()); - Assert.assertEquals(0, freon.getUnsuccessfulValidationCount()); - System.setOut(originalStream); - } - @Test public void multiThread() throws Exception { List args = new ArrayList<>();