HDDS-2048: State check during container state transition in datanode should be lock protected (#1375)
This commit is contained in:
parent
bc2d3a71d6
commit
c3beeb7761
|
@ -82,7 +82,9 @@
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to perform KeyValue Container operations.
|
* Class to perform KeyValue Container operations. Any modifications to
|
||||||
|
* KeyValueContainer object should ideally be done via api exposed in
|
||||||
|
* KeyValueHandler class.
|
||||||
*/
|
*/
|
||||||
public class KeyValueContainer implements Container<KeyValueContainerData> {
|
public class KeyValueContainer implements Container<KeyValueContainerData> {
|
||||||
|
|
||||||
|
@ -554,6 +556,8 @@ public boolean hasReadLock() {
|
||||||
* Acquire write lock.
|
* Acquire write lock.
|
||||||
*/
|
*/
|
||||||
public void writeLock() {
|
public void writeLock() {
|
||||||
|
// TODO: The lock for KeyValueContainer object should not be exposed
|
||||||
|
// publicly.
|
||||||
this.lock.writeLock().lock();
|
this.lock.writeLock().lock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -881,75 +881,97 @@ public void exportContainer(final Container container,
|
||||||
@Override
|
@Override
|
||||||
public void markContainerForClose(Container container)
|
public void markContainerForClose(Container container)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Move the container to CLOSING state only if it's OPEN
|
container.writeLock();
|
||||||
if (container.getContainerState() == State.OPEN) {
|
try {
|
||||||
container.markContainerForClose();
|
// Move the container to CLOSING state only if it's OPEN
|
||||||
sendICR(container);
|
if (container.getContainerState() == State.OPEN) {
|
||||||
|
container.markContainerForClose();
|
||||||
|
sendICR(container);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
container.writeUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void markContainerUnhealthy(Container container)
|
public void markContainerUnhealthy(Container container)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (container.getContainerState() != State.UNHEALTHY) {
|
container.writeLock();
|
||||||
try {
|
try {
|
||||||
container.markContainerUnhealthy();
|
if (container.getContainerState() != State.UNHEALTHY) {
|
||||||
} catch (IOException ex) {
|
try {
|
||||||
// explicitly catch IOException here since the this operation
|
container.markContainerUnhealthy();
|
||||||
// will fail if the Rocksdb metadata is corrupted.
|
} catch (IOException ex) {
|
||||||
long id = container.getContainerData().getContainerID();
|
// explicitly catch IOException here since the this operation
|
||||||
LOG.warn("Unexpected error while marking container "
|
// will fail if the Rocksdb metadata is corrupted.
|
||||||
+id+ " as unhealthy", ex);
|
long id = container.getContainerData().getContainerID();
|
||||||
} finally {
|
LOG.warn("Unexpected error while marking container " + id
|
||||||
sendICR(container);
|
+ " as unhealthy", ex);
|
||||||
|
} finally {
|
||||||
|
sendICR(container);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
container.writeUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void quasiCloseContainer(Container container)
|
public void quasiCloseContainer(Container container)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final State state = container.getContainerState();
|
container.writeLock();
|
||||||
// Quasi close call is idempotent.
|
try {
|
||||||
if (state == State.QUASI_CLOSED) {
|
final State state = container.getContainerState();
|
||||||
return;
|
// Quasi close call is idempotent.
|
||||||
|
if (state == State.QUASI_CLOSED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// The container has to be in CLOSING state.
|
||||||
|
if (state != State.CLOSING) {
|
||||||
|
ContainerProtos.Result error =
|
||||||
|
state == State.INVALID ? INVALID_CONTAINER_STATE :
|
||||||
|
CONTAINER_INTERNAL_ERROR;
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Cannot quasi close container #" + container.getContainerData()
|
||||||
|
.getContainerID() + " while in " + state + " state.", error);
|
||||||
|
}
|
||||||
|
container.quasiClose();
|
||||||
|
sendICR(container);
|
||||||
|
} finally {
|
||||||
|
container.writeUnlock();
|
||||||
}
|
}
|
||||||
// The container has to be in CLOSING state.
|
|
||||||
if (state != State.CLOSING) {
|
|
||||||
ContainerProtos.Result error = state == State.INVALID ?
|
|
||||||
INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
|
|
||||||
throw new StorageContainerException("Cannot quasi close container #" +
|
|
||||||
container.getContainerData().getContainerID() + " while in " +
|
|
||||||
state + " state.", error);
|
|
||||||
}
|
|
||||||
container.quasiClose();
|
|
||||||
sendICR(container);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeContainer(Container container)
|
public void closeContainer(Container container)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final State state = container.getContainerState();
|
container.writeLock();
|
||||||
// Close call is idempotent.
|
try {
|
||||||
if (state == State.CLOSED) {
|
final State state = container.getContainerState();
|
||||||
return;
|
// Close call is idempotent.
|
||||||
|
if (state == State.CLOSED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (state == State.UNHEALTHY) {
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Cannot close container #" + container.getContainerData()
|
||||||
|
.getContainerID() + " while in " + state + " state.",
|
||||||
|
ContainerProtos.Result.CONTAINER_UNHEALTHY);
|
||||||
|
}
|
||||||
|
// The container has to be either in CLOSING or in QUASI_CLOSED state.
|
||||||
|
if (state != State.CLOSING && state != State.QUASI_CLOSED) {
|
||||||
|
ContainerProtos.Result error =
|
||||||
|
state == State.INVALID ? INVALID_CONTAINER_STATE :
|
||||||
|
CONTAINER_INTERNAL_ERROR;
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Cannot close container #" + container.getContainerData()
|
||||||
|
.getContainerID() + " while in " + state + " state.", error);
|
||||||
|
}
|
||||||
|
container.close();
|
||||||
|
sendICR(container);
|
||||||
|
} finally {
|
||||||
|
container.writeUnlock();
|
||||||
}
|
}
|
||||||
if (state == State.UNHEALTHY) {
|
|
||||||
throw new StorageContainerException(
|
|
||||||
"Cannot close container #" + container.getContainerData()
|
|
||||||
.getContainerID() + " while in " + state + " state.",
|
|
||||||
ContainerProtos.Result.CONTAINER_UNHEALTHY);
|
|
||||||
}
|
|
||||||
// The container has to be either in CLOSING or in QUASI_CLOSED state.
|
|
||||||
if (state != State.CLOSING && state != State.QUASI_CLOSED) {
|
|
||||||
ContainerProtos.Result error = state == State.INVALID ?
|
|
||||||
INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR;
|
|
||||||
throw new StorageContainerException("Cannot close container #" +
|
|
||||||
container.getContainerData().getContainerID() + " while in " +
|
|
||||||
state + " state.", error);
|
|
||||||
}
|
|
||||||
container.close();
|
|
||||||
sendICR(container);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -258,21 +258,25 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
|
||||||
Preconditions.checkArgument(count > 0,
|
Preconditions.checkArgument(count > 0,
|
||||||
"Count must be a positive number.");
|
"Count must be a positive number.");
|
||||||
container.readLock();
|
container.readLock();
|
||||||
List<BlockData> result = null;
|
try {
|
||||||
KeyValueContainerData cData = (KeyValueContainerData) container
|
List<BlockData> result = null;
|
||||||
.getContainerData();
|
KeyValueContainerData cData =
|
||||||
try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
|
(KeyValueContainerData) container.getContainerData();
|
||||||
result = new ArrayList<>();
|
try (ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
|
||||||
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
|
result = new ArrayList<>();
|
||||||
List<Map.Entry<byte[], byte[]>> range =
|
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
|
||||||
db.getStore().getSequentialRangeKVs(startKeyInBytes, count,
|
List<Map.Entry<byte[], byte[]>> range = db.getStore()
|
||||||
MetadataKeyFilters.getNormalKeyFilter());
|
.getSequentialRangeKVs(startKeyInBytes, count,
|
||||||
for (Map.Entry<byte[], byte[]> entry : range) {
|
MetadataKeyFilters.getNormalKeyFilter());
|
||||||
BlockData value = BlockUtils.getBlockData(entry.getValue());
|
for (Map.Entry<byte[], byte[]> entry : range) {
|
||||||
BlockData data = new BlockData(value.getBlockID());
|
BlockData value = BlockUtils.getBlockData(entry.getValue());
|
||||||
result.add(data);
|
BlockData data = new BlockData(value.getBlockID());
|
||||||
|
result.add(data);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
return result;
|
} finally {
|
||||||
|
container.readUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue