// // mutexlock.h -- mutex and semaphore wrapper. // // Developed for QNX, but may work on other Posix-compliant systems. // // Maintains much of the interface of the GNU Common C++ library, // but is all-inline and removes all the GNU macro dreck. // // License: LGPL // // John Nagle // Animats // January 2003 // #ifndef MUTEXLOCK_H #define MUTEXLOCK_H #include #include #include #include #include #include "errnoexception.h" namespace ost { using namespace std; // // class Mutex -- classic mutex // // This just encapsulates Posix mutexes. // // These mutexes are recursive; you can lock the same // object twice from the same thread. But these mutexes // do not have P and V type counters; see "Semaphore" // below for that. // Based on interface defined by David Sugar // // constants // const pthread_mutex_t initialmutexstate = PTHREAD_RMUTEX_INITIALIZER; // initial state of mutex // class Mutex { private: pthread_mutex_t m_mutex; // Posix mutex object private: void error_check(int stat) // report Posix error if any { if (stat == EOK) return; // if OK, done throw(errno_error(stat)); // otherwise fails and throws input error number } Mutex& operator=(Mutex& m) { throw(errno_error(EINVAL)); } // private/unreachable - unassignable Mutex(Mutex& m) { throw(errno_error(EINVAL)); } // private/unreachable - uncopyable public: Mutex() // constructor (makes recursive mutex) { pthread_mutexattr_t attr; // construct attributes of mutex error_check(pthread_mutexattr_init(&attr)); // initialize to default attributes error_check(pthread_mutexattr_setrecursive(&attr, PTHREAD_RECURSIVE_ENABLE)); // make it a recursive mutex error_check(pthread_mutex_init(&m_mutex,&attr)); // initialze the mutex error_check(pthread_mutexattr_destroy(&attr)); // release the attribute object } virtual ~Mutex() // destructor (throws if any threads waiting) { error_check(pthread_mutex_destroy(&m_mutex)); } void enterMutex(void) // locks the mutex for the current thread { error_check(pthread_mutex_lock(&m_mutex)); } bool tryEnterMutex(void) // like EnterMutex, but returns false if already locked. { int stat = pthread_mutex_trylock(&m_mutex); // try to lock if (stat == EOK) return(true); // got lock if (stat == EBUSY) return(false); // busy throw(errno_error(stat)); // error, throw } void leaveMutex(void) // obvious unlock function { error_check(pthread_mutex_unlock(&m_mutex)); } // unlock }; // // class MutexLock -- scope-based lock for mutex // // Locks when constructed, unlocks when destructed. // Exception-safe. // // This should be used whenever possible, in preference // to locking and unlocking mutexes directly. // // Usage: // { MutexLock lock(mutex); // ... // locked region // } // unlocks at scope exit. // class MutexLock { private: Mutex& m_mutex; // reference to associated mutex public: // Constructor - locks MutexLock( Mutex& mutex ) : m_mutex( mutex ) { m_mutex.enterMutex(); } // Destructor - unlocks ~MutexLock() // non-virtual destructor - do not subclass { m_mutex.leaveMutex(); } }; // // class Semaphore -- Djkystra-type P and V primitives. // // These allow usage counts > 1, unlike mutexes. // // The initial count can be specified, and is zero by default. // Each "wait" decrements the count, blocking until the count is > 0. // Each "post" increments the count. "Post" never blocks. // // Semaphores created with "shared=true" can be used in // shared memory. // // "wait" can be given a timeout value, in seconds, as a floating point number. // class Semaphore { private: sem_t m_semaphore; // the semaphore private: virtual void error_check(int stat) // report Posix error if any { if (stat == EOK) return; // if OK, done throw(errno_error(stat)); // otherwise fails and throws errno } private: Semaphore(Semaphore& dummy) {} // make class uncopyable Semaphore& operator=(Semaphore& dummy) { return(*this);} // make class uncopyable public: // Constructor. Semaphore(size_t resource = 0, bool shared=false) { error_check(sem_init(&m_semaphore,shared,resource)); } // Destructor virtual ~Semaphore() { sem_destroy(&m_semaphore); } // wait -- wait for counter > 0 void wait() { error_check(sem_wait(&m_semaphore)); } // trywait -- lock if counter > 0, return false otherwise. bool trywait() { int stat = sem_trywait(&m_semaphore); // try to lock return(stat == 0); // success if stat==0 } // timedwaitns -- wait no longer than specified number of nanoseconds bool timedwaitns(uint64_t ns) { struct timespec tm; // don't wait past this time clock_gettime(CLOCK_REALTIME, &tm); // get time now uint64_t timens(timespec2nsec(&tm)); // convert now to nanoseconds timens += ns; // compute end of wait nsec2timespec(&tm,timens); // convert back to time structure int stat = sem_timedwait(&m_semaphore,&tm);// wait, no longer than timelimit if (stat == 0) return(true); // normal case - locked // We have to check errno to find out whether this is simply a timout // or a hard error. Under QNX, errno is thread-local, so this is safe. if (errno == ETIMEDOUT) return(false); // timeout case - not locked throw(errno_error(errno)); // error case - throw } // wait -- with timeout argument (seconds) bool wait(double secs) { return(timedwaitns(uint64_t(secs*1.0e9))); } // wait no more than secs // post -- increment counter, unblocking anyone waiting. void post() { error_check(sem_post(&m_semaphore)); } }; // // atomic_incmod_value -- add to value, modular, as atomic operation // // This is subtle. // inline unsigned atomic_incmod_value(volatile unsigned* loc, unsigned mod) { unsigned oldval = atomic_add_value(loc, 1); // add 1, return value, atomic operation if (oldval >= mod) // if overflow { if (oldval == mod) // if exactly at overflow { atomic_sub(loc,mod); }// must reduce by one cycle oldval %= mod; // must reduce result } return(oldval); } // // Bounded buffer package // // Reminiscent of John Walker's bounded buffer package in FANG, circa 1972. // // This template class implements a fixed-size bounded buffer. // The buffer stores N objects of type T, for implementing // producer-consumer relationships. // "put" inserts, blocking if the buffer is full. // "get" removes, blocking if the buffer is empty. // // Bounded buffers of size 1 work, and are quite useful for // simple event synchronization. // A size of 0, however, is not useful. // template class BoundedBuffer { private: Semaphore m_lock_head; // lock for output end Semaphore m_lock_tail; // lock for input end size_t m_getpos; // next avail for read size_t m_putpos; // next avail for write T m_data[N]; // the data private: size_t next(size_t& pos) // advance to next item, returning current { if (N > 1) // for N=1, no arithmetic is required. { return(atomic_incmod_value(&pos,N)); // next pos } else { // size 1, no computation required return(0); } } public: BoundedBuffer() : // constructor - create an empty object m_lock_head(0), // no slots are full m_lock_tail(N), // N slots available m_getpos(0), // start at 0 m_putpos(0) // start at 0 {} void get(T& item) // get one from queue - blocks { m_lock_head.wait(); // wait for an item item = m_data[next(m_getpos)]; // item available, get it m_lock_tail.post(); // allow another put } void put(T item) // put item - blocks { m_lock_tail.wait(); // wait for space m_data[next(m_putpos)] = item; // put item in slot m_lock_head.post(); // allow another get } bool tryget(T& item) // get one from queue - non-blocking { bool locked = m_lock_head.trywait(); // wait for an item if (!locked) return(false); // didn't get one item = m_data[next(m_getpos)]; // item available, get it m_lock_tail.post(); // allow another put return(true); // got one } bool tryput(T item) // put item - non-blocking { bool locked = m_lock_tail.trywait(); // wait for space if (!locked) return(false); // didn't get one m_data[next(m_putpos)] = item; // put item in slot m_lock_head.post(); // allow another get return(true); // put one } bool get(T& item, double secs) // get one from queue - with timeout on block { bool locked = m_lock_head.wait(secs); // wait for an item if (!locked) return(false); // didn't get one item = m_data[next(m_getpos)]; // item available, get it m_lock_tail.post(); // allow another put return(true); // got one } bool put(T item, double secs) // put item - with timeout on block { bool locked = m_lock_tail.wait(secs); // wait for space if (!locked) return(false); // didn't get one m_data[next(m_putpos)] = item; // put item in slot m_lock.head.post(); // allow another get return(true); // put one } bool peek(T& item) // look at top of queue, but do not remove item { bool locked = m_lock_head.trywait(); // try to lock if (!locked) return(false); // nothing to return item = m_data[m_getpos]; // item available, get it m_lock.head.post(); // unlock return(true); // peeked successfully } unsigned int size() const // size of buffer { return(N); } // return fixed size }; }; // end namespace #endif // MUTEXLOCK_H