HDDS-269. Refactor IdentifiableEventPayload to use a long ID. Contributed by Ajay Kumar.
This commit is contained in:
parent
cbf2026483
commit
e9c44ecfc6
@ -311,9 +311,9 @@ public void addCmdStatus(Long key, CommandStatus status) {
|
||||
* @param cmd - {@link SCMCommand}.
|
||||
*/
|
||||
public void addCmdStatus(SCMCommand cmd) {
|
||||
this.addCmdStatus(cmd.getCmdId(),
|
||||
this.addCmdStatus(cmd.getId(),
|
||||
CommandStatusBuilder.newBuilder()
|
||||
.setCmdId(cmd.getCmdId())
|
||||
.setCmdId(cmd.getId())
|
||||
.setStatus(Status.PENDING)
|
||||
.setType(cmd.getType())
|
||||
.build());
|
||||
|
@ -64,9 +64,9 @@ void handle(SCMCommand command, OzoneContainer container,
|
||||
*/
|
||||
default void updateCommandStatus(StateContext context, SCMCommand command,
|
||||
boolean cmdExecuted, Logger log) {
|
||||
if (!context.updateCommandStatus(command.getCmdId(), cmdExecuted)) {
|
||||
log.debug("{} with cmdId:{} not found.", command.getType(),
|
||||
command.getCmdId());
|
||||
if (!context.updateCommandStatus(command.getId(), cmdExecuted)) {
|
||||
log.debug("{} with Id:{} not found.", command.getType(),
|
||||
command.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,8 +41,8 @@ public CloseContainerCommand(long containerID,
|
||||
|
||||
// Should be called only for protobuf conversion
|
||||
private CloseContainerCommand(long containerID,
|
||||
HddsProtos.ReplicationType replicationType, long cmdId) {
|
||||
super(cmdId);
|
||||
HddsProtos.ReplicationType replicationType, long id) {
|
||||
super(id);
|
||||
this.containerID = containerID;
|
||||
this.replicationType = replicationType;
|
||||
}
|
||||
@ -70,7 +70,7 @@ public byte[] getProtoBufMessage() {
|
||||
public CloseContainerCommandProto getProto() {
|
||||
return CloseContainerCommandProto.newBuilder()
|
||||
.setContainerID(containerID)
|
||||
.setCmdId(getCmdId())
|
||||
.setCmdId(getId())
|
||||
.setReplicationType(replicationType).build();
|
||||
}
|
||||
|
||||
|
@ -42,8 +42,8 @@ public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
|
||||
|
||||
// Should be called only for protobuf conversion
|
||||
private DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks,
|
||||
long cmdId) {
|
||||
super(cmdId);
|
||||
long id) {
|
||||
super(id);
|
||||
this.blocksTobeDeleted = blocks;
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ public static DeleteBlocksCommand getFromProtobuf(
|
||||
|
||||
public DeleteBlocksCommandProto getProto() {
|
||||
return DeleteBlocksCommandProto.newBuilder()
|
||||
.setCmdId(getCmdId())
|
||||
.setCmdId(getId())
|
||||
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
|
||||
}
|
||||
}
|
||||
|
@ -51,8 +51,8 @@ public ReplicateContainerCommand(long containerID,
|
||||
|
||||
// Should be called only for protobuf conversion
|
||||
public ReplicateContainerCommand(long containerID,
|
||||
List<DatanodeDetails> sourceDatanodes, long cmdId) {
|
||||
super(cmdId);
|
||||
List<DatanodeDetails> sourceDatanodes, long id) {
|
||||
super(id);
|
||||
this.containerID = containerID;
|
||||
this.sourceDatanodes = sourceDatanodes;
|
||||
}
|
||||
@ -69,7 +69,7 @@ public byte[] getProtoBufMessage() {
|
||||
|
||||
public ReplicateContainerCommandProto getProto() {
|
||||
Builder builder = ReplicateContainerCommandProto.newBuilder()
|
||||
.setCmdId(getCmdId())
|
||||
.setCmdId(getId())
|
||||
.setContainerID(containerID);
|
||||
for (DatanodeDetails dd : sourceDatanodes) {
|
||||
builder.addSources(dd.getProtoBufMessage());
|
||||
|
@ -55,7 +55,7 @@ public byte[] getProtoBufMessage() {
|
||||
* @return cmdId.
|
||||
*/
|
||||
@Override
|
||||
public long getCmdId() {
|
||||
public long getId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -21,21 +21,23 @@
|
||||
import org.apache.hadoop.hdds.HddsIdFactory;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
|
||||
|
||||
/**
|
||||
* A class that acts as the base class to convert between Java and SCM
|
||||
* commands in protobuf format.
|
||||
* @param <T>
|
||||
*/
|
||||
public abstract class SCMCommand<T extends GeneratedMessage> {
|
||||
private long cmdId;
|
||||
public abstract class SCMCommand<T extends GeneratedMessage> implements
|
||||
IdentifiableEventPayload {
|
||||
private long id;
|
||||
|
||||
SCMCommand() {
|
||||
this.cmdId = HddsIdFactory.getLongId();
|
||||
this.id = HddsIdFactory.getLongId();
|
||||
}
|
||||
|
||||
SCMCommand(long cmdId) {
|
||||
this.cmdId = cmdId;
|
||||
SCMCommand(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
/**
|
||||
* Returns the type of this command.
|
||||
@ -53,8 +55,8 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
|
||||
* Gets the commandId of this object.
|
||||
* @return uuid.
|
||||
*/
|
||||
public long getCmdId() {
|
||||
return cmdId;
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,7 +21,6 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
@ -61,22 +60,22 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
|
||||
|
||||
private final Event<COMPLETION_PAYLOAD> completionEvent;
|
||||
|
||||
private final LeaseManager<UUID> leaseManager;
|
||||
private final LeaseManager<Long> leaseManager;
|
||||
|
||||
private final EventWatcherMetrics metrics;
|
||||
|
||||
private final String name;
|
||||
|
||||
protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
|
||||
protected final Map<Long, TIMEOUT_PAYLOAD> trackedEventsByID =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
|
||||
|
||||
private final Map<UUID, Long> startTrackingTimes = new HashedMap();
|
||||
private final Map<Long, Long> startTrackingTimes = new HashedMap();
|
||||
|
||||
public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
Event<COMPLETION_PAYLOAD> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
LeaseManager<Long> leaseManager) {
|
||||
this.startEvent = startEvent;
|
||||
this.completionEvent = completionEvent;
|
||||
this.leaseManager = leaseManager;
|
||||
@ -94,7 +93,7 @@ public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
|
||||
public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
Event<COMPLETION_PAYLOAD> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
LeaseManager<Long> leaseManager) {
|
||||
this("", startEvent, completionEvent, leaseManager);
|
||||
}
|
||||
|
||||
@ -103,13 +102,13 @@ public void start(EventQueue queue) {
|
||||
queue.addHandler(startEvent, this::handleStartEvent);
|
||||
|
||||
queue.addHandler(completionEvent, (completionPayload, publisher) -> {
|
||||
UUID uuid = completionPayload.getUUID();
|
||||
long id = completionPayload.getId();
|
||||
try {
|
||||
handleCompletion(uuid, publisher);
|
||||
handleCompletion(id, publisher);
|
||||
} catch (LeaseNotFoundException e) {
|
||||
//It's already done. Too late, we already retried it.
|
||||
//Not a real problem.
|
||||
LOG.warn("Completion event without active lease. UUID={}", uuid);
|
||||
LOG.warn("Completion event without active lease. Id={}", id);
|
||||
}
|
||||
});
|
||||
|
||||
@ -120,13 +119,13 @@ public void start(EventQueue queue) {
|
||||
private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
|
||||
EventPublisher publisher) {
|
||||
metrics.incrementTrackedEvents();
|
||||
UUID identifier = payload.getUUID();
|
||||
long identifier = payload.getId();
|
||||
startTrackingTimes.put(identifier, System.currentTimeMillis());
|
||||
|
||||
trackedEventsByUUID.put(identifier, payload);
|
||||
trackedEventsByID.put(identifier, payload);
|
||||
trackedEvents.add(payload);
|
||||
try {
|
||||
Lease<UUID> lease = leaseManager.acquire(identifier);
|
||||
Lease<Long> lease = leaseManager.acquire(identifier);
|
||||
try {
|
||||
lease.registerCallBack(() -> {
|
||||
handleTimeout(publisher, identifier);
|
||||
@ -141,23 +140,23 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void handleCompletion(UUID uuid,
|
||||
private synchronized void handleCompletion(long id,
|
||||
EventPublisher publisher) throws LeaseNotFoundException {
|
||||
metrics.incrementCompletedEvents();
|
||||
leaseManager.release(uuid);
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
|
||||
leaseManager.release(id);
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id);
|
||||
trackedEvents.remove(payload);
|
||||
long originalTime = startTrackingTimes.remove(uuid);
|
||||
long originalTime = startTrackingTimes.remove(id);
|
||||
metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
|
||||
onFinished(publisher, payload);
|
||||
}
|
||||
|
||||
private synchronized void handleTimeout(EventPublisher publisher,
|
||||
UUID identifier) {
|
||||
long identifier) {
|
||||
metrics.incrementTimedOutEvents();
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(identifier);
|
||||
trackedEvents.remove(payload);
|
||||
startTrackingTimes.remove(payload.getUUID());
|
||||
startTrackingTimes.remove(payload.getId());
|
||||
onTimeout(publisher, payload);
|
||||
}
|
||||
|
||||
@ -171,12 +170,12 @@ public synchronized boolean contains(TIMEOUT_PAYLOAD payload) {
|
||||
|
||||
public synchronized boolean remove(TIMEOUT_PAYLOAD payload) {
|
||||
try {
|
||||
leaseManager.release(payload.getUUID());
|
||||
leaseManager.release(payload.getId());
|
||||
} catch (LeaseNotFoundException e) {
|
||||
LOG.warn("Completion event without active lease. UUID={}",
|
||||
payload.getUUID());
|
||||
LOG.warn("Completion event without active lease. Id={}",
|
||||
payload.getId());
|
||||
}
|
||||
trackedEventsByUUID.remove(payload.getUUID());
|
||||
trackedEventsByID.remove(payload.getId());
|
||||
return trackedEvents.remove(payload);
|
||||
|
||||
}
|
||||
@ -187,7 +186,7 @@ public synchronized boolean remove(TIMEOUT_PAYLOAD payload) {
|
||||
|
||||
public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
|
||||
Predicate<? super TIMEOUT_PAYLOAD> predicate) {
|
||||
return trackedEventsByUUID.values().stream().filter(predicate)
|
||||
return trackedEventsByID.values().stream().filter(predicate)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -17,14 +17,12 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdds.server.events;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Event with an additional unique identifier.
|
||||
*
|
||||
*/
|
||||
public interface IdentifiableEventPayload {
|
||||
|
||||
UUID getUUID();
|
||||
long getId();
|
||||
|
||||
}
|
||||
|
@ -19,15 +19,9 @@
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.hdds.HddsIdFactory;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -47,7 +41,7 @@ public class TestEventWatcher {
|
||||
private static final TypedEvent<ReplicationCompletedEvent>
|
||||
REPLICATION_COMPLETED = new TypedEvent<>(ReplicationCompletedEvent.class);
|
||||
|
||||
LeaseManager<UUID> leaseManager;
|
||||
LeaseManager<Long> leaseManager;
|
||||
|
||||
@Before
|
||||
public void startLeaseManager() {
|
||||
@ -77,21 +71,21 @@ public void testEventHandling() throws InterruptedException {
|
||||
|
||||
replicationWatcher.start(queue);
|
||||
|
||||
UUID uuid1 = UUID.randomUUID();
|
||||
UUID uuid2 = UUID.randomUUID();
|
||||
long id1 = HddsIdFactory.getLongId();
|
||||
long id2 = HddsIdFactory.getLongId();
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED,
|
||||
new UnderreplicatedEvent(uuid1, "C1"));
|
||||
new UnderreplicatedEvent(id1, "C1"));
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED,
|
||||
new UnderreplicatedEvent(uuid2, "C2"));
|
||||
new UnderreplicatedEvent(id2, "C2"));
|
||||
|
||||
Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
queue.fireEvent(REPLICATION_COMPLETED,
|
||||
new ReplicationCompletedEvent(uuid1, "C2", "D1"));
|
||||
new ReplicationCompletedEvent(id1, "C2", "D1"));
|
||||
|
||||
Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
|
||||
|
||||
@ -100,8 +94,8 @@ public void testEventHandling() throws InterruptedException {
|
||||
queue.processAll(1000L);
|
||||
|
||||
Assert.assertEquals(1, underReplicatedEvents.getReceivedEvents().size());
|
||||
Assert.assertEquals(uuid2,
|
||||
underReplicatedEvents.getReceivedEvents().get(0).UUID);
|
||||
Assert.assertEquals(id2,
|
||||
underReplicatedEvents.getReceivedEvents().get(0).id);
|
||||
|
||||
}
|
||||
|
||||
@ -121,15 +115,15 @@ public void testInprogressFilter() throws InterruptedException {
|
||||
replicationWatcher.start(queue);
|
||||
|
||||
UnderreplicatedEvent event1 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1");
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C1");
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED,
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C2"));
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C2"));
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED,
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1"));
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C1"));
|
||||
|
||||
queue.processAll(1000L);
|
||||
Thread.sleep(1000L);
|
||||
@ -166,13 +160,13 @@ public void testMetrics() throws InterruptedException {
|
||||
|
||||
//send 3 event to track 3 in-progress activity
|
||||
UnderreplicatedEvent event1 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1");
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C1");
|
||||
|
||||
UnderreplicatedEvent event2 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C2");
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C2");
|
||||
|
||||
UnderreplicatedEvent event3 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1");
|
||||
new UnderreplicatedEvent(HddsIdFactory.getLongId(), "C1");
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
|
||||
|
||||
@ -182,11 +176,10 @@ public void testMetrics() throws InterruptedException {
|
||||
|
||||
//1st event is completed, don't need to track any more
|
||||
ReplicationCompletedEvent event1Completed =
|
||||
new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
|
||||
new ReplicationCompletedEvent(event1.id, "C1", "D1");
|
||||
|
||||
queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
|
||||
|
||||
|
||||
Thread.sleep(2200l);
|
||||
|
||||
//until now: 3 in-progress activities are tracked with three
|
||||
@ -218,7 +211,7 @@ private class CommandWatcherExample
|
||||
|
||||
public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent,
|
||||
Event<ReplicationCompletedEvent> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
LeaseManager<Long> leaseManager) {
|
||||
super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
|
||||
}
|
||||
|
||||
@ -243,21 +236,21 @@ public EventWatcherMetrics getMetrics() {
|
||||
private static class ReplicationCompletedEvent
|
||||
implements IdentifiableEventPayload {
|
||||
|
||||
private final UUID UUID;
|
||||
private final long id;
|
||||
|
||||
private final String containerId;
|
||||
|
||||
private final String datanodeId;
|
||||
|
||||
public ReplicationCompletedEvent(UUID UUID, String containerId,
|
||||
public ReplicationCompletedEvent(long id, String containerId,
|
||||
String datanodeId) {
|
||||
this.UUID = UUID;
|
||||
this.id = id;
|
||||
this.containerId = containerId;
|
||||
this.datanodeId = datanodeId;
|
||||
}
|
||||
|
||||
public UUID getUUID() {
|
||||
return UUID;
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -284,17 +277,17 @@ private static class UnderreplicatedEvent
|
||||
|
||||
implements IdentifiableEventPayload {
|
||||
|
||||
private final UUID UUID;
|
||||
private final long id;
|
||||
|
||||
private final String containerId;
|
||||
|
||||
public UnderreplicatedEvent(UUID UUID, String containerId) {
|
||||
public UnderreplicatedEvent(long id, String containerId) {
|
||||
this.containerId = containerId;
|
||||
this.UUID = UUID;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public UUID getUUID() {
|
||||
return UUID;
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user