Revert "HDDS-1610. applyTransaction failure should not be lost on restart. Contributed by Shashikant Banerjee."
This reverts commit 62445021d5
as it has unintended changes in DirectoryWithSnapshotFeature class..
This commit is contained in:
parent
8ab7020e64
commit
ce8eb1283a
|
@ -34,7 +34,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
||||||
import org.apache.ratis.protocol.RaftGroupId;
|
import org.apache.ratis.protocol.RaftGroupId;
|
||||||
import org.apache.ratis.protocol.StateMachineException;
|
|
||||||
import org.apache.ratis.server.RaftServer;
|
import org.apache.ratis.server.RaftServer;
|
||||||
import org.apache.ratis.server.impl.RaftServerProxy;
|
import org.apache.ratis.server.impl.RaftServerProxy;
|
||||||
import org.apache.ratis.server.protocol.TermIndex;
|
import org.apache.ratis.server.protocol.TermIndex;
|
||||||
|
@ -84,7 +83,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
@ -149,7 +147,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
private final Cache<Long, ByteString> stateMachineDataCache;
|
private final Cache<Long, ByteString> stateMachineDataCache;
|
||||||
private final boolean isBlockTokenEnabled;
|
private final boolean isBlockTokenEnabled;
|
||||||
private final TokenVerifier tokenVerifier;
|
private final TokenVerifier tokenVerifier;
|
||||||
private final AtomicBoolean isStateMachineHealthy;
|
|
||||||
|
|
||||||
private final Semaphore applyTransactionSemaphore;
|
private final Semaphore applyTransactionSemaphore;
|
||||||
/**
|
/**
|
||||||
|
@ -187,7 +184,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
ScmConfigKeys.
|
ScmConfigKeys.
|
||||||
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
|
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
|
||||||
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
|
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
|
||||||
isStateMachineHealthy = new AtomicBoolean(true);
|
|
||||||
this.executors = new ExecutorService[numContainerOpExecutors];
|
this.executors = new ExecutorService[numContainerOpExecutors];
|
||||||
for (int i = 0; i < numContainerOpExecutors; i++) {
|
for (int i = 0; i < numContainerOpExecutors; i++) {
|
||||||
final int index = i;
|
final int index = i;
|
||||||
|
@ -269,14 +265,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
public long takeSnapshot() throws IOException {
|
public long takeSnapshot() throws IOException {
|
||||||
TermIndex ti = getLastAppliedTermIndex();
|
TermIndex ti = getLastAppliedTermIndex();
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
if (!isStateMachineHealthy.get()) {
|
|
||||||
String msg =
|
|
||||||
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
|
|
||||||
+ " is unhealthy. The last applied index is at " + ti;
|
|
||||||
StateMachineException sme = new StateMachineException(msg);
|
|
||||||
LOG.error(msg);
|
|
||||||
throw sme;
|
|
||||||
}
|
|
||||||
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
|
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
|
||||||
final File snapshotFile =
|
final File snapshotFile =
|
||||||
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
|
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
|
||||||
|
@ -287,12 +275,12 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
// make sure the snapshot file is synced
|
// make sure the snapshot file is synced
|
||||||
fos.getFD().sync();
|
fos.getFD().sync();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
|
LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
|
||||||
snapshotFile);
|
snapshotFile);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", gid, ti,
|
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
|
||||||
snapshotFile, (Time.monotonicNow() - startTime));
|
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
|
||||||
return ti.getIndex();
|
return ti.getIndex();
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -397,12 +385,17 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerCommandResponseProto runCommand(
|
private ContainerCommandResponseProto runCommandGetResponse(
|
||||||
ContainerCommandRequestProto requestProto,
|
ContainerCommandRequestProto requestProto,
|
||||||
DispatcherContext context) {
|
DispatcherContext context) {
|
||||||
return dispatchCommand(requestProto, context);
|
return dispatchCommand(requestProto, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Message runCommand(ContainerCommandRequestProto requestProto,
|
||||||
|
DispatcherContext context) {
|
||||||
|
return runCommandGetResponse(requestProto, context)::toByteString;
|
||||||
|
}
|
||||||
|
|
||||||
private ExecutorService getCommandExecutor(
|
private ExecutorService getCommandExecutor(
|
||||||
ContainerCommandRequestProto requestProto) {
|
ContainerCommandRequestProto requestProto) {
|
||||||
int executorId = (int)(requestProto.getContainerID() % executors.length);
|
int executorId = (int)(requestProto.getContainerID() % executors.length);
|
||||||
|
@ -432,7 +425,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
// thread.
|
// thread.
|
||||||
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
|
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
|
||||||
CompletableFuture.supplyAsync(() ->
|
CompletableFuture.supplyAsync(() ->
|
||||||
runCommand(requestProto, context), chunkExecutor);
|
runCommandGetResponse(requestProto, context), chunkExecutor);
|
||||||
|
|
||||||
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
|
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
|
||||||
|
|
||||||
|
@ -509,8 +502,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
metrics.incNumQueryStateMachineOps();
|
metrics.incNumQueryStateMachineOps();
|
||||||
final ContainerCommandRequestProto requestProto =
|
final ContainerCommandRequestProto requestProto =
|
||||||
getContainerCommandRequestProto(request.getContent());
|
getContainerCommandRequestProto(request.getContent());
|
||||||
return CompletableFuture
|
return CompletableFuture.completedFuture(runCommand(requestProto, null));
|
||||||
.completedFuture(runCommand(requestProto, null)::toByteString);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
metrics.incNumQueryStateMachineFails();
|
metrics.incNumQueryStateMachineFails();
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
|
@ -682,58 +674,30 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
|
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
|
||||||
builder.setCreateContainerSet(createContainerSet);
|
builder.setCreateContainerSet(createContainerSet);
|
||||||
}
|
}
|
||||||
CompletableFuture<Message> applyTransactionFuture =
|
|
||||||
new CompletableFuture<>();
|
|
||||||
// Ensure the command gets executed in a separate thread than
|
// Ensure the command gets executed in a separate thread than
|
||||||
// stateMachineUpdater thread which is calling applyTransaction here.
|
// stateMachineUpdater thread which is calling applyTransaction here.
|
||||||
CompletableFuture<ContainerCommandResponseProto> future =
|
CompletableFuture<Message> future = CompletableFuture
|
||||||
CompletableFuture.supplyAsync(
|
.supplyAsync(() -> runCommand(requestProto, builder.build()),
|
||||||
() -> runCommand(requestProto, builder.build()),
|
|
||||||
getCommandExecutor(requestProto));
|
getCommandExecutor(requestProto));
|
||||||
future.thenApply(r -> {
|
|
||||||
|
future.thenAccept(m -> {
|
||||||
if (trx.getServerRole() == RaftPeerRole.LEADER) {
|
if (trx.getServerRole() == RaftPeerRole.LEADER) {
|
||||||
long startTime = (long) trx.getStateMachineContext();
|
long startTime = (long) trx.getStateMachineContext();
|
||||||
metrics.incPipelineLatency(cmdType,
|
metrics.incPipelineLatency(cmdType,
|
||||||
Time.monotonicNowNanos() - startTime);
|
Time.monotonicNowNanos() - startTime);
|
||||||
}
|
}
|
||||||
if (r.getResult() != ContainerProtos.Result.SUCCESS) {
|
|
||||||
StorageContainerException sce =
|
final Long previous =
|
||||||
new StorageContainerException(r.getMessage(), r.getResult());
|
applyTransactionCompletionMap
|
||||||
LOG.error(
|
|
||||||
"gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
|
|
||||||
+ "{} Container Result: {}", gid, r.getCmdType(), index,
|
|
||||||
r.getMessage(), r.getResult());
|
|
||||||
metrics.incNumApplyTransactionsFails();
|
|
||||||
// Since the applyTransaction now is completed exceptionally,
|
|
||||||
// before any further snapshot is taken , the exception will be
|
|
||||||
// caught in stateMachineUpdater in Ratis and ratis server will
|
|
||||||
// shutdown.
|
|
||||||
applyTransactionFuture.completeExceptionally(sce);
|
|
||||||
isStateMachineHealthy.compareAndSet(true, false);
|
|
||||||
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
|
|
||||||
} else {
|
|
||||||
LOG.debug(
|
|
||||||
"gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
|
|
||||||
+ "{} Container Result: {}", gid, r.getCmdType(), index,
|
|
||||||
r.getMessage(), r.getResult());
|
|
||||||
applyTransactionFuture.complete(r::toByteString);
|
|
||||||
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
|
|
||||||
metrics.incNumBytesCommittedCount(
|
|
||||||
requestProto.getWriteChunk().getChunkData().getLen());
|
|
||||||
}
|
|
||||||
// add the entry to the applyTransactionCompletionMap only if the
|
|
||||||
// stateMachine is healthy i.e, there has been no applyTransaction
|
|
||||||
// failures before.
|
|
||||||
if (isStateMachineHealthy.get()) {
|
|
||||||
final Long previous = applyTransactionCompletionMap
|
|
||||||
.put(index, trx.getLogEntry().getTerm());
|
.put(index, trx.getLogEntry().getTerm());
|
||||||
Preconditions.checkState(previous == null);
|
Preconditions.checkState(previous == null);
|
||||||
updateLastApplied();
|
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
|
||||||
}
|
metrics.incNumBytesCommittedCount(
|
||||||
|
requestProto.getWriteChunk().getChunkData().getLen());
|
||||||
}
|
}
|
||||||
return applyTransactionFuture;
|
updateLastApplied();
|
||||||
}).whenComplete((r, t) -> applyTransactionSemaphore.release());
|
}).whenComplete((r, t) -> applyTransactionSemaphore.release());
|
||||||
return applyTransactionFuture;
|
return future;
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException | InterruptedException e) {
|
||||||
metrics.incNumApplyTransactionsFails();
|
metrics.incNumApplyTransactionsFails();
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
|
|
|
@ -609,15 +609,6 @@ public final class XceiverServerRatis extends XceiverServer {
|
||||||
handlePipelineFailure(groupId, roleInfoProto);
|
handlePipelineFailure(groupId, roleInfoProto);
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleApplyTransactionFailure(RaftGroupId groupId,
|
|
||||||
RaftProtos.RaftPeerRole role) {
|
|
||||||
UUID dnId = RatisHelper.toDatanodeId(getServer().getId());
|
|
||||||
String msg =
|
|
||||||
"Ratis Transaction failure in datanode " + dnId + " with role " + role
|
|
||||||
+ " .Triggering pipeline close action.";
|
|
||||||
triggerPipelineClose(groupId, msg,
|
|
||||||
ClosePipelineInfo.Reason.STATEMACHINE_TRANSACTION_FAILED, true);
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* The fact that the snapshot contents cannot be used to actually catch up
|
* The fact that the snapshot contents cannot be used to actually catch up
|
||||||
* the follower, it is the reason to initiate close pipeline and
|
* the follower, it is the reason to initiate close pipeline and
|
||||||
|
|
|
@ -214,7 +214,6 @@ message ClosePipelineInfo {
|
||||||
enum Reason {
|
enum Reason {
|
||||||
PIPELINE_FAILED = 1;
|
PIPELINE_FAILED = 1;
|
||||||
PIPELINE_LOG_FAILED = 2;
|
PIPELINE_LOG_FAILED = 2;
|
||||||
STATEMACHINE_TRANSACTION_FAILED = 3;
|
|
||||||
}
|
}
|
||||||
required PipelineID pipelineID = 1;
|
required PipelineID pipelineID = 1;
|
||||||
optional Reason reason = 3;
|
optional Reason reason = 3;
|
||||||
|
|
|
@ -742,8 +742,8 @@ public class DirectoryWithSnapshotFeature implements INode.Feature {
|
||||||
if (currentINode.isLastReference()) {
|
if (currentINode.isLastReference()) {
|
||||||
// if this is the last reference, the created list can be
|
// if this is the last reference, the created list can be
|
||||||
// destroyed.
|
// destroyed.
|
||||||
// priorDiff.getChildrenDiff().destroyCreatedList(
|
priorDiff.getChildrenDiff().destroyCreatedList(
|
||||||
// reclaimContext, currentINode);
|
reclaimContext, currentINode);
|
||||||
} else {
|
} else {
|
||||||
// we only check the node originally in prior's created list
|
// we only check the node originally in prior's created list
|
||||||
for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) {
|
for (INode cNode : priorDiff.diff.getCreatedUnmodifiable()) {
|
||||||
|
|
|
@ -22,32 +22,23 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneKeyDetails;
|
||||||
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
|
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
||||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
|
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.ratis.protocol.RaftRetryFailureException;
|
|
||||||
import org.apache.ratis.protocol.StateMachineException;
|
|
||||||
import org.apache.ratis.server.storage.FileInfo;
|
|
||||||
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -55,7 +46,6 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -64,8 +54,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
|
||||||
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
||||||
import static org.apache.hadoop.hdds.HddsConfigKeys.
|
import static org.apache.hadoop.hdds.HddsConfigKeys.
|
||||||
HDDS_CONTAINER_REPORT_INTERVAL;
|
HDDS_CONTAINER_REPORT_INTERVAL;
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
|
||||||
ContainerDataProto.State.UNHEALTHY;
|
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
||||||
HDDS_SCM_WATCHER_TIMEOUT;
|
HDDS_SCM_WATCHER_TIMEOUT;
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
||||||
|
@ -88,7 +77,7 @@ public class TestContainerStateMachineFailures {
|
||||||
private static String volumeName;
|
private static String volumeName;
|
||||||
private static String bucketName;
|
private static String bucketName;
|
||||||
private static String path;
|
private static String path;
|
||||||
private static XceiverClientManager xceiverClientManager;
|
private static int chunkSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a MiniDFSCluster for testing.
|
* Create a MiniDFSCluster for testing.
|
||||||
|
@ -112,11 +101,6 @@ public class TestContainerStateMachineFailures {
|
||||||
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
||||||
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
|
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
|
||||||
conf.setTimeDuration(
|
|
||||||
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
|
|
||||||
1, TimeUnit.SECONDS);
|
|
||||||
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
|
|
||||||
conf.setQuietMode(false);
|
conf.setQuietMode(false);
|
||||||
cluster =
|
cluster =
|
||||||
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
|
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
|
||||||
|
@ -125,7 +109,6 @@ public class TestContainerStateMachineFailures {
|
||||||
//the easiest way to create an open container is creating a key
|
//the easiest way to create an open container is creating a key
|
||||||
client = OzoneClientFactory.getClient(conf);
|
client = OzoneClientFactory.getClient(conf);
|
||||||
objectStore = client.getObjectStore();
|
objectStore = client.getObjectStore();
|
||||||
xceiverClientManager = new XceiverClientManager(conf);
|
|
||||||
volumeName = "testcontainerstatemachinefailures";
|
volumeName = "testcontainerstatemachinefailures";
|
||||||
bucketName = volumeName;
|
bucketName = volumeName;
|
||||||
objectStore.createVolume(volumeName);
|
objectStore.createVolume(volumeName);
|
||||||
|
@ -149,10 +132,19 @@ public class TestContainerStateMachineFailures {
|
||||||
.createKey("ratis", 1024, ReplicationType.RATIS,
|
.createKey("ratis", 1024, ReplicationType.RATIS,
|
||||||
ReplicationFactor.ONE, new HashMap<>());
|
ReplicationFactor.ONE, new HashMap<>());
|
||||||
byte[] testData = "ratis".getBytes();
|
byte[] testData = "ratis".getBytes();
|
||||||
|
long written = 0;
|
||||||
// First write and flush creates a container in the datanode
|
// First write and flush creates a container in the datanode
|
||||||
key.write(testData);
|
key.write(testData);
|
||||||
|
written += testData.length;
|
||||||
key.flush();
|
key.flush();
|
||||||
key.write(testData);
|
key.write(testData);
|
||||||
|
written += testData.length;
|
||||||
|
|
||||||
|
//get the name of a valid container
|
||||||
|
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
||||||
|
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
|
||||||
|
.build();
|
||||||
KeyOutputStream groupOutputStream =
|
KeyOutputStream groupOutputStream =
|
||||||
(KeyOutputStream) key.getOutputStream();
|
(KeyOutputStream) key.getOutputStream();
|
||||||
List<OmKeyLocationInfo> locationInfoList =
|
List<OmKeyLocationInfo> locationInfoList =
|
||||||
|
@ -165,14 +157,7 @@ public class TestContainerStateMachineFailures {
|
||||||
.getContainer().getContainerSet()
|
.getContainer().getContainerSet()
|
||||||
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
|
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
|
||||||
.getContainerPath()));
|
.getContainerPath()));
|
||||||
try {
|
key.close();
|
||||||
// there is only 1 datanode in the pipeline, the pipeline will be closed
|
|
||||||
// and allocation to new pipeline will fail as there is no other dn in
|
|
||||||
// the cluster
|
|
||||||
key.close();
|
|
||||||
} catch(IOException ioe) {
|
|
||||||
Assert.assertTrue(ioe instanceof OMException);
|
|
||||||
}
|
|
||||||
long containerID = omKeyLocationInfo.getContainerID();
|
long containerID = omKeyLocationInfo.getContainerID();
|
||||||
|
|
||||||
// Make sure the container is marked unhealthy
|
// Make sure the container is marked unhealthy
|
||||||
|
@ -194,6 +179,22 @@ public class TestContainerStateMachineFailures {
|
||||||
.getDatanodeStateMachine().getContainer();
|
.getDatanodeStateMachine().getContainer();
|
||||||
Assert
|
Assert
|
||||||
.assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
|
.assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
|
||||||
|
|
||||||
|
OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
|
||||||
|
.getBucket(bucketName).getKey("ratis");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure length of data stored in key is equal to number of bytes written.
|
||||||
|
*/
|
||||||
|
Assert.assertTrue("Number of bytes stored in the key is not equal " +
|
||||||
|
"to number of bytes written.", keyDetails.getDataSize() == written);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pending data from the second write should get written to a new container
|
||||||
|
* during key.close() because the first container is UNHEALTHY by that time
|
||||||
|
*/
|
||||||
|
Assert.assertTrue("Expect Key to be stored in 2 separate containers",
|
||||||
|
keyDetails.getOzoneKeyLocations().size() == 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -206,6 +207,12 @@ public class TestContainerStateMachineFailures {
|
||||||
key.write("ratis".getBytes());
|
key.write("ratis".getBytes());
|
||||||
key.flush();
|
key.flush();
|
||||||
key.write("ratis".getBytes());
|
key.write("ratis".getBytes());
|
||||||
|
|
||||||
|
//get the name of a valid container
|
||||||
|
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
||||||
|
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
|
||||||
|
.build();
|
||||||
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
|
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
|
||||||
List<OmKeyLocationInfo> locationInfoList =
|
List<OmKeyLocationInfo> locationInfoList =
|
||||||
groupOutputStream.getLocationInfoList();
|
groupOutputStream.getLocationInfoList();
|
||||||
|
@ -221,14 +228,8 @@ public class TestContainerStateMachineFailures {
|
||||||
(KeyValueContainerData) containerData;
|
(KeyValueContainerData) containerData;
|
||||||
// delete the container db file
|
// delete the container db file
|
||||||
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
|
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
|
||||||
try {
|
|
||||||
// there is only 1 datanode in the pipeline, the pipeline will be closed
|
key.close();
|
||||||
// and allocation to new pipeline will fail as there is no other dn in
|
|
||||||
// the cluster
|
|
||||||
key.close();
|
|
||||||
} catch(IOException ioe) {
|
|
||||||
Assert.assertTrue(ioe instanceof OMException);
|
|
||||||
}
|
|
||||||
|
|
||||||
long containerID = omKeyLocationInfo.getContainerID();
|
long containerID = omKeyLocationInfo.getContainerID();
|
||||||
|
|
||||||
|
@ -269,83 +270,4 @@ public class TestContainerStateMachineFailures {
|
||||||
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
|
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
|
||||||
dispatcher.dispatch(request.build(), null).getResult());
|
dispatcher.dispatch(request.build(), null).getResult());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testApplyTransactionFailure() throws Exception {
|
|
||||||
OzoneOutputStream key =
|
|
||||||
objectStore.getVolume(volumeName).getBucket(bucketName)
|
|
||||||
.createKey("ratis", 1024, ReplicationType.RATIS,
|
|
||||||
ReplicationFactor.ONE, new HashMap<>());
|
|
||||||
// First write and flush creates a container in the datanode
|
|
||||||
key.write("ratis".getBytes());
|
|
||||||
key.flush();
|
|
||||||
key.write("ratis".getBytes());
|
|
||||||
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
|
|
||||||
List<OmKeyLocationInfo> locationInfoList =
|
|
||||||
groupOutputStream.getLocationInfoList();
|
|
||||||
Assert.assertEquals(1, locationInfoList.size());
|
|
||||||
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
|
|
||||||
ContainerData containerData =
|
|
||||||
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
|
|
||||||
.getContainer().getContainerSet()
|
|
||||||
.getContainer(omKeyLocationInfo.getContainerID())
|
|
||||||
.getContainerData();
|
|
||||||
Assert.assertTrue(containerData instanceof KeyValueContainerData);
|
|
||||||
KeyValueContainerData keyValueContainerData =
|
|
||||||
(KeyValueContainerData) containerData;
|
|
||||||
key.close();
|
|
||||||
ContainerStateMachine stateMachine =
|
|
||||||
(ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
|
|
||||||
SimpleStateMachineStorage storage =
|
|
||||||
(SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
|
|
||||||
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
|
|
||||||
// Since the snapshot threshold is set to 1, since there are
|
|
||||||
// applyTransactions, we should see snapshots
|
|
||||||
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
|
|
||||||
FileInfo snapshot = storage.findLatestSnapshot().getFile();
|
|
||||||
Assert.assertNotNull(snapshot);
|
|
||||||
long containerID = omKeyLocationInfo.getContainerID();
|
|
||||||
// delete the container db file
|
|
||||||
FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
|
|
||||||
Pipeline pipeline = cluster.getStorageContainerLocationClient()
|
|
||||||
.getContainerWithPipeline(containerID).getPipeline();
|
|
||||||
XceiverClientSpi xceiverClient =
|
|
||||||
xceiverClientManager.acquireClient(pipeline);
|
|
||||||
ContainerProtos.ContainerCommandRequestProto.Builder request =
|
|
||||||
ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
|
||||||
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
|
|
||||||
request.setCmdType(ContainerProtos.Type.CloseContainer);
|
|
||||||
request.setContainerID(containerID);
|
|
||||||
request.setCloseContainer(
|
|
||||||
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
|
|
||||||
// close container transaction will fail over Ratis and will initiate
|
|
||||||
// a pipeline close action
|
|
||||||
|
|
||||||
// Since the applyTransaction failure is propagated to Ratis,
|
|
||||||
// stateMachineUpdater will it exception while taking the next snapshot
|
|
||||||
// and should shutdown the RaftServerImpl. The client request will fail
|
|
||||||
// with RaftRetryFailureException.
|
|
||||||
try {
|
|
||||||
xceiverClient.sendCommand(request.build());
|
|
||||||
Assert.fail("Expected exception not thrown");
|
|
||||||
} catch (IOException e) {
|
|
||||||
Assert.assertTrue(HddsClientUtils
|
|
||||||
.checkForException(e) instanceof RaftRetryFailureException);
|
|
||||||
}
|
|
||||||
// Make sure the container is marked unhealthy
|
|
||||||
Assert.assertTrue(
|
|
||||||
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
|
|
||||||
.getContainer().getContainerSet().getContainer(containerID)
|
|
||||||
.getContainerState()
|
|
||||||
== ContainerProtos.ContainerDataProto.State.UNHEALTHY);
|
|
||||||
try {
|
|
||||||
// try to take a new snapshot, ideally it should just fail
|
|
||||||
stateMachine.takeSnapshot();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
Assert.assertTrue(ioe instanceof StateMachineException);
|
|
||||||
}
|
|
||||||
// Make sure the latest snapshot is same as the previous one
|
|
||||||
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
|
|
||||||
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -73,10 +73,6 @@ import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.ratis.protocol.RaftGroupId;
|
|
||||||
import org.apache.ratis.server.impl.RaftServerImpl;
|
|
||||||
import org.apache.ratis.server.impl.RaftServerProxy;
|
|
||||||
import org.apache.ratis.statemachine.StateMachine;
|
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -870,16 +866,4 @@ public final class ContainerTestHelper {
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StateMachine getStateMachine(MiniOzoneCluster cluster)
|
|
||||||
throws Exception {
|
|
||||||
XceiverServerSpi server =
|
|
||||||
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
|
|
||||||
getContainer().getWriteChannel();
|
|
||||||
RaftServerProxy proxy =
|
|
||||||
(RaftServerProxy) (((XceiverServerRatis) server).getServer());
|
|
||||||
RaftGroupId groupId = proxy.getGroupIds().iterator().next();
|
|
||||||
RaftServerImpl impl = proxy.getImpl(groupId);
|
|
||||||
return impl.getStateMachine();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,13 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
import org.apache.hadoop.ozone.container.common.transport
|
||||||
|
.server.XceiverServerSpi;
|
||||||
|
import org.apache.hadoop.ozone.container.common.transport.server.ratis
|
||||||
|
.XceiverServerRatis;
|
||||||
|
import org.apache.ratis.protocol.RaftGroupId;
|
||||||
|
import org.apache.ratis.server.impl.RaftServerImpl;
|
||||||
|
import org.apache.ratis.server.impl.RaftServerProxy;
|
||||||
import org.apache.ratis.server.protocol.TermIndex;
|
import org.apache.ratis.server.protocol.TermIndex;
|
||||||
import org.apache.ratis.statemachine.StateMachine;
|
import org.apache.ratis.statemachine.StateMachine;
|
||||||
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
||||||
|
@ -121,6 +127,13 @@ public class TestFreonWithDatanodeFastRestart {
|
||||||
}
|
}
|
||||||
|
|
||||||
private StateMachine getStateMachine() throws Exception {
|
private StateMachine getStateMachine() throws Exception {
|
||||||
return ContainerTestHelper.getStateMachine(cluster);
|
XceiverServerSpi server =
|
||||||
|
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
|
||||||
|
getContainer().getWriteChannel();
|
||||||
|
RaftServerProxy proxy =
|
||||||
|
(RaftServerProxy)(((XceiverServerRatis)server).getServer());
|
||||||
|
RaftGroupId groupId = proxy.getGroupIds().iterator().next();
|
||||||
|
RaftServerImpl impl = proxy.getImpl(groupId);
|
||||||
|
return impl.getStateMachine();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue