OpenRTM
1.0.0
|
00001 // -*- C++ -*- 00020 #ifndef RTC_RINGBUFFER_H 00021 #define RTC_RINGBUFFER_H 00022 00023 #include <vector> 00024 #include <algorithm> 00025 #include <iostream> 00026 00027 #include <coil/TimeValue.h> 00028 #include <coil/Mutex.h> 00029 #include <coil/Guard.h> 00030 #include <coil/Condition.h> 00031 #include <coil/stringutil.h> 00032 00033 #include <rtm/BufferBase.h> 00034 #include <rtm/BufferStatus.h> 00035 00036 #define RINGBUFFER_DEFAULT_LENGTH 8 00037 00051 namespace RTC 00052 { 00088 template <class DataType> 00089 class RingBuffer 00090 : public BufferBase<DataType> 00091 { 00092 public: 00093 BUFFERSTATUS_ENUM 00094 typedef coil::Guard<coil::Mutex> Guard; 00118 RingBuffer(long int length = RINGBUFFER_DEFAULT_LENGTH) 00119 : m_overwrite(true), m_readback(true), 00120 m_timedwrite(false), m_timedread(false), 00121 m_wtimeout(1, 0), m_rtimeout(1, 0), 00122 m_length(length), 00123 m_wpos(0), m_rpos(0), m_fillcount(0), m_wcount(0), 00124 m_buffer(m_length) 00125 { 00126 this->reset(); 00127 } 00128 00144 virtual ~RingBuffer(void) 00145 { 00146 } 00147 00187 virtual void init(const coil::Properties& prop) 00188 { 00189 initLength(prop); 00190 initWritePolicy(prop); 00191 initReadPolicy(prop); 00192 } 00193 00214 virtual size_t length(void) const 00215 { 00216 Guard guard(m_posmutex); 00217 return m_length; 00218 } 00219 00242 virtual ReturnCode length(size_t n) 00243 { 00244 m_buffer.resize(n); 00245 m_length = n; 00246 this->reset(); 00247 return ::RTC::BufferStatus::BUFFER_OK; //BUFFER_OK; 00248 } 00249 00272 virtual ReturnCode reset() 00273 { 00274 Guard guard(m_posmutex); 00275 m_fillcount = 0; 00276 m_wcount = 0; 00277 m_wpos = 0; 00278 m_rpos = 0; 00279 return ::RTC::BufferStatus::BUFFER_OK; 00280 } 00281 00282 00283 00284 //---------------------------------------------------------------------- 00305 virtual DataType* wptr(long int n = 0) 00306 { 00307 Guard guard(m_posmutex); 00308 return &m_buffer[(m_wpos + n + m_length) % m_length]; 00309 } 00310 00334 virtual ReturnCode advanceWptr(long int n = 1) 00335 { 00336 // n > 0 : 00337 // n satisfies n <= writable elements 00338 // n <= m_length - m_fillcout 00339 // n < 0 : -n = n' 00340 // n satisfies n'<= readable elements 00341 // n'<= m_fillcount 00342 // n >= - m_fillcount 00343 Guard guard(m_posmutex); 00344 if ((n > 0 && n > static_cast<long int>(m_length - m_fillcount)) || 00345 (n < 0 && n < static_cast<long int>(-m_fillcount))) 00346 { 00347 return ::RTC::BufferStatus::PRECONDITION_NOT_MET; 00348 } 00349 00350 m_wpos = (m_wpos + n + m_length) % m_length; 00351 m_fillcount += n; 00352 m_wcount += n; 00353 return ::RTC::BufferStatus::BUFFER_OK; 00354 } 00382 virtual ReturnCode put(const DataType& value) 00383 { 00384 Guard guard(m_posmutex); 00385 m_buffer[m_wpos] = value; 00386 return ::RTC::BufferStatus::BUFFER_OK; 00387 } 00388 00430 virtual ReturnCode write(const DataType& value, 00431 long int sec = -1, long int nsec = 0) 00432 { 00433 { 00434 Guard guard(m_full.mutex); 00435 00436 if (full()) 00437 { 00438 00439 bool timedwrite(m_timedwrite); 00440 bool overwrite(m_overwrite); 00441 00442 if (!(sec < 0)) // if second arg is set -> block mode 00443 { 00444 timedwrite = true; 00445 overwrite = false; 00446 } 00447 00448 if (overwrite && !timedwrite) // "overwrite" mode 00449 { 00450 advanceRptr(); 00451 } 00452 else if (!overwrite && !timedwrite) // "do_nothing" mode 00453 { 00454 return ::RTC::BufferStatus::BUFFER_FULL; 00455 } 00456 else if (!overwrite && timedwrite) // "block" mode 00457 { 00458 if (sec < 0) 00459 { 00460 sec = m_wtimeout.sec(); 00461 nsec = m_wtimeout.usec() * 1000; 00462 } 00463 // true: signaled, false: timeout 00464 if (!m_full.cond.wait(sec, nsec)) 00465 { 00466 return ::RTC::BufferStatus::TIMEOUT; 00467 } 00468 } 00469 else // unknown condition 00470 { 00471 return ::RTC::BufferStatus::PRECONDITION_NOT_MET; 00472 } 00473 } 00474 } 00475 00476 put(value); 00477 00478 { 00479 Guard eguard(m_empty.mutex); 00480 if (empty()) 00481 { 00482 // Guard eguard(m_empty.mutex); 00483 advanceWptr(1); 00484 m_empty.cond.signal(); 00485 } 00486 else 00487 { 00488 advanceWptr(1); 00489 } 00490 } 00491 return ::RTC::BufferStatus::BUFFER_OK; 00492 } 00493 00515 virtual size_t writable() const 00516 { 00517 Guard guard(m_posmutex); 00518 return m_length - m_fillcount; 00519 } 00520 00540 virtual bool full(void) const 00541 { 00542 Guard guard(m_posmutex); 00543 return m_length == m_fillcount; 00544 } 00545 00546 //---------------------------------------------------------------------- 00567 virtual DataType* rptr(long int n = 0) 00568 { 00569 Guard guard(m_posmutex); 00570 return &(m_buffer[(m_rpos + n + m_length) % m_length]); 00571 } 00572 00594 virtual ReturnCode advanceRptr(long int n = 1) 00595 { 00596 // n > 0 : 00597 // n satisfies n <= readable elements 00598 // n <= m_fillcout 00599 // n < 0 : -n = n' 00600 // n satisfies n'<= m_length - m_fillcount 00601 // n >= m_fillcount - m_length 00602 Guard guard(m_posmutex); 00603 if ((n > 0 && n > static_cast<long int>(m_fillcount)) || 00604 (n < 0 && n < static_cast<long int>(m_fillcount - m_length))) 00605 { 00606 return ::RTC::BufferStatus::PRECONDITION_NOT_MET; 00607 } 00608 00609 m_rpos = (m_rpos + n + m_length) % m_length; 00610 m_fillcount -= n; 00611 return ::RTC::BufferStatus::BUFFER_OK; 00612 } 00613 00638 virtual ReturnCode get(DataType& value) 00639 { 00640 Guard gaurd(m_posmutex); 00641 value = m_buffer[m_rpos]; 00642 return ::RTC::BufferStatus::BUFFER_OK; 00643 } 00644 00645 00663 virtual DataType& get() 00664 { 00665 Guard gaurd(m_posmutex); 00666 return m_buffer[m_rpos]; 00667 } 00668 00669 00711 virtual ReturnCode read(DataType& value, 00712 long int sec = -1, long int nsec = 0) 00713 { 00714 { 00715 Guard gaurd(m_empty.mutex); 00716 00717 if (empty()) 00718 { 00719 bool timedread(m_timedread); 00720 bool readback(m_readback); 00721 00722 if (!(sec < 0)) // if second arg is set -> block mode 00723 { 00724 timedread = true; 00725 readback = false; 00726 sec = m_rtimeout.sec(); 00727 nsec = m_rtimeout.usec() * 1000; 00728 } 00729 00730 if (readback && !timedread) // "readback" mode 00731 { 00732 if (!(m_wcount > 0)) 00733 { 00734 return ::RTC::BufferStatus::BUFFER_EMPTY; 00735 } 00736 advanceRptr(-1); 00737 } 00738 else if (!readback && !timedread) // "do_nothing" mode 00739 { 00740 return ::RTC::BufferStatus::BUFFER_EMPTY; 00741 } 00742 else if (!readback && timedread) // "block" mode 00743 { 00744 if (sec < 0) 00745 { 00746 sec = m_rtimeout.sec(); 00747 nsec = m_rtimeout.usec() * 1000; 00748 } 00749 // true: signaled, false: timeout 00750 if (!m_empty.cond.wait(sec, nsec)) 00751 { 00752 return ::RTC::BufferStatus::TIMEOUT; 00753 } 00754 } 00755 else // unknown condition 00756 { 00757 return ::RTC::BufferStatus::PRECONDITION_NOT_MET; 00758 } 00759 } 00760 } 00761 00762 get(value); 00763 00764 { 00765 Guard fguard(m_full.mutex); 00766 if (full()) 00767 { 00768 // Guard fguard(m_full.mutex); 00769 advanceRptr(1); 00770 m_full.cond.signal(); 00771 } 00772 else 00773 { 00774 advanceRptr(1); 00775 } 00776 } 00777 return ::RTC::BufferStatus::BUFFER_OK; 00778 } 00779 00804 virtual size_t readable() const 00805 { 00806 Guard guard(m_posmutex); 00807 return m_fillcount; 00808 } 00809 00829 virtual bool empty(void) const 00830 { 00831 Guard guard(m_posmutex); 00832 return m_fillcount == 0; 00833 } 00834 00835 private: 00836 inline void initLength(const coil::Properties& prop) 00837 { 00838 if (!prop["length"].empty()) 00839 { 00840 size_t n; 00841 if (coil::stringTo(n, prop["length"].c_str())) 00842 { 00843 if (n > 0) 00844 { 00845 this->length(n); 00846 } 00847 } 00848 } 00849 } 00850 00851 inline void initWritePolicy(const coil::Properties& prop) 00852 { 00853 std::string policy(prop["write.full_policy"]); 00854 coil::normalize(policy); 00855 if (policy == "overwrite") 00856 { 00857 m_overwrite = true; 00858 m_timedwrite = false; 00859 } 00860 else if (policy == "do_nothing") 00861 { 00862 m_overwrite = false; 00863 m_timedwrite = false; 00864 } 00865 else if (policy == "block") 00866 { 00867 m_overwrite = false; 00868 m_timedwrite = true; 00869 00870 double tm; 00871 if (coil::stringTo(tm, prop["write.timeout"].c_str())) 00872 { 00873 if (!(tm < 0)) 00874 { 00875 m_wtimeout = tm; 00876 } 00877 } 00878 } 00879 } 00880 00881 inline void initReadPolicy(const coil::Properties& prop) 00882 { 00883 std::string policy(prop["read.empty_policy"]); 00884 if (policy == "readback") 00885 { 00886 m_readback = true; 00887 m_timedread = false; 00888 } 00889 else if (policy == "do_nothing") 00890 { 00891 m_readback = false; 00892 m_timedread = false; 00893 } 00894 else if (policy == "block") 00895 { 00896 m_readback = false; 00897 m_timedread = true; 00898 double tm; 00899 if (coil::stringTo(tm, prop["read.timeout"].c_str())) 00900 { 00901 m_rtimeout = tm; 00902 } 00903 } 00904 } 00905 00906 private: 00914 bool m_overwrite; 00915 00923 bool m_readback; 00924 00932 bool m_timedwrite; 00940 bool m_timedread; 00941 00949 coil::TimeValue m_wtimeout; 00950 00958 coil::TimeValue m_rtimeout; 00959 00967 size_t m_length; 00968 00976 size_t m_wpos; 00977 00985 size_t m_rpos; 00986 00994 size_t m_fillcount; 00995 01003 size_t m_wcount; 01004 01012 std::vector<DataType> m_buffer; 01013 01021 struct condition 01022 { 01023 condition() : cond(mutex) {} 01024 coil::Condition<coil::Mutex> cond; 01025 coil::Mutex mutex; 01026 }; 01027 01035 mutable coil::Mutex m_posmutex; 01036 01044 condition m_empty; 01045 01053 condition m_full; 01054 }; 01055 }; // namespace RTC 01056 01057 #endif // RTC_RINGBUFFER_H