diff options
Diffstat (limited to 'ops/posix_mq.rs/src')
-rw-r--r-- | ops/posix_mq.rs/src/error.rs | 122 | ||||
-rw-r--r-- | ops/posix_mq.rs/src/lib.rs | 247 | ||||
-rw-r--r-- | ops/posix_mq.rs/src/tests.rs | 21 |
3 files changed, 390 insertions, 0 deletions
diff --git a/ops/posix_mq.rs/src/error.rs b/ops/posix_mq.rs/src/error.rs new file mode 100644 index 000000000000..bacd2aeb39e0 --- /dev/null +++ b/ops/posix_mq.rs/src/error.rs @@ -0,0 +1,122 @@ +use nix; +use std::{error, fmt, io, num}; + +/// This module implements a simple error type to match the errors that can be thrown from the C +/// functions as well as some extra errors resulting from internal validations. +/// +/// As this crate exposes an opinionated API to the POSIX queues certain errors have been +/// ignored: +/// +/// * ETIMEDOUT: The low-level timed functions are not exported and this error can not occur. +/// * EAGAIN: Non-blocking queue calls are not supported. +/// * EINVAL: Same reason as ETIMEDOUT +/// * EMSGSIZE: The message size is immutable after queue creation and this crate checks it. +/// * ENAMETOOLONG: This crate performs name validation +/// +/// If an unexpected error is encountered it will be wrapped appropriately and should be reported +/// as a bug on https://b.tvl.fyi + +#[derive(Debug)] +pub enum Error { + // These errors are raised inside of the library + InvalidQueueName(&'static str), + ValueReadingError(io::Error), + MessageSizeExceeded(), + MaximumMessageSizeExceeded(), + MaximumMessageCountExceeded(), + + // These errors match what is described in the man pages (from mq_overview(7) onwards). + PermissionDenied(), + InvalidQueueDescriptor(), + QueueCallInterrupted(), + QueueAlreadyExists(), + QueueNotFound(), + InsufficientMemory(), + InsufficientSpace(), + + // These two are (hopefully) unlikely in modern systems + ProcessFileDescriptorLimitReached(), + SystemFileDescriptorLimitReached(), + + // If an unhandled / unknown / unexpected error occurs this error will be used. + // In those cases bug reports would be welcome! + UnknownForeignError(nix::errno::Errno), + + // Some other unexpected / unknown error occured. This is probably an error from + // the nix crate. Bug reports also welcome for this! + UnknownInternalError(), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use Error::*; + f.write_str(match *self { + // This error contains more sensible description strings already + InvalidQueueName(e) => e, + ValueReadingError(_) => "error reading system configuration for message queues", + MessageSizeExceeded() => "message is larger than maximum size for specified queue", + MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum", + MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum", + PermissionDenied() => "permission to the specified queue was denied", + InvalidQueueDescriptor() => "the internal queue descriptor was invalid", + QueueCallInterrupted() => "queue method interrupted by signal", + QueueAlreadyExists() => "the specified queue already exists", + QueueNotFound() => "the specified queue could not be found", + InsufficientMemory() => "insufficient memory to call queue method", + InsufficientSpace() => "insufficient space to call queue method", + ProcessFileDescriptorLimitReached() => { + "maximum number of process file descriptors reached" + } + SystemFileDescriptorLimitReached() => { + "maximum number of system file descriptors reached" + } + UnknownForeignError(_) => "unknown foreign error occured: please report a bug!", + UnknownInternalError() => "unknown internal error occured: please report a bug!", + }) + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + Error::ValueReadingError(e) => Some(e), + Error::UnknownForeignError(e) => Some(e), + _ => None, + } + } +} + +/// This from implementation is used to translate errors from the lower-level +/// C-calls into sensible Rust errors. +impl From<nix::errno::Errno> for Error { + fn from(err: nix::Error) -> Self { + use nix::errno::Errno::*; + match err { + EACCES => Error::PermissionDenied(), + EBADF => Error::InvalidQueueDescriptor(), + EINTR => Error::QueueCallInterrupted(), + EEXIST => Error::QueueAlreadyExists(), + EMFILE => Error::ProcessFileDescriptorLimitReached(), + ENFILE => Error::SystemFileDescriptorLimitReached(), + ENOENT => Error::QueueNotFound(), + ENOMEM => Error::InsufficientMemory(), + ENOSPC => Error::InsufficientSpace(), + _ => Error::UnknownForeignError(err), + } + } +} + +// This implementation is used when reading system queue settings. +impl From<io::Error> for Error { + fn from(e: io::Error) -> Self { + Error::ValueReadingError(e) + } +} + +// This implementation is used when parsing system queue settings. The unknown error is returned +// here because the system is probably seriously broken if those files don't contain numbers. +impl From<num::ParseIntError> for Error { + fn from(_: num::ParseIntError) -> Self { + Error::UnknownInternalError() + } +} diff --git a/ops/posix_mq.rs/src/lib.rs b/ops/posix_mq.rs/src/lib.rs new file mode 100644 index 000000000000..ed35fb03be82 --- /dev/null +++ b/ops/posix_mq.rs/src/lib.rs @@ -0,0 +1,247 @@ +extern crate libc; +extern crate nix; + +use error::Error; +use libc::mqd_t; +use nix::mqueue; +use nix::sys::stat; +use std::ffi::CString; +use std::fs::File; +use std::io::Read; +use std::ops::Drop; +use std::string::ToString; + +pub mod error; + +#[cfg(test)] +mod tests; + +/// Wrapper type for queue names that performs basic validation of queue names before calling +/// out to C code. +#[derive(Debug, Clone, PartialEq)] +pub struct Name(CString); + +impl Name { + pub fn new<S: ToString>(s: S) -> Result<Self, Error> { + let string = s.to_string(); + + if !string.starts_with('/') { + return Err(Error::InvalidQueueName("Queue name must start with '/'")); + } + + // The C library has a special error return for this case, so I assume people must actually + // have tried just using '/' as a queue name. + if string.len() == 1 { + return Err(Error::InvalidQueueName( + "Queue name must be a slash followed by one or more characters", + )); + } + + if string.len() > 255 { + return Err(Error::InvalidQueueName( + "Queue name must not exceed 255 characters", + )); + } + + if string.matches('/').count() > 1 { + return Err(Error::InvalidQueueName( + "Queue name can not contain more than one slash", + )); + } + + // TODO: What error is being thrown away here? Is it possible? + Ok(Name(CString::new(string).unwrap())) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Message { + pub data: Vec<u8>, + pub priority: u32, +} + +/// Represents an open queue descriptor to a POSIX message queue. This carries information +/// about the queue's limitations (i.e. maximum message size and maximum message count). +#[derive(Debug)] +pub struct Queue { + name: Name, + + /// Internal file/queue descriptor. + queue_descriptor: mqd_t, + + /// Maximum number of pending messages in this queue. + max_pending: i64, + + /// Maximum size of this queue. + max_size: usize, +} + +impl Queue { + /// Creates a new queue and fails if it already exists. + /// By default the queue will be read/writable by the current user with no access for other + /// users. + /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue. + pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> { + if max_pending > read_i64_from_file(MSG_MAX)? { + return Err(Error::MaximumMessageCountExceeded()); + } + + if max_size > read_i64_from_file(MSGSIZE_MAX)? { + return Err(Error::MaximumMessageSizeExceeded()); + } + + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::MQ_OFlag::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::MQ_OFlag::O_CREAT); + // Fail if queue exists already + flags.toggle(mqueue::MQ_OFlag::O_EXCL); + flags + }; + + let attr = mqueue::MqAttr::new(0, max_pending, max_size, 0); + + let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?; + + Ok(Queue { + name, + queue_descriptor, + max_pending, + max_size: max_size as usize, + }) + } + + /// Opens an existing queue. + pub fn open(name: Name) -> Result<Queue, Error> { + // No extra flags need to be constructed as the default is to open and fail if the + // queue does not exist yet - which is what we want here. + let oflags = mqueue::MQ_OFlag::O_RDWR; + let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), None)?; + + let attr = mq_getattr(queue_descriptor)?; + + Ok(Queue { + name, + queue_descriptor, + max_pending: attr.mq_maxmsg, + max_size: attr.mq_msgsize as usize, + }) + } + + /// Opens an existing queue or creates a new queue with the OS default settings. + pub fn open_or_create(name: Name) -> Result<Queue, Error> { + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::MQ_OFlag::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::MQ_OFlag::O_CREAT); + flags + }; + + let default_pending = read_i64_from_file(MSG_DEFAULT)?; + let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?; + let attr = mqueue::MqAttr::new(0, default_pending, default_size, 0); + + let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?; + + let actual_attr = mq_getattr(queue_descriptor)?; + + Ok(Queue { + name, + queue_descriptor, + max_pending: actual_attr.mq_maxmsg, + max_size: actual_attr.mq_msgsize as usize, + }) + } + + /// Delete a message queue from the system. This method will make the queue unavailable for + /// other processes after their current queue descriptors have been closed. + pub fn delete(self) -> Result<(), Error> { + mqueue::mq_unlink(&self.name.0)?; + drop(self); + Ok(()) + } + + /// Send a message to the message queue. + /// If the queue is full this call will block until a message has been consumed. + pub fn send(&self, msg: &Message) -> Result<(), Error> { + if msg.data.len() > self.max_size as usize { + return Err(Error::MessageSizeExceeded()); + } + + mqueue::mq_send(self.queue_descriptor, msg.data.as_ref(), msg.priority) + .map_err(|e| e.into()) + } + + /// Receive a message from the message queue. + /// If the queue is empty this call will block until a message arrives. + pub fn receive(&self) -> Result<Message, Error> { + let mut data: Vec<u8> = vec![0; self.max_size as usize]; + let mut priority: u32 = 0; + + let msg_size = mqueue::mq_receive(self.queue_descriptor, data.as_mut(), &mut priority)?; + + data.truncate(msg_size); + Ok(Message { data, priority }) + } + + pub fn max_pending(&self) -> i64 { + self.max_pending + } + + pub fn max_size(&self) -> usize { + self.max_size + } +} + +impl Drop for Queue { + fn drop(&mut self) { + // Attempt to close the queue descriptor and discard any possible errors. + // The only error thrown in the C-code is EINVAL, which would mean that the + // descriptor has already been closed. + mqueue::mq_close(self.queue_descriptor).ok(); + } +} + +// Creates the default queue mode (0600). +fn default_mode() -> stat::Mode { + let mut mode = stat::Mode::empty(); + mode.toggle(stat::Mode::S_IRUSR); + mode.toggle(stat::Mode::S_IWUSR); + mode +} + +/// This file defines the default number of maximum pending messages in a queue. +const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default"; + +/// This file defines the system maximum number of pending messages in a queue. +const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max"; + +/// This file defines the default maximum size of messages in a queue. +const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default"; + +/// This file defines the system maximum size for messages in a queue. +const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max"; + +/// This method is used in combination with the above constants to find system limits. +fn read_i64_from_file(name: &str) -> Result<i64, Error> { + let mut file = File::open(name.to_string())?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + Ok(content.trim().parse()?) +} + +/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which +/// is very impractical. +/// To work around it, this method calls the C-function directly. +fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> { + use std::mem; + let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit(); + let res = unsafe { libc::mq_getattr(mqd, attr.as_mut_ptr()) }; + nix::errno::Errno::result(res) + .map(|_| unsafe { attr.assume_init() }) + .map_err(|e| e.into()) +} diff --git a/ops/posix_mq.rs/src/tests.rs b/ops/posix_mq.rs/src/tests.rs new file mode 100644 index 000000000000..1f4ea9a58da6 --- /dev/null +++ b/ops/posix_mq.rs/src/tests.rs @@ -0,0 +1,21 @@ +use super::*; + +#[test] +fn test_open_delete() { + // Simple test with default queue settings + let name = Name::new("/test-queue").unwrap(); + let queue = Queue::open_or_create(name).expect("Opening queue failed"); + + let message = Message { + data: "test-message".as_bytes().to_vec(), + priority: 0, + }; + + queue.send(&message).expect("message sending failed"); + + let result = queue.receive().expect("message receiving failed"); + + assert_eq!(message, result); + + queue.delete().expect("deleting queue failed"); +} |