Implementing queue with put and take operations in OMNeT++

1,312 views
Skip to first unread message

Miguel Martins

unread,
Oct 29, 2012, 5:47:57 PM10/29/12
to omn...@googlegroups.com
Hello,
I'm trying to incorporate queuing functionality in one of my projects. It appears, however, that the queuing examples that come with OMNeT++ don't quite implement the functionality I want: in particular, I'd like to have a queue with both put and take operations. "queuinglib" only seems to implement the former.

Here's what I've done so far:
/* FILE: Queue.cpp */

#include "Queue.h"
#include "QueueMessage_m.h"
#include <omnetpp.h>

void Queue::initialize() {

}

void Queue::handleMessage(cMessage* msg) {
    QueueMessage* message = check_and_cast<QueueMessage*>(msg);
    if (message->getOperationType() == REQUEST_PUT) {
        queue.insert(message);
    } else if (message->getOperationType() == REQUEST_TAKE) {
        if (!queue.isEmpty()) {
            QueueMessage* reply = check_and_cast<QueueMessage*>(queue.pop());
            send(reply, msg->getSenderGate());
        } else {
            //Wait for queue to not be empty... Somehow...
        }
    }
}

The corresponding header file is as follows:
#ifndef QUEUE_H_
#define QUEUE_H_

#include <omnetpp.h>

class Queue : public cSimpleModule {
private:
    cQueue queue;
    virtual void initialize();
    virtual void handleMessage(cMessage* msg);
};

Define_Module(Queue)

#endif /* QUEUE_H_ */

Here's my logic behind it: whenever a module wants to put a message on the queue, it sends a message to the Queue module with an operation type of "REQUEST_PUT". Likewise, whenever a module wants to take a message from the queue, it sends a message with an operation type of "REQUEST_TAKE", which would cause the Queue module to take a message from its internal queue and send it to whoever performed the take operation.

The question is... How do I properly implement this:
if (!queue.isEmpty()) {
            QueueMessage* reply = check_and_cast<QueueMessage*>(queue.pop());
            send(reply, msg->getSenderGate());
        } else {
            //Wait for queue to not be empty... Somehow... <-- This bit right here.
        }

That is, to put it roughly, what do I do when the queue is actually empty? How would I go about making it somehow "wait" until there's a message on the queue?

Thank you in advance.


Miguel Martins

unread,
Oct 29, 2012, 5:50:53 PM10/29/12
to omn...@googlegroups.com
Here's some additional info: I've tried to model a simple scenario where there is a client that puts values in the queue, and a server which takes them out and prints them. Here's the code for both:

/* FILE: Client.cpp */

#include "Client.h"
#include "QueueMessage_m.h"
#include <omnetpp.h>

void Client::initialize() {
    init_message = new cMessage("init");
    scheduleAt(0,init_message);
}

void Client::handleMessage(cMessage* msg) {
    if (msg == init_message) {
        ev << "Client initialized!\n";
        send_to_queue(1,2);
    }
}

void Client::send_to_queue(int first,int second) {
    QueueMessage* message = new QueueMessage();
    message->setOperationType(REQUEST_PUT);
    message->setFirstValue(first);
    message->setSecondValue(second);

    send(message, "gate$o");
}

And the server:

/* FILE: Server.cpp */

#include "Server.h"
#include "QueueMessage_m.h"
#include <omnetpp.h>

void Server::initialize() {
    init_message = new cMessage("init");
    scheduleAt(0,init_message);
}

void Server::handleMessage(cMessage* msg) {
    if (msg == init_message) {
        ev << "Server initialized!\n";
        QueueMessage* message = new QueueMessage();
        message->setOperationType(REQUEST_TAKE);
        send(message, "gate$o");
    } else {
        ev << "Server got a message other than the init message!\n";
        QueueMessage* message = check_and_cast<QueueMessage*>(msg);
        ev.printf("Server got firstValue=%d and secondValue=%d", message->getFirstValue(), message->getSecondValue());
    }
}

And the message format for the queue:

enum OperationType {
    NULL_OPERATION = 0;
    REQUEST_PUT = 1;
    REQUEST_TAKE = 2;
};

message QueueMessage {
    int operationType @enum(OperationType);
    int firstValue;
    int secondValue;
}


Rudolf Hornig

unread,
Oct 30, 2012, 5:47:26 AM10/30/12
to omn...@googlegroups.com
For the question: if there is nothing in the queue, you can either:

- send back an "error" packet signifying that packets cannot be returned from an empty queue and do nothing
- set an internal bool field signalling that there is a "request pending" and send the next incoming packet out immediately at the next arriving packet (instead of queuing it)...

it depends on what semantics you use between the two classes... but at the end, you will see some issues with this, like what happens if you request 2-3 or n packets in a row with an empty queue, so possibly you should implement it as a counter of outstanding requests... At the end you should implement also a PEEK operation so the server will not request a packet if the queue is empty...

BUT: i suggest to take a look at the PassiveQueue / Server in queueing lib because that does exactly what you want (and a bit more)
Rudolf

Miguel Martins

unread,
Nov 5, 2012, 2:21:53 PM11/5/12
to omn...@googlegroups.com
Your answer gave me an idea: what if I had a second, internal queue, for pending requests? The following seems to solve my problem (at least in the example I gave):

#include "Queue.h"
#include "QueueMessage_m.h"
#include <omnetpp.h>

void Queue::initialize() {

}

void Queue::handleMessage(cMessage* msg) {
    QueueMessage* message = check_and_cast<QueueMessage*>(msg);

    if (message->getOperationType() == REQUEST_PUT) {
        queue.insert(message);
    } else if (message->getOperationType() == REQUEST_TAKE) {
        if (!queue.isEmpty()) {
            QueueMessage* reply = check_and_cast<QueueMessage*>(queue.pop());
            cGate* reply_gate = gateHalf(msg->getArrivalGate()->getBaseName(),cGate::OUTPUT,msg->getArrivalGate()->getIndex());
            send(reply, reply_gate);
        } else {
            requests.insert(msg);
        }
    }
    //Do we have requests? Serve them!
    cQueue temporary_queue;
    ev << "Server has " << requests.length() << " requests to serve.\n";
    while (!requests.isEmpty()) {
        QueueMessage* request = check_and_cast<QueueMessage*>(requests.pop());
        if (request->getOperationType() == REQUEST_TAKE) {
            //If queue is not empty, serve this request now
            if (!queue.isEmpty()) {
                QueueMessage* reply = check_and_cast<QueueMessage*>(queue.pop());
                cGate* reply_gate = gateHalf(request->getArrivalGate()->getBaseName(),cGate::OUTPUT,request->getArrivalGate()->getIndex());
                send(reply, reply_gate);
            }
            //If not, put it back, in the temporary queue
            else {
                temporary_queue.insert(request);
            }
        }
    }
    //Copy from temporary queue
    while (!temporary_queue.isEmpty()) {
        requests.insert(temporary_queue.pop());
    }
}

Rudolf Hornig

unread,
Nov 5, 2012, 4:41:57 PM11/5/12
to omn...@googlegroups.com
Yes, obviously you have to queue also the requests from the server, so they will be served in the correct order.

The PassiveQueue / Server modules in the queueinglib example have a bit different schematics and they are a bit more general:

- Several PassiveQueues can be attached to a single Server. Whenever the server finishes a job, it can look into all queues and check whether they are empty or not (before sending an actual request). If more than two queue have a message, there is an installable algorithm that decides which one will be queried. This way you can implement round-robin, priority based and other serving strategies....

The same is true in the opposite direction: a single passive queue can be attached to several servers. Whenever a job arrives to a passive queue it looks for a connected idle server. if ore than one id idle, again an installable algorithm can decide which one should process the just arrived job...

Take a look hot it works in the Terminal example.
Rudolf
Reply all
Reply to author
Forward
0 new messages