about summary refs log tree commit diff
path: root/ops/posix_mq.rs/src
diff options
context:
space:
mode:
Diffstat (limited to 'ops/posix_mq.rs/src')
-rw-r--r--ops/posix_mq.rs/src/error.rs130
-rw-r--r--ops/posix_mq.rs/src/lib.rs269
-rw-r--r--ops/posix_mq.rs/src/tests.rs22
3 files changed, 421 insertions, 0 deletions
diff --git a/ops/posix_mq.rs/src/error.rs b/ops/posix_mq.rs/src/error.rs
new file mode 100644
index 000000000000..1ef585c01efb
--- /dev/null
+++ b/ops/posix_mq.rs/src/error.rs
@@ -0,0 +1,130 @@
+use nix;
+use std::error;
+use std::fmt;
+use std::io;
+use std::num;
+
+/// This module implements a simple error type to match the errors that can be thrown from the C
+/// functions as well as some extra errors resulting from internal validations.
+///
+/// As this crate exposes an opinionated API to the POSIX queues certain errors have been
+/// ignored:
+///
+/// * ETIMEDOUT: The low-level timed functions are not exported and this error can not occur.
+/// * EAGAIN: Non-blocking queue calls are not supported.
+/// * EINVAL: Same reason as ETIMEDOUT
+/// * EMSGSIZE: The message size is immutable after queue creation and this crate checks it.
+/// * ENAMETOOLONG: This crate performs name validation
+///
+/// If an unexpected error is encountered it will be wrapped appropriately and should be reported
+/// as a bug on https://github.com/aprilabank/posix_mq.rs
+
+#[derive(Debug)]
+pub enum Error {
+    // These errors are raised inside of the library
+    InvalidQueueName(&'static str),
+    ValueReadingError(io::Error),
+    MessageSizeExceeded(),
+    MaximumMessageSizeExceeded(),
+    MaximumMessageCountExceeded(),
+
+    // These errors match what is described in the man pages (from mq_overview(7) onwards).
+    PermissionDenied(),
+    InvalidQueueDescriptor(),
+    QueueCallInterrupted(),
+    QueueAlreadyExists(),
+    QueueNotFound(),
+    InsufficientMemory(),
+    InsufficientSpace(),
+
+    // These two are (hopefully) unlikely in modern systems
+    ProcessFileDescriptorLimitReached(),
+    SystemFileDescriptorLimitReached(),
+
+    // If an unhandled / unknown / unexpected error occurs this error will be used.
+    // In those cases bug reports would be welcome!
+    UnknownForeignError(nix::errno::Errno),
+
+    // Some other unexpected / unknown error occured. This is probably an error from
+    // the nix crate. Bug reports also welcome for this!
+    UnknownInternalError(Option<nix::Error>),
+}
+
+impl error::Error for Error {
+    fn description(&self) -> &str {
+        use Error::*;
+        match *self {
+            // This error contains more sensible description strings already
+            InvalidQueueName(e) => e,
+            ValueReadingError(_) => "error reading system configuration for message queues",
+            MessageSizeExceeded() => "message is larger than maximum size for specified queue",
+            MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum",
+            MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum",
+            PermissionDenied() => "permission to the specified queue was denied",
+            InvalidQueueDescriptor() => "the internal queue descriptor was invalid",
+            QueueCallInterrupted() => "queue method interrupted by signal",
+            QueueAlreadyExists() => "the specified queue already exists",
+            QueueNotFound() => "the specified queue could not be found",
+            InsufficientMemory() => "insufficient memory to call queue method",
+            InsufficientSpace() => "insufficient space to call queue method",
+            ProcessFileDescriptorLimitReached() =>
+                "maximum number of process file descriptors reached",
+            SystemFileDescriptorLimitReached() =>
+                "maximum number of system file descriptors reached",
+            UnknownForeignError(_) => "unknown foreign error occured: please report a bug!",
+            UnknownInternalError(_) => "unknown internal error occured: please report a bug!",
+        }
+    }
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // Explicitly import this to gain access to Error::description()
+        use std::error::Error;
+        f.write_str(self.description())
+    }
+}
+
+/// This from implementation is used to translate errors from the lower-level
+/// C-calls into sensible Rust errors.
+impl From<nix::Error> for Error {
+    fn from(e: nix::Error) -> Self {
+        match e {
+            nix::Error::Sys(e) => match_errno(e),
+            _ => Error::UnknownInternalError(Some(e)),
+        }
+    }
+}
+
+// This implementation is used when reading system queue settings.
+impl From<io::Error> for Error {
+    fn from(e: io::Error) -> Self {
+        Error::ValueReadingError(e)
+    }
+}
+
+// This implementation is used when parsing system queue settings. The unknown error is returned
+// here because the system is probably seriously broken if those files don't contain numbers.
+impl From<num::ParseIntError> for Error {
+    fn from(_: num::ParseIntError) -> Self {
+        Error::UnknownInternalError(None)
+    }
+}
+
+
+fn match_errno(err: nix::errno::Errno) -> Error {
+    use nix::errno::Errno::*;
+
+    match err {
+        EACCES => Error::PermissionDenied(),
+        EBADF  => Error::InvalidQueueDescriptor(),
+        EINTR  => Error::QueueCallInterrupted(),
+        EEXIST => Error::QueueAlreadyExists(),
+        EMFILE => Error::ProcessFileDescriptorLimitReached(),
+        ENFILE => Error::SystemFileDescriptorLimitReached(),
+        ENOENT => Error::QueueNotFound(),
+        ENOMEM => Error::InsufficientMemory(),
+        ENOSPC => Error::InsufficientSpace(),
+        _      => Error::UnknownForeignError(err),
+    }
+}
diff --git a/ops/posix_mq.rs/src/lib.rs b/ops/posix_mq.rs/src/lib.rs
new file mode 100644
index 000000000000..057601eccfbd
--- /dev/null
+++ b/ops/posix_mq.rs/src/lib.rs
@@ -0,0 +1,269 @@
+extern crate nix;
+extern crate libc;
+
+use error::Error;
+use libc::mqd_t;
+use nix::mqueue;
+use nix::sys::stat;
+use std::ffi::CString;
+use std::fs::File;
+use std::io::Read;
+use std::string::ToString;
+use std::ops::Drop;
+
+pub mod error;
+
+#[cfg(test)]
+mod tests;
+
+/// Wrapper type for queue names that performs basic validation of queue names before calling
+/// out to C code.
+#[derive(Debug, Clone, PartialEq)]
+pub struct Name(CString);
+
+impl Name {
+    pub fn new<S: ToString>(s: S) -> Result<Self, Error> {
+        let string = s.to_string();
+
+        if !string.starts_with('/') {
+            return Err(Error::InvalidQueueName("Queue name must start with '/'"));
+        }
+
+        // The C library has a special error return for this case, so I assume people must actually
+        // have tried just using '/' as a queue name.
+        if string.len() == 1 {
+            return Err(Error::InvalidQueueName(
+                "Queue name must be a slash followed by one or more characters"
+            ));
+        }
+
+        if string.len() > 255 {
+            return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters"));
+        }
+
+        if string.matches('/').count() > 1 {
+            return Err(Error::InvalidQueueName("Queue name can not contain more than one slash"));
+        }
+
+        // TODO: What error is being thrown away here? Is it possible?
+        Ok(Name(CString::new(string).unwrap()))
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Message {
+    pub data: Vec<u8>,
+    pub priority: u32,
+}
+
+/// Represents an open queue descriptor to a POSIX message queue. This carries information
+/// about the queue's limitations (i.e. maximum message size and maximum message count).
+#[derive(Debug)]
+pub struct Queue {
+    name: Name,
+
+    /// Internal file/queue descriptor.
+    queue_descriptor: mqd_t,
+
+    /// Maximum number of pending messages in this queue.
+    max_pending: i64,
+
+    /// Maximum size of this queue.
+    max_size: usize,
+}
+
+impl Queue {
+    /// Creates a new queue and fails if it already exists.
+    /// By default the queue will be read/writable by the current user with no access for other
+    /// users.
+    /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue.
+    pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> {
+        if max_pending > read_i64_from_file(MSG_MAX)? {
+            return Err(Error::MaximumMessageCountExceeded());
+        }
+
+        if max_size > read_i64_from_file(MSGSIZE_MAX)? {
+            return Err(Error::MaximumMessageSizeExceeded());
+        }
+
+        let oflags = {
+            let mut flags = mqueue::MQ_OFlag::empty();
+            // Put queue in r/w mode
+            flags.toggle(mqueue::MQ_OFlag::O_RDWR);
+            // Enable queue creation
+            flags.toggle(mqueue::MQ_OFlag::O_CREAT);
+            // Fail if queue exists already
+            flags.toggle(mqueue::MQ_OFlag::O_EXCL);
+            flags
+        };
+
+        let attr = mqueue::MqAttr::new(
+            0, max_pending, max_size, 0
+        );
+
+        let queue_descriptor = mqueue::mq_open(
+            &name.0,
+            oflags,
+            default_mode(),
+            Some(&attr),
+        )?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending,
+            max_size: max_size as usize,
+        })
+    }
+
+    /// Opens an existing queue.
+    pub fn open(name: Name) -> Result<Queue, Error> {
+        // No extra flags need to be constructed as the default is to open and fail if the
+        // queue does not exist yet - which is what we want here.
+        let oflags = mqueue::MQ_OFlag::O_RDWR;
+        let queue_descriptor = mqueue::mq_open(
+            &name.0,
+            oflags,
+            default_mode(),
+            None,
+        )?;
+
+        let attr = mq_getattr(queue_descriptor)?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending: attr.mq_maxmsg,
+            max_size: attr.mq_msgsize as usize,
+        })
+    }
+
+    /// Opens an existing queue or creates a new queue with the OS default settings.
+    pub fn open_or_create(name: Name) -> Result<Queue, Error> {
+        let oflags = {
+            let mut flags = mqueue::MQ_OFlag::empty();
+            // Put queue in r/w mode
+            flags.toggle(mqueue::MQ_OFlag::O_RDWR);
+            // Enable queue creation
+            flags.toggle(mqueue::MQ_OFlag::O_CREAT);
+            flags
+        };
+
+        let default_pending = read_i64_from_file(MSG_DEFAULT)?;
+        let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?;
+        let attr = mqueue::MqAttr::new(
+            0, default_pending, default_size, 0
+        );
+
+        let queue_descriptor = mqueue::mq_open(
+            &name.0,
+            oflags,
+            default_mode(),
+            Some(&attr),
+        )?;
+
+        let actual_attr = mq_getattr(queue_descriptor)?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending: actual_attr.mq_maxmsg,
+            max_size: actual_attr.mq_msgsize as usize,
+        })
+    }
+
+    /// Delete a message queue from the system. This method will make the queue unavailable for
+    /// other processes after their current queue descriptors have been closed.
+    pub fn delete(self) -> Result<(), Error> {
+        mqueue::mq_unlink(&self.name.0)?;
+        drop(self);
+        Ok(())
+    }
+
+    /// Send a message to the message queue.
+    /// If the queue is full this call will block until a message has been consumed.
+    pub fn send(&self, msg: &Message) -> Result<(), Error> {
+        if msg.data.len() > self.max_size as usize {
+            return Err(Error::MessageSizeExceeded());
+        }
+
+        mqueue::mq_send(
+            self.queue_descriptor,
+            msg.data.as_ref(),
+            msg.priority,
+        ).map_err(|e| e.into())
+    }
+
+    /// Receive a message from the message queue.
+    /// If the queue is empty this call will block until a message arrives.
+    pub fn receive(&self) -> Result<Message, Error> {
+        let mut data: Vec<u8> = vec![0; self.max_size as usize];
+        let mut priority: u32 = 0;
+
+        let msg_size = mqueue::mq_receive(
+            self.queue_descriptor,
+            data.as_mut(),
+            &mut priority,
+        )?;
+
+        data.truncate(msg_size);
+        Ok(Message { data, priority })
+    }
+
+    pub fn max_pending(&self) -> i64 {
+        self.max_pending
+    }
+
+    pub fn max_size(&self) -> usize {
+        self.max_size
+    }
+}
+
+impl Drop for Queue {
+    fn drop(&mut self) {
+        // Attempt to close the queue descriptor and discard any possible errors.
+        // The only error thrown in the C-code is EINVAL, which would mean that the
+        // descriptor has already been closed.
+        mqueue::mq_close(self.queue_descriptor).ok();
+    }
+}
+
+// Creates the default queue mode (0600).
+fn default_mode() -> stat::Mode {
+    let mut mode = stat::Mode::empty();
+    mode.toggle(stat::Mode::S_IRUSR);
+    mode.toggle(stat::Mode::S_IWUSR);
+    mode
+}
+
+/// This file defines the default number of maximum pending messages in a queue.
+const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default";
+
+/// This file defines the system maximum number of pending messages in a queue.
+const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max";
+
+/// This file defines the default maximum size of messages in a queue.
+const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default";
+
+/// This file defines the system maximum size for messages in a queue.
+const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max";
+
+/// This method is used in combination with the above constants to find system limits.
+fn read_i64_from_file(name: &str) -> Result<i64, Error> {
+    let mut file = File::open(name.to_string())?;
+    let mut content = String::new();
+    file.read_to_string(&mut content)?;
+    Ok(content.trim().parse()?)
+}
+
+/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which
+/// is very impractical.
+/// To work around it, this method calls the C-function directly.
+fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> {
+    use std::mem;
+    let mut attr = unsafe { mem::uninitialized::<libc::mq_attr>() };
+    let res = unsafe { libc::mq_getattr(mqd, &mut attr) };
+    nix::errno::Errno::result(res)
+        .map(|_| attr)
+        .map_err(|e| e.into())
+}
diff --git a/ops/posix_mq.rs/src/tests.rs b/ops/posix_mq.rs/src/tests.rs
new file mode 100644
index 000000000000..7a08876aeacd
--- /dev/null
+++ b/ops/posix_mq.rs/src/tests.rs
@@ -0,0 +1,22 @@
+use super::*;
+
+#[test]
+fn test_open_delete() {
+    // Simple test with default queue settings
+    let name = Name::new("/test-queue").unwrap();
+    let queue = Queue::open_or_create(name)
+        .expect("Opening queue failed");
+
+    let message = Message {
+        data: "test-message".as_bytes().to_vec(),
+        priority: 0,
+    };
+
+    queue.send(&message).expect("message sending failed");
+
+    let result = queue.receive().expect("message receiving failed");
+
+    assert_eq!(message, result);
+
+    queue.delete().expect("deleting queue failed");
+}