-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathotherHttpServer2.go
616 lines (570 loc) · 14.6 KB
/
otherHttpServer2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
package main
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/textproto"
"net/url"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/http2"
)
func main() {
srv := NewServerEudore()
srv.SetHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method, r.RequestURI)
w.Write([]byte("eudore server"))
}))
srv.ListenAndServe(":8088")
}
var (
crlf = []byte("\r\n")
colonSpace = []byte(": ")
constinueMsg = []byte("HTTP/1.1 100 Continue\r\n\r\n")
rwPool = sync.Pool{
New: func() interface{} {
return &Response{
request: Request{
Request: http.Request{
ProtoMajor: 1,
ProtoMinor: 1,
},
reader: bufio.NewReaderSize(nil, 2048),
},
writer: bufio.NewWriterSize(nil, 2048),
buf: make([]byte, 2048),
}
},
}
// ErrLineInvalid 定义http请求行无效的错误。
ErrLineInvalid = errors.New("request line is invalid")
)
// Server 定义http server。
type Server struct {
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
ctx context.Context
mu sync.Mutex
wg sync.WaitGroup
listeners []net.Listener
proto string
// nextHandler protocol.HandlerConn
httpHandler http.Handler
serverHandler func(context.Context, net.Conn, http.Handler)
nextHandler func(context.Context, *tls.Conn, http.Handler)
Print func(...interface{}) `alias:"print"`
}
// NewServerEudore 方法创建一个server
func NewServerEudore() *Server {
return &Server{
ReadTimeout: 60 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 60 * time.Second,
ctx: context.Background(),
httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
serverHandler: httpHandlerr,
nextHandler: NewHTTP2Handler(),
proto: "h2",
}
}
// ListenAndServe 方法监听一个tcp连接,并启动服务。
func (srv *Server) ListenAndServe(addr string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(ln)
}
// ListenAndServeTLS 方法监听一个tcp连接,并启动服务。
func (srv *Server) ListenAndServeTLS(addr, certFile, keyFile string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
config := &tls.Config{
Certificates: make([]tls.Certificate, 1),
PreferServerCipherSuites: true,
}
if config.NextProtos == nil && len(srv.proto) > 0 {
config.NextProtos = []string{srv.proto}
}
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return err
}
return srv.Serve(tls.NewListener(ln, config))
}
// Serve 方法服务处理监听
func (srv *Server) Serve(ln net.Listener) error {
srv.mu.Lock()
for _, i := range srv.listeners {
if i == ln {
return fmt.Errorf("ln is serve status")
}
}
srv.listeners = append(srv.listeners, ln)
srv.mu.Unlock()
for {
// 读取连接
c, err := ln.Accept()
// 错误连接丢弃
if err != nil {
// 等待重试
if ne, ok := err.(net.Error); ok && ne.Temporary() {
time.Sleep(5 * time.Millisecond)
continue
}
return err
}
// Handle new connections
// 处理新连接
go srv.newConnServe(c)
}
}
func (srv *Server) newConnServe(conn net.Conn) {
remoteAddr := conn.RemoteAddr().String()
ctx := context.WithValue(srv.ctx, http.LocalAddrContextKey, remoteAddr)
if tlsConn, ok := conn.(*tls.Conn); ok {
if err := tlsConn.Handshake(); err != nil {
srv.Print(fmt.Errorf("TLS handshake error from %s: %v", conn.RemoteAddr(), err))
return
}
if proto := tlsConn.ConnectionState().NegotiatedProtocol; validNPN(proto) && proto == srv.proto && srv.nextHandler != nil {
srv.nextHandler(ctx, tlsConn, srv.httpHandler)
return
}
}
srv.serverHandler(ctx, conn, srv.httpHandler)
}
// Shutdown 方法关闭Server
func (srv *Server) Shutdown(ctx context.Context) error {
var stop = make(chan error)
go func() {
stop <- srv.Close()
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-stop:
return err
}
}
}
// Close 方法关闭Server
func (srv *Server) Close() (err error) {
srv.mu.Lock()
for _, ln := range srv.listeners {
if e := ln.Close(); e != nil && err == nil {
err = e
}
}
srv.listeners = nil
srv.mu.Unlock()
return err
}
// validNPN reports whether the proto is not a blacklisted Next
// Protocol Negotiation protocol. Empty and built-in protocol types
// are blacklisted and can't be overridden with alternate
// implementations.
func validNPN(proto string) bool {
switch proto {
case "", "http/1.1", "http/1.0":
return false
}
return true
}
// SetHandler 方法设置server的请求处理者
func (srv *Server) SetHandler(h http.Handler) {
srv.httpHandler = h
}
// SetDefaulteHandler 方法设置默认http处理函数。
func (srv *Server) SetServerHandler(h func(context.Context, net.Conn, http.Handler)) {
srv.serverHandler = h
}
// SetnextHandlerr 方法设置serve的tls处理函数。
func (srv *Server) SetnextHandlerr(proto string, h func(context.Context, *tls.Conn, http.Handler)) error {
switch proto {
case "h2":
srv.proto, srv.nextHandler = proto, h
return nil
}
return fmt.Errorf("tls nosuppered npn proto")
}
// httpHandlerr 函数处理http/1.1请求
func httpHandlerr(pctx context.Context, conn net.Conn, handler http.Handler) {
// Initialize the request object.
// 初始化请求对象。
resp := rwPool.Get().(*Response)
for {
// c.SetReadDeadline(time.Now().Add(h.ReadTimeout))
err := resp.request.Reset(conn)
if err != nil {
// handler error
if isNotCommonNetReadError(err) {
// h.Print("eudore http request read error: ", err)
}
break
}
resp.Reset(conn)
ctx, cancelCtx := context.WithCancel(pctx)
resp.cancel = cancelCtx
// 处理请求
// c.SetWriteDeadline(time.Now().Add(h.WriteTimeout))
handler.ServeHTTP(resp, resp.request.Request.WithContext(ctx))
if resp.ishjack {
return
}
resp.finalFlush()
if resp.request.Header.Get("Connection") != "keep-alive" {
break
}
// c.SetDeadline(time.Now().Add(h.IdleTimeout))
}
conn.Close()
rwPool.Put(resp)
}
// isNotCommonNetReadError 函数检查net读取错误是否未非通用错误。
func isNotCommonNetReadError(err error) bool {
if err == io.EOF {
return false
}
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
return false
}
if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
return false
}
return true
}
// NewHTTP2Handler 方法创建一个h2处理函数。
func NewHTTP2Handler() func(context.Context, *tls.Conn, http.Handler) {
h2svc := &http2.Server{}
return func(ctx context.Context, conn *tls.Conn, h http.Handler) {
h2svc.ServeConn(conn, &http2.ServeConnOpts{
Context: ctx,
Handler: h,
})
}
}
// Request 定义一个http请求。
type Request struct {
http.Request
conn net.Conn
// read body
reader *bufio.Reader
nextLength int64
expect bool
mu sync.Mutex
}
// Reset 方法重置请求对象
func (r *Request) Reset(conn net.Conn) error {
r.conn = conn
r.reader.Reset(conn)
r.Header = make(http.Header)
// Read the http request line.
// 读取http请求行。
line, err := r.readLine()
if err != nil {
return err
}
// 读取请求行,sawEOF作为临时变量
r.Method, r.RequestURI, r.Proto, err = parseRequestLine(line)
if err != nil {
return err
}
// 初始化path和uri参数。
r.URL, err = url.ParseRequestURI(r.RequestURI)
if err != nil {
return err
}
// 读取http headers
for {
// 读取一行内容。
line, err = r.readLine()
if err != nil || len(line) == 0 {
break
}
// 分割成header存储到请求中。
r.Header.Add(splitHeader(line))
}
r.Host = r.Header.Get("Host")
r.RemoteAddr = conn.RemoteAddr().String()
// 从header中读取请求body长度,如果无body直接长度为0,未处理分段传输。
lenstr := r.Header.Get("Content-Length")
if len(lenstr) > 0 {
r.ContentLength, err = strconv.ParseInt(lenstr, 10, 64)
if err != nil {
return err
}
r.expect = r.Header.Get("Expect") == "100-continue"
r.nextLength = r.ContentLength
r.Body = r
} else {
r.ContentLength = 0
r.Body = http.NoBody
}
return nil
}
// Read 方法读取数据,实现io.Reader。
func (r *Request) Read(p []byte) (int, error) {
r.mu.Lock()
if r.expect {
r.expect = false
r.conn.Write(constinueMsg)
}
if r.nextLength <= 0 {
return 0, io.EOF
}
if int64(len(p)) > r.nextLength {
p = p[0:r.nextLength]
}
n, err := r.reader.Read(p)
r.nextLength -= int64(n)
r.mu.Unlock()
return n, err
}
// Close 方法关闭read。
func (r *Request) Close() error {
r.mu.Lock()
io.CopyN(ioutil.Discard, r.reader, r.nextLength)
r.mu.Unlock()
return nil
}
// 从请求读取一行数据
func (r *Request) readLine() ([]byte, error) {
// r.closeDot()
var line []byte
for {
l, more, err := r.reader.ReadLine()
if err != nil {
return nil, err
}
// Avoid the copy if the first call produced a full line.
if line == nil && !more {
return l, nil
}
line = append(line, l...)
if !more {
break
}
}
return line, nil
}
// parseRequestLine parses "GET /foo HTTP/1.1" into its three parts.
func parseRequestLine(line []byte) (method, requestURI, proto string, err error) {
s1 := bytes.IndexByte(line, ' ')
s2 := bytes.IndexByte(line[s1+1:], ' ')
if s1 < 0 || s2 < 0 {
return method, requestURI, proto, ErrLineInvalid
}
s2 += s1 + 1
return string(line[:s1]), string(line[s1+1 : s2]), string(line[s2+1:]), nil
}
// 将header的键值切分
func splitHeader(line []byte) (string, string) {
i := bytes.Index(line, colonSpace)
if i != -1 {
return textproto.CanonicalMIMEHeaderKey(string(line[:i])), string(line[i+2:])
}
return "", ""
}
// TimeFormat 定义响应header写入Date的时间格式。
const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
// Response 定义http响应对象
type Response struct {
request Request
writer *bufio.Writer
header http.Header
status int
size int
iswrite bool
chunked bool
ishjack bool
// buffer
buf []byte
n int
err error
//
cancel context.CancelFunc
}
// cancelConn 定义Conn在Close时执行Context cancel
type cancelConn struct {
net.Conn
cancel context.CancelFunc
}
// Reset 方法重置http响应状态
func (w *Response) Reset(conn net.Conn) {
w.writer.Reset(conn)
w.header = make(http.Header)
w.status = 200
w.size = 0
w.iswrite = false
w.chunked = false
w.ishjack = false
w.err = nil
w.n = 0
}
// Header 方法获得http响应header对象。
func (w *Response) Header() http.Header {
return w.header
}
// WriteHeader 方法写入状态码
func (w *Response) WriteHeader(codeCode int) {
w.status = codeCode
}
// Write 方法写入数据,如果写入数据长度小于缓冲,不会立刻返回,也不会写入状态行。
func (w *Response) Write(p []byte) (int, error) {
// 数据大于缓冲,发送数据
if w.n+len(p) > len(w.buf) {
// 写入数据
n, _ := w.writeDate(p, len(p))
// 更新数据长度
w.size += n
return n, w.err
}
// 数据小于缓存,保存
n := copy(w.buf[w.n:], p)
w.n += n
// 更新数据长度
w.size += n
return n, nil
}
// writeDate 方法写入数据并返回。
//
// 会先写入缓冲数据,然后将当前数据写入
//
// 提升分块效率,会将大小两块合并发送。
func (w *Response) writeDate(p []byte, length int) (n int, err error) {
// 写入状态行
w.writerResponseLine()
// 如果有写入错误,或者数据长度为0则返回。
if w.err != nil || (length+w.n) == 0 {
return 0, w.err
}
// 数据写入
if w.chunked {
// 分块写入
fmt.Fprintf(w.writer, "%x\r\n", length+w.n)
// 写入缓冲数据和当前数据
w.writer.Write(w.buf[0:w.n])
n, err = w.writer.Write(p)
// 分块结束
w.writer.Write([]byte{13, 10})
} else {
w.writer.Write(w.buf[0:w.n])
n, err = w.writer.Write(p)
}
w.n = 0
// 检测写入的长度
if n < length {
err = io.ErrShortWrite
}
w.err = err
return
}
// writerResponseLine 方法写入状态行
func (w *Response) writerResponseLine() {
// 已经写入则返回
if w.iswrite {
return
}
// 设置写入标志为true。
w.iswrite = true
// Write response line
// 写入响应行
fmt.Fprintf(w.writer, "%s %d %s\r\n", w.request.Proto, w.status, http.StatusText(w.status))
// Write headers
// 写入headers
for key, vals := range w.header {
for _, val := range vals {
fmt.Fprintf(w.writer, "%s: %s\r\n", key, val)
}
}
// 写入时间和Server
fmt.Fprintf(w.writer, "Date: %s\r\nServer: eudore\r\n", time.Now().UTC().Format(TimeFormat))
// 检测是否有写入长度,没有则进行分块传输。
// 未检测Content-Length值是否合法
w.chunked = len(w.header.Get("Content-Length")) == 0 && w.header.Get("Upgrade") == ""
if w.chunked {
fmt.Fprintf(w.writer, "Transfer-Encoding: chunked\r\n")
}
// Write header separator
// 写入header后分割符
w.writer.Write([]byte("\r\n"))
}
// Flush 方法数据写入
func (w *Response) Flush() {
// 将缓冲数据写入
w.writeDate(nil, 0)
w.n = 0
// 发送writer的全部数据
w.writer.Flush()
}
// finalFlush 方法请求结束时flush写入数据。
func (w *Response) finalFlush() (err error) {
// 如果没有写入状态行,并且没有指定内容长度。
// 设置内容长度为当前缓冲数据。
if !w.iswrite && len(w.header.Get("Content-Length")) == 0 {
w.header.Set("Content-Length", fmt.Sprint(w.n))
}
// 将缓冲数据写入
w.writeDate(nil, 0)
// 处理分段传输
if w.chunked {
// 处理Trailer header
tr := w.header.Get("Trailer")
if len(tr) == 0 {
// 没有Trailer,直接写入结束
w.writer.Write([]byte{0x30, 0x0d, 0x0a, 0x0d, 0x0a})
} else {
// 写入结尾
w.writer.Write([]byte{0x30, 0x0d, 0x0a})
// 写入Trailer的值
for _, k := range strings.Split(tr, ",") {
fmt.Fprintf(w.writer, "%s: %s\r\n", k, w.header.Get(k))
}
w.writer.Write([]byte{0x0d, 0x0a})
}
}
// 发送数据
err = w.writer.Flush()
w.cancel()
return
}
// Hijack 方法劫持http连接。
func (w *Response) Hijack() (net.Conn, *bufio.ReadWriter, error) {
w.ishjack = true
// w.request.conn.SetDeadline(time.Time{})
// return &cancelConn{w.request.conn, w.cancel}, nil
return w.request.conn, bufio.NewReadWriter(w.request.reader, w.writer), nil
}
// Push 方法http协议不支持push方法。
func (*Response) Push(string, *http.PushOptions) error {
return nil
}
// Status 方法返回当前状态码。
func (w *Response) Status() int {
return w.status
}
// Size 方法返回写入的数据长度。
func (w *Response) Size() int {
return w.size
}
// Close 方法在net.Conn关闭时,执行context cancel。
func (c *cancelConn) Close() (err error) {
err = c.Conn.Close()
c.cancel()
return
}