diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java deleted file mode 100755 index 3c67f08a6c..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java +++ /dev/null @@ -1,299 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.kaha.impl.data; - -import java.io.DataInput; -import java.io.IOException; -import java.io.InputStream; -import java.io.UTFDataFormatException; -import org.apache.activemq.util.ByteSequence; -/** - * Optimized ByteArrayInputStream that can be used more than once - * - * @version $Revision: 1.1.1.1 $ - */ -public final class StoreByteArrayInputStream extends InputStream implements DataInput{ - private byte[] buf; - private int pos; - private int offset; - - /** - * Creates a StoreByteArrayInputStream. - * - * @param buf the input buffer. - */ - public StoreByteArrayInputStream(byte buf[]){ - this.buf=buf; - this.pos=0; - this.offset = 0; - } - - /** - * Creates a StoreByteArrayInputStream. - * - * @param sequence the input buffer. - */ - public StoreByteArrayInputStream(ByteSequence sequence){ - this.buf=sequence.getData(); - this.offset=this.pos=sequence.getOffset(); - } - - /** - * Creates WireByteArrayInputStream with a minmalist byte array - */ - public StoreByteArrayInputStream(){ - this(new byte[0]); - } - - /** - * - * @return the size - */ - public int size(){ - return pos-offset; - } - - /** - * @return the underlying data array - */ - public byte[] getRawData(){ - return buf; - } - - /** - * reset the StoreByteArrayInputStream to use an new byte array - * - * @param newBuff - */ - public void restart(byte[] newBuff){ - buf=newBuff; - pos=0; - } - - /** - * reset the StoreByteArrayInputStream to use an new ByteSequence - * @param sequence - * - */ - public void restart(ByteSequence sequence){ - this.buf=sequence.getData(); - this.pos=sequence.getOffset(); - } - - /** - * re-start the input stream - reusing the current buffer - * - * @param size - */ - public void restart(int size){ - if(buf==null||buf.lengthint in the - * range 0 to 255. If no byte is available because the end of the stream has been - * reached, the value -1 is returned. - *

- * This read method cannot block. - * - * @return the next byte of data, or -1 if the end of the stream has been reached. - */ - public int read(){ - return (poslen bytes of data into an array of bytes from this input stream. - * - * @param b the buffer into which the data is read. - * @param off the start offset of the data. - * @param len the maximum number of bytes read. - * @return the total number of bytes read into the buffer, or -1 if there is no more data because the - * end of the stream has been reached. - */ - public int read(byte b[],int off,int len){ - if(b==null){ - throw new NullPointerException(); - } - if(pos>=buf.length){ - return -1; - } - if(pos+len>buf.length){ - len=buf.length-pos; - } - if(len<=0){ - return 0; - } - System.arraycopy(buf,pos,b,off,len); - pos+=len; - return len; - } - - /** - * @return the number of bytes that can be read from the input stream without blocking. - */ - public int available(){ - return buf.length-pos; - } - - public void readFully(byte[] b){ - read(b,0,b.length); - } - - public void readFully(byte[] b,int off,int len){ - read(b,off,len); - } - - public int skipBytes(int n){ - if(pos+n>buf.length){ - n=buf.length-pos; - } - if(n<0){ - return 0; - } - pos+=n; - return n; - } - - public boolean readBoolean(){ - return read()!=0; - } - - public byte readByte(){ - return (byte) read(); - } - - public int readUnsignedByte(){ - return read(); - } - - public short readShort(){ - int ch1=read(); - int ch2=read(); - return (short) ((ch1<<8)+(ch2<<0)); - } - - public int readUnsignedShort(){ - int ch1=read(); - int ch2=read(); - return ((ch1<<8)+(ch2<<0)); - } - - public char readChar(){ - int ch1=read(); - int ch2=read(); - return (char) ((ch1<<8)+(ch2<<0)); - } - - public int readInt(){ - int ch1=read(); - int ch2=read(); - int ch3=read(); - int ch4=read(); - return ((ch1<<24)+(ch2<<16)+(ch3<<8)+(ch4<<0)); - } - - public long readLong(){ - return (((long) buf[pos++]<<56)+((long) (buf[pos++]&255)<<48)+((long) (buf[pos++]&255)<<40) - +((long) (buf[pos++]&255)<<32)+((long) (buf[pos++]&255)<<24)+((buf[pos++]&255)<<16) - +((buf[pos++]&255)<<8)+((buf[pos++]&255)<<0)); - } - - public float readFloat() throws IOException{ - return Float.intBitsToFloat(readInt()); - } - - public double readDouble() throws IOException{ - return Double.longBitsToDouble(readLong()); - } - - public String readLine(){ - int start=pos; - while(pos127) - break; - pos++; - characters[count++]=(char) c; - } - while(pos>4){ - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - pos++; - characters[count++]=(char) c; - break; - case 12: - case 13: - pos+=2; - if(pos>length) - throw new UTFDataFormatException("bad string"); - c2=(int) buf[pos-1]; - if((c2&0xC0)!=0x80) - throw new UTFDataFormatException("bad string"); - characters[count++]=(char) (((c&0x1F)<<6)|(c2&0x3F)); - break; - case 14: - pos+=3; - if(pos>length) - throw new UTFDataFormatException("bad string"); - c2=(int) buf[pos-2]; - c3=(int) buf[pos-1]; - if(((c2&0xC0)!=0x80)||((c3&0xC0)!=0x80)) - throw new UTFDataFormatException("bad string"); - characters[count++]=(char) (((c&0x0F)<<12)|((c2&0x3F)<<6)|((c3&0x3F)<<0)); - break; - default: - throw new UTFDataFormatException("bad string"); - } - } - return new String(characters,0,count); - } -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java deleted file mode 100755 index dfc1f7d3f8..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.kaha.impl.data; - -import java.io.DataOutput; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UTFDataFormatException; -import org.apache.activemq.util.ByteSequence; -/** - * Optimized ByteArrayOutputStream - * - * @version $Revision: 1.1.1.1 $ - */ -public final class StoreByteArrayOutputStream extends OutputStream implements DataOutput{ - private byte buf[]; - private int pos; - - /** - * Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes. - * - * @param size the initial size. - * @exception IllegalArgumentException if size is negative. - */ - public StoreByteArrayOutputStream(int size){ - if(size<0){ - throw new IllegalArgumentException("Invalid size: "+size); - } - buf=new byte[size]; - } - - /** - * Creates a new byte array output stream. - */ - public StoreByteArrayOutputStream(){ - this(16*1024); - } - - /** - * start using a fresh byte array - * - * @param size - */ - public void restart(int size){ - buf=new byte[size]; - pos=0; - } - - /** - * Get a ByteSequence from the stream - * @return the byte sequence - */ - public ByteSequence toByteSequence() { - return new ByteSequence(buf, 0, pos); - } - - /** - * Writes the specified byte to this byte array output stream. - * - * @param b the byte to be written. - */ - public void write(int b){ - int newcount=pos+1; - ensureEnoughBuffer(newcount); - buf[pos]=(byte) b; - pos=newcount; - } - - /** - * Writes len bytes from the specified byte array starting at offset off to this byte - * array output stream. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - */ - public void write(byte b[],int off,int len){ - if(len==0){ - return; - } - int newcount=pos+len; - ensureEnoughBuffer(newcount); - System.arraycopy(b,off,buf,pos,len); - pos=newcount; - } - - /** - * @return the underlying byte[] buffer - */ - public byte[] getData(){ - return buf; - } - - /** - * reset the output stream - */ - public void reset(){ - pos=0; - } - - /** - * Set the current position for writing - * - * @param offset - */ - public void position(int offset){ - ensureEnoughBuffer(offset); - pos=offset; - } - - public int size(){ - return pos; - } - - - - public void writeBoolean(boolean v){ - ensureEnoughBuffer(1); - buf[pos++]=(byte) (v?1:0); - } - - public void writeByte(int v){ - ensureEnoughBuffer(1); - buf[pos++]=(byte) (v>>>0); - } - - public void writeShort(int v){ - ensureEnoughBuffer(2); - buf[pos++]=(byte) (v>>>8); - buf[pos++]=(byte) (v>>>0); - } - - public void writeChar(int v){ - ensureEnoughBuffer(2); - buf[pos++]=(byte) (v>>>8); - buf[pos++]=(byte) (v>>>0); - } - - public void writeInt(int v){ - ensureEnoughBuffer(4); - buf[pos++]=(byte) (v>>>24); - buf[pos++]=(byte) (v>>>16); - buf[pos++]=(byte) (v>>>8); - buf[pos++]=(byte) (v>>>0); - } - - public void writeLong(long v){ - ensureEnoughBuffer(8); - buf[pos++]=(byte) (v>>>56); - buf[pos++]=(byte) (v>>>48); - buf[pos++]=(byte) (v>>>40); - buf[pos++]=(byte) (v>>>32); - buf[pos++]=(byte) (v>>>24); - buf[pos++]=(byte) (v>>>16); - buf[pos++]=(byte) (v>>>8); - buf[pos++]=(byte) (v>>>0); - } - - public void writeFloat(float v) throws IOException{ - writeInt(Float.floatToIntBits(v)); - } - - public void writeDouble(double v) throws IOException{ - writeLong(Double.doubleToLongBits(v)); - } - - public void writeBytes(String s){ - int length=s.length(); - for(int i=0;i>>8)&0xFF); - write((c>>>0)&0xFF); - } - } - - public void writeUTF(String str) throws IOException{ - int strlen=str.length(); - int encodedsize=0; - int c; - for(int i=0;i=0x0001)&&(c<=0x007F)){ - encodedsize++; - }else if(c>0x07FF){ - encodedsize+=3; - }else{ - encodedsize+=2; - } - } - if(encodedsize>65535) - throw new UTFDataFormatException("encoded string too long: "+encodedsize+" bytes"); - ensureEnoughBuffer(encodedsize+2); - writeShort(encodedsize); - int i=0; - for(i=0;i=0x0001)&&(c<=0x007F))) - break; - buf[pos++]=(byte) c; - } - for(;i=0x0001)&&(c<=0x007F)){ - buf[pos++]=(byte) c; - }else if(c>0x07FF){ - buf[pos++]=(byte) (0xE0|((c>>12)&0x0F)); - buf[pos++]=(byte) (0x80|((c>>6)&0x3F)); - buf[pos++]=(byte) (0x80|((c>>0)&0x3F)); - }else{ - buf[pos++]=(byte) (0xC0|((c>>6)&0x1F)); - buf[pos++]=(byte) (0x80|((c>>0)&0x3F)); - } - } - } - - private void ensureEnoughBuffer(int newcount){ - if(newcount>buf.length){ - byte newbuf[]=new byte[Math.max(buf.length<<1,newcount)]; - System.arraycopy(buf,0,newbuf,0,pos); - buf=newbuf; - } - } -} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java index 61c483ef06..9741093b40 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.util.DataByteArrayInputStream; /** * Optimized Store reader * @@ -29,7 +30,7 @@ import org.apache.activemq.kaha.StoreLocation; final class StoreDataReader{ private DataManager dataManager; - private StoreByteArrayInputStream dataIn; + private DataByteArrayInputStream dataIn; /** * Construct a Store reader @@ -38,7 +39,7 @@ final class StoreDataReader{ */ StoreDataReader(DataManager fileManager){ this.dataManager=fileManager; - this.dataIn=new StoreByteArrayInputStream(); + this.dataIn=new DataByteArrayInputStream(); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java index a25a1852df..f9425bf7e0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java @@ -23,6 +23,7 @@ import java.io.RandomAccessFile; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.util.DataByteArrayOutputStream; /** * Optimized Store writer * @@ -30,7 +31,7 @@ import org.apache.activemq.kaha.StoreLocation; */ final class StoreDataWriter{ - private StoreByteArrayOutputStream buffer; + private DataByteArrayOutputStream buffer; private DataManager dataManager; @@ -41,7 +42,7 @@ final class StoreDataWriter{ */ StoreDataWriter(DataManager fileManager){ this.dataManager=fileManager; - this.buffer=new StoreByteArrayOutputStream(); + this.buffer=new DataByteArrayOutputStream(); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java index 96aa19d805..f12cc44e98 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java @@ -20,7 +20,7 @@ package org.apache.activemq.kaha.impl.index; import java.io.IOException; import java.io.RandomAccessFile; -import org.apache.activemq.kaha.impl.data.StoreByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayInputStream; /** * Optimized Store reader * @@ -28,7 +28,7 @@ import org.apache.activemq.kaha.impl.data.StoreByteArrayInputStream; */ class StoreIndexReader{ protected RandomAccessFile file; - protected StoreByteArrayInputStream dataIn; + protected DataByteArrayInputStream dataIn; protected byte[] buffer=new byte[IndexItem.INDEX_SIZE]; /** @@ -38,7 +38,7 @@ class StoreIndexReader{ */ StoreIndexReader(RandomAccessFile file){ this.file=file; - this.dataIn=new StoreByteArrayInputStream(); + this.dataIn=new DataByteArrayInputStream(); } protected IndexItem readItem(long offset) throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java index 7ccdce8378..94332050c2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import org.apache.activemq.kaha.impl.data.DataManager; -import org.apache.activemq.kaha.impl.data.StoreByteArrayOutputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; /** * Optimized Store writer * @@ -29,7 +29,7 @@ import org.apache.activemq.kaha.impl.data.StoreByteArrayOutputStream; */ class StoreIndexWriter{ - protected final StoreByteArrayOutputStream dataOut = new StoreByteArrayOutputStream(); + protected final DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(); protected final RandomAccessFile file; protected final String name; protected final DataManager redoLog;