diff options
Diffstat (limited to 'ops/mq_cli/src/main.rs')
-rw-r--r-- | ops/mq_cli/src/main.rs | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/ops/mq_cli/src/main.rs b/ops/mq_cli/src/main.rs new file mode 100644 index 000000000000..55ff0064295d --- /dev/null +++ b/ops/mq_cli/src/main.rs @@ -0,0 +1,223 @@ +extern crate clap; +extern crate posix_mq; +extern crate libc; +extern crate nix; + +use clap::{App, SubCommand, Arg, ArgMatches, AppSettings}; +use posix_mq::{Name, Queue, Message}; +use std::fs::{read_dir, File}; +use std::io::{self, Read, Write}; +use std::process::exit; + +fn run_ls() { + let mqueues = read_dir("/dev/mqueue") + .expect("Could not read message queues"); + + for queue in mqueues { + let path = queue.unwrap().path(); + let status = { + let mut file = File::open(&path) + .expect("Could not open queue file"); + + let mut content = String::new(); + file.read_to_string(&mut content).expect("Could not read queue file"); + + content + }; + + let queue_name = path.components().last().unwrap() + .as_os_str() + .to_string_lossy(); + + println!("/{}: {}", queue_name, status) + }; +} + +fn run_inspect(queue_name: &str) { + let name = Name::new(queue_name).expect("Invalid queue name"); + let queue = Queue::open(name).expect("Could not open queue"); + + println!("Queue {}:\n", queue_name); + println!("Max. message size: {} bytes", queue.max_size()); + println!("Max. # of pending messages: {}", queue.max_pending()); +} + +fn run_create(cmd: &ArgMatches) { + if let Some(rlimit) = cmd.value_of("rlimit") { + set_rlimit(rlimit.parse().expect("Invalid rlimit value")); + } + + let name = Name::new(cmd.value_of("queue").unwrap()) + .expect("Invalid queue name"); + + let max_pending: i64 = cmd.value_of("max-pending").unwrap().parse().unwrap(); + let max_size: i64 = cmd.value_of("max-size").unwrap().parse().unwrap(); + + let queue = Queue::create(name, max_pending, max_size * 1024); + + match queue { + Ok(_) => println!("Queue created successfully"), + Err(e) => { + writeln!(io::stderr(), "Could not create queue: {}", e).ok(); + exit(1); + }, + }; +} + +fn run_receive(queue_name: &str) { + let name = Name::new(queue_name).expect("Invalid queue name"); + let queue = Queue::open(name).expect("Could not open queue"); + + let message = match queue.receive() { + Ok(msg) => msg, + Err(e) => { + writeln!(io::stderr(), "Failed to receive message: {}", e).ok(); + exit(1); + } + }; + + // Attempt to write the message out as a string, but write out raw bytes if it turns out to not + // be UTF-8 encoded data. + match String::from_utf8(message.data.clone()) { + Ok(string) => println!("{}", string), + Err(_) => { + writeln!(io::stderr(), "Message not UTF-8 encoded!").ok(); + io::stdout().write(message.data.as_ref()).ok(); + } + }; +} + +fn run_send(queue_name: &str, content: &str) { + let name = Name::new(queue_name).expect("Invalid queue name"); + let queue = Queue::open(name).expect("Could not open queue"); + + let message = Message { + data: content.as_bytes().to_vec(), + priority: 0, + }; + + match queue.send(&message) { + Ok(_) => (), + Err(e) => { + writeln!(io::stderr(), "Could not send message: {}", e).ok(); + exit(1); + } + } +} + +fn run_rlimit() { + let mut rlimit = libc::rlimit { + rlim_cur: 0, + rlim_max: 0, + }; + + let mut errno = 0; + unsafe { + let res = libc::getrlimit(libc::RLIMIT_MSGQUEUE, &mut rlimit); + if res != 0 { + errno = nix::errno::errno(); + } + }; + + if errno != 0 { + writeln!(io::stderr(), "Could not get message queue rlimit: {}", errno).ok(); + } else { + println!("Message queue rlimit:"); + println!("Current limit: {}", rlimit.rlim_cur); + println!("Maximum limit: {}", rlimit.rlim_max); + } +} + +fn set_rlimit(new_limit: u64) { + let rlimit = libc::rlimit { + rlim_cur: new_limit, + rlim_max: new_limit, + }; + + let mut errno: i32 = 0; + unsafe { + let res = libc::setrlimit(libc::RLIMIT_MSGQUEUE, &rlimit); + if res != 0 { + errno = nix::errno::errno(); + } + } + + match errno { + 0 => println!("Set RLIMIT_MSGQUEUE hard limit to {}", new_limit), + _ => { + // Not mapping these error codes to messages for now, the user can + // look up the meaning in setrlimit(2). + panic!("Could not set hard limit: {}", errno); + } + }; +} + +fn main() { + let ls = SubCommand::with_name("ls").about("list message queues"); + + let queue_arg = Arg::with_name("queue").required(true).takes_value(true); + + let rlimit_arg = Arg::with_name("rlimit") + .help("RLIMIT_MSGQUEUE to set for this command") + .long("rlimit") + .takes_value(true); + + let inspect = SubCommand::with_name("inspect") + .about("inspect details about a queue") + .arg(&queue_arg); + + let create = SubCommand::with_name("create") + .about("Create a new queue") + .arg(&queue_arg) + .arg(&rlimit_arg) + .arg(Arg::with_name("max-size") + .help("maximum message size (in kB)") + .long("max-size") + .required(true) + .takes_value(true)) + .arg(Arg::with_name("max-pending") + .help("maximum # of pending messages") + .long("max-pending") + .required(true) + .takes_value(true)); + + let receive = SubCommand::with_name("receive") + .about("Receive a message from a queue") + .arg(&queue_arg); + + let send = SubCommand::with_name("send") + .about("Send a message to a queue") + .arg(&queue_arg) + .arg(Arg::with_name("message") + .help("the message to send") + .required(true)); + + let rlimit = SubCommand::with_name("rlimit") + .about("Get the message queue rlimit") + .setting(AppSettings::SubcommandRequiredElseHelp); + + let matches = App::new("mq") + .setting(AppSettings::SubcommandRequiredElseHelp) + .version("1.0.0") + .about("Administrate and inspect POSIX message queues") + .subcommand(ls) + .subcommand(inspect) + .subcommand(create) + .subcommand(receive) + .subcommand(send) + .subcommand(rlimit) + .get_matches(); + + match matches.subcommand() { + ("ls", _) => run_ls(), + ("inspect", Some(cmd)) => run_inspect(cmd.value_of("queue").unwrap()), + ("create", Some(cmd)) => run_create(cmd), + ("receive", Some(cmd)) => run_receive(cmd.value_of("queue").unwrap()), + ("send", Some(cmd)) => run_send( + cmd.value_of("queue").unwrap(), + cmd.value_of("message").unwrap() + ), + ("rlimit", _) => run_rlimit(), + _ => unimplemented!(), + } +} |