-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
71 lines (60 loc) · 1.54 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package stream
import (
"fmt"
"github.com/beyondstorage/go-storage/v4/types"
"github.com/panjf2000/ants/v2"
"golang.org/x/time/rate"
)
const (
PersistMethodWrite = "write"
PersistMethodMultipart = "multipart"
PersistMethodAppend = "append"
)
type Config struct {
Upper types.Storager
Under types.Storager
// SpeedLimit for upper storager, unit is B/s
SpeedLimit int
PersistMethod string
}
func New(upper, under types.Storager) (s *Stream, err error) {
return NewWithConfig(&Config{
Upper: upper,
Under: under,
PersistMethod: PersistMethodMultipart,
})
}
func NewWithConfig(cfg *Config) (s *Stream, err error) {
s = &Stream{
method: cfg.PersistMethod,
upper: cfg.Upper,
under: cfg.Under,
}
// Validate persist method.
switch cfg.PersistMethod {
case PersistMethodMultipart:
m, ok := cfg.Under.(types.Multiparter)
if !ok {
return nil, fmt.Errorf("under storage %s doesn't support persis method multipart", cfg.Under)
}
s.underMultipart = m
// TODO: we will support appender later.
case PersistMethodWrite:
break
default:
return nil, fmt.Errorf("not supported persis method: %v", cfg.PersistMethod)
}
if cfg.SpeedLimit != 0 {
s.limit = rate.NewLimiter(rate.Limit(cfg.SpeedLimit), cfg.SpeedLimit)
}
// FIXME: we will support setting workers later.
s.p, err = ants.NewPool(10)
if err != nil {
return nil, fmt.Errorf("init stream: %w", err)
}
// No buffer channel for op.
s.ch = make(chan op)
// A sized buffer channel for error.
s.errch = make(chan error, 10)
return s, nil
}