Posted By

ehedenst on 08/17/10


Tagged

twitter erlang


Versions (?)

Erlang Twitter Streams


 / Published in: Other
 

URL: http://blog.jebu.net/2009/09/erlang-tap-to-the-twitter-stream/

  1. -module(twitter_stream).
  2. -author('[email protected]').
  3. %%
  4. %% Copyright (c) 2009, Jebu Ittiachen
  5. %% All rights reserved.
  6. %%
  7. %% Redistribution and use in source and binary forms, with or without modification, are
  8. %% permitted provided that the following conditions are met:
  9. %%
  10. %% 1. Redistributions of source code must retain the above copyright notice, this list of
  11. %% conditions and the following disclaimer.
  12. %%
  13. %% 2. Redistributions in binary form must reproduce the above copyright notice, this list
  14. %% of conditions and the following disclaimer in the documentation and/or other materials
  15. %% provided with the distribution.
  16. %%
  17. %% THIS SOFTWARE IS PROVIDED BY JEBU ITTIACHEN ``AS IS'' AND ANY EXPRESS OR IMPLIED
  18. %% WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
  19. %% FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JEBU ITTIACHEN OR
  20. %% CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  21. %% CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  22. %% SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
  23. %% ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
  24. %% NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
  25. %% ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. %%
  27. %% The views and conclusions contained in the software and documentation are those of the
  28. %% authors and should not be interpreted as representing official policies, either expressed
  29. %% or implied, of Jebu Ittiachen.
  30. %%
  31. %% API
  32. -export([fetch/1, fetch/3, process_data/1]).
  33.  
  34. % single arg version expects url of the form http://user:[email protected]/1/statuses/sample.json
  35. % this will spawn the 3 arg version so the shell is free
  36. fetch(URL) ->
  37. spawn(twitter_stream, fetch, [URL, 5, 30]).
  38.  
  39. % 3 arg version expects url of the form http://user:[email protected]/1/statuses/sample.json
  40. % retry - number of times the stream is reconnected
  41. % sleep - secs to sleep between retries.
  42. fetch(URL, Retry, Sleep) when Retry > 0 ->
  43. % setup the request to process async
  44. % and have it stream the data back to this process
  45. try http:request(get,
  46. {URL, []},
  47. [],
  48. [{sync, false},
  49. {stream, self}]) of
  50. {ok, RequestId} ->
  51. case receive_chunk(RequestId) of
  52. {ok, _} ->
  53. % stream broke normally retry
  54. timer:sleep(Sleep * 1000),
  55. fetch(URL, Retry - 1, Sleep);
  56. {error, unauthorized, Result} ->
  57. {error, Result, unauthorized};
  58. {error, timeout} ->
  59. timer:sleep(Sleep * 1000),
  60. fetch(URL, Retry - 1, Sleep);
  61. {_, Reason} ->
  62. error_logger:info_msg("Got some Reason ~p ~n", [Reason]),
  63. timer:sleep(Sleep * 1000),
  64. fetch(URL, Retry - 1, Sleep)
  65. end;
  66. _ ->
  67. timer:sleep(Sleep * 1000),
  68. fetch(URL, Retry - 1, Sleep)
  69. catch
  70. _:_ ->
  71. timer:sleep(Sleep * 1000),
  72. fetch(URL, Retry - 1, Sleep)
  73. end;
  74. %
  75. fetch(_, Retry, _) when Retry =< 0 ->
  76. error_logger:info_msg("No more retries done with processing fetch thread~n"),
  77. {error, no_more_retry}.
  78. %
  79. % this is the tweet handler persumably you could do something useful here
  80. %
  81. process_data(Data) ->
  82. error_logger:info_msg("Received tweet ~p ~n", [Data]),
  83. ok.
  84.  
  85. %%====================================================================
  86. %% Internal functions
  87. %%====================================================================
  88. receive_chunk(RequestId) ->
  89. receive
  90. {http, {RequestId, {error, Reason}}} when(Reason =:= etimedout) orelse(Reason =:= timeout) ->
  91. {error, timeout};
  92. {http, {RequestId, {{_, 401, _} = Status, Headers, _}}} ->
  93. {error, unauthorized, {Status, Headers}};
  94. {http, {RequestId, Result}} ->
  95. {error, Result};
  96.  
  97. %% start of streaming data
  98. {http,{RequestId, stream_start, Headers}} ->
  99. error_logger:info_msg("Streaming data start ~p ~n",[Headers]),
  100. receive_chunk(RequestId);
  101.  
  102. %% streaming chunk of data
  103. %% this is where we will be looping around,
  104. %% we spawn this off to a seperate process as soon as we get the chunk and go back to receiving the tweets
  105. {http,{RequestId, stream, Data}} ->
  106. spawn(twitter_stream, process_data, [Data]),
  107. receive_chunk(RequestId);
  108.  
  109. %% end of streaming data
  110. {http,{RequestId, stream_end, Headers}} ->
  111. error_logger:info_msg("Streaming data end ~p ~n", [Headers]),
  112. {ok, RequestId}
  113.  
  114. %% timeout
  115. after 60 * 1000 ->
  116. {error, timeout}
  117.  
  118. end.

Report this snippet  

You need to login to post a comment.