ARTEMIS-1891 use io executor to send replicate packet
After sending pages, the thread will hold the storage manager write lock and send synchronization finished packet(use the parent thread pool) to the backup node. At the same time, thread pool is full bcs they are waiting for the storage manager read lock to write the page or journal, leading to replication starting failure. Here we use io executor to send replicate packet to fix thread pool starvation problem.
This commit is contained in:
parent
fbcee58e8c
commit
06eb82cb14
|
@ -120,7 +120,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
|
|
||||||
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
|
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
private final ExecutorFactory executorFactory;
|
private final ExecutorFactory ioExecutorFactory;
|
||||||
|
|
||||||
private final Executor replicationStream;
|
private final Executor replicationStream;
|
||||||
|
|
||||||
|
@ -142,12 +142,12 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
||||||
final long timeout,
|
final long timeout,
|
||||||
final long initialReplicationSyncTimeout,
|
final long initialReplicationSyncTimeout,
|
||||||
final ExecutorFactory executorFactory) {
|
final ExecutorFactory ioExecutorFactory) {
|
||||||
this.executorFactory = executorFactory;
|
this.ioExecutorFactory = ioExecutorFactory;
|
||||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||||
this.remotingConnection = remotingConnection;
|
this.remotingConnection = remotingConnection;
|
||||||
this.replicationStream = executorFactory.getExecutor();
|
this.replicationStream = ioExecutorFactory.getExecutor();
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,7 +355,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
|
final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
|
||||||
if (lineUp) {
|
if (lineUp) {
|
||||||
repliToken.replicationLineUp();
|
repliToken.replicationLineUp();
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
||||||
ReplicationFailureListener listener = new ReplicationFailureListener();
|
ReplicationFailureListener listener = new ReplicationFailureListener();
|
||||||
rc.addCloseListener(listener);
|
rc.addCloseListener(listener);
|
||||||
rc.addFailureListener(listener);
|
rc.addFailureListener(listener);
|
||||||
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
|
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
||||||
replicationManager.start();
|
replicationManager.start();
|
||||||
Thread t = new Thread(new Runnable() {
|
Thread t = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue