about summary refs log tree commit diff
path: root/net/stomp_erl
diff options
context:
space:
mode:
Diffstat (limited to 'net/stomp_erl')
-rw-r--r--net/stomp_erl/.gitignore10
-rw-r--r--net/stomp_erl/Makefile8
-rw-r--r--net/stomp_erl/README.md78
-rw-r--r--net/stomp_erl/include/stomp.hrl22
-rw-r--r--net/stomp_erl/src/stomp.app.src7
-rw-r--r--net/stomp_erl/src/stomp_app.erl11
-rw-r--r--net/stomp_erl/src/stomp_sup.erl22
-rw-r--r--net/stomp_erl/src/stomp_worker.erl193
8 files changed, 351 insertions, 0 deletions
diff --git a/net/stomp_erl/.gitignore b/net/stomp_erl/.gitignore
new file mode 100644
index 000000000000..8e46d5a07f8f
--- /dev/null
+++ b/net/stomp_erl/.gitignore
@@ -0,0 +1,10 @@
+.eunit
+deps
+*.o
+*.beam
+*.plt
+erl_crash.dump
+ebin
+rel/example_project
+.concrete/DEV_MODE
+.rebar
diff --git a/net/stomp_erl/Makefile b/net/stomp_erl/Makefile
new file mode 100644
index 000000000000..b3bc54673d0b
--- /dev/null
+++ b/net/stomp_erl/Makefile
@@ -0,0 +1,8 @@
+PROJECT = stomp
+PROJECT_DESCRIPTION = STOMP client for Erlang
+PROJECT_VERSION = 0.1.0
+
+# Whitespace to be used when creating files from templates.
+SP = 4
+
+include erlang.mk
diff --git a/net/stomp_erl/README.md b/net/stomp_erl/README.md
new file mode 100644
index 000000000000..21c95a11a2e9
--- /dev/null
+++ b/net/stomp_erl/README.md
@@ -0,0 +1,78 @@
+STOMP on Erlang
+===============
+
+`stomp.erl` is a simple Erlang client for the [STOMP protocol][] in version 1.2.
+
+Currently only subscribing to queues is supported.
+
+It provides an application called `stomp` which takes configuration of the form:
+
+```erlang
+[{stomp, #{host     => "stomp-server.somedomain.sexy", % required
+           port     => 61613,                          % optional
+           login    => <<"someuser">>,                 % optional
+           passcode => <<"hunter2>>,                   % optional
+ }}].
+```
+
+## Types
+
+The following types are used in `stomp.erl`, you can include them from
+`stomp.hrl`:
+
+```erlang
+%% Client ack modes, refer to the STOMP protocol documentation
+-type ack_mode() :: client | client_individual | auto.
+
+%% Subscriptions are enumerated from 0
+-type sub_id() :: integer().
+
+%% Message IDs (for acknowledgements) are simple strings. They are
+%% extracted from the 'ack' field of the header in client or client-individual
+%% mode, and from the 'message-id' field in auto mode.
+-type message_id() :: binary().
+
+%% A STOMP message as received from a queue subscription
+-record(stomp_msg, { headers :: #{ binary() => binary() },
+                     body    :: binary() }.
+-type stomp_msg() :: #stomp_msg{}.
+```
+
+Once the application starts it will register a process under the name
+`stomp_worker` and expose the following API:
+
+## Subscribing to a queue
+
+```erlang
+%% Subscribe to a destination, receive the subscription ID
+-spec subscribe(binary(),   % Destination (e.g. <<"/queue/lizards">>)
+                ack_mode(), % Client-acknowledgement mode
+                -> {ok, sub_id()}.
+```
+
+This synchronous call subscribes to a message queue. The `stomp_worker` will
+link itself to the caller and forward received messages as
+`{msg, sub_id(), stomp_msg()}`.
+
+Depending on the acknowledgement mode specified on connecting, the subscriber
+may have to acknowledge receival of messages.
+
+## Acknowledging messages
+
+```erlang
+%% Acknowledge a message ID.
+%% This is not required in auto mode. In client mode it will acknowledge the
+%% received messages up to the ID specified. In client-individual mode every
+%% single message has to be acknowledged.
+-spec ack(sub_id(), message_id()) -> ok.
+
+%% Explicitly "unacknowledge" a message
+-spec nack(sub_id(), message_id()) -> ok.
+```
+
+Both of these calls are asynchronous and will return immediately. Note that in
+the case of the `stomp_worker` crashing before a message acknowledgement is
+handled, the message *may* be delivered again. Your consumer needs to be able to
+handle this.
+
+[STOMP protocol]: https://stomp.github.io/stomp-specification-1.2.html
diff --git a/net/stomp_erl/include/stomp.hrl b/net/stomp_erl/include/stomp.hrl
new file mode 100644
index 000000000000..30c933b56302
--- /dev/null
+++ b/net/stomp_erl/include/stomp.hrl
@@ -0,0 +1,22 @@
+%% Client ack modes, refer to the STOMP protocol documentation
+-type ack_mode() :: client | client_individual | auto.
+
+%% Subscriptions are enumerated from 0
+-type sub_id() :: integer().
+
+%% Message IDs (for acknowledgements) are simple strings. They are
+%% extracted from the 'ack' field of the header in client or client-individual
+%% mode, and from the 'message-id' field in auto mode.
+-type message_id() :: binary().
+
+%% A destination can be a queue, or something else.
+%% Example: <<"/queue/lizards">>
+-type destination() :: binary().
+
+%% A STOMP message as received from a queue subscription
+-record(stomp_msg, { headers :: #{ binary() => binary() },
+                     body    :: binary() }).
+-type stomp_msg() :: #stomp_msg{}.
+
+%% STOMP frame components
+-type headers() :: #{binary() => binary()}.
diff --git a/net/stomp_erl/src/stomp.app.src b/net/stomp_erl/src/stomp.app.src
new file mode 100644
index 000000000000..baf0e271d1f0
--- /dev/null
+++ b/net/stomp_erl/src/stomp.app.src
@@ -0,0 +1,7 @@
+{application, stomp, [{description, "STOMP client for Erlang"},
+                      {vsn, "0.1.0"},
+                      {modules, [stomp_app, stomp_sup, stomp_worker]},
+                      {registered, [stomp_worker]},
+                      {env, []},
+                      {applications, [kernel, stdlib]},
+                      {mod, {stomp_app, []}}]}.
diff --git a/net/stomp_erl/src/stomp_app.erl b/net/stomp_erl/src/stomp_app.erl
new file mode 100644
index 000000000000..2ba3e69f992a
--- /dev/null
+++ b/net/stomp_erl/src/stomp_app.erl
@@ -0,0 +1,11 @@
+-module(stomp_app).
+-behaviour(application).
+
+-export([start/2]).
+-export([stop/1]).
+
+start(_Type, _Args) ->
+    stomp_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/net/stomp_erl/src/stomp_sup.erl b/net/stomp_erl/src/stomp_sup.erl
new file mode 100644
index 000000000000..3a298bc9bf8b
--- /dev/null
+++ b/net/stomp_erl/src/stomp_sup.erl
@@ -0,0 +1,22 @@
+-module(stomp_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    Procs = [stomp_spec()],
+    {ok, {{one_for_one, 1, 5}, Procs}}.
+
+%% Private
+
+stomp_spec() ->
+    #{id       => stomp_proc,
+      start    => {stomp_worker, start_link, []},
+      restart  => permanent,
+      shutdown => 5000,
+      type     => worker,
+      module   => [stomp_worker]}.
diff --git a/net/stomp_erl/src/stomp_worker.erl b/net/stomp_erl/src/stomp_worker.erl
new file mode 100644
index 000000000000..80981d37ab52
--- /dev/null
+++ b/net/stomp_erl/src/stomp_worker.erl
@@ -0,0 +1,193 @@
+-module(stomp_worker).
+-behaviour(gen_server).
+
+%% API.
+-export([start_link/0]).
+
+%% gen_server.
+-export([init/1]).
+-export([handle_call/3]).
+-export([handle_cast/2]).
+-export([handle_info/2]).
+-export([terminate/2]).
+-export([code_change/3]).
+
+%% Testing
+-compile(export_all).
+
+-include("stomp.hrl").
+
+%% State of a stomp_worker
+-record(state, {connection    :: port(),
+                next_sub      :: sub_id(),
+                subscriptions :: #{ destination() => sub_id() },
+                subscribers   :: #{ destination() => pid() }
+               }).
+-type state() :: #state{}.
+
+%% API implementation
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+    {ok, Pid} = gen_server:start_link(?MODULE, [], []),
+    register(?MODULE, Pid),
+    {ok, Pid}.
+
+%% gen_server implementation
+
+-spec init(any()) -> {ok, state()}.
+init(_Args) ->
+    %% Fetch configuration from app config
+    {ok, Host} = application:get_env(stomp, host),
+    Port = application:get_env(stomp, port, 61613),
+    Login = application:get_env(stomp, login),
+    Pass = application:get_env(stomp, passcode),
+
+    %% Catch exit signals from linked processes (subscribers dying)
+    process_flag(trap_exit, true),
+
+    %% Establish connection
+    {ok, Conn} = connect(Host, Port, Login, Pass),
+
+    {ok, #state{connection = Conn,
+                next_sub   = 0,
+                subscriptions = #{},
+                subscribers = #{}}}.
+
+%% Handle subscription calls
+handle_call({subscribe, Dest, Ack}, From, State) ->
+    %% Subscribe to new destination
+    SubId = State#state.next_sub,
+    ok = subscribe(State#state.connection, SubId, Dest, Ack),
+
+    %% Add subscription and subscriber to state
+    Subscriptions = maps:put(SubId, Dest, State#state.subscriptions),
+    Subscribers = maps:put(SubId, From, State#state.subscribers),
+    NextSub = SubId + 1,
+    NewState = State#state{subscriptions = Subscriptions,
+                           subscribers = Subscribers,
+                           next_sub = NextSub },
+
+    {reply, {ok, SubId}, NewState};
+handle_call(_Req, _From, State) ->
+    {reply, ignored, State}.
+
+handle_info({tcp, Conn, Frame}, State) when Conn =:= State#state.connection ->
+    handle_frame(Frame, State);
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+%% Unused gen_server callbacks
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% Private functions
+
+-spec connect(list(), integer(), any(), any()) -> {ok, port()}.
+connect(Host, Port, Login, Pass) ->
+    %% STOMP CONNECT frame
+    Connect = connect_frame(Host, Login, Pass),
+
+    %% TODO: Configurable buffer size
+    %% Frames larger than the user-level buffer will be truncated, so it should
+    %% never be smaller than the largest expected messages.
+    {ok, Socket} = gen_tcp:connect(Host, Port, [binary,
+                                                {packet, line},
+                                                {line_delimiter, $\0},
+                                                {buffer, 262144}]),
+
+    ok = gen_tcp:send(Socket, Connect),
+    {ok, Socket}.
+
+-spec subscribe(port(), sub_id(), destination(), ack_mode()) -> ok.
+subscribe(Socket, Id, Queue, Ack) ->
+    {ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack),
+    gen_tcp:send(Socket, SubscribeFrame).
+
+%%% Parsing STOMP frames
+
+handle_frame(<<"MESSAGE", "\n", _Frame/binary>>, State) ->
+    {noreply, State};
+handle_frame(Frame, State) ->
+    io:format("Received unknown frame ~p", [Frame]),
+    {noreply, State}.
+
+
+%% Parse out headers into a map
+-spec parse_headers(binary()) -> headers().
+parse_headers(HeadersBin) ->
+    Headers = binary:split(HeadersBin, <<"\n">>, [global]),
+    ToPairs = fun(H, M) -> [K,V | []] = binary:split(H, <<":">>),
+                           maps:put(K, V, M)
+              end,
+    {ok, lists:mapfoldl(ToPairs, #{}, Headers)}.
+
+%%% Making STOMP protocol frames
+
+%% Format a header
+-spec format_header({binary(), binary()}) -> binary().
+format_header({Key, Val}) ->
+    <<Key/binary, ":", Val/binary, "\n">>.
+
+%% Build a single STOMP frame
+-spec make_frame(binary(),
+                 headers(),
+                 binary())
+                -> {ok, iolist()}.
+make_frame(Command, HeaderMap, Body) ->
+    Headers = lists:map(fun format_header/1, maps:to_list(HeaderMap)),
+    Frame = [Command, <<"\n">>, Headers, <<"\n">>, Body, <<0>>],
+    {ok, Frame}.
+
+%%% Default frames
+
+-spec connect_frame(list(), any(), any()) -> iolist().
+connect_frame(Host, {ok, Login}, {ok, Pass}) ->
+    make_frame(<<"CONNECT">>,
+               #{<<"accept-version">> => <<"1.2">>,
+                 <<"host">>           => Host,
+                 <<"login">>          => Login,
+                 <<"passcode">>       => Pass,
+                 <<"heart-beat">>     => <<"0,5000">>},
+               []);
+connect_frame(Host, _Login, _Pass) ->
+    make_frame(<<"CONNECT">>,
+               #{<<"accept-version">> => <<"1.2">>,
+                 <<"host">>           => Host,
+                 %% Expect a server heartbeat every 5 seconds, let the server
+                 %% expect one every 10. We don't actually check this and just
+                 %% echo server heartbeats.
+                 %% TODO: For now the server is told not to expect replies due to
+                 %% a weird behaviour.
+                 <<"heart-beat">>     => <<"0,5000">>},
+               []).
+
+
+-spec subscribe_frame(sub_id(), destination(), ack_mode()) -> iolist().
+subscribe_frame(Id, Queue, Ack) ->
+    make_frame(<<"SUBSCRIBE">>,
+               #{<<"id">>          => integer_to_binary(Id),
+                 <<"destination">> => Queue,
+                 <<"ack">>         => ack_mode_to_binary(Ack)},
+               []).
+
+-spec ack_mode_to_binary(ack_mode()) -> binary().
+ack_mode_to_binary(AckMode) ->
+    case AckMode of
+        auto              -> <<"auto">>;
+        client            -> <<"client">>;
+        client_individual -> <<"client-individual">>
+    end.
+
+%% -spec ack_frame(binary()) -> iolist().
+%% ack_frame(MessageID) ->
+%%     make_frame(<<"ACK">>,
+%%                [{"id", MessageID}],
+%%                []).