HDDS-1779. TestWatchForCommit tests are flaky.Contributed by Shashikant Banerjee. (#1071)
This commit is contained in:
parent
79f6118dcc
commit
8ef2365ffd
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.client.rpc;
|
||||
|
||||
import org.apache.hadoop.conf.StorageUnit;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientReply;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
|
||||
|
||||
/**
|
||||
* This class tests the 2 way commit in Ratis.
|
||||
*/
|
||||
public class Test2WayCommitInRatis {
|
||||
|
||||
private MiniOzoneCluster cluster;
|
||||
private OzoneClient client;
|
||||
private ObjectStore objectStore;
|
||||
private String volumeName;
|
||||
private String bucketName;
|
||||
private int chunkSize;
|
||||
private int flushSize;
|
||||
private int maxFlushSize;
|
||||
private int blockSize;
|
||||
private StorageContainerLocationProtocolClientSideTranslatorPB
|
||||
storageContainerLocationClient;
|
||||
private static String containerOwner = "OZONE";
|
||||
|
||||
/**
|
||||
* Create a MiniDFSCluster for testing.
|
||||
* <p>
|
||||
* Ozone is made active by setting OZONE_ENABLED = true
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void startCluster(OzoneConfiguration conf) throws Exception {
|
||||
chunkSize = 100;
|
||||
flushSize = 2 * chunkSize;
|
||||
maxFlushSize = 2 * flushSize;
|
||||
blockSize = 2 * maxFlushSize;
|
||||
|
||||
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
|
||||
conf.setTimeDuration(
|
||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
|
||||
1, TimeUnit.SECONDS);
|
||||
|
||||
conf.setQuietMode(false);
|
||||
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||
.setNumDatanodes(7)
|
||||
.setBlockSize(blockSize)
|
||||
.setChunkSize(chunkSize)
|
||||
.setStreamBufferFlushSize(flushSize)
|
||||
.setStreamBufferMaxSize(maxFlushSize)
|
||||
.setStreamBufferSizeUnit(StorageUnit.BYTES)
|
||||
.build();
|
||||
cluster.waitForClusterToBeReady();
|
||||
//the easiest way to create an open container is creating a key
|
||||
client = OzoneClientFactory.getClient(conf);
|
||||
objectStore = client.getObjectStore();
|
||||
volumeName = "watchforcommithandlingtest";
|
||||
bucketName = volumeName;
|
||||
objectStore.createVolume(volumeName);
|
||||
objectStore.getVolume(volumeName).createBucket(bucketName);
|
||||
storageContainerLocationClient = cluster
|
||||
.getStorageContainerLocationClient();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shutdown MiniDFSCluster.
|
||||
*/
|
||||
private void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void test2WayCommitForRetryfailure() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
||||
TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
||||
startCluster(conf);
|
||||
GenericTestUtils.LogCapturer logCapturer =
|
||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
|
||||
ContainerWithPipeline container1 = storageContainerLocationClient
|
||||
.allocateContainer(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE, containerOwner);
|
||||
XceiverClientSpi xceiverClient = clientManager
|
||||
.acquireClient(container1.getPipeline());
|
||||
Assert.assertEquals(1, xceiverClient.getRefcount());
|
||||
Assert.assertEquals(container1.getPipeline(),
|
||||
xceiverClient.getPipeline());
|
||||
Pipeline pipeline = xceiverClient.getPipeline();
|
||||
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
|
||||
XceiverClientReply reply = xceiverClient.sendCommandAsync(
|
||||
ContainerTestHelper.getCreateContainerRequest(
|
||||
container1.getContainerInfo().getContainerID(),
|
||||
xceiverClient.getPipeline()));
|
||||
reply.getResponse().get();
|
||||
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
|
||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
||||
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
|
||||
.getCloseContainer(pipeline,
|
||||
container1.getContainerInfo().getContainerID()));
|
||||
reply.getResponse().get();
|
||||
xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
|
||||
|
||||
// commitInfo Map will be reduced to 2 here
|
||||
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
|
||||
clientManager.releaseClient(xceiverClient, false);
|
||||
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
|
||||
Assert
|
||||
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
|
||||
logCapturer.stopCapturing();
|
||||
shutdown();
|
||||
}
|
||||
}
|
|
@ -46,7 +46,9 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -135,7 +137,10 @@ public class TestWatchForCommit {
|
|||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
||||
TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
||||
conf.setTimeDuration(
|
||||
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
|
||||
1, TimeUnit.SECONDS);
|
||||
startCluster(conf);
|
||||
XceiverClientMetrics metrics =
|
||||
XceiverClientManager.getXceiverClientMetrics();
|
||||
|
@ -178,31 +183,24 @@ public class TestWatchForCommit {
|
|||
.getOutputStream();
|
||||
Assert.assertTrue(stream instanceof BlockOutputStream);
|
||||
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
|
||||
|
||||
// we have just written data more than flush Size(2 chunks), at this time
|
||||
// buffer pool will have 3 buffers allocated worth of chunk size
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
// writtenDataLength as well flushedDataLength will be updated here
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(maxFlushSize,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
|
||||
// since data equals to maxBufferSize is written, this will be a blocking
|
||||
// call and hence will wait for atleast flushSize worth of data to get
|
||||
// acked by all servers right here
|
||||
Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
|
||||
|
||||
// watchForCommit will clean up atleast one entry from the map where each
|
||||
// entry corresponds to flushSize worth of data
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
|
||||
|
||||
// Now do a flush. This will flush the data and update the flush length and
|
||||
// the map.
|
||||
key.flush();
|
||||
|
||||
Assert.assertEquals(pendingWriteChunkCount,
|
||||
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
|
||||
Assert.assertEquals(pendingPutBlockCount,
|
||||
|
@ -213,19 +211,15 @@ public class TestWatchForCommit {
|
|||
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
|
||||
Assert.assertEquals(totalOpCount + 8,
|
||||
metrics.getTotalOpCount());
|
||||
|
||||
// Since the data in the buffer is already flushed, flush here will have
|
||||
// no impact on the counters and data structures
|
||||
|
||||
Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
|
||||
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
|
||||
|
||||
Assert.assertEquals(dataLength,
|
||||
blockOutputStream.getTotalDataFlushedLength());
|
||||
// flush will make sure one more entry gets updated in the map
|
||||
Assert.assertTrue(
|
||||
blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
|
||||
|
||||
XceiverClientRatis raftClient =
|
||||
(XceiverClientRatis) blockOutputStream.getXceiverClient();
|
||||
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
|
||||
|
@ -235,11 +229,9 @@ public class TestWatchForCommit {
|
|||
// again write data with more than max buffer limit. This will call
|
||||
// watchForCommit again. Since the commit will happen 2 way, the
|
||||
// commitInfoMap will get updated for servers which are alive
|
||||
|
||||
// 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
|
||||
// once exception is hit
|
||||
key.write(data1);
|
||||
|
||||
// As a part of handling the exception, 4 failed writeChunks will be
|
||||
// rewritten plus one partial chunk plus two putBlocks for flushSize
|
||||
// and one flush for partial chunk
|
||||
|
@ -282,7 +274,7 @@ public class TestWatchForCommit {
|
|||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
|
||||
TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
||||
startCluster(conf);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
ContainerWithPipeline container1 = storageContainerLocationClient
|
||||
|
@ -303,8 +295,11 @@ public class TestWatchForCommit {
|
|||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
|
||||
try {
|
||||
// just watch for a lo index which in not updated in the commitInfo Map
|
||||
xceiverClient.watchForCommit(index + 1, 3000);
|
||||
// just watch for a log index which in not updated in the commitInfo Map
|
||||
// as well as there is no logIndex generate in Ratis.
|
||||
// The basic idea here is just to test if its throws an exception.
|
||||
xceiverClient
|
||||
.watchForCommit(index + new Random().nextInt(100) + 10, 3000);
|
||||
Assert.fail("expected exception not thrown");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(
|
||||
|
@ -321,7 +316,7 @@ public class TestWatchForCommit {
|
|||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
|
||||
100, TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
||||
startCluster(conf);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
ContainerWithPipeline container1 = storageContainerLocationClient
|
||||
|
@ -343,67 +338,30 @@ public class TestWatchForCommit {
|
|||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
|
||||
// again write data with more than max buffer limit. This wi
|
||||
try {
|
||||
// just watch for a lo index which in not updated in the commitInfo Map
|
||||
xceiverClient.watchForCommit(index + 1, 20000);
|
||||
// just watch for a log index which in not updated in the commitInfo Map
|
||||
// as well as there is no logIndex generate in Ratis.
|
||||
// The basic idea here is just to test if its throws an exception.
|
||||
xceiverClient
|
||||
.watchForCommit(index + new Random().nextInt(100) + 10, 20000);
|
||||
Assert.fail("expected exception not thrown");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(HddsClientUtils
|
||||
.checkForException(e) instanceof RaftRetryFailureException);
|
||||
Assert.assertTrue(e instanceof ExecutionException);
|
||||
// since the timeout value is quite long, the watch request will either
|
||||
// fail with NotReplicated exceptio, RetryFailureException or
|
||||
// RuntimeException
|
||||
Assert.assertFalse(HddsClientUtils
|
||||
.checkForException(e) instanceof TimeoutException);
|
||||
}
|
||||
clientManager.releaseClient(xceiverClient, false);
|
||||
shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test2WayCommitForRetryfailure() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
|
||||
TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8);
|
||||
startCluster(conf);
|
||||
GenericTestUtils.LogCapturer logCapturer =
|
||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
||||
XceiverClientManager clientManager = new XceiverClientManager(conf);
|
||||
|
||||
ContainerWithPipeline container1 = storageContainerLocationClient
|
||||
.allocateContainer(HddsProtos.ReplicationType.RATIS,
|
||||
HddsProtos.ReplicationFactor.THREE, containerOwner);
|
||||
XceiverClientSpi xceiverClient = clientManager
|
||||
.acquireClient(container1.getPipeline());
|
||||
Assert.assertEquals(1, xceiverClient.getRefcount());
|
||||
Assert.assertEquals(container1.getPipeline(),
|
||||
xceiverClient.getPipeline());
|
||||
Pipeline pipeline = xceiverClient.getPipeline();
|
||||
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
|
||||
XceiverClientReply reply = xceiverClient.sendCommandAsync(
|
||||
ContainerTestHelper.getCreateContainerRequest(
|
||||
container1.getContainerInfo().getContainerID(),
|
||||
xceiverClient.getPipeline()));
|
||||
reply.getResponse().get();
|
||||
Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
|
||||
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
|
||||
reply = xceiverClient.sendCommandAsync(ContainerTestHelper
|
||||
.getCloseContainer(pipeline,
|
||||
container1.getContainerInfo().getContainerID()));
|
||||
reply.getResponse().get();
|
||||
xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
|
||||
|
||||
// commitInfo Map will be reduced to 2 here
|
||||
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
|
||||
clientManager.releaseClient(xceiverClient, false);
|
||||
Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
|
||||
Assert
|
||||
.assertTrue(logCapturer.getOutput().contains("Committed by majority"));
|
||||
logCapturer.stopCapturing();
|
||||
shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test2WayCommitForTimeoutException() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
|
||||
TimeUnit.SECONDS);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
|
||||
startCluster(conf);
|
||||
GenericTestUtils.LogCapturer logCapturer =
|
||||
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
|
||||
|
@ -477,8 +435,12 @@ public class TestWatchForCommit {
|
|||
pipelineList.add(pipeline);
|
||||
ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
|
||||
try {
|
||||
// just watch for a lo index which in not updated in the commitInfo Map
|
||||
xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000);
|
||||
// just watch for a log index which in not updated in the commitInfo Map
|
||||
// as well as there is no logIndex generate in Ratis.
|
||||
// The basic idea here is just to test if its throws an exception.
|
||||
xceiverClient
|
||||
.watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10,
|
||||
20000);
|
||||
Assert.fail("Expected exception not thrown");
|
||||
} catch(Exception e) {
|
||||
Assert.assertTrue(HddsClientUtils
|
||||
|
|
Loading…
Reference in New Issue