/**
* @fileoverview
* @author Brandon Alexander - baalexander@gmail.com
*/
var EventEmitter2 = require('eventemitter2').EventEmitter2;
var Message = require('./Message');
/**
* Publish and/or subscribe to a topic in ROS.
*
* Emits the following events:
* * 'warning' - if there are any warning during the Topic creation
* * 'message' - the message data from rosbridge
*
* @constructor
* @param options - object with following keys:
* * ros - the ROSLIB.Ros connection handle
* * name - the topic name, like /cmd_vel
* * messageType - the message type, like 'std_msgs/String'
* * compression - the type of compression to use, like 'png'
* * throttle_rate - the rate (in ms in between messages) at which to throttle the topics
* * queue_size - the queue created at bridge side for re-publishing webtopics (defaults to 100)
* * latch - latch the topic when publishing
* * queue_length - the queue length at bridge side used when subscribing (defaults to 0, no queueing).
*/
function Topic(options) {
options = options || {};
this.ros = options.ros;
this.name = options.name;
this.messageType = options.messageType;
this.isAdvertised = false;
this.compression = options.compression || 'none';
this.throttle_rate = options.throttle_rate || 0;
this.latch = options.latch || false;
this.queue_size = options.queue_size || 100;
this.queue_length = options.queue_length || 0;
// Check for valid compression types
if (this.compression && this.compression !== 'png' &&
this.compression !== 'none') {
this.emit('warning', this.compression +
' compression is not supported. No compression will be used.');
}
// Check if throttle rate is negative
if (this.throttle_rate < 0) {
this.emit('warning', this.throttle_rate + ' is not allowed. Set to 0');
this.throttle_rate = 0;
}
var that = this;
this._messageCallback = function(data) {
that.emit('message', new Message(data));
};
}
Topic.prototype.__proto__ = EventEmitter2.prototype;
/**
* Every time a message is published for the given topic, the callback
* will be called with the message object.
*
* @param callback - function with the following params:
* * message - the published message
*/
Topic.prototype.subscribe = function(callback) {
if (typeof callback === 'function') {
this.on('message', callback);
}
if (this.subscribeId) { return; }
this.ros.on(this.name, this._messageCallback);
this.subscribeId = 'subscribe:' + this.name + ':' + (++this.ros.idCounter);
this.ros.callOnConnection({
op: 'subscribe',
id: this.subscribeId,
type: this.messageType,
topic: this.name,
compression: this.compression,
throttle_rate: this.throttle_rate,
queue_length: this.queue_length
});
};
/**
* Unregisters as a subscriber for the topic. Unsubscribing stop remove
* all subscribe callbacks. To remove a call back, you must explicitly
* pass the callback function in.
*
* @param callback - the optional callback to unregister, if
* * provided and other listeners are registered the topic won't
* * unsubscribe, just stop emitting to the passed listener
*/
Topic.prototype.unsubscribe = function(callback) {
if (callback) {
this.off('message', callback);
// If there is any other callbacks still subscribed don't unsubscribe
if (this.listeners('message').length) { return; }
}
if (!this.subscribeId) { return; }
// Note: Don't call this.removeAllListeners, allow client to handle that themselves
this.ros.off(this.name, this._messageCallback);
this.emit('unsubscribe');
this.ros.callOnConnection({
op: 'unsubscribe',
id: this.subscribeId,
topic: this.name
});
this.subscribeId = null;
};
/**
* Registers as a publisher for the topic.
*/
Topic.prototype.advertise = function() {
if (this.isAdvertised) {
return;
}
this.advertiseId = 'advertise:' + this.name + ':' + (++this.ros.idCounter);
this.ros.callOnConnection({
op: 'advertise',
id: this.advertiseId,
type: this.messageType,
topic: this.name,
latch: this.latch,
queue_size: this.queue_size
});
this.isAdvertised = true;
};
/**
* Unregisters as a publisher for the topic.
*/
Topic.prototype.unadvertise = function() {
if (!this.isAdvertised) {
return;
}
this.emit('unadvertise');
this.ros.callOnConnection({
op: 'unadvertise',
id: this.advertiseId,
topic: this.name
});
this.isAdvertised = false;
};
/**
* Publish the message.
*
* @param message - A ROSLIB.Message object.
*/
Topic.prototype.publish = function(message) {
if (!this.isAdvertised) {
this.advertise();
}
this.ros.idCounter++;
var call = {
op: 'publish',
id: 'publish:' + this.name + ':' + this.ros.idCounter,
topic: this.name,
msg: message,
latch: this.latch
};
this.ros.callOnConnection(call);
};
module.exports = Topic;