This closes #563

This commit is contained in:
jbertram 2016-06-06 16:41:13 -05:00
commit 50d83fb63d
18 changed files with 246 additions and 39 deletions

View File

@ -29,9 +29,21 @@ public interface OperationContext extends IOCompletion {
/** /**
* Execute the task when all IO operations are complete, * Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending. * Or execute it immediately if nothing is pending.
* @param runnable the tas to be executed.
* @param storeOnly There are tasks that won't need to wait on replication or paging and will need to
* be completed as soon as the response from the journal is received. An example would be the
* DuplicateCache
*/
void executeOnCompletion(IOCallback runnable, boolean storeOnly);
/**
* Execute the task when all IO operations are complete,
* Or execute it immediately if nothing is pending.
* @param runnable the tas to be executed.
*/ */
void executeOnCompletion(IOCallback runnable); void executeOnCompletion(IOCallback runnable);
void replicationLineUp(); void replicationLineUp();
void replicationDone(); void replicationDone();

View File

@ -100,6 +100,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void afterCompleteOperations(IOCallback run); void afterCompleteOperations(IOCallback run);
/** This is similar to afterComplete, however this only cares about the journal part. */
void afterStoreOperations(IOCallback run);
/** /**
* Block until the operations are done. * Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up * Warning: Don't use it inside an ordered executor, otherwise the system may lock up

View File

@ -292,6 +292,10 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
getContext().executeOnCompletion(run); getContext().executeOnCompletion(run);
} }
public void afterStoreOperations(IOCallback run) {
getContext().executeOnCompletion(run, true);
}
@Override @Override
public long generateID() { public long generateID() {
return idGenerator.generateID(); return idGenerator.generateID();
@ -1788,6 +1792,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
runnable.done(); runnable.done();
} }
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
executeOnCompletion(runnable);
}
@Override @Override
public void replicationDone() { public void replicationDone() {
} }

View File

@ -34,6 +34,13 @@ final class DummyOperationContext implements OperationContext {
runnable.done(); runnable.done();
} }
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
// There are no executeOnCompletion calls while using the DummyOperationContext
// However we keep the code here for correctness
runnable.done();
}
@Override @Override
public void replicationDone() { public void replicationDone() {
} }

View File

@ -61,8 +61,10 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.jboss.logging.Logger;
public class JournalStorageManager extends AbstractJournalStorageManager { public class JournalStorageManager extends AbstractJournalStorageManager {
private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
private SequentialFileFactory journalFF; private SequentialFileFactory journalFF;
@ -569,6 +571,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
} }
catch (Exception e) { catch (Exception e) {
logger.warn(e.getMessage(), e);
stopReplication(); stopReplication();
throw e; throw e;
} }
@ -681,6 +684,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
*/ */
@Override @Override
public void stopReplication() { public void stopReplication() {
logger.trace("stopReplication()");
storageManagerLock.writeLock().lock(); storageManagerLock.writeLock().lock();
try { try {
if (replicator == null) if (replicator == null)

View File

@ -72,6 +72,7 @@ public class OperationContextImpl implements OperationContext {
} }
private List<TaskHolder> tasks; private List<TaskHolder> tasks;
private List<TaskHolder> storeOnlyTasks;
private long minimalStore = Long.MAX_VALUE; private long minimalStore = Long.MAX_VALUE;
private long minimalReplicated = Long.MAX_VALUE; private long minimalReplicated = Long.MAX_VALUE;
@ -126,7 +127,12 @@ public class OperationContextImpl implements OperationContext {
} }
@Override @Override
public void executeOnCompletion(final IOCallback completion) { public void executeOnCompletion(IOCallback runnable) {
executeOnCompletion(runnable, false);
}
@Override
public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) {
if (errorCode != -1) { if (errorCode != -1) {
completion.onError(errorCode, errorMessage); completion.onError(errorCode, errorMessage);
return; return;
@ -135,11 +141,18 @@ public class OperationContextImpl implements OperationContext {
boolean executeNow = false; boolean executeNow = false;
synchronized (this) { synchronized (this) {
if (tasks == null) { if (storeOnly) {
tasks = new LinkedList<>(); if (storeOnlyTasks == null) {
minimalReplicated = replicationLineUp.intValue(); storeOnlyTasks = new LinkedList<>();
minimalStore = storeLineUp.intValue(); }
minimalPage = pageLineUp.intValue(); }
else {
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = replicationLineUp.intValue();
minimalStore = storeLineUp.intValue();
minimalPage = pageLineUp.intValue();
}
} }
// On this case, we can just execute the context directly // On this case, we can just execute the context directly
@ -159,7 +172,12 @@ public class OperationContextImpl implements OperationContext {
} }
} }
else { else {
tasks.add(new TaskHolder(completion)); if (storeOnly) {
storeOnlyTasks.add(new TaskHolder(completion));
}
else {
tasks.add(new TaskHolder(completion));
}
} }
} }
@ -177,6 +195,20 @@ public class OperationContextImpl implements OperationContext {
} }
private void checkTasks() { private void checkTasks() {
if (storeOnlyTasks != null) {
Iterator<TaskHolder> iter = storeOnlyTasks.iterator();
while (iter.hasNext()) {
TaskHolder holder = iter.next();
if (stored >= holder.storeLined) {
// If set, we use an executor to avoid the server being single threaded
execute(holder.task);
iter.remove();
}
}
}
if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) { if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) {
Iterator<TaskHolder> iter = tasks.iterator(); Iterator<TaskHolder> iter = tasks.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {

View File

@ -93,6 +93,11 @@ public class NullStorageManager implements StorageManager {
public void done() { public void done() {
} }
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
runnable.done();
}
@Override @Override
public void storeLineUp() { public void storeLineUp() {
} }
@ -338,6 +343,11 @@ public class NullStorageManager implements StorageManager {
run.done(); run.done();
} }
@Override
public void afterStoreOperations(IOCallback run) {
run.done();
}
@Override @Override
public void waitOnOperations() throws Exception { public void waitOnOperations() throws Exception {
} }

View File

@ -226,7 +226,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
} }
// For a tx, it's important that the entry is not added to the cache until commit // For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected // since if the client fails then resends them tx we don't want it to get rejected
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); tx.afterStore(new AddDuplicateIDOperation(duplID, recordID));
} }
} }
} }

View File

@ -690,9 +690,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
} }
for (Object id : idsToRemove) { for (Object id : idsToRemove) {
RemotingConnection conn = getConnection(id); final RemotingConnection conn = getConnection(id);
if (conn != null) { if (conn != null) {
conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress())); // In certain cases (replicationManager for instance) calling fail could take some time
// We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for
// missing pings
flushExecutor.execute(new Runnable() {
@Override
public void run() {
conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
}
});
removeConnection(id); removeConnection(id);
} }
} }

View File

@ -265,13 +265,14 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} }
@Override @Override
public synchronized void stop() throws Exception { public void stop() throws Exception {
if (!started) { synchronized (this) {
return; if (!started) {
logger.trace("Stopping being ignored as it hasn't been started");
return;
}
} }
enabled = false;
// This is to avoid the write holding a lock while we are trying to close it // This is to avoid the write holding a lock while we are trying to close it
if (replicatingChannel != null) { if (replicatingChannel != null) {
replicatingChannel.close(); replicatingChannel.close();
@ -279,6 +280,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} }
synchronized (replicationLock) { synchronized (replicationLock) {
enabled = false;
writable.set(true); writable.set(true);
replicationLock.notifyAll(); replicationLock.notifyAll();
clearReplicationTokens(); clearReplicationTokens();
@ -299,9 +301,12 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* backup crashing). * backup crashing).
*/ */
public void clearReplicationTokens() { public void clearReplicationTokens() {
logger.trace("clearReplicationTokens initiating");
synchronized (replicationLock) { synchronized (replicationLock) {
logger.trace("clearReplicationTokens entered the lock");
while (!pendingTokens.isEmpty()) { while (!pendingTokens.isEmpty()) {
OperationContext ctx = pendingTokens.poll(); OperationContext ctx = pendingTokens.poll();
logger.trace("Calling ctx.replicationDone()");
try { try {
ctx.replicationDone(); ctx.replicationDone();
} }
@ -310,6 +315,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
} }
} }
} }
logger.trace("clearReplicationTokens finished");
} }
/** /**
@ -347,20 +353,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
synchronized (replicationLock) { synchronized (replicationLock) {
if (enabled) { if (enabled) {
pendingTokens.add(repliToken); pendingTokens.add(repliToken);
if (!replicatingChannel.getConnection().isWritable(this)) { if (!flowControl()) {
try { return repliToken;
writable.set(false);
//don't wait for ever as this may hang tests etc, we've probably been closed anyway
long now = System.currentTimeMillis();
long deadline = now + 5000;
while (!writable.get() && now < deadline) {
replicationLock.wait(deadline - now);
now = System.currentTimeMillis();
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
} }
replicatingChannel.send(packet); replicatingChannel.send(packet);
} }
@ -379,6 +373,43 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
return repliToken; return repliToken;
} }
/** This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on replication lock. .*/
private boolean flowControl() {
// synchronized (replicationLock) { -- I'm not adding this because the caller already has it
// future maintainers of this code please be aware that the intention here is hold the lock on replication lock
if (!replicatingChannel.getConnection().isWritable(this)) {
try {
logger.trace("flowControl waiting on writable");
writable.set(false);
//don't wait for ever as this may hang tests etc, we've probably been closed anyway
long now = System.currentTimeMillis();
long deadline = now + 5000;
while (!writable.get() && now < deadline) {
replicationLock.wait(deadline - now);
now = System.currentTimeMillis();
}
logger.trace("flow control done");
if (!writable.get()) {
ActiveMQServerLogger.LOGGER.slowReplicationResponse();
logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
try {
stop();
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
}
return false;
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
return true;
}
@Override @Override
public void readyForWriting() { public void readyForWriting() {
synchronized (replicationLock) { synchronized (replicationLock) {
@ -591,6 +622,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
try { try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) { if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
logger.trace("sendSynchronizationDone wasn't finished in time");
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
} }
} }
@ -598,6 +630,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
logger.debug(e); logger.debug(e);
} }
inSync = false; inSync = false;
logger.trace("sendSynchronizationDone finished");
} }
} }

View File

@ -1209,6 +1209,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222206, value = "Connection limit of {0} reached. Refusing connection from {1}.", format = Message.Format.MESSAGE_FORMAT) @Message(id = 222206, value = "Connection limit of {0} reached. Refusing connection from {1}.", format = Message.Format.MESSAGE_FORMAT)
void connectionLimitReached(long connectionsAllowed, String address); void connectionLimitReached(long connectionsAllowed, String address);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222207, value = "The backup server is not responding promptly introducing latency beyond the limit. Replication server being disconnected now.",
format = Message.Format.MESSAGE_FORMAT)
void slowReplicationResponse();
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e); void initializationError(@Cause Throwable e);

View File

@ -67,6 +67,11 @@ public interface Transaction {
void addOperation(TransactionOperation sync); void addOperation(TransactionOperation sync);
/** This is an operation that will be called right after the storage is completed.
* addOperation could only happen after paging and replication, while these operations will just be
* about the storage*/
void afterStore(TransactionOperation sync);
List<TransactionOperation> getAllOperations(); List<TransactionOperation> getAllOperations();
boolean hasTimedOut(long currentTime, int defaultTimeout); boolean hasTimedOut(long currentTime, int defaultTimeout);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -38,6 +39,8 @@ public class TransactionImpl implements Transaction {
private List<TransactionOperation> operations; private List<TransactionOperation> operations;
private List<TransactionOperation> storeOperations;
private static final int INITIAL_NUM_PROPERTIES = 10; private static final int INITIAL_NUM_PROPERTIES = 10;
private Object[] properties = new Object[TransactionImpl.INITIAL_NUM_PROPERTIES]; private Object[] properties = new Object[TransactionImpl.INITIAL_NUM_PROPERTIES];
@ -301,6 +304,24 @@ public class TransactionImpl implements Transaction {
} }
}); });
final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
this.storeOperations = null;
if (storeOperationsToComplete != null) {
storageManager.afterStoreOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
}
@Override
public void done() {
afterCommit(storeOperationsToComplete);
}
});
}
} }
} }
@ -365,6 +386,9 @@ public class TransactionImpl implements Transaction {
final List<TransactionOperation> operationsToComplete = this.operations; final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null; this.operations = null;
final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
this.storeOperations = null;
// We use the Callback even for non persistence // We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have // If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order // to execute this runnable in the correct order
@ -380,6 +404,21 @@ public class TransactionImpl implements Transaction {
afterRollback(operationsToComplete); afterRollback(operationsToComplete);
} }
}); });
if (storeOperationsToComplete != null) {
storageManager.afterStoreOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
}
@Override
public void done() {
afterRollback(storeOperationsToComplete);
}
});
}
} }
@Override @Override
@ -445,6 +484,15 @@ public class TransactionImpl implements Transaction {
operations.add(operation); operations.add(operation);
} }
@Override
public synchronized void afterStore(TransactionOperation sync) {
if (storeOperations == null) {
storeOperations = new LinkedList<>();
}
storeOperations.add(sync);
}
private int getOperationsCount() { private int getOperationsCount() {
checkCreateOperations(); checkCreateOperations();
@ -491,7 +539,7 @@ public class TransactionImpl implements Transaction {
private void checkCreateOperations() { private void checkCreateOperations() {
if (operations == null) { if (operations == null) {
operations = new ArrayList<>(); operations = new LinkedList<>();
} }
} }
@ -505,13 +553,13 @@ public class TransactionImpl implements Transaction {
} }
} }
private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) { private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
if (oeprationsToComplete != null) { if (operationsToComplete != null) {
for (TransactionOperation operation : oeprationsToComplete) { for (TransactionOperation operation : operationsToComplete) {
operation.afterRollback(this); operation.afterRollback(this);
} }
// Help out GC here // Help out GC here
oeprationsToComplete.clear(); operationsToComplete.clear();
} }
} }
@ -521,6 +569,11 @@ public class TransactionImpl implements Transaction {
operation.beforeCommit(this); operation.beforeCommit(this);
} }
} }
if (storeOperations != null) {
for (TransactionOperation operation : storeOperations) {
operation.beforeCommit(this);
}
}
} }
private synchronized void beforePrepare() throws Exception { private synchronized void beforePrepare() throws Exception {
@ -529,6 +582,11 @@ public class TransactionImpl implements Transaction {
operation.beforePrepare(this); operation.beforePrepare(this);
} }
} }
if (storeOperations != null) {
for (TransactionOperation operation : storeOperations) {
operation.beforePrepare(this);
}
}
} }
private synchronized void beforeRollback() throws Exception { private synchronized void beforeRollback() throws Exception {
@ -537,6 +595,11 @@ public class TransactionImpl implements Transaction {
operation.beforeRollback(this); operation.beforeRollback(this);
} }
} }
if (storeOperations != null) {
for (TransactionOperation operation : storeOperations) {
operation.beforeRollback(this);
}
}
} }
private synchronized void afterPrepare() { private synchronized void afterPrepare() {
@ -545,6 +608,11 @@ public class TransactionImpl implements Transaction {
operation.afterPrepare(this); operation.afterPrepare(this);
} }
} }
if (storeOperations != null) {
for (TransactionOperation operation : storeOperations) {
operation.afterPrepare(this);
}
}
} }
@Override @Override

View File

@ -252,6 +252,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
run.done(); run.done();
} }
@Override
public void afterStoreOperations(IOCallback run) {
run.done();
}
@Override @Override
public boolean waitOnOperations(long timeout) throws Exception { public boolean waitOnOperations(long timeout) throws Exception {
return false; return false;

View File

@ -67,10 +67,6 @@ public class OrphanedConsumerTest extends ActiveMQTestBase {
* This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()} * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()}
*/ */
public static void leavingCloseOnTestCountersWhileClosing() { public static void leavingCloseOnTestCountersWhileClosing() {
if (staticServer.getConnectionCount() == 0) {
verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!");
}
if (staticServer.getSessions().size() == 0) { if (staticServer.getSessions().size() == 0) {
verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!"); verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!");
} }

View File

@ -5730,7 +5730,12 @@ public class PagingTest extends ActiveMQTestBase {
@Override @Override
public void executeOnCompletion(IOCallback runnable) { public void executeOnCompletion(IOCallback runnable) {
runnable.done();
}
@Override
public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
runnable.done();
} }
} }
} }

View File

@ -90,7 +90,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void onError(int errorCode, String errorMessage) { public void onError(int errorCode, String errorMessage) {
} }
}); }, true);
Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));

View File

@ -114,6 +114,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
} }
@Override
public void afterStore(TransactionOperation sync) {
}
@Override @Override
public void addOperation(final TransactionOperation sync) { public void addOperation(final TransactionOperation sync) {