about summary refs log tree commit diff
path: root/ops/posix_mq.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ops/posix_mq.rs')
-rw-r--r--ops/posix_mq.rs/.gitignore3
-rw-r--r--ops/posix_mq.rs/CODE_OF_CONDUCT.md20
-rw-r--r--ops/posix_mq.rs/Cargo.lock63
-rw-r--r--ops/posix_mq.rs/Cargo.toml12
-rw-r--r--ops/posix_mq.rs/LICENSE21
-rw-r--r--ops/posix_mq.rs/README.md44
-rw-r--r--ops/posix_mq.rs/default.nix3
-rw-r--r--ops/posix_mq.rs/src/error.rs122
-rw-r--r--ops/posix_mq.rs/src/lib.rs247
-rw-r--r--ops/posix_mq.rs/src/tests.rs21
10 files changed, 556 insertions, 0 deletions
diff --git a/ops/posix_mq.rs/.gitignore b/ops/posix_mq.rs/.gitignore
new file mode 100644
index 000000000000..e5b6fdb28e32
--- /dev/null
+++ b/ops/posix_mq.rs/.gitignore
@@ -0,0 +1,3 @@
+/target/
+**/*.rs.bk
+.idea/
diff --git a/ops/posix_mq.rs/CODE_OF_CONDUCT.md b/ops/posix_mq.rs/CODE_OF_CONDUCT.md
new file mode 100644
index 000000000000..c4013ac13ebc
--- /dev/null
+++ b/ops/posix_mq.rs/CODE_OF_CONDUCT.md
@@ -0,0 +1,20 @@
+A SERMON ON ETHICS AND LOVE
+===========================
+
+One day Mal-2 asked the messenger spirit Saint Gulik to approach the Goddess and request Her presence for some desperate advice. Shortly afterwards the radio came on by itself, and an ethereal female Voice said **YES?**
+
+"O! Eris! Blessed Mother of Man! Queen of Chaos! Daughter of Discord! Concubine of Confusion! O! Exquisite Lady, I beseech You to lift a heavy burden from my heart!"
+
+**WHAT BOTHERS YOU, MAL? YOU DON'T SOUND WELL.**
+
+"I am filled with fear and tormented with terrible visions of pain. Everywhere people are hurting one another, the planet is rampant with injustices, whole societies plunder groups of their own people, mothers imprison sons, children perish while brothers war. O, woe."
+
+**WHAT IS THE MATTER WITH THAT, IF IT IS WHAT YOU WANT TO DO?**
+
+"But nobody Wants it! Everybody hates it."
+
+**OH. WELL, THEN *STOP*.**
+
+At which moment She turned herself into an aspirin commercial and left The Polyfather stranded alone with his species.
+
+SINISTER DEXTER HAS A BROKEN SPIROMETER.
diff --git a/ops/posix_mq.rs/Cargo.lock b/ops/posix_mq.rs/Cargo.lock
new file mode 100644
index 000000000000..dc344613d052
--- /dev/null
+++ b/ops/posix_mq.rs/Cargo.lock
@@ -0,0 +1,63 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 3
+
+[[package]]
+name = "autocfg"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
+
+[[package]]
+name = "bitflags"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
+
+[[package]]
+name = "cc"
+version = "1.0.50"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd"
+
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "libc"
+version = "0.2.117"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c"
+
+[[package]]
+name = "memoffset"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "nix"
+version = "0.23.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6"
+dependencies = [
+ "bitflags",
+ "cc",
+ "cfg-if",
+ "libc",
+ "memoffset",
+]
+
+[[package]]
+name = "posix_mq"
+version = "3771.0.0"
+dependencies = [
+ "libc",
+ "nix",
+]
diff --git a/ops/posix_mq.rs/Cargo.toml b/ops/posix_mq.rs/Cargo.toml
new file mode 100644
index 000000000000..8390b80b86f0
--- /dev/null
+++ b/ops/posix_mq.rs/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "posix_mq"
+version = "3771.0.0"
+authors = ["Vincent Ambo <tazjin@tvl.su>"]
+description = "(Higher-level) Rust bindings to POSIX message queues"
+license = "MIT"
+homepage = "https://cs.tvl.fyi/depot/-/tree/ops/posix_mq.rs"
+repository = "https://code.tvl.fyi/depot.git:/ops/posix_mq.rs.git"
+
+[dependencies]
+nix = "0.23"
+libc = "0.2"
diff --git a/ops/posix_mq.rs/LICENSE b/ops/posix_mq.rs/LICENSE
new file mode 100644
index 000000000000..2389546b1383
--- /dev/null
+++ b/ops/posix_mq.rs/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017-2020 Vincent Ambo
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/ops/posix_mq.rs/README.md b/ops/posix_mq.rs/README.md
new file mode 100644
index 000000000000..800d2221e492
--- /dev/null
+++ b/ops/posix_mq.rs/README.md
@@ -0,0 +1,44 @@
+posix_mq
+========
+
+[![crates.io](https://img.shields.io/crates/v/posix_mq.svg)](https://crates.io/crates/posix_mq)
+
+This is a simple, relatively high-level library for the POSIX [message queue API][]. It wraps the lower-level API in a
+simpler interface with more robust error handling.
+
+Check out this project's [sister library][] in Kotlin.
+
+Usage example:
+
+```rust
+// Values that need to undergo validation are wrapped in safe types:
+let name = Name::new("/test-queue").unwrap();
+
+// Queue creation with system defaults is simple:
+let queue = Queue::open_or_create(name).expect("Opening queue failed");
+
+// Sending a message:
+let message = Message {
+  data: "test-message".as_bytes().to_vec(),
+  priority: 0,
+};
+queue.send(&message).expect("message sending failed");
+
+// ... and receiving it!
+let result = queue.receive().expect("message receiving failed");
+```
+
+## Development
+
+Development happens in the [TVL
+monorepo](https://cs.tvl.fyi/depot/-/tree/ops/posix_mq.rs).
+
+Starting from version `3771.0.0`, the version numbers correspond to
+_revisions_ of the TVL repository, available as git refs (e.g.
+`refs/r/3771`).
+
+See the TVL documentation for more information about how to contribute
+to the codebase.
+
+[message queue API]: https://linux.die.net/man/7/mq_overview
+[sister library]: https://github.com/aprilabank/posix_mq.kt
diff --git a/ops/posix_mq.rs/default.nix b/ops/posix_mq.rs/default.nix
new file mode 100644
index 000000000000..6b0e32009a66
--- /dev/null
+++ b/ops/posix_mq.rs/default.nix
@@ -0,0 +1,3 @@
+{ depot, ... }:
+
+depot.third_party.naersk.buildPackage ./.
diff --git a/ops/posix_mq.rs/src/error.rs b/ops/posix_mq.rs/src/error.rs
new file mode 100644
index 000000000000..bacd2aeb39e0
--- /dev/null
+++ b/ops/posix_mq.rs/src/error.rs
@@ -0,0 +1,122 @@
+use nix;
+use std::{error, fmt, io, num};
+
+/// This module implements a simple error type to match the errors that can be thrown from the C
+/// functions as well as some extra errors resulting from internal validations.
+///
+/// As this crate exposes an opinionated API to the POSIX queues certain errors have been
+/// ignored:
+///
+/// * ETIMEDOUT: The low-level timed functions are not exported and this error can not occur.
+/// * EAGAIN: Non-blocking queue calls are not supported.
+/// * EINVAL: Same reason as ETIMEDOUT
+/// * EMSGSIZE: The message size is immutable after queue creation and this crate checks it.
+/// * ENAMETOOLONG: This crate performs name validation
+///
+/// If an unexpected error is encountered it will be wrapped appropriately and should be reported
+/// as a bug on https://b.tvl.fyi
+
+#[derive(Debug)]
+pub enum Error {
+    // These errors are raised inside of the library
+    InvalidQueueName(&'static str),
+    ValueReadingError(io::Error),
+    MessageSizeExceeded(),
+    MaximumMessageSizeExceeded(),
+    MaximumMessageCountExceeded(),
+
+    // These errors match what is described in the man pages (from mq_overview(7) onwards).
+    PermissionDenied(),
+    InvalidQueueDescriptor(),
+    QueueCallInterrupted(),
+    QueueAlreadyExists(),
+    QueueNotFound(),
+    InsufficientMemory(),
+    InsufficientSpace(),
+
+    // These two are (hopefully) unlikely in modern systems
+    ProcessFileDescriptorLimitReached(),
+    SystemFileDescriptorLimitReached(),
+
+    // If an unhandled / unknown / unexpected error occurs this error will be used.
+    // In those cases bug reports would be welcome!
+    UnknownForeignError(nix::errno::Errno),
+
+    // Some other unexpected / unknown error occured. This is probably an error from
+    // the nix crate. Bug reports also welcome for this!
+    UnknownInternalError(),
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use Error::*;
+        f.write_str(match *self {
+            // This error contains more sensible description strings already
+            InvalidQueueName(e) => e,
+            ValueReadingError(_) => "error reading system configuration for message queues",
+            MessageSizeExceeded() => "message is larger than maximum size for specified queue",
+            MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum",
+            MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum",
+            PermissionDenied() => "permission to the specified queue was denied",
+            InvalidQueueDescriptor() => "the internal queue descriptor was invalid",
+            QueueCallInterrupted() => "queue method interrupted by signal",
+            QueueAlreadyExists() => "the specified queue already exists",
+            QueueNotFound() => "the specified queue could not be found",
+            InsufficientMemory() => "insufficient memory to call queue method",
+            InsufficientSpace() => "insufficient space to call queue method",
+            ProcessFileDescriptorLimitReached() => {
+                "maximum number of process file descriptors reached"
+            }
+            SystemFileDescriptorLimitReached() => {
+                "maximum number of system file descriptors reached"
+            }
+            UnknownForeignError(_) => "unknown foreign error occured: please report a bug!",
+            UnknownInternalError() => "unknown internal error occured: please report a bug!",
+        })
+    }
+}
+
+impl error::Error for Error {
+    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
+        match self {
+            Error::ValueReadingError(e) => Some(e),
+            Error::UnknownForeignError(e) => Some(e),
+            _ => None,
+        }
+    }
+}
+
+/// This from implementation is used to translate errors from the lower-level
+/// C-calls into sensible Rust errors.
+impl From<nix::errno::Errno> for Error {
+    fn from(err: nix::Error) -> Self {
+        use nix::errno::Errno::*;
+        match err {
+            EACCES => Error::PermissionDenied(),
+            EBADF => Error::InvalidQueueDescriptor(),
+            EINTR => Error::QueueCallInterrupted(),
+            EEXIST => Error::QueueAlreadyExists(),
+            EMFILE => Error::ProcessFileDescriptorLimitReached(),
+            ENFILE => Error::SystemFileDescriptorLimitReached(),
+            ENOENT => Error::QueueNotFound(),
+            ENOMEM => Error::InsufficientMemory(),
+            ENOSPC => Error::InsufficientSpace(),
+            _ => Error::UnknownForeignError(err),
+        }
+    }
+}
+
+// This implementation is used when reading system queue settings.
+impl From<io::Error> for Error {
+    fn from(e: io::Error) -> Self {
+        Error::ValueReadingError(e)
+    }
+}
+
+// This implementation is used when parsing system queue settings. The unknown error is returned
+// here because the system is probably seriously broken if those files don't contain numbers.
+impl From<num::ParseIntError> for Error {
+    fn from(_: num::ParseIntError) -> Self {
+        Error::UnknownInternalError()
+    }
+}
diff --git a/ops/posix_mq.rs/src/lib.rs b/ops/posix_mq.rs/src/lib.rs
new file mode 100644
index 000000000000..ed35fb03be82
--- /dev/null
+++ b/ops/posix_mq.rs/src/lib.rs
@@ -0,0 +1,247 @@
+extern crate libc;
+extern crate nix;
+
+use error::Error;
+use libc::mqd_t;
+use nix::mqueue;
+use nix::sys::stat;
+use std::ffi::CString;
+use std::fs::File;
+use std::io::Read;
+use std::ops::Drop;
+use std::string::ToString;
+
+pub mod error;
+
+#[cfg(test)]
+mod tests;
+
+/// Wrapper type for queue names that performs basic validation of queue names before calling
+/// out to C code.
+#[derive(Debug, Clone, PartialEq)]
+pub struct Name(CString);
+
+impl Name {
+    pub fn new<S: ToString>(s: S) -> Result<Self, Error> {
+        let string = s.to_string();
+
+        if !string.starts_with('/') {
+            return Err(Error::InvalidQueueName("Queue name must start with '/'"));
+        }
+
+        // The C library has a special error return for this case, so I assume people must actually
+        // have tried just using '/' as a queue name.
+        if string.len() == 1 {
+            return Err(Error::InvalidQueueName(
+                "Queue name must be a slash followed by one or more characters",
+            ));
+        }
+
+        if string.len() > 255 {
+            return Err(Error::InvalidQueueName(
+                "Queue name must not exceed 255 characters",
+            ));
+        }
+
+        if string.matches('/').count() > 1 {
+            return Err(Error::InvalidQueueName(
+                "Queue name can not contain more than one slash",
+            ));
+        }
+
+        // TODO: What error is being thrown away here? Is it possible?
+        Ok(Name(CString::new(string).unwrap()))
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Message {
+    pub data: Vec<u8>,
+    pub priority: u32,
+}
+
+/// Represents an open queue descriptor to a POSIX message queue. This carries information
+/// about the queue's limitations (i.e. maximum message size and maximum message count).
+#[derive(Debug)]
+pub struct Queue {
+    name: Name,
+
+    /// Internal file/queue descriptor.
+    queue_descriptor: mqd_t,
+
+    /// Maximum number of pending messages in this queue.
+    max_pending: i64,
+
+    /// Maximum size of this queue.
+    max_size: usize,
+}
+
+impl Queue {
+    /// Creates a new queue and fails if it already exists.
+    /// By default the queue will be read/writable by the current user with no access for other
+    /// users.
+    /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue.
+    pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> {
+        if max_pending > read_i64_from_file(MSG_MAX)? {
+            return Err(Error::MaximumMessageCountExceeded());
+        }
+
+        if max_size > read_i64_from_file(MSGSIZE_MAX)? {
+            return Err(Error::MaximumMessageSizeExceeded());
+        }
+
+        let oflags = {
+            let mut flags = mqueue::MQ_OFlag::empty();
+            // Put queue in r/w mode
+            flags.toggle(mqueue::MQ_OFlag::O_RDWR);
+            // Enable queue creation
+            flags.toggle(mqueue::MQ_OFlag::O_CREAT);
+            // Fail if queue exists already
+            flags.toggle(mqueue::MQ_OFlag::O_EXCL);
+            flags
+        };
+
+        let attr = mqueue::MqAttr::new(0, max_pending, max_size, 0);
+
+        let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending,
+            max_size: max_size as usize,
+        })
+    }
+
+    /// Opens an existing queue.
+    pub fn open(name: Name) -> Result<Queue, Error> {
+        // No extra flags need to be constructed as the default is to open and fail if the
+        // queue does not exist yet - which is what we want here.
+        let oflags = mqueue::MQ_OFlag::O_RDWR;
+        let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), None)?;
+
+        let attr = mq_getattr(queue_descriptor)?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending: attr.mq_maxmsg,
+            max_size: attr.mq_msgsize as usize,
+        })
+    }
+
+    /// Opens an existing queue or creates a new queue with the OS default settings.
+    pub fn open_or_create(name: Name) -> Result<Queue, Error> {
+        let oflags = {
+            let mut flags = mqueue::MQ_OFlag::empty();
+            // Put queue in r/w mode
+            flags.toggle(mqueue::MQ_OFlag::O_RDWR);
+            // Enable queue creation
+            flags.toggle(mqueue::MQ_OFlag::O_CREAT);
+            flags
+        };
+
+        let default_pending = read_i64_from_file(MSG_DEFAULT)?;
+        let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?;
+        let attr = mqueue::MqAttr::new(0, default_pending, default_size, 0);
+
+        let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?;
+
+        let actual_attr = mq_getattr(queue_descriptor)?;
+
+        Ok(Queue {
+            name,
+            queue_descriptor,
+            max_pending: actual_attr.mq_maxmsg,
+            max_size: actual_attr.mq_msgsize as usize,
+        })
+    }
+
+    /// Delete a message queue from the system. This method will make the queue unavailable for
+    /// other processes after their current queue descriptors have been closed.
+    pub fn delete(self) -> Result<(), Error> {
+        mqueue::mq_unlink(&self.name.0)?;
+        drop(self);
+        Ok(())
+    }
+
+    /// Send a message to the message queue.
+    /// If the queue is full this call will block until a message has been consumed.
+    pub fn send(&self, msg: &Message) -> Result<(), Error> {
+        if msg.data.len() > self.max_size as usize {
+            return Err(Error::MessageSizeExceeded());
+        }
+
+        mqueue::mq_send(self.queue_descriptor, msg.data.as_ref(), msg.priority)
+            .map_err(|e| e.into())
+    }
+
+    /// Receive a message from the message queue.
+    /// If the queue is empty this call will block until a message arrives.
+    pub fn receive(&self) -> Result<Message, Error> {
+        let mut data: Vec<u8> = vec![0; self.max_size as usize];
+        let mut priority: u32 = 0;
+
+        let msg_size = mqueue::mq_receive(self.queue_descriptor, data.as_mut(), &mut priority)?;
+
+        data.truncate(msg_size);
+        Ok(Message { data, priority })
+    }
+
+    pub fn max_pending(&self) -> i64 {
+        self.max_pending
+    }
+
+    pub fn max_size(&self) -> usize {
+        self.max_size
+    }
+}
+
+impl Drop for Queue {
+    fn drop(&mut self) {
+        // Attempt to close the queue descriptor and discard any possible errors.
+        // The only error thrown in the C-code is EINVAL, which would mean that the
+        // descriptor has already been closed.
+        mqueue::mq_close(self.queue_descriptor).ok();
+    }
+}
+
+// Creates the default queue mode (0600).
+fn default_mode() -> stat::Mode {
+    let mut mode = stat::Mode::empty();
+    mode.toggle(stat::Mode::S_IRUSR);
+    mode.toggle(stat::Mode::S_IWUSR);
+    mode
+}
+
+/// This file defines the default number of maximum pending messages in a queue.
+const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default";
+
+/// This file defines the system maximum number of pending messages in a queue.
+const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max";
+
+/// This file defines the default maximum size of messages in a queue.
+const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default";
+
+/// This file defines the system maximum size for messages in a queue.
+const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max";
+
+/// This method is used in combination with the above constants to find system limits.
+fn read_i64_from_file(name: &str) -> Result<i64, Error> {
+    let mut file = File::open(name.to_string())?;
+    let mut content = String::new();
+    file.read_to_string(&mut content)?;
+    Ok(content.trim().parse()?)
+}
+
+/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which
+/// is very impractical.
+/// To work around it, this method calls the C-function directly.
+fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> {
+    use std::mem;
+    let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
+    let res = unsafe { libc::mq_getattr(mqd, attr.as_mut_ptr()) };
+    nix::errno::Errno::result(res)
+        .map(|_| unsafe { attr.assume_init() })
+        .map_err(|e| e.into())
+}
diff --git a/ops/posix_mq.rs/src/tests.rs b/ops/posix_mq.rs/src/tests.rs
new file mode 100644
index 000000000000..1f4ea9a58da6
--- /dev/null
+++ b/ops/posix_mq.rs/src/tests.rs
@@ -0,0 +1,21 @@
+use super::*;
+
+#[test]
+fn test_open_delete() {
+    // Simple test with default queue settings
+    let name = Name::new("/test-queue").unwrap();
+    let queue = Queue::open_or_create(name).expect("Opening queue failed");
+
+    let message = Message {
+        data: "test-message".as_bytes().to_vec(),
+        priority: 0,
+    };
+
+    queue.send(&message).expect("message sending failed");
+
+    let result = queue.receive().expect("message receiving failed");
+
+    assert_eq!(message, result);
+
+    queue.delete().expect("deleting queue failed");
+}