Passing Messages from the shoulders of Apache:
A demonstration of distributed image processing


Part 2: Camera Capture Process in C++ with Avro, Qpid, and OpenCV
By Matthew Stevenson - miamg@dnikatt.h.reutroboknoshanind@obor.ttagmaiycrehpl.com  (July 25, 2011)
  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code
We will need to do a few things:
Connecting to a broker and create a message sender:
Connection myConnection("localhost:5672", "{username:admin, password:admin}");  
myConnection.open();
Session mySession = myConnection.createSession();
Sender mySender = mySession.createSender("example.VideoTopic; {create: always, node: {type: topic}}");

Connect to a camera:
CvCapture* myCapture = cvCreateCameraCapture(-1);

Capture a frame:
IplImage* img = cvQueryFrame(myCapture);

I usually convert the image to grayscale and resize it immediately after capture. This helps performances, but is not necessary and the code can handle color images as well as grayscale.
IplImage* myConvertColorHeader = cvCreateImage(cvSize(640,480),IPL_DEPTH_8U,1);
IplImage* myResizeHeader = cvCreateImage(cvSize(320,240),IPL_DEPTH_8U,1);
cvConvertImage(img,myConvertColorHeader);
cvResize(myConvertColorHeader,myResizeHeader);

Now we need an Avro definition in JSON for a serializable image:

{
    "name": "PortableImage", "type": "record",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "width", "type": "int"},
        {"name": "height", "type": "int"},
        {"name": "nChannels", "type": "int"},
        {"name": "widthStep", "type": "int"},
        {"name": "data", "type": "bytes"}
    ]
}

And we'll use Avro to generate code for C++ and Java.
Next we need to map the OpenCV IplImage to our PortableImage format.
PortableImage* packImage(IplImage *img){
    PortableImage* pi = new PortableImage();
    pi->width = img->width;
    pi->height = img->height;
    pi->nChannels = img->nChannels;
    pi->widthStep = img->widthStep;
    int dataSize = img->imageSize;
   
    vector<uint8_t> data(dataSize);
    for(int i=0; i<dataSize; i++){
        data[i] = img->imageData[i];
    }
    pi->data = data;
    return pi; 
}

Now we need to serialize the PortableImage and pack it into a Qpid message.
This step presents a small difficulty. Avro serializes data directly to an avro OutputStream.
Avro defines a FileOutputStream and a MemoryOutputStream, however only the defintion for the abstract OutputStream is available through the api, and the implementations are hidden in source files.
We need to access the serialized bytes directly, but we cannot do this since the MemoryOutputStream is hidden from us.
As a work around, I made an identical copy of the MemoryOutputStream, and renamed it AvroQpidOutputStream.
These both will hold the serialized data in a public field:
std::vector<uint8_t*> data_

I also defined the global function:
std::auto_ptr<OutputStream> avroqpidOutputStream(size_t chunkSize = 4 * 1024);

which mimics how MemoryOutputStreams are created.
Now we will serialize the PortableImage, and then copy the data into a Qpid message. (Ideally the AvroQpidOutputStream could serialize directly into a qpid message.)
This template function will create a Qpid message from any generated Avro type:
template<typename T> Message packMessage(T *t){
    auto_ptr<AvroQpidOutputStream> os = avroqpidOutputStream();
    EncoderPtr e = binaryEncoder();
    e->init(*os);

    avro::encode(*e, *t);
    e->flush();
   
    int count = os->byteCount();    
    char* data = new char[count];
    int i=0;
    for (std::vector<uint8_t*>::const_iterator it = os->data_.begin(); it != os->data_.end() && i<count; ++it) {
        uint8_t* chunk = *it;
        int size = os->chunkSize_;
        for(int j=0; j<size && i<count; j++, i++){
            data[i] = chunk[j];
        }
    }
    Message message;
    message.setContent(data, count);
    delete[] data;
    return message;
}

This will capture, convert to grayscale, resize, and send an image:
IplImage* img = cvQueryFrame(myCapture);
cvConvertImage(img,myConvertColorHeader);
cvResize(myConvertColorHeader,myResizeHeader);
PortableImage* pimg = packImage(myResizeHeader);
Message message = packMessage<PortableImage>(pimg);
mySender.send(message);

Now our message is sitting in a topic, waiting to be fetched by the Image Processor and Video Display.

  1. Overview - Distributed messaging with Avro and Qpid
  2. Camera Capture Process
  3. Face Detection Process
  4. Video Display (C++)
  5. Video Display (Java)
  6. Running the Processes
  7. Complete Code