Passing Messages from the shoulders of Apache:
A demonstration of distributed image processing
By Matthew Stevenson - miamg@dnikatt.h.reutroboknoshanind@obor.ttagmaiycrehpl.com (July 25, 2011)
We are now ready to create a veiwer in Java.
Similar to the C++ display, this process will:
- Retreive an image
- Retreive face locations
- Display the image with locations
Start by connecting to the broker.
ConnectionFactory cf = new AMQConnectionFactory("amqp://admin:admin@clientid/test?brokerlist='tcp://localhost:5672'");
Connection myConnection = cf.createConnection();
Session mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination myImageQueue = new AMQQueue(videoQueue);
Destination myRegionsQueue = new AMQQueue(regionsQueue);
MessageConsumer myImageConsumer = mySession.createConsumer(myImageQueue);
MessageConsumer myRegionsConsumer = mySession.createConsumer(myRegionsQueue);
myConnection.start();
We will need to follow the same steps for fetching an image.
First we need to get get our PortableImage from the Qpid message.
This generic method will deserialize the message contents to a class generated from an Avro schema
public static <T
extends SpecificRecordBase
>
T unpackMessage
(Class<T
> c, BytesMessage message
) throws Exception,
IOException{
long len = message.
getBodyLength();
byte[] data =
new byte[(int)len
]; //loss of data when len larger than max int
int read = message.
readBytes(data
);
ByteArrayInputStream in =
new ByteArrayInputStream(data
);
DatumReader
<T
> reader =
new SpecificDatumReader
<T
>(c
);
Decoder d = DecoderFactory.
get().
binaryDecoder(in,
null);
T t = reader.
read(null, d
);
return t
;
}
Although unused in this example, this will serialize an Avro type into a Qpid message:
public static <T
extends SpecificRecordBase
>
void packMessage
(Class<T
> c, T t, BytesMessage message
) throws JMSException,
IOException{
ByteArrayOutputStream out =
new ByteArrayOutputStream();
DatumWriter
<T
> w =
new SpecificDatumWriter
<T
>(c
);
Encoder e = EncoderFactory.
get().
binaryEncoder(out,
null);
w.
write(t, e
);
e.
flush();
message.
writeBytes(out.
toByteArray(),
0, out.
size());
}
This will create a BufferedImage from our PortableImage class:
(It will work with single-channel 8-bit grayscale or 3-channel 24-bit rgb images)
public static BufferedImage unpackImage
(PortableImage pimg
) {
int w = pimg.
width;
int h = pimg.
height;
int wStep = pimg.
widthStep;
int c = pimg.
nChannels;
ByteBuffer data = pimg.
data;
BufferedImage bimg =
new BufferedImage(pimg.
width, pimg.
height,
BufferedImage.
TYPE_INT_ARGB);
for (int y =
0; y
< h
; y++
) {
for (int x =
0; x
< w
; x++
) {
int argb = getARGB
(data, y
*wStep + x
*c, c
);
bimg.
setRGB(x, y, argb
);
}
}
return bimg
;
}
private static int getRGB
(ByteBuffer data,
int offset,
int channels
){
int rgb =
255 << 24; //set alpha
int val =
0;
for(int i=
0; i
<3; i++
){
if(i
<channels
){
val = Utils.
unsign(data.
get(offset+i
));
}
int shift =
(2-i
)*8;
rgb = rgb |
(val
<< shift
);
}
return rgb
;
}
Using these methods we can get a usable image from our message:
Message imgMsg = myImageConsumer.
receive();
PortableImage pimg = unpackMessage
(PortableImage.
class,
(BytesMessage
)imgMsg
);
Image img = unpackImage
(pimg
);
Next we can fetch the results from our face detection and draw them on our image:
Message rgnsMsg = myRegionsConsumer.
receive();
ImageRegions regions = unpackMessage
(ImageRegions.
class,
(BytesMessage
)rgnsMsg
);
Graphics g = img.
getGraphics();
g.
setColor(Color.
RED);
for(ImageRegion rgn : regions.
regions){
g.
drawRect(rgn.
x, rgn.
y, rgn.
width, rgn.
height);
}
Lastly, we need a panel to display our image:
public class VideoPanel
extends javax.
swing.
JPanel{
private Image myImage
;
public VideoPanel
() {...
}
public void drawImage
(Image image
){
myImage = image
;
repaint
();
}
@
Override public void paint
(Graphics g
){
g.
drawImage(myImage,
0,
0, getWidth
(), getHeight
(),
null);
}
...
}
Finally, with an instance of this panel we can now draw our image:
vidPanel.drawImage(img);