notify.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package rss
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. log "github.com/go-pkgz/lgr"
  8. "github.com/mmcdole/gofeed"
  9. "github.com/pkg/errors"
  10. )
  11. // Notify on RSS change
  12. type Notify struct {
  13. Feed string
  14. Duration time.Duration
  15. Timeout time.Duration
  16. once sync.Once
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. }
  20. // Event from RSS
  21. type Event struct {
  22. ChanTitle string
  23. Title string
  24. Link string
  25. Text string
  26. GUID string
  27. Item gofeed.Item
  28. Feed gofeed.Feed
  29. }
  30. // Go starts notifier and returns events channel
  31. func (n *Notify) Go(ctx context.Context) <-chan Event {
  32. log.Printf("[INFO] start notifier for %s, every %s", n.Feed, n.Duration)
  33. n.once.Do(func() { n.ctx, n.cancel = context.WithCancel(ctx) })
  34. ch := make(chan Event)
  35. // wait for duration, can be terminated by ctx
  36. waitOrCancel := func(ctx context.Context) bool {
  37. select {
  38. case <-ctx.Done():
  39. return false
  40. case <-time.After(n.Duration):
  41. return true
  42. }
  43. }
  44. go func() {
  45. defer func() {
  46. close(ch)
  47. n.cancel()
  48. }()
  49. fp := gofeed.NewParser()
  50. fp.Client = &http.Client{Timeout: n.Timeout}
  51. log.Printf("[DEBUG] notifier uses http timeout %v", n.Timeout)
  52. lastGUID := ""
  53. for {
  54. feedData, err := fp.ParseURL(n.Feed)
  55. if err != nil {
  56. log.Printf("[WARN] failed to fetch/parse url from %s, %v", n.Feed, err)
  57. if !waitOrCancel(n.ctx) {
  58. return
  59. }
  60. continue
  61. }
  62. event, err := n.feedEvent(feedData)
  63. if lastGUID != event.GUID && err == nil {
  64. if lastGUID != "" { // don't notify on initial change
  65. log.Printf("[INFO] new event %s - %s", event.GUID, event.Title)
  66. ch <- event
  67. } else {
  68. log.Printf("[INFO] ignore first event %s - %s", event.GUID, event.Title)
  69. }
  70. lastGUID = event.GUID
  71. }
  72. if !waitOrCancel(n.ctx) {
  73. log.Print("[WARN] notifier canceled")
  74. return
  75. }
  76. }
  77. }()
  78. return ch
  79. }
  80. // Shutdown notifier
  81. func (n *Notify) Shutdown() {
  82. log.Print("[DEBUG] shutdown initiated")
  83. n.cancel()
  84. <-n.ctx.Done()
  85. }
  86. // feedEvent gets latest item from rss feed
  87. func (n *Notify) feedEvent(feed *gofeed.Feed) (e Event, err error) {
  88. if len(feed.Items) == 0 {
  89. return e, errors.New("no items in rss feed")
  90. }
  91. if feed.Items[0].GUID == "" {
  92. return e, errors.Errorf("no guid for rss entry %+v", feed.Items[0])
  93. }
  94. e.ChanTitle = feed.Title
  95. e.Item = *feed.Items[0]
  96. e.Feed = *feed
  97. e.Title = feed.Items[0].Title
  98. e.Link = feed.Items[0].Link
  99. e.Text = feed.Items[0].Description
  100. e.GUID = feed.Items[0].GUID
  101. return e, nil
  102. }