This closes #2114
This commit is contained in:
commit
183ebcc5f8
|
@ -120,7 +120,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
|
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
private final ExecutorFactory executorFactory;
|
private final ExecutorFactory ioExecutorFactory;
|
||||||
|
|
||||||
private final Executor replicationStream;
|
private final Executor replicationStream;
|
||||||
|
|
||||||
|
@ -142,12 +142,12 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
||||||
final long timeout,
|
final long timeout,
|
||||||
final long initialReplicationSyncTimeout,
|
final long initialReplicationSyncTimeout,
|
||||||
final ExecutorFactory executorFactory) {
|
final ExecutorFactory ioExecutorFactory) {
|
||||||
this.executorFactory = executorFactory;
|
this.ioExecutorFactory = ioExecutorFactory;
|
||||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||||
this.remotingConnection = remotingConnection;
|
this.remotingConnection = remotingConnection;
|
||||||
this.replicationStream = executorFactory.getExecutor();
|
this.replicationStream = ioExecutorFactory.getExecutor();
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,7 +355,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
|
final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
|
||||||
if (lineUp) {
|
if (lineUp) {
|
||||||
repliToken.replicationLineUp();
|
repliToken.replicationLineUp();
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
||||||
ReplicationFailureListener listener = new ReplicationFailureListener();
|
ReplicationFailureListener listener = new ReplicationFailureListener();
|
||||||
rc.addCloseListener(listener);
|
rc.addCloseListener(listener);
|
||||||
rc.addFailureListener(listener);
|
rc.addFailureListener(listener);
|
||||||
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
|
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
||||||
replicationManager.start();
|
replicationManager.start();
|
||||||
Thread t = new Thread(new Runnable() {
|
Thread t = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue