Thursday, April 2, 2009

Gzipped JMS message bodies

If you want to send large messages with JMS, it is probably a good idea to compress them, assuming they contain data that's compressible (particularly effective with XML payloads). This is a natural fit for JMS BytesMessages, and compression is easily done with GZIPInputStream and GZIPOutputStream.

When I first did this, I used ByteArrayInputStream/ByteArrayOutputStream as the stream feeding the GZIP stream, but the problem with that is that you have to copy the underlying byte array into or out of the message in its entirety. This represents an extra buffer copy that in principle is not necesary. BytesMessage provides byte buffer I/O. Unfortunately, it doesn't conform to the InputStream / OutputStream interface directly.

So the fix is to shim BytesMessages using what I call BytesMessageInputStream and BytesMessageOutputStream. Once you shim the BytesMessage to a stream, you simply use that stream in the constructor of a GZIP stream and you're ready to go. Once the data has been written to the GZIP stream and that stream closed, the message can be sent or acknowledged, depending on which direction you're going.


public class BytesMessageInputStream extends InputStream {
private final BytesMessage message;
public BytesMessageInputStream(BytesMessage bm) { this.message = bm; }
public boolean markSupported() { return false; }
public int read() throws IOException {
try {
return this.message.read();
}
catch(MessageEOFException ex) { return -1; }
catch(JMSException ex) { throw new IOException(ex); }
}
public int read(byte[] buf) throws IOException {
try {
return this.message.readBytes(buf);
}
catch(JMSException ex) { throw new IOException(ex); }
}
}
public class BytesMessageOutputStream extends OutputStream {
private final BytesMessage message;
public BytesMessageOutputStream(BytesMessage bm) { this.message = bm; }
public void write(byte b) throws IOException {
try {
this.message.writeByte(b);
}
catch(JMSException ex) { throw new IOException(ex); }
}
public void write(byte[] buf) throws IOException {
try {
this.message.writeBytes(buf);
}
catch(JMSException ex) { throw new IOException(ex); }
}
public void write(byte[] buf, int off, int len) throws IOException {
try {
this.message.writeBytes(buf, off, len);
}
catch(JMSException ex) { throw new IOException(ex); }
}
}

3 comments:

Fahd Shariff said...

BytesMessage doesn't have a read() method. So your BytesMessageInputStream class won't work.

Thanks

Nick said...

You're right. Substitute readByte() for read().

Akom said...

Using it, works, thanks! (had to change read() to readByte() and add a new method in BytesMessageOutputStream for JDK1.7:


@Override
public void write(int b) throws IOException {
try {
this.message.writeInt(b);
} catch (JMSException ex) {
throw new IOException(ex);
}

}