Issue #1835 reentrant lock in AbstractConnector.

This commit is contained in:
Greg Wilkins 2017-09-22 12:45:14 +10:00
parent 7768a781be
commit 0fa8c565bd
1 changed files with 78 additions and 108 deletions

View File

@ -364,7 +364,6 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
}
}
@Override
public ConnectionFactory getConnectionFactory(String protocol)
{
@ -388,108 +387,105 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void addConnectionFactory(ConnectionFactory factory)
{
try (Locker.Lock lock = _locker.lock())
if (isRunning())
throw new IllegalStateException(getState());
Set<ConnectionFactory> to_remove = new HashSet<>();
for (String key:factory.getProtocols())
{
Set<ConnectionFactory> to_remove = new HashSet<>();
for (String key:factory.getProtocols())
key=StringUtil.asciiToLowerCase(key);
ConnectionFactory old=_factories.remove(key);
if (old!=null)
{
key=StringUtil.asciiToLowerCase(key);
ConnectionFactory old=_factories.remove(key);
if (old!=null)
{
if (old.getProtocol().equals(_defaultProtocol))
_defaultProtocol=null;
to_remove.add(old);
}
_factories.put(key, factory);
if (old.getProtocol().equals(_defaultProtocol))
_defaultProtocol=null;
to_remove.add(old);
}
// keep factories still referenced
for (ConnectionFactory f : _factories.values())
to_remove.remove(f);
// remove old factories
for (ConnectionFactory old: to_remove)
{
removeBean(old);
if (LOG.isDebugEnabled())
LOG.debug("{} removed {}", this, old);
}
// add new Bean
addBean(factory);
if (_defaultProtocol==null)
_defaultProtocol=factory.getProtocol();
if (LOG.isDebugEnabled())
LOG.debug("{} added {}", this, factory);
_factories.put(key, factory);
}
// keep factories still referenced
for (ConnectionFactory f : _factories.values())
to_remove.remove(f);
// remove old factories
for (ConnectionFactory old: to_remove)
{
removeBean(old);
if (LOG.isDebugEnabled())
LOG.debug("{} removed {}", this, old);
}
// add new Bean
addBean(factory);
if (_defaultProtocol==null)
_defaultProtocol=factory.getProtocol();
if (LOG.isDebugEnabled())
LOG.debug("{} added {}", this, factory);
}
public void addFirstConnectionFactory(ConnectionFactory factory)
{
try (Locker.Lock lock = _locker.lock())
{
List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
_factories.clear();
addConnectionFactory(factory);
for (ConnectionFactory existing : existings)
addConnectionFactory(existing);
_defaultProtocol = factory.getProtocol();
}
if (isRunning())
throw new IllegalStateException(getState());
List<ConnectionFactory> existings = new ArrayList<>(_factories.values());
_factories.clear();
addConnectionFactory(factory);
for (ConnectionFactory existing : existings)
addConnectionFactory(existing);
_defaultProtocol = factory.getProtocol();
}
public void addIfAbsentConnectionFactory(ConnectionFactory factory)
{
try (Locker.Lock lock = _locker.lock())
if (isRunning())
throw new IllegalStateException(getState());
String key=StringUtil.asciiToLowerCase(factory.getProtocol());
if (_factories.containsKey(key))
{
String key=StringUtil.asciiToLowerCase(factory.getProtocol());
if (_factories.containsKey(key))
{
if (LOG.isDebugEnabled())
LOG.debug("{} addIfAbsent ignored {}", this, factory);
}
else
{
_factories.put(key, factory);
addBean(factory);
if (_defaultProtocol==null)
_defaultProtocol=factory.getProtocol();
if (LOG.isDebugEnabled())
LOG.debug("{} addIfAbsent added {}", this, factory);
}
if (LOG.isDebugEnabled())
LOG.debug("{} addIfAbsent ignored {}", this, factory);
}
else
{
_factories.put(key, factory);
addBean(factory);
if (_defaultProtocol==null)
_defaultProtocol=factory.getProtocol();
if (LOG.isDebugEnabled())
LOG.debug("{} addIfAbsent added {}", this, factory);
}
}
public ConnectionFactory removeConnectionFactory(String protocol)
{
try (Locker.Lock lock = _locker.lock())
{
ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol));
removeBean(factory);
return factory;
}
if (isRunning())
throw new IllegalStateException(getState());
ConnectionFactory factory= _factories.remove(StringUtil.asciiToLowerCase(protocol));
removeBean(factory);
return factory;
}
@Override
public Collection<ConnectionFactory> getConnectionFactories()
{
try (Locker.Lock lock = _locker.lock())
{
return _factories.values();
}
return _factories.values();
}
public void setConnectionFactories(Collection<ConnectionFactory> factories)
{
try (Locker.Lock lock = _locker.lock())
{
List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
for (ConnectionFactory factory: existing)
removeConnectionFactory(factory.getProtocol());
for (ConnectionFactory factory: factories)
if (factory!=null)
addConnectionFactory(factory);
}
if (isRunning())
throw new IllegalStateException(getState());
List<ConnectionFactory> existing = new ArrayList<>(_factories.values());
for (ConnectionFactory factory: existing)
removeConnectionFactory(factory.getProtocol());
for (ConnectionFactory factory: factories)
if (factory!=null)
addConnectionFactory(factory);
}
@ManagedAttribute("The priority delta to apply to acceptor threads")
@ -521,18 +517,15 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
@ManagedAttribute("Protocols supported by this connector")
public List<String> getProtocols()
{
synchronized (_factories)
{
return new ArrayList<>(_factories.keySet());
}
return new ArrayList<>(_factories.keySet());
}
public void clearConnectionFactories()
{
synchronized (_factories)
{
_factories.clear();
}
if (isRunning())
throw new IllegalStateException(getState());
_factories.clear();
}
@ManagedAttribute("This connector's default protocol")
@ -616,10 +609,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
if (_acceptorPriorityDelta!=0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
synchronized (AbstractConnector.this)
{
_acceptors[_id] = thread;
}
_acceptors[_id] = thread;
try
{
@ -676,26 +666,6 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
}
// protected void connectionOpened(Connection connection)
// {
// _stats.connectionOpened();
// connection.onOpen();
// }
//
// protected void connectionClosed(Connection connection)
// {
// connection.onClose();
// long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
// _stats.connectionClosed(duration, connection.getMessagesIn(), connection.getMessagesOut());
// }
//
// public void connectionUpgraded(Connection oldConnection, Connection newConnection)
// {
// oldConnection.onClose();
// _stats.connectionUpgraded(oldConnection.getMessagesIn(), oldConnection.getMessagesOut());
// newConnection.onOpen();
// }
@Override
public Collection<EndPoint> getConnectedEndPoints()
{