diff options
Diffstat (limited to 'net/stomp_erl')
-rw-r--r-- | net/stomp_erl/.gitignore | 10 | ||||
-rw-r--r-- | net/stomp_erl/Makefile | 8 | ||||
-rw-r--r-- | net/stomp_erl/README.md | 78 | ||||
-rw-r--r-- | net/stomp_erl/include/stomp.hrl | 22 | ||||
-rw-r--r-- | net/stomp_erl/src/stomp.app.src | 7 | ||||
-rw-r--r-- | net/stomp_erl/src/stomp_app.erl | 11 | ||||
-rw-r--r-- | net/stomp_erl/src/stomp_sup.erl | 22 | ||||
-rw-r--r-- | net/stomp_erl/src/stomp_worker.erl | 193 |
8 files changed, 351 insertions, 0 deletions
diff --git a/net/stomp_erl/.gitignore b/net/stomp_erl/.gitignore new file mode 100644 index 000000000000..8e46d5a07f8f --- /dev/null +++ b/net/stomp_erl/.gitignore @@ -0,0 +1,10 @@ +.eunit +deps +*.o +*.beam +*.plt +erl_crash.dump +ebin +rel/example_project +.concrete/DEV_MODE +.rebar diff --git a/net/stomp_erl/Makefile b/net/stomp_erl/Makefile new file mode 100644 index 000000000000..b3bc54673d0b --- /dev/null +++ b/net/stomp_erl/Makefile @@ -0,0 +1,8 @@ +PROJECT = stomp +PROJECT_DESCRIPTION = STOMP client for Erlang +PROJECT_VERSION = 0.1.0 + +# Whitespace to be used when creating files from templates. +SP = 4 + +include erlang.mk diff --git a/net/stomp_erl/README.md b/net/stomp_erl/README.md new file mode 100644 index 000000000000..21c95a11a2e9 --- /dev/null +++ b/net/stomp_erl/README.md @@ -0,0 +1,78 @@ +STOMP on Erlang +=============== + +`stomp.erl` is a simple Erlang client for the [STOMP protocol][] in version 1.2. + +Currently only subscribing to queues is supported. + +It provides an application called `stomp` which takes configuration of the form: + +```erlang +[{stomp, #{host => "stomp-server.somedomain.sexy", % required + port => 61613, % optional + login => <<"someuser">>, % optional + passcode => <<"hunter2>>, % optional + }}]. +``` + +## Types + +The following types are used in `stomp.erl`, you can include them from +`stomp.hrl`: + +```erlang +%% Client ack modes, refer to the STOMP protocol documentation +-type ack_mode() :: client | client_individual | auto. + +%% Subscriptions are enumerated from 0 +-type sub_id() :: integer(). + +%% Message IDs (for acknowledgements) are simple strings. They are +%% extracted from the 'ack' field of the header in client or client-individual +%% mode, and from the 'message-id' field in auto mode. +-type message_id() :: binary(). + +%% A STOMP message as received from a queue subscription +-record(stomp_msg, { headers :: #{ binary() => binary() }, + body :: binary() }. +-type stomp_msg() :: #stomp_msg{}. +``` + +Once the application starts it will register a process under the name +`stomp_worker` and expose the following API: + +## Subscribing to a queue + +```erlang +%% Subscribe to a destination, receive the subscription ID +-spec subscribe(binary(), % Destination (e.g. <<"/queue/lizards">>) + ack_mode(), % Client-acknowledgement mode + -> {ok, sub_id()}. +``` + +This synchronous call subscribes to a message queue. The `stomp_worker` will +link itself to the caller and forward received messages as +`{msg, sub_id(), stomp_msg()}`. + +Depending on the acknowledgement mode specified on connecting, the subscriber +may have to acknowledge receival of messages. + +## Acknowledging messages + +```erlang +%% Acknowledge a message ID. +%% This is not required in auto mode. In client mode it will acknowledge the +%% received messages up to the ID specified. In client-individual mode every +%% single message has to be acknowledged. +-spec ack(sub_id(), message_id()) -> ok. + +%% Explicitly "unacknowledge" a message +-spec nack(sub_id(), message_id()) -> ok. +``` + +Both of these calls are asynchronous and will return immediately. Note that in +the case of the `stomp_worker` crashing before a message acknowledgement is +handled, the message *may* be delivered again. Your consumer needs to be able to +handle this. + +[STOMP protocol]: https://stomp.github.io/stomp-specification-1.2.html diff --git a/net/stomp_erl/include/stomp.hrl b/net/stomp_erl/include/stomp.hrl new file mode 100644 index 000000000000..30c933b56302 --- /dev/null +++ b/net/stomp_erl/include/stomp.hrl @@ -0,0 +1,22 @@ +%% Client ack modes, refer to the STOMP protocol documentation +-type ack_mode() :: client | client_individual | auto. + +%% Subscriptions are enumerated from 0 +-type sub_id() :: integer(). + +%% Message IDs (for acknowledgements) are simple strings. They are +%% extracted from the 'ack' field of the header in client or client-individual +%% mode, and from the 'message-id' field in auto mode. +-type message_id() :: binary(). + +%% A destination can be a queue, or something else. +%% Example: <<"/queue/lizards">> +-type destination() :: binary(). + +%% A STOMP message as received from a queue subscription +-record(stomp_msg, { headers :: #{ binary() => binary() }, + body :: binary() }). +-type stomp_msg() :: #stomp_msg{}. + +%% STOMP frame components +-type headers() :: #{binary() => binary()}. diff --git a/net/stomp_erl/src/stomp.app.src b/net/stomp_erl/src/stomp.app.src new file mode 100644 index 000000000000..baf0e271d1f0 --- /dev/null +++ b/net/stomp_erl/src/stomp.app.src @@ -0,0 +1,7 @@ +{application, stomp, [{description, "STOMP client for Erlang"}, + {vsn, "0.1.0"}, + {modules, [stomp_app, stomp_sup, stomp_worker]}, + {registered, [stomp_worker]}, + {env, []}, + {applications, [kernel, stdlib]}, + {mod, {stomp_app, []}}]}. diff --git a/net/stomp_erl/src/stomp_app.erl b/net/stomp_erl/src/stomp_app.erl new file mode 100644 index 000000000000..2ba3e69f992a --- /dev/null +++ b/net/stomp_erl/src/stomp_app.erl @@ -0,0 +1,11 @@ +-module(stomp_app). +-behaviour(application). + +-export([start/2]). +-export([stop/1]). + +start(_Type, _Args) -> + stomp_sup:start_link(). + +stop(_State) -> + ok. diff --git a/net/stomp_erl/src/stomp_sup.erl b/net/stomp_erl/src/stomp_sup.erl new file mode 100644 index 000000000000..3a298bc9bf8b --- /dev/null +++ b/net/stomp_erl/src/stomp_sup.erl @@ -0,0 +1,22 @@ +-module(stomp_sup). +-behaviour(supervisor). + +-export([start_link/0]). +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + Procs = [stomp_spec()], + {ok, {{one_for_one, 1, 5}, Procs}}. + +%% Private + +stomp_spec() -> + #{id => stomp_proc, + start => {stomp_worker, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + module => [stomp_worker]}. diff --git a/net/stomp_erl/src/stomp_worker.erl b/net/stomp_erl/src/stomp_worker.erl new file mode 100644 index 000000000000..80981d37ab52 --- /dev/null +++ b/net/stomp_erl/src/stomp_worker.erl @@ -0,0 +1,193 @@ +-module(stomp_worker). +-behaviour(gen_server). + +%% API. +-export([start_link/0]). + +%% gen_server. +-export([init/1]). +-export([handle_call/3]). +-export([handle_cast/2]). +-export([handle_info/2]). +-export([terminate/2]). +-export([code_change/3]). + +%% Testing +-compile(export_all). + +-include("stomp.hrl"). + +%% State of a stomp_worker +-record(state, {connection :: port(), + next_sub :: sub_id(), + subscriptions :: #{ destination() => sub_id() }, + subscribers :: #{ destination() => pid() } + }). +-type state() :: #state{}. + +%% API implementation + +-spec start_link() -> {ok, pid()}. +start_link() -> + {ok, Pid} = gen_server:start_link(?MODULE, [], []), + register(?MODULE, Pid), + {ok, Pid}. + +%% gen_server implementation + +-spec init(any()) -> {ok, state()}. +init(_Args) -> + %% Fetch configuration from app config + {ok, Host} = application:get_env(stomp, host), + Port = application:get_env(stomp, port, 61613), + Login = application:get_env(stomp, login), + Pass = application:get_env(stomp, passcode), + + %% Catch exit signals from linked processes (subscribers dying) + process_flag(trap_exit, true), + + %% Establish connection + {ok, Conn} = connect(Host, Port, Login, Pass), + + {ok, #state{connection = Conn, + next_sub = 0, + subscriptions = #{}, + subscribers = #{}}}. + +%% Handle subscription calls +handle_call({subscribe, Dest, Ack}, From, State) -> + %% Subscribe to new destination + SubId = State#state.next_sub, + ok = subscribe(State#state.connection, SubId, Dest, Ack), + + %% Add subscription and subscriber to state + Subscriptions = maps:put(SubId, Dest, State#state.subscriptions), + Subscribers = maps:put(SubId, From, State#state.subscribers), + NextSub = SubId + 1, + NewState = State#state{subscriptions = Subscriptions, + subscribers = Subscribers, + next_sub = NextSub }, + + {reply, {ok, SubId}, NewState}; +handle_call(_Req, _From, State) -> + {reply, ignored, State}. + +handle_info({tcp, Conn, Frame}, State) when Conn =:= State#state.connection -> + handle_frame(Frame, State); +handle_info(_Msg, State) -> + {noreply, State}. + +%% Unused gen_server callbacks + +handle_cast(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Private functions + +-spec connect(list(), integer(), any(), any()) -> {ok, port()}. +connect(Host, Port, Login, Pass) -> + %% STOMP CONNECT frame + Connect = connect_frame(Host, Login, Pass), + + %% TODO: Configurable buffer size + %% Frames larger than the user-level buffer will be truncated, so it should + %% never be smaller than the largest expected messages. + {ok, Socket} = gen_tcp:connect(Host, Port, [binary, + {packet, line}, + {line_delimiter, $\0}, + {buffer, 262144}]), + + ok = gen_tcp:send(Socket, Connect), + {ok, Socket}. + +-spec subscribe(port(), sub_id(), destination(), ack_mode()) -> ok. +subscribe(Socket, Id, Queue, Ack) -> + {ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack), + gen_tcp:send(Socket, SubscribeFrame). + +%%% Parsing STOMP frames + +handle_frame(<<"MESSAGE", "\n", _Frame/binary>>, State) -> + {noreply, State}; +handle_frame(Frame, State) -> + io:format("Received unknown frame ~p", [Frame]), + {noreply, State}. + + +%% Parse out headers into a map +-spec parse_headers(binary()) -> headers(). +parse_headers(HeadersBin) -> + Headers = binary:split(HeadersBin, <<"\n">>, [global]), + ToPairs = fun(H, M) -> [K,V | []] = binary:split(H, <<":">>), + maps:put(K, V, M) + end, + {ok, lists:mapfoldl(ToPairs, #{}, Headers)}. + +%%% Making STOMP protocol frames + +%% Format a header +-spec format_header({binary(), binary()}) -> binary(). +format_header({Key, Val}) -> + <<Key/binary, ":", Val/binary, "\n">>. + +%% Build a single STOMP frame +-spec make_frame(binary(), + headers(), + binary()) + -> {ok, iolist()}. +make_frame(Command, HeaderMap, Body) -> + Headers = lists:map(fun format_header/1, maps:to_list(HeaderMap)), + Frame = [Command, <<"\n">>, Headers, <<"\n">>, Body, <<0>>], + {ok, Frame}. + +%%% Default frames + +-spec connect_frame(list(), any(), any()) -> iolist(). +connect_frame(Host, {ok, Login}, {ok, Pass}) -> + make_frame(<<"CONNECT">>, + #{<<"accept-version">> => <<"1.2">>, + <<"host">> => Host, + <<"login">> => Login, + <<"passcode">> => Pass, + <<"heart-beat">> => <<"0,5000">>}, + []); +connect_frame(Host, _Login, _Pass) -> + make_frame(<<"CONNECT">>, + #{<<"accept-version">> => <<"1.2">>, + <<"host">> => Host, + %% Expect a server heartbeat every 5 seconds, let the server + %% expect one every 10. We don't actually check this and just + %% echo server heartbeats. + %% TODO: For now the server is told not to expect replies due to + %% a weird behaviour. + <<"heart-beat">> => <<"0,5000">>}, + []). + + +-spec subscribe_frame(sub_id(), destination(), ack_mode()) -> iolist(). +subscribe_frame(Id, Queue, Ack) -> + make_frame(<<"SUBSCRIBE">>, + #{<<"id">> => integer_to_binary(Id), + <<"destination">> => Queue, + <<"ack">> => ack_mode_to_binary(Ack)}, + []). + +-spec ack_mode_to_binary(ack_mode()) -> binary(). +ack_mode_to_binary(AckMode) -> + case AckMode of + auto -> <<"auto">>; + client -> <<"client">>; + client_individual -> <<"client-individual">> + end. + +%% -spec ack_frame(binary()) -> iolist(). +%% ack_frame(MessageID) -> +%% make_frame(<<"ACK">>, +%% [{"id", MessageID}], +%% []). |