diff options
-rw-r--r-- | include/stomp.hrl | 3 | ||||
-rw-r--r-- | src/stomp_worker.erl | 26 |
2 files changed, 27 insertions, 2 deletions
diff --git a/include/stomp.hrl b/include/stomp.hrl index 8ac197f78e01..30c933b56302 100644 --- a/include/stomp.hrl +++ b/include/stomp.hrl @@ -17,3 +17,6 @@ -record(stomp_msg, { headers :: #{ binary() => binary() }, body :: binary() }). -type stomp_msg() :: #stomp_msg{}. + +%% STOMP frame components +-type headers() :: #{binary() => binary()}. diff --git a/src/stomp_worker.erl b/src/stomp_worker.erl index bbdda73eac5e..b4604981836d 100644 --- a/src/stomp_worker.erl +++ b/src/stomp_worker.erl @@ -69,11 +69,13 @@ handle_call({subscribe, Dest, Ack}, From, State) -> handle_call(_Req, _From, State) -> {reply, ignored, State}. -%% Unused gen_server callbacks - +handle_info({tcp, Conn, Frame}, State#state{connection = Conn}) -> + handle_frame(Frame, State); handle_info(_Msg, State) -> {noreply, State}. +%% Unused gen_server callbacks + handle_cast(_Msg, State) -> {noreply, State}. @@ -106,6 +108,26 @@ subscribe(Socket, Id, Queue, Ack) -> {ok, SubscribeFrame} = subscribe_frame(Id, Queue, Ack), gen_tcp:send(Socket, SubscribeFrame). +%%% Parsing STOMP frames + +handle_frame(<<"MESSAGE", "\n", Frame/binary>>, + #state{subscribers = Subscribers, + subscriptions = Subscriptions}) -> + + {noreply, State}; +handle_frame(Frame, State) -> + io:format("Received unknown frame ~p", [Frame]), + {noreply, State}. + + +%% Parse out headers into a map +-spec parse_headers(binary()) -> headers(). +parse_headers(HeadersBin) -> + Headers = binary:split(HeadersBin, <<"\n">>, [global]), + ToPairs = fun(H, M) -> [K,V | []] = binary:split(H, <<":">>), + maps:put(K, V, M) + end, + {ok, lists:mapfoldl(ToPairs, #{}, Headers)}. %%% Making STOMP protocol frames |