Passing Messages from the shoulders of Apache:
A demonstration of distributed image processing


Part 3: Face Detection Process in C++ with Avro, Qpid, and OpenCV
By Matthew Stevenson - miamg@dnikatt.h.reutroboknoshanind@obor.ttagmaiycrehpl.com  (July 25, 2011)
  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code
This component will:
First we need to connect to the broker.
This time we will create a receiver to get the images, and a sender to send the results.
Connection myConnection("localhost:5672", "{username:admin, password:admin}");
myConnection.open();
Session mySession = myConnection.createSession();
Receiver myImageReceiver = mySession.createReceiver("example.VideoTopic; {create: always, node: {type: topic}}");
Sender myResultSender = mySession.createSender("example.FaceQueue; {create: always, node: {type: queue}}");

We will need to pull an image from the VideoTopic. To do this, we fetch the message, deserialize the contents to a PortableImage, and convert to an IplImage.
Here is a template function for deserializing a Qpid message to an Avro type:
template<typename T> T* unpackMessage(Message &message){
    DecoderPtr d = binaryDecoder();
    auto_ptr<InputStream> is = memoryInputStream((const uint8_t*)message.getContentPtr(), message.getContentSize());
    d->init(*is);
    try{
        T* t = new T();
        decode(*d, *t);
        return t;
    }catch(const avro::Exception &ex){
        return NULL;
    }
}

This will convert a PortableImage into an IplImage:
IplImage* unpackImage(PortableImage* pimg){    
    int width = pimg->width;
    int height = pimg->height;
    int channels = pimg->nChannels;
    int widthStep = pimg->widthStep;
    CvSize size = cvSize(width, height);
    IplImage* image = NULL;
    try{
        image = cvCreateImageHeader(size, IPL_DEPTH_8U, channels);
        cvInitImageHeader(image, size, IPL_DEPTH_8U, channels);
        int dataSize = widthStep*height;
        uint8_t* buf = new uint8_t[dataSize];
        for(int i=0; i<pimg->data.size(); i++){
            buf[i] = pimg->data[i];
        }
        cvSetData(image, buf, widthStep);
        image->imageDataOrigin = (char*)buf;
    }catch(const cv::Exception &ex){
        return NULL;
    }catch(...){
        return NULL;
    }
    return image;
}

We can get our message and unpack the image:
Message imageMessage = myImageReceiver.fetch();
PortableImage* pimg = unpackMessage<PortableImage>(imageMessage);
IplImage* img = unpackImage(pimg);

Now we can detect faces:
CvMemStorage* myStorage = cvCreateMemStorage();
CvHaarClassifierCascade* myClassifier = (CvHaarClassifierCascade *)cvLoad("/path/to/haarcascade_frontalface_default.xml");
CvSeq* detectedFaces = cvHaarDetectObjects(img, myClassifier, myStorage, 1.2, 5, CV_HAAR_DO_CANNY_PRUNING, cvSize(40,40));

After running this, detectedFaces will contain a sequence of face locations, and they will be in the form of
CvRect.
These are the results we want to send, so we will need to convert them to an Avro type first.
CvRect contains the center point, a width, and a height. And we may find multiple faces in an image.

{
    "name": "ImageRegions", "type": "record",
    "fields": [
        {"name": "imageId", "type": "long"},
        {"name": "regions", "type": {"type":"array", "items":{
            "name": "ImageRegion", "type": "record",
            "fields": [
                {"name": "x", "type": "int"},
                {"name": "y", "type": "int"},
                {"name": "width", "type": "int"},
                {"name": "height", "type": "int"}
            ]
        }}}
    ]
}

(I included an imageId field so these regions can be assosciated with a PortableImage's id. These are not used in this example.)
Once again we'll use Avro to generate the code for C++ and Java.
Next, we'll convert the detectedFaces sequence to an ImageRegions:
ImageRegions* packImageRegions(CvSeq *seq){
    ImageRegions* regions = new ImageRegions();  
    int count = (seq ? seq->total : 0);
    for(int i = 0; i < count; ++i){
        CvRect* r = (CvRect*)cvGetSeqElem(seq, i);
        ImageRegion ir;
        ir.x = r->x;
        ir.y = r->y;
        ir.height = r->height;
        ir.width = r->width;
        regions->regions.push_back(ir);
    }
    return regions;
}

Now we pack up and send off the results:
ImageRegions* regions = packImageRegions(detectedFaces);
Message resultsMessage = packMessage<ImageRegions>(regions);
myResultSender.send(resultsMessage);

And then clean up after ourselves:
if(detectedFaces != NULL){
    cvClearSeq(detectedFaces);
}
cvClearMemStorage(myStorage);
delete[] img->imageDataOrigin;

And now we are ready display the image and result.
  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code