ARTEMIS-1663 Fixing Encoding on PageCountPendingImpl
The PageCountPendingImpl was increasing the encode size without using its full allocation. This was causing issues on replication as the encode is also used to determine the size of the packets. however the packets were not receive the full allocated data causing missing packets on the replication and test failures. This is fixing the issue
This commit is contained in:
parent
f329b5b3f5
commit
9b104930c2
|
@ -135,7 +135,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
|
||||||
// We have to make sure this is sync here
|
// We have to make sure this is sync here
|
||||||
// not syncing this to disk may cause the page files to be out of sync on pages.
|
// not syncing this to disk may cause the page files to be out of sync on pages.
|
||||||
// we can't afford the case where a page file is written without a record here
|
// we can't afford the case where a page file is written without a record here
|
||||||
long id = storage.storePendingCounter(this.subscriptionID, page.getPageId(), increment);
|
long id = storage.storePendingCounter(this.subscriptionID, page.getPageId());
|
||||||
pendingInfo = new PendingCounter(id, increment, size);
|
pendingInfo = new PendingCounter(id, increment, size);
|
||||||
pendingCounters.put((long) page.getPageId(), pendingInfo);
|
pendingCounters.put((long) page.getPageId(), pendingInfo);
|
||||||
} else {
|
} else {
|
||||||
|
@ -422,7 +422,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
|
||||||
/**
|
/**
|
||||||
* @param id
|
* @param id
|
||||||
* @param count
|
* @param count
|
||||||
* @param size
|
* @param persistentSize
|
||||||
*/
|
*/
|
||||||
PendingCounter(long id, int count, long persistentSize) {
|
PendingCounter(long id, int count, long persistentSize) {
|
||||||
super();
|
super();
|
||||||
|
|
|
@ -338,7 +338,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
||||||
*/
|
*/
|
||||||
long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception;
|
long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception;
|
||||||
|
|
||||||
long storePendingCounter(long queueID, long pageID, int inc) throws Exception;
|
long storePendingCounter(long queueID, long pageID) throws Exception;
|
||||||
|
|
||||||
void deleteIncrementRecord(long txID, long recordID) throws Exception;
|
void deleteIncrementRecord(long txID, long recordID) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -1388,11 +1388,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long storePendingCounter(final long queueID, final long pageID, final int inc) throws Exception {
|
public long storePendingCounter(final long queueID, final long pageID) throws Exception {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
final long recordID = idGenerator.generateID();
|
final long recordID = idGenerator.generateID();
|
||||||
PageCountPendingImpl pendingInc = new PageCountPendingImpl(queueID, pageID, inc);
|
PageCountPendingImpl pendingInc = new PageCountPendingImpl(queueID, pageID);
|
||||||
// We must guarantee the record sync before we actually write on the page otherwise we may get out of sync
|
// We must guarantee the record sync before we actually write on the page otherwise we may get out of sync
|
||||||
// on the counter
|
// on the counter
|
||||||
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true);
|
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true);
|
||||||
|
|
|
@ -33,7 +33,7 @@ public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public PageCountPendingImpl(long queueID, long pageID, int inc) {
|
public PageCountPendingImpl(long queueID, long pageID) {
|
||||||
this.queueID = queueID;
|
this.queueID = queueID;
|
||||||
this.pageID = pageID;
|
this.pageID = pageID;
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,7 @@ public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getEncodeSize() {
|
public int getEncodeSize() {
|
||||||
return DataConstants.SIZE_LONG * 3;
|
return DataConstants.SIZE_LONG * 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -472,7 +472,7 @@ public class NullStorageManager implements StorageManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
|
public long storePendingCounter(long queueID, long pageID) throws Exception {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -592,7 +592,7 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
|
public long storePendingCounter(long queueID, long pageID) throws Exception {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -673,8 +673,8 @@ public class SendAckFailTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long storePendingCounter(long queueID, long pageID, int inc) throws Exception {
|
public long storePendingCounter(long queueID, long pageID) throws Exception {
|
||||||
return manager.storePendingCounter(queueID, pageID, inc);
|
return manager.storePendingCounter(queueID, pageID);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue