Initial commit
[bloat] / mastodon / streaming.go
1 package mastodon
2
3 import (
4         "bufio"
5         "context"
6         "encoding/json"
7         "io"
8         "net/http"
9         "net/url"
10         "path"
11         "strings"
12 )
13
14 // UpdateEvent is struct for passing status event to app.
15 type UpdateEvent struct {
16         Status *Status `json:"status"`
17 }
18
19 func (e *UpdateEvent) event() {}
20
21 // NotificationEvent is struct for passing notification event to app.
22 type NotificationEvent struct {
23         Notification *Notification `json:"notification"`
24 }
25
26 func (e *NotificationEvent) event() {}
27
28 // DeleteEvent is struct for passing deletion event to app.
29 type DeleteEvent struct{ ID string }
30
31 func (e *DeleteEvent) event() {}
32
33 // ErrorEvent is struct for passing errors to app.
34 type ErrorEvent struct{ err error }
35
36 func (e *ErrorEvent) event()        {}
37 func (e *ErrorEvent) Error() string { return e.err.Error() }
38
39 // Event is interface passing events to app.
40 type Event interface {
41         event()
42 }
43
44 func handleReader(q chan Event, r io.Reader) error {
45         var name string
46         s := bufio.NewScanner(r)
47         for s.Scan() {
48                 line := s.Text()
49                 token := strings.SplitN(line, ":", 2)
50                 if len(token) != 2 {
51                         continue
52                 }
53                 switch strings.TrimSpace(token[0]) {
54                 case "event":
55                         name = strings.TrimSpace(token[1])
56                 case "data":
57                         var err error
58                         switch name {
59                         case "update":
60                                 var status Status
61                                 err = json.Unmarshal([]byte(token[1]), &status)
62                                 if err == nil {
63                                         q <- &UpdateEvent{&status}
64                                 }
65                         case "notification":
66                                 var notification Notification
67                                 err = json.Unmarshal([]byte(token[1]), &notification)
68                                 if err == nil {
69                                         q <- &NotificationEvent{&notification}
70                                 }
71                         case "delete":
72                                 q <- &DeleteEvent{ID: string(strings.TrimSpace(token[1]))}
73                         }
74                         if err != nil {
75                                 q <- &ErrorEvent{err}
76                         }
77                 }
78         }
79         return s.Err()
80 }
81
82 func (c *Client) streaming(ctx context.Context, p string, params url.Values) (chan Event, error) {
83         u, err := url.Parse(c.config.Server)
84         if err != nil {
85                 return nil, err
86         }
87         u.Path = path.Join(u.Path, "/api/v1/streaming", p)
88         u.RawQuery = params.Encode()
89
90         req, err := http.NewRequest(http.MethodGet, u.String(), nil)
91         if err != nil {
92                 return nil, err
93         }
94         req = req.WithContext(ctx)
95         req.Header.Set("Authorization", "Bearer "+c.config.AccessToken)
96
97         q := make(chan Event)
98         go func() {
99                 defer close(q)
100                 for {
101                         select {
102                         case <-ctx.Done():
103                                 return
104                         default:
105                         }
106
107                         c.doStreaming(req, q)
108                 }
109         }()
110         return q, nil
111 }
112
113 func (c *Client) doStreaming(req *http.Request, q chan Event) {
114         resp, err := c.Do(req)
115         if err != nil {
116                 q <- &ErrorEvent{err}
117                 return
118         }
119         defer resp.Body.Close()
120
121         if resp.StatusCode != http.StatusOK {
122                 q <- &ErrorEvent{parseAPIError("bad request", resp)}
123                 return
124         }
125
126         err = handleReader(q, resp.Body)
127         if err != nil {
128                 q <- &ErrorEvent{err}
129         }
130 }
131
132 // StreamingUser return channel to read events on home.
133 func (c *Client) StreamingUser(ctx context.Context) (chan Event, error) {
134         return c.streaming(ctx, "user", nil)
135 }
136
137 // StreamingPublic return channel to read events on public.
138 func (c *Client) StreamingPublic(ctx context.Context, isLocal bool) (chan Event, error) {
139         p := "public"
140         if isLocal {
141                 p = path.Join(p, "local")
142         }
143
144         return c.streaming(ctx, p, nil)
145 }
146
147 // StreamingHashtag return channel to read events on tagged timeline.
148 func (c *Client) StreamingHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error) {
149         params := url.Values{}
150         params.Set("tag", tag)
151
152         p := "hashtag"
153         if isLocal {
154                 p = path.Join(p, "local")
155         }
156
157         return c.streaming(ctx, p, params)
158 }
159
160 // StreamingList return channel to read events on a list.
161 func (c *Client) StreamingList(ctx context.Context, id string) (chan Event, error) {
162         params := url.Values{}
163         params.Set("list", string(id))
164
165         return c.streaming(ctx, "list", params)
166 }