Thursday, April 4, 2013

ZeroMQ on Node.JS and Socket inspection

First of all, I'm sorry for the lack of updates. Lately I've been writing papers and not really coding. Moreover in the last few days I was on vacation, so no computer either.
Anyways. In my project I'm using ZeroMQ which is a very good socket library. I use that to make my workers communicate with each other.

Lately my main concern was message loss. Since I increase and decrease the number of workers, it may happen that some worker gets shut down when it is receiving, processing or sending a message.
The very first approach I had to solve the issue was to save the timestamp when a message was received and then wait some time. If a message was not received within that time (last_received_message - time_now > some_variable) then no messages will ever arrive anymore and I would shut down the worker. Moreover a flag would help me if a message is being processed (that is, when receiving a message a flag is set to true, when the message leaves the worker, the flag is set to false).

The problem is that I cannot possibly access the socket's queue to check what is inside and if I have to wait some more time before shutting the worker down. Eventually I found out about the getsockopt() function and its return values.
Before showing the code, I have to tell that this is not a final solution, nor the very right way to do it. For what concerns my sockets, I use PULL and PUSH. This means that I can only have two valid options for both. For the PULL socket which is read-only:

0 = nothing to read
1 = have something to read

For the PUSH socket which is write-only they are:
0 = can't write
2 = can write

BUT. The getsockopt(ZMQ_EVENTS) & ZMQ_POLLOUT > 0 does not mean there are no messages in the queue. It just means that the queue is not full and the socket is ready to accept some more for sending. On the other hand getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN == 0 guarantees that the incoming queue is empty.


if(msg.command == 'kill'){
    setInterval(function(){
       var time_now = new Date().getTime();
       //if 10 seconds passed without receiving any message or no message received at all (producer or useless worker)
       if(time_now - last_message_received > 10000 && !execution_flag || !last_message_received || receiver.getsockopt(zmq.ZMQ_EVENTS) | zmq.ZMQ_POLLIN == 0 && !execution_flag){
            //kill
       }
    }, 1000);
}

So basically I set up a timeout each second that checks if something has been received, if the worker is working on something or if is not working at something AND the POLLIN value is 0.
I still have to check this approach, but the given values for the bitmasks are correct.
If you have a better idea I'm open to suggestions. For now I think I will keep it this way.
Categories: , ,

19 comments: