forked from tinode/chat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhdl_longpoll.go
217 lines (186 loc) · 5.8 KB
/
hdl_longpoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/******************************************************************************
*
* Description :
*
* Handler of long polling clients. See also hdl_websock.go for web sockets and
* hdl_grpc.go for gRPC
*
*****************************************************************************/
package main
import (
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"time"
"github.com/tinode/chat/server/logs"
)
func (sess *Session) sendMessageLp(wrt http.ResponseWriter, msg interface{}) bool {
if len(sess.send) > sendQueueLimit {
logs.Err.Println("longPoll: outbound queue limit exceeded", sess.sid)
return false
}
statsInc("OutgoingMessagesLongpollTotal", 1)
if err := lpWrite(wrt, msg); err != nil {
logs.Err.Println("longPoll: writeOnce failed", sess.sid, err)
return false
}
return true
}
func (sess *Session) writeOnce(wrt http.ResponseWriter, req *http.Request) {
for {
select {
case msg, ok := <-sess.send:
if !ok {
return
}
switch v := msg.(type) {
case *ServerComMessage: // single unserialized message
w := sess.serializeAndUpdateStats(v)
if !sess.sendMessageLp(wrt, w) {
return
}
default: // serialized message
if !sess.sendMessageLp(wrt, v) {
return
}
}
return
case <-sess.bkgTimer.C:
if sess.background {
sess.background = false
sess.onBackgroundTimer()
}
case msg := <-sess.stop:
// Request to close the session. Make it unavailable.
globals.sessionStore.Delete(sess)
// Don't care if lpWrite fails.
if msg != nil {
lpWrite(wrt, msg)
}
return
case topic := <-sess.detach:
// Request to detach the session from a topic.
sess.delSub(topic)
// No 'return' statement here: continue waiting
case <-time.After(pingPeriod):
// just write an empty packet on timeout
if _, err := wrt.Write([]byte{}); err != nil {
logs.Err.Println("longPoll: writeOnce: timout", sess.sid, err)
}
return
case <-req.Context().Done():
// HTTP request canceled or connection lost.
return
}
}
}
func lpWrite(wrt http.ResponseWriter, msg interface{}) error {
// This will panic if msg is not []byte. This is intentional.
wrt.Write(msg.([]byte))
return nil
}
func (sess *Session) readOnce(wrt http.ResponseWriter, req *http.Request) (int, error) {
if req.ContentLength > globals.maxMessageSize {
return http.StatusExpectationFailed, errors.New("request too large")
}
req.Body = http.MaxBytesReader(wrt, req.Body, globals.maxMessageSize)
raw, err := ioutil.ReadAll(req.Body)
if err == nil {
// Locking-unlocking is needed because the client may issue multiple requests in parallel.
// Should not affect performance
sess.lock.Lock()
statsInc("IncomingMessagesLongpollTotal", 1)
sess.dispatchRaw(raw)
sess.lock.Unlock()
return 0, nil
}
return 0, err
}
// serveLongPoll handles long poll connections when WebSocket is not available
// Connection could be without sid or with sid:
// - if sid is empty, create session, expect a login in the same request, respond and close
// - if sid is not empty and there is an initialized session, payload is optional
// - if no payload, perform long poll
// - if payload exists, process it and close
// - if sid is not empty but there is no session, report an error
func serveLongPoll(wrt http.ResponseWriter, req *http.Request) {
now := time.Now().UTC().Round(time.Millisecond)
// Use the lowest common denominator - this is a legacy handler after all (otherwise would use application/json)
wrt.Header().Set("Content-Type", "text/plain")
if globals.tlsStrictMaxAge != "" {
wrt.Header().Set("Strict-Transport-Security", "max-age"+globals.tlsStrictMaxAge)
}
enc := json.NewEncoder(wrt)
if isValid, _ := checkAPIKey(getAPIKey(req)); !isValid {
wrt.WriteHeader(http.StatusForbidden)
enc.Encode(ErrAPIKeyRequired(now))
return
}
// TODO(gene): should it be configurable?
// Currently any domain is allowed to get data from the chat server
wrt.Header().Set("Access-Control-Allow-Origin", "*")
// Ensure the response is not cached
if req.ProtoAtLeast(1, 1) {
wrt.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") // HTTP 1.1
} else {
wrt.Header().Set("Pragma", "no-cache") // HTTP 1.0
}
wrt.Header().Set("Expires", "0") // Proxies
// TODO(gene): respond differently to valious HTTP methods
// Get session id
sid := req.FormValue("sid")
var sess *Session
if sid == "" {
// New session
var count int
sess, count = globals.sessionStore.NewSession(wrt, "")
sess.remoteAddr = getRemoteAddr(req)
logs.Info.Println("longPoll: session started", sess.sid, sess.remoteAddr, count)
wrt.WriteHeader(http.StatusCreated)
pkt := NoErrCreated(req.FormValue("id"), "", now)
pkt.Ctrl.Params = map[string]string{
"sid": sess.sid,
}
enc.Encode(pkt)
return
}
// Existing session
sess = globals.sessionStore.Get(sid)
if sess == nil {
logs.Warn.Println("longPoll: invalid or expired session id", sid)
wrt.WriteHeader(http.StatusForbidden)
enc.Encode(ErrSessionNotFound(now))
return
}
if addr := getRemoteAddr(req); sess.remoteAddr != addr {
sess.remoteAddr = addr
logs.Warn.Println("longPoll: remote address changed", sid, addr)
}
if req.ContentLength != 0 {
// Read payload and send it for processing.
if code, err := sess.readOnce(wrt, req); err != nil {
logs.Warn.Println("longPoll: readOnce failed", sess.sid, err)
// Failed to read request, report an error, if possible
if code != 0 {
wrt.WriteHeader(code)
} else {
wrt.WriteHeader(http.StatusBadRequest)
}
enc.Encode(ErrMalformed(req.FormValue("id"), "", now))
}
return
}
sess.writeOnce(wrt, req)
}
// Obtain IP address of the client.
func getRemoteAddr(req *http.Request) string {
var addr string
if globals.useXForwardedFor {
addr = req.Header.Get("X-Forwarded-For")
}
if addr != "" {
return addr
}
return req.RemoteAddr
}