ARTEMIS-4704 eliminate unnecessary variable in ReplicationManager
This commit is contained in:
parent
9f50aff46d
commit
da9695a5f6
|
@ -140,9 +140,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
|
||||
private final Channel replicatingChannel;
|
||||
|
||||
private boolean started;
|
||||
|
||||
private volatile boolean enabled;
|
||||
private volatile boolean started;
|
||||
|
||||
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
|
||||
|
||||
|
@ -224,13 +222,13 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, id, recordType, persister, record));
|
||||
}
|
||||
}
|
||||
|
||||
public void appendDeleteRecord(final byte journalID, final long id) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
|
||||
}
|
||||
}
|
||||
|
@ -242,7 +240,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, txID, id, recordType, persister, record));
|
||||
}
|
||||
}
|
||||
|
@ -251,7 +249,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final long txID,
|
||||
boolean sync,
|
||||
final boolean lineUp) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
|
||||
}
|
||||
}
|
||||
|
@ -260,13 +258,13 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final long txID,
|
||||
final long id,
|
||||
final EncodingSupport record) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
|
||||
}
|
||||
}
|
||||
|
||||
public void appendDeleteRecordTransactional(final byte journalID, final long txID, final long id) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
|
||||
}
|
||||
}
|
||||
|
@ -274,13 +272,13 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
public void appendPrepareRecord(final byte journalID,
|
||||
final long txID,
|
||||
final EncodingSupport transactionData) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
|
||||
}
|
||||
}
|
||||
|
||||
public void appendRollbackRecord(final byte journalID, final long txID) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationCommitMessage(journalID, true, txID));
|
||||
}
|
||||
}
|
||||
|
@ -290,45 +288,45 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
* @param pageNumber
|
||||
*/
|
||||
public void pageClosed(final SimpleString storeName, final long pageNumber) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false, remotingConnection.isVersionUsingLongOnPageReplication()));
|
||||
}
|
||||
}
|
||||
|
||||
public void pageDeleted(final SimpleString storeName, final long pageNumber) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true, remotingConnection.isVersionUsingLongOnPageReplication()));
|
||||
}
|
||||
}
|
||||
|
||||
public void pageWrite(final PagedMessage message, final long pageNumber) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber, remotingConnection.isVersionUsingLongOnPageReplication()));
|
||||
}
|
||||
}
|
||||
|
||||
public void largeMessageBegin(final long messageId) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationLargeMessageBeginMessage(messageId));
|
||||
}
|
||||
}
|
||||
|
||||
//we pass in storageManager to generate ID only if enabled
|
||||
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
long pendingRecordID = storageManager.generateID();
|
||||
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true));
|
||||
}
|
||||
}
|
||||
|
||||
public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false));
|
||||
}
|
||||
}
|
||||
|
||||
public void largeMessageWrite(final long messageId, final byte[] body) {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
|
||||
}
|
||||
}
|
||||
|
@ -368,8 +366,6 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
started = true;
|
||||
|
||||
enabled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -402,8 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
slowReplicationChecker = null;
|
||||
}
|
||||
|
||||
enabled = false;
|
||||
|
||||
if (clearTokens) {
|
||||
clearReplicationTokens();
|
||||
}
|
||||
|
@ -462,7 +456,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, ReusableLatch done) {
|
||||
if (!enabled) {
|
||||
if (!started) {
|
||||
packet.release();
|
||||
return null;
|
||||
}
|
||||
|
@ -474,7 +468,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final ReplicatePacketRequest request = new ReplicatePacketRequest(packet, repliToken, done);
|
||||
replicatePacketRequests.add(request);
|
||||
replicationStream.execute(() -> {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendReplicatedPackets(false);
|
||||
} else {
|
||||
releaseReplicatedPackets(replicatePacketRequests);
|
||||
|
@ -497,7 +491,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
private void checkSlowReplication() {
|
||||
if (!enabled) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
assert checkEventLoop();
|
||||
|
@ -534,7 +528,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
// We try to:
|
||||
// - save recursive calls of resume due to flushConnection
|
||||
// - saving flush pending writes *if* the OS hasn't notified that's writable again
|
||||
if (awaitingResume || isFlushing || !enabled) {
|
||||
if (awaitingResume || isFlushing || !started) {
|
||||
return;
|
||||
}
|
||||
if (replicatePacketRequests.isEmpty()) {
|
||||
|
@ -687,7 +681,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
* @throws Exception
|
||||
*/
|
||||
public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
|
||||
if (!enabled) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
SequentialFile file = jf.getFile().cloneFile();
|
||||
|
@ -701,13 +695,13 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
sendLargeFile(null, null, id, file, size);
|
||||
}
|
||||
}
|
||||
|
||||
public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception {
|
||||
if (enabled)
|
||||
if (started)
|
||||
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
|
@ -726,7 +720,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
final long id,
|
||||
SequentialFile file,
|
||||
long maxBytesToSend) throws Exception {
|
||||
if (!enabled)
|
||||
if (!started)
|
||||
return;
|
||||
if (!file.isOpen()) {
|
||||
file.open();
|
||||
|
@ -801,7 +795,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
AbstractJournalStorageManager.JournalContent contentType,
|
||||
String nodeID,
|
||||
boolean allowsAutoFailBack) throws ActiveMQException {
|
||||
if (enabled)
|
||||
if (started)
|
||||
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), datafiles, contentType, nodeID, allowsAutoFailBack));
|
||||
}
|
||||
|
||||
|
@ -814,7 +808,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
* @param nodeID
|
||||
*/
|
||||
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) throws ActiveMQReplicationTimeooutException {
|
||||
if (enabled) {
|
||||
if (started) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("sendSynchronizationDone ::{}, {}", nodeID, initialReplicationSyncTimeout);
|
||||
|
@ -865,7 +859,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
ArrayList<Long> idsToSend;
|
||||
idsToSend = new ArrayList<>(largeMessages.keySet());
|
||||
|
||||
if (enabled)
|
||||
if (started)
|
||||
sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), idsToSend));
|
||||
}
|
||||
|
||||
|
@ -878,9 +872,9 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
* @return
|
||||
*/
|
||||
public OperationContext sendPrimaryIsStopping(final PrimaryStopping finalMessage) {
|
||||
logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", finalMessage, enabled);
|
||||
if (enabled) {
|
||||
logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, enabled);
|
||||
logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", finalMessage, started);
|
||||
if (started) {
|
||||
logger.debug("PRIMARY IS STOPPING?!? message={} {}", finalMessage, started);
|
||||
return sendReplicatePacket(new ReplicationPrimaryIsStoppingMessage(finalMessage));
|
||||
}
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue