This commit is contained in:
Martyn Taylor 2017-08-23 13:37:05 +01:00
commit d6aed25288
1 changed files with 13 additions and 21 deletions

View File

@ -350,34 +350,26 @@ public final class ReplicationManager implements ActiveMQComponent {
}
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled)
if (!enabled) {
packet.release();
return null;
boolean runItNow = false;
}
final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
if (lineUp) {
repliToken.replicationLineUp();
}
if (enabled) {
replicationStream.execute(() -> {
if (enabled) {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
packet.release();
}
// Execute outside lock
if (runItNow) {
repliToken.replicationDone();
}
replicationStream.execute(() -> {
if (enabled) {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
} else {
packet.release();
repliToken.replicationDone();
}
});
return repliToken;
}