diff options
author | Vincent Ambo <tazjin@gmail.com> | 2017-10-15T23·08+0200 |
---|---|---|
committer | Vincent Ambo <tazjin@gmail.com> | 2017-10-15T23·08+0200 |
commit | 7dc6144e3fe53d611a8a83ad78fed1dc37c785e6 (patch) | |
tree | d451d4a76f60141393fcf6a6551880c1b52204e4 /src/lib.rs | |
parent | 1f1a74108e74a50c39eeac37bb3a91cb49c1d35d (diff) |
feat: Implement high-level POSIX message queue API
Implements a high-level API on top of POSIX message queues (mq_overview(7)). This API can be used to perform local RPC between processes that need to exchange messages *fast* (or *easy*) with priority ordering. The methods are mostly documented but there are still two corner cases that need to be looked at and a lot of tests missing.
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 261 |
1 files changed, 257 insertions, 4 deletions
diff --git a/src/lib.rs b/src/lib.rs index 31e1bb209f98..d84d84526995 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,260 @@ +extern crate nix; +extern crate libc; + +use nix::mqueue; +use nix::sys::stat; +use std::ffi::CString; +use libc::mqd_t; +use error::Error; +use std::string::ToString; +use std::fs::File; +use std::io::Read; + +mod error; + #[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); +mod tests; + +/* +TODO: + +* what happens if permissions change after FD was opened? +* drop dependency on nix crate? + +*/ + +/// Wrapper type for queue names that performs basic validation of queue names before calling +/// out to C code. +pub struct Name(CString); + +impl Name { + pub fn new<S: ToString>(s: S) -> Result<Self, Error> { + let string = s.to_string(); + + if !string.starts_with('/') { + return Err(Error::InvalidQueueName("Queue name must not start with '/'")) + } + + // The C library has a special error return for this case, so I assume people must actually + // have tried just using '/' as a queue name. + if string.len() == 1 { + return Err(Error::InvalidQueueName( + "Queue name must be a slash followed by one or more characters" + )) + } + + if string.len() > 255 { + return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters")) + } + + if string.matches('/').count() > 1 { + return Err(Error::InvalidQueueName("Queue name can not contain more than one slash")) + } + + // TODO: What error is being thrown away here? Is it possible? + Ok(Name(CString::new(string).unwrap())) } } + +#[derive(Debug)] +pub struct Message { + pub data: Vec<u8>, + pub priority: u32, +} + +/// Represents an open queue descriptor to a POSIX message queue. This carries information +/// about the queue's limitations (i.e. maximum message size and maximum message count). +pub struct Queue { + name: Name, + + /// Internal file/queue descriptor. + queue_descriptor: mqd_t, + + /// Maximum number of pending messages in this queue. + max_pending: i64, + + /// Maximum size of this queue. + max_size: usize, +} + +impl Queue { + /// Creates a new queue and fails if it already exists. + /// By default the queue will be read/writable by the current user with no access for other + /// users. + /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue. + pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> { + if max_pending > read_i64_from_file(MSG_MAX)? { + return Err(Error::MaximumMessageCountExceeded()) + } + + if max_size > read_i64_from_file(MSGSIZE_MAX)? { + return Err(Error::MaximumMessageSizeExceeded()) + } + + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::O_CREAT); + // Fail if queue exists already + flags.toggle(mqueue::O_EXCL); + flags + }; + + let attr = mqueue::MqAttr::new( + 0, max_pending, max_size, 0 + ); + + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + Some(&attr), + )?; + + Ok(Queue { + name, + queue_descriptor, + max_pending, + max_size: max_size as usize, + }) + } + + /// Opens an existing queue. + pub fn open(name: Name) -> Result<Queue, Error> { + // No extra flags need to be constructed as the default is to open and fail if the + // queue does not exist yet - which is what we want here. + let oflags = mqueue::O_RDWR; + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + None, + )?; + + let attr = mq_getattr(queue_descriptor)?; + + Ok(Queue{ + name, + queue_descriptor, + max_pending: attr.mq_maxmsg, + max_size: attr.mq_msgsize as usize, + }) + } + + /// Opens an existing queue or creates a new queue with the OS default settings. + pub fn open_or_create(name: Name) -> Result<Queue, Error> { + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::O_CREAT); + flags + }; + + let default_pending = read_i64_from_file(MSG_DEFAULT)?; + let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?; + let attr = mqueue::MqAttr::new( + 0, default_pending, default_size, 0 + ); + + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + Some(&attr), + )?; + + let actual_attr = mq_getattr(queue_descriptor)?; + + Ok(Queue { + name, + queue_descriptor, + max_pending: actual_attr.mq_maxmsg, + max_size: actual_attr.mq_msgsize as usize, + }) + } + + /// Delete a message queue from the system. This method will make the queue unavailable for + /// other processes, too! + pub fn delete(self) -> Result<(), Error> { + mqueue::mq_unlink(&self.name.0)?; + Ok(()) + } + + pub fn send(self, msg: Message) -> Result<(), Error> { + if msg.data.len() > self.max_size as usize { + return Err(Error::MessageSizeExceeded()) + } + + mqueue::mq_send( + self.queue_descriptor.clone(), + msg.data.as_ref(), + msg.priority, + ).map_err(|e| e.into()) + } + + pub fn receive(self) -> Result<Message, Error> { + let mut data: Vec<u8> = vec![0; self.max_size as usize]; + let mut priority: u32 = 0; + + let msg_size = mqueue::mq_receive( + self.queue_descriptor.clone(), + data.as_mut(), + &mut priority, + )?; + + data.truncate(msg_size); + Ok(Message { data, priority }) + } + + pub fn max_pending(&self) -> i64 { + self.max_pending + } + + pub fn max_size(&self) -> usize { + self.max_size + } +} + +// Creates the default queue mode (0600). +fn default_mode() -> stat::Mode { + let mut mode = stat::Mode::empty(); + mode.toggle(stat::S_IRUSR); + mode.toggle(stat::S_IWUSR); + mode +} + +/// This file defines the default number of maximum pending messages in a queue. +const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default"; + +/// This file defines the system maximum number of pending messages in a queue. +const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max"; + +/// This file defines the default maximum size of messages in a queue. +const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default"; + +/// This file defines the system maximum size for messages in a queue. +const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max"; + +/// This method is used in combination with the above constants to find system limits. +fn read_i64_from_file(name: &str) -> Result<i64, Error> { + let mut file = File::open(name.to_string())?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + Ok(content.parse()?) +} + +/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which +/// is very impractical. +/// To work around it, this method calls the C-function directly. +fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> { + use std::mem; + let mut attr = unsafe { mem::uninitialized::<libc::mq_attr>() }; + let res = unsafe { libc::mq_getattr(mqd, &mut attr) }; + nix::Errno::result(res) + .map(|_| attr) + .map_err(|e| e.into()) +} |