streaming.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package anaconda
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "github.com/dustin/go-jsonpointer"
  10. )
  11. const (
  12. BaseUrlUserStream = "https://userstream.twitter.com/1.1"
  13. BaseUrlSiteStream = "https://sitestream.twitter.com/1.1"
  14. BaseUrlStream = "https://stream.twitter.com/1.1"
  15. )
  16. // messages
  17. type StatusDeletionNotice struct {
  18. Id int64 `json:"id"`
  19. IdStr string `json:"id_str"`
  20. UserId int64 `json:"user_id"`
  21. UserIdStr string `json:"user_id_str"`
  22. }
  23. type statusDeletionNotice struct {
  24. Delete *struct {
  25. Status *StatusDeletionNotice `json:"status"`
  26. } `json:"delete"`
  27. }
  28. type DirectMessageDeletionNotice struct {
  29. Id int64 `json:"id"`
  30. IdStr string `json:"id_str"`
  31. UserId int64 `json:"user_id"`
  32. UserIdStr string `json:"user_id_str"`
  33. }
  34. type directMessageDeletionNotice struct {
  35. Delete *struct {
  36. DirectMessage *DirectMessageDeletionNotice `json:"direct_message"`
  37. } `json:"delete"`
  38. }
  39. type LocationDeletionNotice struct {
  40. UserId int64 `json:"user_id"`
  41. UserIdStr string `json:"user_id_str"`
  42. UpToStatusId int64 `json:"up_to_status_id"`
  43. UpToStatusIdStr string `json:"up_to_status_id_str"`
  44. }
  45. type locationDeletionNotice struct {
  46. ScrubGeo *LocationDeletionNotice `json:"scrub_geo"`
  47. }
  48. type LimitNotice struct {
  49. Track int64 `json:"track"`
  50. }
  51. type limitNotice struct {
  52. Limit *LimitNotice `json:"limit"`
  53. }
  54. type StatusWithheldNotice struct {
  55. Id int64 `json:"id"`
  56. UserId int64 `json:"user_id"`
  57. WithheldInCountries []string `json:"withheld_in_countries"`
  58. }
  59. type statusWithheldNotice struct {
  60. StatusWithheld *StatusWithheldNotice `json:"status_withheld"`
  61. }
  62. type UserWithheldNotice struct {
  63. Id int64 `json:"id"`
  64. WithheldInCountries []string `json:"withheld_in_countries"`
  65. }
  66. type userWithheldNotice struct {
  67. UserWithheld *UserWithheldNotice `json:"user_withheld"`
  68. }
  69. type DisconnectMessage struct {
  70. Code int64 `json:"code"`
  71. StreamName string `json:"stream_name"`
  72. Reason string `json:"reason"`
  73. }
  74. type disconnectMessage struct {
  75. Disconnect *DisconnectMessage `json:"disconnect"`
  76. }
  77. type StallWarning struct {
  78. Code string `json:"code"`
  79. Message string `json:"message"`
  80. PercentFull int64 `json:"percent_full"`
  81. }
  82. type stallWarning struct {
  83. Warning *StallWarning `json:"warning"`
  84. }
  85. type FriendsList []int64
  86. type friendsList struct {
  87. Friends *FriendsList `json:"friends"`
  88. }
  89. type streamDirectMessage struct {
  90. DirectMessage *DirectMessage `json:"direct_message"`
  91. }
  92. type Event struct {
  93. Target *User `json:"target"`
  94. Source *User `json:"source"`
  95. Event string `json:"event"`
  96. CreatedAt string `json:"created_at"`
  97. }
  98. type EventList struct {
  99. Event
  100. TargetObject *List `json:"target_object"`
  101. }
  102. type EventTweet struct {
  103. Event
  104. TargetObject *Tweet `json:"target_object"`
  105. }
  106. type EventFollow struct {
  107. Event
  108. }
  109. type TooManyFollow struct {
  110. Warning *struct {
  111. Code string `json:"code"`
  112. Message string `json:"message"`
  113. UserId int64 `json:"user_id"`
  114. } `json:"warning"`
  115. }
  116. // TODO: Site Stream messages. I cant test.
  117. // Stream allows you to stream using one of the
  118. // PublicStream* or UserStream api methods
  119. //
  120. // A go loop is started an gives you an stream that sends interface{}
  121. // objects through it's chan C
  122. // Objects which you can cast into a tweet like this :
  123. // t, ok := o.(twitter.Tweet) // try casting into a tweet
  124. // if !ok {
  125. // log.Debug("Recieved non tweet message")
  126. // }
  127. //
  128. // If we can't stream the chan will be closed.
  129. // Otherwise the loop will connect and send streams in the chan.
  130. // It will also try to reconnect itself after an exponential backoff time
  131. // if the connection is lost
  132. // If twitter response is one of 420, 429 or 503 (meaning "wait a sec")
  133. // the loop retries to open the socket with a simple autogrowing backoff.
  134. //
  135. // When finished streaming call stream.Stop() to initiate termination process.
  136. //
  137. type Stream struct {
  138. api TwitterApi
  139. C chan interface{}
  140. run bool
  141. }
  142. func (s *Stream) listen(response http.Response) {
  143. if response.Body != nil {
  144. defer response.Body.Close()
  145. }
  146. s.api.Log.Notice("Listening to twitter socket")
  147. defer s.api.Log.Notice("twitter socket closed, leaving loop")
  148. scanner := bufio.NewScanner(response.Body)
  149. for scanner.Scan() && s.run {
  150. j := scanner.Bytes()
  151. if len(j) == 0 {
  152. s.api.Log.Debug("Empty bytes... Moving along")
  153. } else {
  154. s.C <- jsonToKnownType(j)
  155. }
  156. }
  157. }
  158. func jsonToKnownType(j []byte) interface{} {
  159. // TODO: DRY
  160. if o := new(Tweet); jsonAsStruct(j, "/source", &o) {
  161. return *o
  162. } else if o := new(statusDeletionNotice); jsonAsStruct(j, "/delete/status", &o) {
  163. return *o.Delete.Status
  164. } else if o := new(directMessageDeletionNotice); jsonAsStruct(j, "/delete/direct_message", &o) {
  165. return *o.Delete.DirectMessage
  166. } else if o := new(locationDeletionNotice); jsonAsStruct(j, "/scrub_geo", &o) {
  167. return *o.ScrubGeo
  168. } else if o := new(limitNotice); jsonAsStruct(j, "/limit", &o) {
  169. return *o.Limit
  170. } else if o := new(statusWithheldNotice); jsonAsStruct(j, "/status_withheld", &o) {
  171. return *o.StatusWithheld
  172. } else if o := new(userWithheldNotice); jsonAsStruct(j, "/user_withheld", &o) {
  173. return *o.UserWithheld
  174. } else if o := new(disconnectMessage); jsonAsStruct(j, "/disconnect", &o) {
  175. return *o.Disconnect
  176. } else if o := new(stallWarning); jsonAsStruct(j, "/warning", &o) {
  177. return *o.Warning
  178. } else if o := new(friendsList); jsonAsStruct(j, "/friends", &o) {
  179. return *o.Friends
  180. } else if o := new(streamDirectMessage); jsonAsStruct(j, "/direct_message", &o) {
  181. return *o.DirectMessage
  182. } else if o := new(EventTweet); jsonAsStruct(j, "/target_object/source", &o) {
  183. return *o
  184. } else if o := new(EventList); jsonAsStruct(j, "/target_object/slug", &o) {
  185. return *o
  186. } else if o := new(Event); jsonAsStruct(j, "/target_object", &o) {
  187. return *o
  188. } else if o := new(EventFollow); jsonAsStruct(j, "/event", &o) {
  189. return *o
  190. } else {
  191. return nil
  192. }
  193. }
  194. func (s *Stream) requestStream(urlStr string, v url.Values, method int) (resp *http.Response, err error) {
  195. switch method {
  196. case _GET:
  197. return s.api.oauthClient.Get(s.api.HttpClient, s.api.Credentials, urlStr, v)
  198. case _POST:
  199. return s.api.oauthClient.Post(s.api.HttpClient, s.api.Credentials, urlStr, v)
  200. default:
  201. }
  202. return nil, fmt.Errorf("HTTP method not yet supported")
  203. }
  204. func (s *Stream) loop(urlStr string, v url.Values, method int) {
  205. defer s.api.Log.Debug("Leaving request stream loop")
  206. defer close(s.C)
  207. rlb := NewHTTP420ErrBackoff()
  208. for s.run {
  209. resp, err := s.requestStream(urlStr, v, method)
  210. if err != nil {
  211. if err == io.EOF {
  212. // Sometimes twitter closes the stream
  213. // right away with EOF as of a rate limit
  214. resp.StatusCode = 420
  215. } else {
  216. s.api.Log.Criticalf("Cannot request stream : %s", err)
  217. return
  218. }
  219. }
  220. s.api.Log.Debugf("Response status=%s code=%d", resp.Status, resp.StatusCode)
  221. switch resp.StatusCode {
  222. case 200, 304:
  223. s.listen(*resp)
  224. rlb.Reset()
  225. case 420, 429, 503:
  226. s.api.Log.Noticef("Twitter streaming: backing off as got : %+s", resp.Status)
  227. rlb.BackOff()
  228. case 400, 401, 403, 404, 406, 410, 422, 500, 502, 504:
  229. s.api.Log.Criticalf("Twitter streaming: leaving after an irremediable error: %+s", resp.Status)
  230. return
  231. default:
  232. s.api.Log.Notice("Received unknown status: %+s", resp.StatusCode)
  233. }
  234. }
  235. }
  236. func (s *Stream) Stop() {
  237. s.run = false
  238. }
  239. func (s *Stream) start(urlStr string, v url.Values, method int) {
  240. s.run = true
  241. go s.loop(urlStr, v, method)
  242. }
  243. func (a TwitterApi) newStream(urlStr string, v url.Values, method int) *Stream {
  244. stream := Stream{
  245. api: a,
  246. C: make(chan interface{}),
  247. }
  248. stream.start(urlStr, v, method)
  249. return &stream
  250. }
  251. func (a TwitterApi) UserStream(v url.Values) (stream *Stream) {
  252. return a.newStream(BaseUrlUserStream+"/user.json", v, _GET)
  253. }
  254. func (a TwitterApi) PublicStreamSample(v url.Values) (stream *Stream) {
  255. return a.newStream(BaseUrlStream+"/statuses/sample.json", v, _GET)
  256. }
  257. // XXX: To use this API authority is requied. but I dont have this. I cant test.
  258. func (a TwitterApi) PublicStreamFirehose(v url.Values) (stream *Stream) {
  259. return a.newStream(BaseUrlStream+"/statuses/firehose.json", v, _GET)
  260. }
  261. // XXX: PublicStream(Track|Follow|Locations) func is needed?
  262. func (a TwitterApi) PublicStreamFilter(v url.Values) (stream *Stream) {
  263. return a.newStream(BaseUrlStream+"/statuses/filter.json", v, _POST)
  264. }
  265. // XXX: To use this API authority is requied. but I dont have this. I cant test.
  266. func (a TwitterApi) SiteStream(v url.Values) (stream *Stream) {
  267. return a.newStream(BaseUrlSiteStream+"/site.json", v, _GET)
  268. }
  269. func jsonAsStruct(j []byte, path string, obj interface{}) (res bool) {
  270. if v, _ := jsonpointer.Find(j, path); v == nil {
  271. return false
  272. }
  273. err := json.Unmarshal(j, obj)
  274. return err == nil
  275. }