Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

OOP and threads

0 views
Skip to first unread message

Kristoffer Persson

unread,
Nov 24, 2004, 4:56:02 AM11/24/04
to
Hi guys!

I came from realtime programming when I started to create Windows apps. One
app I made had a lot of threads in it and they communicated by sending
messages through PostThreadMessage. That's how things were done in the
realtime environment I was used to, but it's not very object oriented, is
it...

So, I thought I could make it more OO by turning each thread into an object,
which encapsulates the functionality. The object would communicate with its
owner in an OO fashion through properties, methods and events. The thread is
still needed though. I thought if the object created the thread internally,
the object's owner would not have to care about it.

The problem would be the object's message queue. The thread's message queue
would be the only one reading messages in its message queue, but if I don't
create a private message queue inside the object, the object's owner would
also receive messages from the thread, possibly deleting them before the
object gets a chance to read it.

So, to a few questions:

1. Am I on the right track?
2. How do I create the object's private message queue?
3. How do I place a message there from the object's private thread?

- Kristoffer -


Chris Uzdavinis

unread,
Nov 24, 2004, 8:13:54 AM11/24/04
to
"Kristoffer Persson" <no@email> writes:

> So, I thought I could make it more OO by turning each thread into an object,
> which encapsulates the functionality.

Sure that can be done, as evidenced by BCB's TThread class, as well as
dozens of other libraries that do approximately the same thing.

> The object would communicate with its owner in an OO fashion through
> properties, methods and events. The thread is still needed though. I
> thought if the object created the thread internally, the object's
> owner would not have to care about it.

I'd be extremely careful with such a design, and personally would not
do that. You still have to have all communication in a thread-safe
way, and with inheritance it could be very difficult if the base
class is intended to run in one thread and the derived thread in
another. It's just too easy to accidently call a function directly
or to access data that protocol would not allow.

Writing threads are hard enough without extra pitfalls built-in to the
design. (If an object is to represent a thread's context, then the
whole object should. We don't want to have to designate only certain
parts of the object to be safe to use...)

> The problem would be the object's message queue. The thread's
> message queue would be the only one reading messages in its message
> queue,

That doesn't make any sense. Do you mean that the thread is the only
reader from its message queue?

> but if I don't create a private message queue inside the object, the
> object's owner would also receive messages from the thread, possibly
> deleting them before the object gets a chance to read it.

I'm not familiar with the base class you're referring to, so the
message dispatching is unclear.

I don't like the idea of a derived class being a seperate thread from
its base, with communication through a queue. They should be two
seperate and unrelated classes that communicate through a message queue.

> So, to a few questions:
>
> 1. Am I on the right track?

Yes, but don't re-invent the wheel. Thread object facades already
exist, consider using something already proven in the field.
Personally, I like the ACE library and the ACE_Task. Writing correct
multi-threaded code is much harder than it ever appears, and clean
design is essential.

> 2. How do I create the object's private message queue?
> 3. How do I place a message there from the object's private thread?

Usually, the object holds a private message queue, and you have a
public "putq" or "enqueue" member function that other threads call
(which pushes the message into the queue.) The object holding the
queue is usually waiting on a condition variable for the queue to
become non-empty, and then it dequeues an element and processes it.

If you have multiple consumers, the logic is a little more tricky, but
for a single consumer per queue, it's simple:

enqueue: (producer thread)
acquire lock
if queue is full, wait until it's not
insert into queue
if a consumers is blocked, signal the consumer to wake up and go
release lock

dequeue: (consumer thread)
acquire lock
if queue is empty, wait until it's not
remove next message, store in local var
if queue is now empty enough and a producer is blocking
for a not-too-full queue, then signal the producer to wake up.
release lock
return messsage


The message queue object is the central piece to this whole thing. I
recently posted some code to implement one, using some helper classes
in ACE, but it may be useful to show it again, as I've changed it
slightly. Below is the code I'm using, though I've stripped out some
other things that are specific to my use. Even if you don't know ACE,
hopefully you'll be able to get the ideas from it, or at least find it
useful. (Or, maybe you'll find a bug, in which case I'd like to know!
:)

#ifndef Msg_Queue_hpp_
#define Msg_Queue_hpp_

#include "ace/Synch_T.h"
#include "ace/Intrusive_List.h"
#include "ace/Basic_Stats.h"

// Simple messsage queue with a high-water-mark. If it
// fills to this level, the producer blocks until the
// queue is emptied to the low-water-mark. NOTE:
// high water mark is much higher than lower mark,
// so when the queue fills up, it must become much more
// empty before we unblock the producer. Otherwise they
// may thrash, blocking and unblocking too often.
//
// NOTE: hwm is measuring number of messages, not size in bytes.
//
//
// A message queue of T objects, stored by pointer, which must be
// compatible with the ACE_Intrusive_List. Most simply, if T inherits
// from ACE_Intrusive_List_Node, it's compatible.
//
template <class T>
class Msg_Queue
{
public:
Msg_Queue(size_t hwm = 1000000, size_t lwm = 10000);
~Msg_Queue();

void push(T * node);
T * pop();
size_t msg_queue_length() const;

private:
void operator=(Msg_Queue const &); // not implemented
Msg_Queue(Msg_Queue const &); // not implemented

typedef ACE_Guard<ACE_Thread_Mutex> Guard;

private:
ACE_Intrusive_List<T> messages_;
size_t msg_queue_length_;
size_t high_water_mark_;
size_t low_water_mark_;
size_t producers_blocked_;
size_t consumers_blocked_;
ACE_Thread_Mutex lock_;
ACE_Condition<ACE_Thread_Mutex> msg_exists_cv_;
ACE_Condition<ACE_Thread_Mutex> empty_enough_cv_;
};


#include "Msg_Queue.ipp"
#endif // Msg_Queue_hpp_

////////////

#ifndef Msg_Queue_ipp_
#define Msg_Queue_ipp_


#define TEMPLATE_ARGS template <class T>
#define MSG_QUEUE Msg_Queue<T>


TEMPLATE_ARGS
MSG_QUEUE::
Msg_Queue(size_t hwm, size_t lwm)
: messages_()
, msg_queue_length_(0)
, high_water_mark_(hwm)
, low_water_mark_(lwm)
, producers_blocked_(0)
, consumers_blocked_(0)
, lock_()
, msg_exists_cv_(lock_)
, empty_enough_cv_(lock_)
{
}


TEMPLATE_ARGS
MSG_QUEUE::
~Msg_Queue()
{
}


TEMPLATE_ARGS
size_t MSG_QUEUE::
msg_queue_length() const
{
Guard guard(lock_);
return msg_queue_length_;
}


TEMPLATE_ARGS
void MSG_QUEUE::
push(T * node)
{
Guard guard(lock_);

if (msg_queue_length_ > high_water_mark_)
{
++producers_blocked_;
while (msg_queue_length_ >= low_water_mark_)
{
empty_enough_cv_.wait();
}
--producers_blocked_;
}

messages_.push_front(node);
++msg_queue_length_;
if (consumers_blocked_ > 0)
{
msg_exists_cv_.signal();
}
}


TEMPLATE_ARGS
T * MSG_QUEUE::
pop()
{
Guard guard(lock_);
if (messages_.empty())
{
++consumers_blocked_;
do
{
msg_exists_cv_.wait();
} while (messages_.empty());
--consumers_blocked_;
}

--msg_queue_length_;
if ( (producers_blocked_ > 0)
&& (msg_queue_length_ <= low_water_mark_))
{
empty_enough_cv_.signal();
}
return messages_.pop_back();
}

#undef MSG_QUEUE
#undef TEMPLATE_ARGS

#endif // Msg_Queue_ipp_

I've had great success with the above code, where the messages
themselves implement the Command pattern.


--
Chris (TeamB);

0 new messages