OpenRTM
1.0.0
|
00001 // -*- C++ -*- 00020 #ifndef RTC_PUBLISHERPERIODIC_H 00021 #define RTC_PUBLISHERPERIODIC_H 00022 00023 #include <coil/Task.h> 00024 #include <coil/Mutex.h> 00025 #include <coil/Condition.h> 00026 #include <coil/PeriodicTask.h> 00027 00028 #include <rtm/RTC.h> 00029 #include <rtm/PublisherBase.h> 00030 #include <rtm/CdrBufferBase.h> 00031 #include <rtm/SystemLogger.h> 00032 #include <rtm/ConnectorBase.h> 00033 #include <rtm/ConnectorListener.h> 00034 00035 namespace coil 00036 { 00037 class Properties; 00038 }; 00039 00040 namespace RTC 00041 { 00042 class InPortConsumer; 00064 class PublisherPeriodic 00065 : public PublisherBase 00066 { 00067 public: 00068 typedef coil::Mutex Mutex; 00069 typedef coil::Condition<Mutex> Condition; 00070 typedef coil::Guard<coil::Mutex> Guard; 00071 DATAPORTSTATUS_ENUM 00072 00082 PublisherPeriodic(void); 00083 00097 virtual ~PublisherPeriodic(void); 00098 00160 virtual ReturnCode init(coil::Properties& prop); 00161 00187 virtual ReturnCode setConsumer(InPortConsumer* consumer); 00188 00214 virtual ReturnCode setBuffer(CdrBufferBase* buffer); 00215 00249 virtual ReturnCode setListener(ConnectorInfo& info, 00250 ConnectorListeners* listeners); 00326 virtual ReturnCode write(const cdrMemoryStream& data, 00327 unsigned long sec, 00328 unsigned long usec); 00356 virtual bool isActive(); 00357 00383 virtual ReturnCode activate(); 00384 00410 virtual ReturnCode deactivate(); 00411 00425 virtual int svc(void); 00426 00427 protected: 00428 enum Policy 00429 { 00430 ALL, 00431 FIFO, 00432 SKIP, 00433 NEW 00434 }; 00435 00443 void setPushPolicy(const coil::Properties& prop); 00444 00452 bool createTask(const coil::Properties& prop); 00453 00457 ReturnCode pushAll(); 00458 00462 ReturnCode pushFifo(); 00463 00467 ReturnCode pushSkip(); 00468 00472 ReturnCode pushNew(); 00473 00529 ReturnCode convertReturn(BufferStatus::Enum status, 00530 const cdrMemoryStream& data); 00531 00532 00550 ReturnCode invokeListener(DataPortStatus::Enum status, 00551 const cdrMemoryStream& data); 00552 00562 inline void onBufferWrite(const cdrMemoryStream& data) 00563 { 00564 m_listeners-> 00565 connectorData_[ON_BUFFER_WRITE].notify(m_profile, data); 00566 } 00567 00577 inline void onBufferFull(const cdrMemoryStream& data) 00578 { 00579 m_listeners-> 00580 connectorData_[ON_BUFFER_FULL].notify(m_profile, data); 00581 } 00582 00592 inline void onBufferWriteTimeout(const cdrMemoryStream& data) 00593 { 00594 m_listeners-> 00595 connectorData_[ON_BUFFER_WRITE_TIMEOUT].notify(m_profile, data); 00596 } 00597 00607 inline void onBufferRead(const cdrMemoryStream& data) 00608 { 00609 m_listeners-> 00610 connectorData_[ON_BUFFER_READ].notify(m_profile, data); 00611 } 00612 00622 inline void onSend(const cdrMemoryStream& data) 00623 { 00624 m_listeners-> 00625 connectorData_[ON_SEND].notify(m_profile, data); 00626 } 00627 00637 inline void onReceived(const cdrMemoryStream& data) 00638 { 00639 m_listeners-> 00640 connectorData_[ON_RECEIVED].notify(m_profile, data); 00641 } 00642 00652 inline void onReceiverFull(const cdrMemoryStream& data) 00653 { 00654 m_listeners-> 00655 connectorData_[ON_RECEIVER_FULL].notify(m_profile, data); 00656 } 00657 00667 inline void onReceiverTimeout(const cdrMemoryStream& data) 00668 { 00669 m_listeners-> 00670 connectorData_[ON_RECEIVER_TIMEOUT].notify(m_profile, data); 00671 } 00672 00682 inline void onReceiverError(const cdrMemoryStream& data) 00683 { 00684 m_listeners-> 00685 connectorData_[ON_RECEIVER_ERROR].notify(m_profile, data); 00686 } 00687 00695 inline void onBufferEmpty() 00696 { 00697 m_listeners-> 00698 connector_[ON_BUFFER_EMPTY].notify(m_profile); 00699 } 00700 00708 inline void onSenderEmpty() 00709 { 00710 m_listeners-> 00711 connector_[ON_SENDER_EMPTY].notify(m_profile); 00712 } 00713 00721 inline void onSenderError() 00722 { 00723 m_listeners-> 00724 connector_[ON_SENDER_ERROR].notify(m_profile); 00725 } 00726 00727 00728 private: 00729 bool bufferIsEmpty() 00730 { 00731 if (m_buffer->empty() && !m_readback) 00732 { 00733 RTC_DEBUG(("buffer empty")); 00734 onBufferEmpty(); 00735 onSenderEmpty(); 00736 return true; 00737 } 00738 return false; 00739 } 00740 00741 Logger rtclog; 00742 InPortConsumer* m_consumer; 00743 CdrBufferBase* m_buffer; 00744 ConnectorInfo m_profile; 00745 coil::PeriodicTaskBase* m_task; 00746 ConnectorListeners* m_listeners; 00747 ReturnCode m_retcode; 00748 Mutex m_retmutex; 00749 Policy m_pushPolicy; 00750 int m_skipn; 00751 bool m_active; 00752 bool m_readback; 00753 int m_leftskip; 00754 }; 00755 }; // namespace RTC 00756 00757 extern "C" 00758 { 00759 void DLL_EXPORT PublisherPeriodicInit(); 00760 }; 00761 00762 #endif // RTC_PUBLISHERPERIODIC_H 00763