ARTEMIS-1444 Support Messages > JournalBufferSize in all Protocols
This commit is contained in:
parent
88e1fdc789
commit
988c91557d
|
@ -120,6 +120,11 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return dbDriver.getMaxSize();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -127,6 +127,16 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
||||||
public void flush() throws Exception {
|
public void flush() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max size record that can be stored in the journal
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long getMaxRecordSize() {
|
||||||
|
return sqlProvider.getMaxBlobSize();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createSchema() throws SQLException {
|
protected void createSchema() throws SQLException {
|
||||||
createTable(sqlProvider.getCreateJournalTableSQL());
|
createTable(sqlProvider.getCreateJournalTableSQL());
|
||||||
|
|
|
@ -101,4 +101,6 @@ public interface SequentialFileFactory {
|
||||||
SequentialFileFactory setDatasync(boolean enabled);
|
SequentialFileFactory setDatasync(boolean enabled);
|
||||||
|
|
||||||
boolean isDatasync();
|
boolean isDatasync();
|
||||||
|
|
||||||
|
long getBufferSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,6 +269,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The same callback is used for Runnable executor.
|
* The same callback is used for Runnable executor.
|
||||||
* This way we can save some memory over the pool.
|
* This way we can save some memory over the pool.
|
||||||
|
|
|
@ -41,6 +41,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
private boolean bufferPooling;
|
private boolean bufferPooling;
|
||||||
//pools only the biggest one -> optimized for the common case
|
//pools only the biggest one -> optimized for the common case
|
||||||
private final ThreadLocal<ByteBuffer> bytesPool;
|
private final ThreadLocal<ByteBuffer> bytesPool;
|
||||||
|
private final int bufferSize;
|
||||||
|
|
||||||
private MappedSequentialFileFactory(File directory,
|
private MappedSequentialFileFactory(File directory,
|
||||||
int capacity,
|
int capacity,
|
||||||
|
@ -57,6 +58,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
} else {
|
} else {
|
||||||
timedBuffer = null;
|
timedBuffer = null;
|
||||||
}
|
}
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
this.bufferPooling = true;
|
this.bufferPooling = true;
|
||||||
this.bytesPool = new ThreadLocal<>();
|
this.bytesPool = new ThreadLocal<>();
|
||||||
}
|
}
|
||||||
|
@ -105,6 +107,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
return useDataSync;
|
return useDataSync;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaxIO() {
|
public int getMaxIO() {
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -203,4 +203,9 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -279,4 +279,10 @@ public interface Journal extends ActiveMQComponent {
|
||||||
* It will make sure there are no more pending operations on the Executors.
|
* It will make sure there are no more pending operations on the Executors.
|
||||||
* */
|
* */
|
||||||
void flush() throws Exception;
|
void flush() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max size record that can be stored in the journal
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
long getMaxRecordSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,6 +103,16 @@ public final class FileWrapperJournal extends JournalBase {
|
||||||
public void flush() throws Exception {
|
public void flush() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max size record that can be stored in the journal
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long getMaxRecordSize() {
|
||||||
|
return journal.getMaxRecordSize();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write the record to the current file.
|
* Write the record to the current file.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -2200,6 +2200,16 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
flushExecutor(compactorExecutor);
|
flushExecutor(compactorExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The max size record that can be stored in the journal
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public long getMaxRecordSize() {
|
||||||
|
return Math.min(getFileSize(), fileFactory.getBufferSize());
|
||||||
|
}
|
||||||
|
|
||||||
private void flushExecutor(Executor executor) throws InterruptedException {
|
private void flushExecutor(Executor executor) throws InterruptedException {
|
||||||
|
|
||||||
if (executor != null) {
|
if (executor != null) {
|
||||||
|
|
|
@ -548,4 +548,9 @@ public class ReplicatedJournal implements Journal {
|
||||||
public void replicationSyncFinished() {
|
public void replicationSyncFinished() {
|
||||||
throw new UnsupportedOperationException("should never get called");
|
throw new UnsupportedOperationException("should never get called");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxRecordSize() {
|
||||||
|
return localJournal.getMaxRecordSize();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,13 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server;
|
package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
|
import javax.json.JsonArrayBuilder;
|
||||||
|
import javax.transaction.xa.Xid;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.json.JsonArrayBuilder;
|
|
||||||
import javax.transaction.xa.Xid;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.Closeable;
|
import org.apache.activemq.artemis.Closeable;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
|
|
@ -16,8 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.impl;
|
package org.apache.activemq.artemis.core.server.impl;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
|
import javax.json.JsonArrayBuilder;
|
||||||
|
import javax.json.JsonObjectBuilder;
|
||||||
|
import javax.transaction.xa.XAException;
|
||||||
|
import javax.transaction.xa.Xid;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -29,16 +31,12 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import javax.json.JsonArrayBuilder;
|
|
||||||
import javax.json.JsonObjectBuilder;
|
|
||||||
import javax.transaction.xa.XAException;
|
|
||||||
import javax.transaction.xa.Xid;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.Closeable;
|
import org.apache.activemq.artemis.Closeable;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -69,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||||
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
|
@ -93,6 +92,8 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
|
||||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.JsonUtil.nullSafe;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Server side Session implementation
|
* Server side Session implementation
|
||||||
*/
|
*/
|
||||||
|
@ -1309,12 +1310,32 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
|
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private LargeServerMessage messageToLargeMessage(Message message) throws Exception {
|
||||||
|
ICoreMessage coreMessage = message.toCore();
|
||||||
|
LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage);
|
||||||
|
|
||||||
|
byte[] body = coreMessage.getReadOnlyBodyBuffer().toByteBuffer().array();
|
||||||
|
lsm.addBytes(body);
|
||||||
|
lsm.releaseResources();
|
||||||
|
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, body.length);
|
||||||
|
return lsm;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized RoutingStatus send(Transaction tx,
|
public synchronized RoutingStatus send(Transaction tx,
|
||||||
final Message message,
|
Message msg,
|
||||||
final boolean direct,
|
final boolean direct,
|
||||||
boolean noAutoCreateQueue) throws Exception {
|
boolean noAutoCreateQueue) throws Exception {
|
||||||
|
|
||||||
|
final Message message;
|
||||||
|
if ((msg.getEncodeSize() > storageManager.getMessageJournal().getMaxRecordSize()) && !msg.isLargeMessage()) {
|
||||||
|
message = messageToLargeMessage(msg);
|
||||||
|
} else {
|
||||||
|
message = msg;
|
||||||
|
}
|
||||||
|
|
||||||
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue) : null);
|
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue) : null);
|
||||||
|
|
||||||
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class OpenWireLargeMessageTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
|
public OpenWireLargeMessageTest() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public SimpleString lmAddress = new SimpleString("LargeMessageAddress");
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
this.realStore = true;
|
||||||
|
super.setUp();
|
||||||
|
server.createQueue(lmAddress, RoutingType.ANYCAST, lmAddress, null, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendLargeMessage() throws Exception {
|
||||||
|
try (Connection connection = factory.createConnection()) {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(lmAddress.toString());
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
// Create 100Mb Message
|
||||||
|
int size = 1024 * 1024 * 10;
|
||||||
|
byte[] bytes = new byte[size];
|
||||||
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
message.writeBytes(bytes);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -712,6 +713,11 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxRecordSize() {
|
||||||
|
return ActiveMQDefaultConfiguration.getDefaultJournalBufferSizeAio();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
|
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.ArtemisConstants;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
@ -69,6 +70,11 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaxIO() {
|
public int getMaxIO() {
|
||||||
return 1;
|
return 1;
|
||||||
|
|
Loading…
Reference in New Issue