Passing Messages from the shoulders of Apache:
A demonstration of distributed image processing


Part 5: Video Display Process in Java with Avro and Qpid
By Matthew Stevenson - miamg@dnikatt.h.reutroboknoshanind@obor.ttagmaiycrehpl.com  (July 25, 2011)
  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code
We are now ready to create a veiwer in Java.
Similar to the C++ display, this process will:
Start by connecting to the broker.
ConnectionFactory cf = new AMQConnectionFactory("amqp://admin:admin@clientid/test?brokerlist='tcp://localhost:5672'");
Connection myConnection = cf.createConnection();
Session mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination myImageQueue = new AMQQueue(videoQueue);
Destination myRegionsQueue = new AMQQueue(regionsQueue);
MessageConsumer myImageConsumer = mySession.createConsumer(myImageQueue);
MessageConsumer myRegionsConsumer = mySession.createConsumer(myRegionsQueue);
myConnection.start();

We will need to follow the same steps for fetching an image.
First we need to get get our PortableImage from the Qpid message.
This generic method will deserialize the message contents to a class generated from an Avro schema
public static <T extends SpecificRecordBase>
        T unpackMessage(Class<T> c, BytesMessage message) throws Exception, IOException{
    long len = message.getBodyLength();
    byte[] data = new byte[(int)len];   //loss of data when len larger than max int
    int read = message.readBytes(data);
    ByteArrayInputStream in = new ByteArrayInputStream(data);
    DatumReader<T> reader = new SpecificDatumReader<T>(c);
    Decoder d = DecoderFactory.get().binaryDecoder(in, null);
    T t = reader.read(null, d);
    return t;
}

Although unused in this example, this will serialize an Avro type into a Qpid message:
public static <T extends SpecificRecordBase>
        void packMessage(Class<T> c, T t, BytesMessage message) throws JMSException, IOException{
    ByteArrayOutputStream out = new ByteArrayOutputStream();    
    DatumWriter<T> w = new SpecificDatumWriter<T>(c);
    Encoder e = EncoderFactory.get().binaryEncoder(out, null);
    w.write(t, e);
    e.flush();
    message.writeBytes(out.toByteArray(), 0, out.size());
}

This will create a BufferedImage from our PortableImage class:
(It will work with single-channel 8-bit grayscale or 3-channel 24-bit rgb images)
public static BufferedImage unpackImage(PortableImage pimg) {
    int w = pimg.width;
    int h = pimg.height;
    int wStep = pimg.widthStep;
    int c = pimg.nChannels;
    ByteBuffer data = pimg.data;
    BufferedImage bimg = new BufferedImage(pimg.width, pimg.height, BufferedImage.TYPE_INT_ARGB);
    for (int y = 0; y < h; y++) {
        for (int x = 0; x < w; x++) {
            int argb = getARGB(data, y*wStep + x*c, c);
            bimg.setRGB(x, y, argb);
        }
    }
    return bimg;
}

private static int getRGB(ByteBuffer data, int offset, int channels){
    int rgb = 255 << 24; //set alpha
    int val = 0;
    for(int i=0; i<3; i++){
        if(i<channels){
            val = Utils.unsign(data.get(offset+i));
        }
        int shift = (2-i)*8;
        rgb = rgb | (val << shift);
    }
    return rgb;
}

Using these methods we can get a usable image from our message:
Message imgMsg = myImageConsumer.receive();
PortableImage pimg = unpackMessage(PortableImage.class, (BytesMessage)imgMsg);
Image img = unpackImage(pimg);

Next we can fetch the results from our face detection and draw them on our image:
Message rgnsMsg = myRegionsConsumer.receive();
ImageRegions regions = unpackMessage(ImageRegions.class, (BytesMessage)rgnsMsg);
Graphics g = img.getGraphics();
g.setColor(Color.RED);
for(ImageRegion rgn : regions.regions){
    g.drawRect(rgn.x, rgn.y, rgn.width, rgn.height);
}

Lastly, we need a panel to display our image:
public class VideoPanel extends javax.swing.JPanel{
    private Image myImage;
   
    public VideoPanel() {...}
   
    public void drawImage(Image image){
        myImage = image;
        repaint();
    }
   
    @Override public void paint(Graphics g){
        g.drawImage(myImage, 0, 0, getWidth(), getHeight(), null);
    }
   
    ...
}

Finally, with an instance of this panel we can now draw our image:
vidPanel.drawImage(img);

  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code