diff --git a/cmd/coord-server/config.go b/cmd/coord-server/config.go deleted file mode 100644 index e0106bfefc0fff75efce3228dadb68ddb033d8eb..0000000000000000000000000000000000000000 --- a/cmd/coord-server/config.go +++ /dev/null @@ -1,17 +0,0 @@ -package main - -import ( - "github.com/mkideal/cli" -) - -type ( - LoggerConfig struct { - Verbose []bool `cli:"v,verbose" usage:"increase the log verbosity"` - Quiet bool `cli:"q,quiet" usage:"decrease the log verbosity"` - } - - ServerConfig struct { - cli.Helper - LoggerConfig - } -) diff --git a/cmd/coord-server/connection.go b/cmd/coord-server/connection.go deleted file mode 100644 index 151d99960340ac5b98f3e7d26f9c67e061aa837c..0000000000000000000000000000000000000000 --- a/cmd/coord-server/connection.go +++ /dev/null @@ -1,146 +0,0 @@ -package main - -import ( - "bufio" - "encoding/json" - "io" - "strconv" - "sync" - - "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol/remote" -) - -type ( - Connection struct { - conn io.ReadWriteCloser - stream *protocol.Stream - ctl chan interface{} - done sync.WaitGroup - stop sync.Once - *log.Logger - closed bool - } -) - -func StartConnection(conn io.ReadWriteCloser, stream *protocol.Stream, logger *log.Logger) *Connection { - c := &Connection{conn: conn, stream: stream, ctl: make(chan interface{}, 5), Logger: logger} - c.done.Add(2) - go c.serve() - return c -} - -func (c *Connection) serve() { - serial := uint64(0) - subscriptions := make(map[remote.SubscriptionID]*protocol.Subscription) - - defer func() { - var errs Errors - for _, sub := range subscriptions { - errs.Append(sub.Close()) - } - if errs.HasError() { - c.Warningf("error while closing subscriptions: %s", errs) - } - - c.done.Done() - c.Debug("connection closed") - }() - - go func() { - scanner := bufio.NewScanner(c.conn) - - defer func() { - if !c.closed { - if err := scanner.Err(); err != nil { - c.Errorf("error reading from socket: %s", err) - } else { - c.Debug("client closed the socket") - } - } - - close(c.ctl) - c.done.Done() - }() - - c.Debug("waiting for requests") - for scanner.Scan() { - var request remote.Request - bytes := scanner.Bytes() - if err := json.Unmarshal(bytes, &request); err != nil { - c.ctl <- remote.Errorf(0,"cannot parse `%s`: %s", bytes, err) - } else { - c.Debugf("received request %v", request) - c.ctl <- request - } - } - }() - - for untypedCmd := range c.ctl { - switch cmd := untypedCmd.(type) { - - case remote.Request: - - if cmd.Record != nil { - if err := c.stream.Append(*cmd.Record); err == nil { - c.ctl <- remote.Recorded(cmd.ID) - } else { - c.ctl <- remote.Error(cmd.ID, err) - } - - } else if cmd.Subscribe != nil { - if sub, err := c.stream.Subscribe(cmd.Subscribe.String(), func(event protocol.Event) { - c.ctl <- remote.Notification(event) - }); err == nil { - serial += 1 - id := remote.SubscriptionID(strconv.FormatUint(serial, 10)) - subscriptions[id] = sub - c.ctl <- remote.Subscribed(cmd.ID, id) - c.Debugf("new subscribed: %s", id) - } else { - c.ctl <- remote.Error(cmd.ID, err) - } - - } else if cmd.Unsubscribe != nil { - if sub, exists := subscriptions[*cmd.Unsubscribe]; exists { - if err := sub.Close(); err == nil { - c.ctl <- remote.Unsubscribed(cmd.ID, *cmd.Unsubscribe) - } else { - c.ctl <- remote.Error(cmd.ID, err) - } - } else { - c.ctl <- remote.Errorf(cmd.ID, "unknown subscription: %s", sub) - } - - } else { - c.ctl <- remote.Errorf(cmd.ID, "cannot handle request: %v", cmd) - } - - case remote.Response: - if bytes, err := json.Marshal(cmd); err != nil { - c.Errorf("could not marshal response %#v: %s", cmd, err) - } else if _, err := c.conn.Write(append(bytes, '\n')); err != nil { - c.Errorf("could not send response `%s`: %s", bytes, err) - } else { - if cmd.Error != nil { - c.Noticef("sent error response %v", cmd) - } else { - c.Debugf("sent response %v", cmd) - } - } - - default: - c.Errorf("unknown connection command: %#v", cmd) - } - } -} - -func (c *Connection) Close() (err error) { - c.stop.Do(func() { - c.closed = true - err = c.conn.Close() - }) - c.done.Wait() - return -} diff --git a/cmd/coord-server/listener.go b/cmd/coord-server/listener.go deleted file mode 100644 index 78d3d4ea1e7383ed4b6c803d624218d5c2a3876f..0000000000000000000000000000000000000000 --- a/cmd/coord-server/listener.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" - - "net" - "sync" -) - -type ( - Listener struct { - stream *protocol.Stream - listener net.Listener - done sync.WaitGroup - closed bool - *log.Logger - } -) - -func Listen(stream *protocol.Stream, listener net.Listener, logger *log.Logger) *Listener { - l := &Listener{stream: stream, listener: listener, Logger: logger} - l.done.Add(1) - go l.serve() - return l -} - -func (l *Listener) serve() { - var closers Closers - - defer func() { - l.Info("stopped accepting connections") - if err := closers.Close(); err != nil { - l.Warningf("error while closing connections: %s", err) - } - l.done.Done() - }() - - l.Info("accepting connections") - for { - netConn, err := l.listener.Accept() - if err != nil { - if netErr, ok := err.(net.Error); ok && !netErr.Temporary() { - if !l.closed { - l.Errorf("error while listening for new connections: %s", err) - } - return - } - l.Warningf("while listening for new connections: %s", err) - continue - } - - name := netConn.RemoteAddr().Network() + "://" + netConn.RemoteAddr().String() - l.Infof("accepted connection from %s", name) - connLogger := l.Logger.WithField("client", name) - conn := StartConnection(netConn, l.stream, connLogger) - closers.Append(conn) - } -} - -func (l *Listener) Close() error { - l.closed = true - if err := l.listener.Close(); err != nil { - return err - } - l.done.Wait() - return nil -} diff --git a/cmd/coord-server/main.go b/cmd/coord-server/main.go deleted file mode 100644 index 99fa82452f88377549426a3faa85e18544a5487b..0000000000000000000000000000000000000000 --- a/cmd/coord-server/main.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "io" - "net" - "os" - - "github.com/mkideal/cli" - - "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" -) - -type ( - PseudoReadWriteCloser struct { - io.ReadCloser - io.WriteCloser - } -) - -func main() { - os.Exit(cli.Run(new(ServerConfig), func(ctx *cli.Context) error { - conf := ctx.Argv().(*ServerConfig) - - logger := setupLogger(conf.LoggerConfig) - - logger.Debugf("configuration: %+v", conf) - - var services Closers - - sh := StartSignalHandler(&services) - services.Append(sh) - - stream := protocol.StartStream() - services.Append(stream) - - for _, arg := range ctx.Args() { - var addr protocol.Addr - if err := addr.Decode(arg); err == nil { - name := addr.String() - listener, err := net.Listen(addr.Network(), addr.Addr.String()) - if err == nil { - if unixListener, isUnix := listener.(*net.UnixListener); isUnix { - unixListener.SetUnlinkOnClose(true) - } - l := Listen(stream, listener, logger.WithField("server", name)) - services.Append(l) - } else { - logger.Errorf("could not listen on %s: %s", name, err.Error()) - } - } else { - logger.Error(err.Error()) - } - } - - services.Wait() - - return nil - })) -} - -func setupLogger(conf LoggerConfig) *log.Logger { - level := log.NOTICE - if conf.Quiet { - level = log.ERROR - } else { - switch len(conf.Verbose) { - case 0: - level = log.NOTICE - case 1: - level = log.INFO - default: - level = log.DEBUG - } - } - log.DefaultLogger = log.NewLogger(log.NewLogWriter(level, os.Stderr, log.TextFormatter)) - - return log.DefaultLogger -} - -func (c PseudoReadWriteCloser) Close() error { - if err := c.ReadCloser.Close(); err != nil { - return err - } - if err := c.WriteCloser.Close(); err != nil { - return err - } - return nil -} diff --git a/cmd/coord-server/service.go b/cmd/coord-server/service.go deleted file mode 100644 index 9cca77dfbf94682ef141e7f03bc8054ce7da148e..0000000000000000000000000000000000000000 --- a/cmd/coord-server/service.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "sync" -) - -type ( - serviceControl struct { - close bool - done *sync.WaitGroup - } -) - -func (s *serviceControl) started() { - s.done = new(sync.WaitGroup) - s.done.Add(1) -} - -func (s *serviceControl) stopped() { - s.done.Done() -} - -func (s *serviceControl) Stop() { - s.close = true - s.done.Wait() -} - -func (s *serviceControl) Complete() bool { - return s.close -} diff --git a/cmd/coord-server/signal.go b/cmd/coord-server/signal.go deleted file mode 100644 index 83abf49ccdeb41b339feac20de76e7c7b6852490..0000000000000000000000000000000000000000 --- a/cmd/coord-server/signal.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" - "io" - "os" - "os/signal" - "sync" - "syscall" -) - -type ( - SignalHandler struct { - signals chan os.Signal - target io.Closer - stop sync.Once - done sync.WaitGroup - } -) - -func StartSignalHandler(target io.Closer) *SignalHandler { - h := &SignalHandler{signals: make(chan os.Signal), target: target} - h.done.Add(1) - go h.serve() - return h -} - -func (h *SignalHandler) serve() { - defer func() { - h.done.Done() - _ = h.target.Close() - }() - - signal.Notify(h.signals, syscall.SIGTERM, syscall.SIGINT) - sig, ok := <- h.signals - if ok { - log.Criticalf("received signal '%s', shutting down", sig) - } -} - -func (h *SignalHandler) Close() (err error) { - h.stop.Do(func() { - signal.Stop(h.signals) - close(h.signals) - }) - h.done.Wait() - return -} diff --git a/cmd/coord-server/utils.go b/cmd/coord-server/utils.go deleted file mode 100644 index aee441a3b733a7973c8b6a33926024aa8915b9fa..0000000000000000000000000000000000000000 --- a/cmd/coord-server/utils.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "io" - "net" - "strings" - "sync" -) - -type ( - Errors []error - - Closers struct { - closers []io.Closer - done sync.WaitGroup - stop sync.Once - } -) - -func (e *Errors) Append(err error) { - if err != nil { - *e = append(*e, err) - } -} - -func (e Errors) HasError() bool { - return e != nil -} - -func (e Errors) Error() string { - if !e.HasError() { - return "no errors" - } - var buf strings.Builder - for i, err := range e { - if i > 0 { - _, _ = buf.WriteString(", ") - } - _, _ = buf.WriteString(err.Error()) - } - return buf.String() -} - -func (e Errors) String() string { - return e.Error() -} - -func (c *Closers) Append(closer io.Closer) { - c.done.Add(1) - c.closers = append(c.closers, closer) -} - -func (c *Closers) Wait() { - c.done.Wait() -} - -func (c *Closers) Close() (err error) { - c.stop.Do(func() { - var errs Errors - for _, closer := range c.closers { - cl := closer - go func() { - defer c.done.Done() - errs.Append(cl.Close()) - }() - } - if errs.HasError() { - err = errs - } - }) - c.done.Wait() - return -} diff --git a/cmd/coord-server/utils_test.go b/cmd/coord-server/utils_test.go deleted file mode 100644 index 905f641b49e22e69fe69b1ad3faa3d386ba63296..0000000000000000000000000000000000000000 --- a/cmd/coord-server/utils_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestErrors_Zero(t *testing.T) { - var err Errors - - assert.Nil(t, err) - assert.False(t, err.HasError()) - assert.Equal(t,"no errors", err.Error()) -} - -func TestErrors_AppendNil(t *testing.T) { - var err Errors - err.Append(nil) - - assert.Nil(t, err) - assert.False(t, err.HasError()) - assert.Equal(t,"no errors", err.Error()) -} - -func TestErrors_OneError(t *testing.T) { - var err Errors - err.Append(errors.New("test")) - - assert.NotNil(t, err) - assert.True(t, err.HasError()) - assert.Equal(t,"test", err.Error()) -} - -func TestErrors_TwoErrors(t *testing.T) { - var err Errors - err.Append(errors.New("a")) - err.Append(errors.New("b")) - - assert.NotNil(t, err) - assert.True(t, err.HasError()) - assert.Equal(t,"a, b", err.Error()) -} - diff --git a/cmd/coord/main.go b/cmd/coord/main.go new file mode 100644 index 0000000000000000000000000000000000000000..dc59c294404deb3bf711e29fd411e95717fc859c --- /dev/null +++ b/cmd/coord/main.go @@ -0,0 +1,44 @@ +package coord + +import ( + "os" + "strconv" + "time" + + "github.com/mkideal/cli" + "github.com/nats-io/nats.go" + "github.com/nats-io/stan.go" + + "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" +) + +type ( + Config struct { + ServerURL string `cli:"s,server" usage:"URL of the NATS server" dft:"$NATS_URL"` + ClusterID string `cli:"c,clusterID" usage:"Cluster ID of the NATS Streaming server" dft:"$NATS_CLUSTER_ID"` + cli.AutoHelper + } +) + +func main() { + config := &Config{ + ServerURL: nats.DefaultURL, + ClusterID: "https://gitlab.irstea.fr/guillaume.perreal/coord", + } + os.Exit(cli.Run(&config, Run)) +} + +func Run(ctx *cli.Context) error { + config := ctx.Argv().(*Config) + clientId := "coord." + strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 36) + + conn, err := stan.Connect(config.ClusterID, clientId, stan.NatsURL(config.ServerURL)) + if err != nil { + return err + } + defer conn.Close() + + log.Debugf("connected to %s", conn.NatsConn().ConnectedUrl()) + + return nil +} diff --git a/go.mod b/go.mod index 427f423e16381dbb21054b467ec55217cb7234fb..1f28e814a5002bd63a0d93f890610dc7f658f447 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,11 @@ go 1.12 require ( github.com/Bowery/prompt v0.0.0-20190419144237-972d0ceb96f5 // indirect - github.com/google/uuid v1.1.1 - github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e - github.com/joomcode/errorx v0.8.0 github.com/labstack/gommon v0.2.9 // indirect github.com/mkideal/cli v0.0.3 github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc // indirect - github.com/stretchr/testify v1.3.0 - github.com/thejerf/suture v3.0.2+incompatible + github.com/nats-io/nats-streaming-server v0.15.1 // indirect + github.com/nats-io/nats.go v1.8.1 + github.com/nats-io/stan.go v0.5.0 golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect ) diff --git a/go.sum b/go.sum index 88e7e2cdf6d6966e465ba50c48bc447145f4ae06..420e7a1a8a786aee3667edd8c3576f4902fe1121 100644 --- a/go.sum +++ b/go.sum @@ -1,43 +1,104 @@ github.com/Bowery/prompt v0.0.0-20190419144237-972d0ceb96f5 h1:7tNlRGC3pUEPKS3DwgX5L0s+cBloaq/JBoi9ceN1MCM= github.com/Bowery/prompt v0.0.0-20190419144237-972d0ceb96f5/go.mod h1:4/6eNcqZ09BZ9wLK3tZOjBA1nDj+B0728nlX5YRlSmQ= +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e h1:XmA6L9IPRdUr28a+SK/oMchGgQy159wvzXA5tJ7l+40= -github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e/go.mod h1:AFIo+02s+12CEg8Gzz9kzhCbmbq6JcKNrhHffCGA9z4= -github.com/joomcode/errorx v0.8.0 h1:GhAqPtcYuo1O7TOIbtzEIDzPGQ3SrKJ3tdjXNmUtDNo= -github.com/joomcode/errorx v0.8.0/go.mod h1:kgco15ekB6cs+4Xjzo7SPeXzx38PbJzBwbnu9qfVNHQ= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0= +github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/labstack/gommon v0.2.9 h1:heVeuAYtevIQVYkGj6A41dtfT91LrvFG220lavpWhrU= github.com/labstack/gommon v0.2.9/go.mod h1:E8ZTmW9vw5az5/ZyHWCp0Lw4OH2ecsaBP1C/NKavGG4= +github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= +github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mkideal/cli v0.0.3 h1:Y1OXyfTVI9eQ9RTiXq12h7q88y22Q9ZU4VI09ifz6lE= github.com/mkideal/cli v0.0.3/go.mod h1:HLuSls75T7LFlTgByGeuLwcvdUmmx/aUQxnnEKxoZzY= github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc h1:eyN9UWVX+CeeCQZPudCUAPc84xQYTjEu9MWNa2HuJrs= github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc/go.mod h1:DECgB56amjU/mmmsKuooNPQ1856HASOMC3D4ntSVU70= +github.com/nats-io/jwt v0.2.6 h1:eAyoYvGgGLXR2EpnsBUvi/FcFrBqN6YKFVbOoEfPN4k= +github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY= +github.com/nats-io/nats-server/v2 v2.0.0 h1:rbFV7gfUPErVdKImVMOlW8Qb1V22nlcpqup5cb9rYa8= +github.com/nats-io/nats-server/v2 v2.0.0/go.mod h1:RyVdsHHvY4B6c9pWG+uRLpZ0h0XsqiuKp2XCTurP5LI= +github.com/nats-io/nats-streaming-server v0.15.1 h1:NLQg18mp68e17v+RJpXyPdA7ZH4osFEZQzV3tdxT6/M= +github.com/nats-io/nats-streaming-server v0.15.1/go.mod h1:bJ1+2CS8MqvkGfr/NwnCF+Lw6aLnL3F5kenM8bZmdCw= +github.com/nats-io/nats.go v1.8.1 h1:6lF/f1/NN6kzUDBz6pyvQDEXO39jqXcWRLu/tKjtOUQ= +github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM= +github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M= +github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nats-io/stan.go v0.4.5/go.mod h1:Ji7mK6gRZJSH1nc3ZJH6vi7zn/QnZhpR9Arm4iuzsUQ= +github.com/nats-io/stan.go v0.5.0 h1:ZaSPMb6jnDXsSlOACynJrUiB3Evleg3ZyyX+rnf3TlQ= +github.com/nats-io/stan.go v0.5.0/go.mod h1:dYqB+vMN3C2F9pT1FRQpg9eHbjPj6mP0yYuyBNuXHZE= +github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/thejerf/suture v3.0.2+incompatible h1:GtMydYcnK4zBJ0KL6Lx9vLzl6Oozb65wh252FTBxrvM= -github.com/thejerf/suture v3.0.2+incompatible/go.mod h1:ibKwrVj+Uzf3XZdAiNWUouPaAbSoemxOHLmJmwheEMc= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= +google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/lib/protocol/addr.go b/lib/protocol/addr.go deleted file mode 100644 index bdd0cd62a78dd57792a1825f2f7166bc0ea38e12..0000000000000000000000000000000000000000 --- a/lib/protocol/addr.go +++ /dev/null @@ -1,29 +0,0 @@ -package protocol - -import ( - "fmt" - "net" - "strings" -) - -type ( - Addr struct { - net.Addr - } -) - -func (a Addr) String() string { - return a.Addr.Network() + "://" + a.Addr.String() -} - -func (a *Addr) Decode(s string) (err error) { - if strings.HasPrefix(s, "tcp://") { - a.Addr, err = net.ResolveTCPAddr("tcp", s[6:]) - return - } - if strings.HasPrefix(s, "unix://") { - a.Addr, err = net.ResolveUnixAddr("unix", s[7:]) - return - } - return fmt.Errorf("cannot parse address: %s", s) -} diff --git a/lib/protocol/api.go b/lib/protocol/api.go deleted file mode 100644 index e09d93e8b06d0d0e1fa5f54ff66309c9849d6d3d..0000000000000000000000000000000000000000 --- a/lib/protocol/api.go +++ /dev/null @@ -1,26 +0,0 @@ -package protocol - -type ( - Event struct { - Topic TopicID - Payload []byte - } - - Server interface { - Record(Event) error - Subscribe(TopicID, Subscriber) (CloseFunc, error) - } - - CloseFunc func() error - - Subscriber interface { - HandleEvent(Event) error - } - - SubscriberFunc func(Event) error -) - -func (f SubscriberFunc) HandleEvent(event Event) error { - return f(event) -} - diff --git a/lib/protocol/client/client.go b/lib/protocol/client/client.go deleted file mode 100644 index 783b7640e0eb716d809ec990e94add01e94aed54..0000000000000000000000000000000000000000 --- a/lib/protocol/client/client.go +++ /dev/null @@ -1,110 +0,0 @@ -package client - -import ( - "bufio" - "encoding/json" - "io" - "net" - "sync" - - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol/remote" -) - -type ( - Client struct { - conn io.ReadWriteCloser - ctl chan interface{} - stop sync.Once - done sync.WaitGroup - } - - requestCmd struct { - remote.Request - resultCh chan<- requestResult - } - - requestResult struct { - remote.Response - error - } -) - -func Dial(addr net.Addr) (*Client, error) { - conn, err := net.Dial(addr.Network(), addr.String()) - if err != nil { - return nil, err - } - return NewClient(conn), nil -} - -func NewClient(conn io.ReadWriteCloser) *Client { - c := &Client{conn: conn, ctl: make(chan interface{}, 5)} - c.done.Add(2) - go c.receive() - go c.loop() - return c -} - -func (c *Client) receive() { - defer func() { - c.done.Done() - }() - - scanner := bufio.NewScanner(c.conn) - - for scanner.Scan() { - var response remote.Response - if err := json.Unmarshal(scanner.Bytes(), &response); err == nil { - c.ctl <- response - } else { - - } - } -} - -func (c *Client) loop() { - defer func() { - c.done.Done() - }() - - pending := make(map[remote.RequestID]chan<- requestResult) - generator := remote.RequestIDGenerator(0) - - for untypedCmd := range c.ctl { - switch cmd := untypedCmd.(type) { - case requestCmd: - cmd.ID = generator.Next() - bytes, err := json.Marshal(cmd.Request) - if err == nil { - _, err = c.conn.Write(append(bytes, '\n')) - } - if err == nil { - pending[cmd.ID] = cmd.resultCh - } else { - cmd.resultCh <- requestResult{error: err} - close(cmd.resultCh) - } - - case remote.Response: - if cmd.Notification != nil { - } else if ch, found := pending[cmd.ID]; found { - ch <- requestResult{Response: cmd} - close(ch) - delete(pending, cmd.ID) - } - } - } -} - -func (c *Client) Close() error { - return nil -} - -func (c *Client) Request(request remote.Request) (remote.Response, error) { - ch := make(chan requestResult, 1) - c.ctl <- requestCmd{request, ch} - result := <-ch - return result.Response, result.error -} - - diff --git a/lib/protocol/remote/types.go b/lib/protocol/remote/types.go deleted file mode 100644 index c0df85f6ef56a772201bd0c6cc8bb9293aa8c3e6..0000000000000000000000000000000000000000 --- a/lib/protocol/remote/types.go +++ /dev/null @@ -1,125 +0,0 @@ -package remote - -import ( - "encoding/json" - "fmt" - - "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" -) - -type ( - SubscriptionID string - - RequestID uint64 - - RequestIDGenerator uint64 - - Request struct { - ID RequestID `json:"id,omitempty"` - Record *protocol.Event `json:"record,omitempty"` - Subscribe *protocol.TopicID `json:"subscribe,omitempty"` - Unsubscribe *SubscriptionID `json:"unsubscribe,omitempty"` - } - - Response struct { - ID RequestID `json:"id,omitempty"` - Recorded bool `json:"recorded,omitempty"` - Subscribed *SubscriptionID `json:"subscribed,omitempty"` - Unsubscribed *SubscriptionID `json:"unsubscribed,omitempty"` - Notification *protocol.Event `json:"notification,omitempty"` - Error *string `json:"error,omitempty"` - } -) - -func (e protocol.Event) String() string { - return fmt.Sprintf("{topic=%q, payload=%q}", string(e.Topic), string(e.Payload)) -} - -var DefaultRequestIDGenerator = RequestIDGenerator(0) - -func (g *RequestIDGenerator) Next() RequestID { - *g += 1 - return RequestID(*g) -} - -func (s SubscriptionID) String() string { - return string(s) -} - -func (r Request) String() string { - if bytes, err := json.Marshal(r); err == nil { - return string(bytes) - } else { - return err.Error() - } -} - -func (r Request) GoString() string { - if r.Record != nil { - return fmt.Sprintf("protocol.Record(%v)", *r.Record) - } else if r.Subscribe != nil { - return fmt.Sprintf("protocol.Subscribe(%q)", *r.Subscribe) - } else if r.Unsubscribe != nil { - return fmt.Sprintf("protocol.Unsubscribe(%q)", *r.Unsubscribe) - } else { - return `errors.New("invalid request")` - } -} - -func Record(ev protocol.Event) Request { return Request{ID: DefaultRequestIDGenerator.Next(), Record: &ev} } -func Subscribe(topic protocol.TopicID) Request { - return Request{ID: DefaultRequestIDGenerator.Next(), Subscribe: &topic} -} -func Unsubscribe(id SubscriptionID) Request { - return Request{ID: DefaultRequestIDGenerator.Next(), Unsubscribe: &id} -} - -func (r Response) String() string { - if bytes, err := json.Marshal(r); err == nil { - return string(bytes) - } else { - return err.Error() - } -} - -func (r Response) GoString() string { - if r.Recorded { - return "protocol.Recorded()" - } else if r.Subscribed != nil { - return fmt.Sprintf("protocol.Subscribed(%q)", *r.Subscribed) - } else if r.Unsubscribed != nil { - return fmt.Sprintf("protocol.Unsubscribed(%q)", *r.Unsubscribed) - } else if r.Notification != nil { - return fmt.Sprintf("protocol.Notification(%v)", *r.Notification) - } else if r.Error != nil { - return fmt.Sprintf("protocol.Error(%v)", *r.Error) - } else { - return `errors.New("invalid response")` - } -} - -func Recorded(id RequestID) Response { - return Response{ID: id, Recorded: true} -} - -func Subscribed(id RequestID, sub SubscriptionID) Response { - return Response{ID: id, Subscribed: &sub} -} - -func Unsubscribed(id RequestID, sub SubscriptionID) Response { - return Response{ID: id, Unsubscribed: &sub} -} - -func Notification(ev protocol.Event) Response { - return Response{Notification: &ev} -} - -func Error(id RequestID, err error) Response { - message := err.Error() - return Response{ID: id, Error: &message} -} - -func Errorf(id RequestID, template string, values ...interface{}) Response { - message := fmt.Sprintf(template, values...) - return Response{ID: id, Error: &message} -} diff --git a/lib/protocol/stream.go b/lib/protocol/stream.go deleted file mode 100644 index 5aa033490281a4809dabc699f5d867d1799bd2fc..0000000000000000000000000000000000000000 --- a/lib/protocol/stream.go +++ /dev/null @@ -1,200 +0,0 @@ -package protocol - -import ( - "context" - "errors" - "sync" - - "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" -) - -type ( - Stream struct { - ctl chan interface{} - ctx context.Context - stop context.CancelFunc - done sync.WaitGroup - - origin element - front *element - subscriptions map[string]*subscriptions - } - - subscriptions struct { - TopicMatcher - subscribers []Subscriber - } - - element struct { - next *element - Event - } - - recordCmd struct { - event Event - response chan<- struct{} - } - - subscribeCmd struct { - topic TopicID - subscriber Subscriber - response chan<- CloseFunc - } - - unsubscribeCmd struct { - key string - subscriber Subscriber - response chan<- struct{} - } -) - -var ( - ErrClosedStream = errors.New("closed stream") -) - -func StartStream() *Stream { - ctx, cancel := context.WithCancel(context.Background()) - s := &Stream{ctl: make(chan interface{}), ctx: ctx, stop: cancel, subscriptions: make(map[string]*subscriptions)} - go s.serve() - return s -} - -func (s *Stream) serve() { - s.done.Add(1) - defer s.done.Done() - - for { - select { - case <-s.ctx.Done(): - return - - case untypedCmd := <-s.ctl: - switch cmd := untypedCmd.(type) { - case recordCmd: - s.record(cmd.event) - close(cmd.response) - - case subscribeCmd: - cmd.response <- s.subscribe(cmd.topic, cmd.subscriber) - close(cmd.response) - - case unsubscribeCmd: - s.unsubscribe(cmd.key, cmd.subscriber) - close(cmd.response) - - default: - log.Errorf("unknown stream command: %#v", cmd) - } - } - } - -} - -func (s *Stream) record(event Event) { - s.front.next = &element{Event: event} - s.front = s.front.next - for _, subs := range s.subscriptions { - _ = subs.HandleEvent(event) - } -} - -func (s *Stream) subscribe(topic TopicID, subscriber Subscriber) CloseFunc { - key := string(topic) - entry, exists := s.subscriptions[key] - if !exists { - entry = &subscriptions{TopicMatcher: MakeMatcher(key)} - s.subscriptions[key] = entry - } - entry.register(subscriber) - - s.replay(entry.TopicMatcher, subscriber) - - return func() error { - response := make(chan struct{}) - if err := s.send(unsubscribeCmd{key, subscriber,response}); err != nil { - return err - } - return nil - } -} - -func (s *Stream) unsubscribe(key string, subscriber Subscriber) { - entry, exists := s.subscriptions[key] - if !exists { - return - } - entry.unregister(subscriber) - if len(entry.subscribers) == 0 { - delete(s.subscriptions, key) - } -} - -func (s *Stream) replay(matcher TopicMatcher, sub Subscriber) { - for cursor := s.origin.next; cursor != nil; cursor = cursor.next { - if matcher.Match(cursor.Topic) { - _ = sub.HandleEvent(cursor.Event) - } - } -} - -func (s *Stream) send(cmd interface{}) error { - select { - case <-s.ctx.Done(): - return ErrClosedStream - default: - s.ctl <- cmd - return nil - } -} - -func (s *Stream) Close() error { - select { - case <-s.ctx.Done(): - return ErrClosedStream - default: - s.stop() - s.done.Wait() - close(s.ctl) - return nil - } -} - -func (s *Stream) Record(event Event) error { - response := make(chan struct{}) - if err := s.send(recordCmd{event, response}); err != nil { - return err - } - <-response - return nil -} - -func (s *Stream) Subscribe(topic TopicID, subscriber Subscriber) (CloseFunc, error) { - response := make(chan CloseFunc) - if err := s.send(subscribeCmd{topic, subscriber, response}); err != nil { - return nil, err - } - return <-response, nil -} - -func (s *subscriptions) HandleEvent(event Event) error { - if s.Match(event.Topic) { - for _, sub := range s.subscribers { - _ = sub.HandleEvent(event) - } - } - return nil -} - -func (s *subscriptions) register(subscriber Subscriber) { - s.subscribers = append(s.subscribers, subscriber) -} - -func (s *subscriptions) unregister(subscriber Subscriber) { - for i, sub := range s.subscribers { - if sub == subscriber { - n := copy(s.subscribers[i:], s.subscribers[i+1:]) - s.subscribers = s.subscribers[:i+n] - return - } - } -} diff --git a/lib/protocol/stream_test.go b/lib/protocol/stream_test.go deleted file mode 100644 index ee0f4f57608fb40ce93cb17113eefaa96f95ee42..0000000000000000000000000000000000000000 --- a/lib/protocol/stream_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package protocol - -import ( - "fmt" -) - -func ExampleBasic() { - stream := StartStream() - - _ = stream.Append(Event{Topic: "a"}) - _ = stream.Append(Event{Topic: "b"}) - - handler := func(event Event) { fmt.Println(event) } - sub, _ := stream.Subscribe("a*", handler) - - _ = stream.Append(Event{Topic: "a"}) - _ = stream.Append(Event{Topic: "b"}) - _ = stream.Append(Event{Topic: "amma"}) - _ = stream.Append(Event{Topic: "c"}) - - _ = sub.Close() - - _ = stream.Append(Event{Topic: "a"}) - - _ = stream.Close() - - // Output: - // {a <nil>} - // {a <nil>} - // {amma <nil>} -} - -func ExampleMulti() { - stream := StartStream() - - _ = stream.Append(Event{Topic: "a"}) - _ = stream.Append(Event{Topic: "c"}) - - topics := []string{"a", "b"} - subs := make([]*Subscription, len(topics)) - - for i, topic := range topics { - idx := i - handler := func(event Event) { fmt.Println(idx, event) } - subs[i], _ = stream.Subscribe(topic, handler) - } - - _ = stream.Append(Event{Topic: "a"}) - _ = stream.Append(Event{Topic: "b"}) - _ = stream.Append(Event{Topic: "c"}) - _ = stream.Append(Event{Topic: "b"}) - - _ = stream.Close() - - // Output: - // 0 {a <nil>} - // 0 {a <nil>} - // 1 {b <nil>} - // 1 {b <nil>} -} diff --git a/lib/protocol/topic.go b/lib/protocol/topic.go deleted file mode 100644 index f10f730d78d8a002c0bee5ee41eb0f0f2e671acd..0000000000000000000000000000000000000000 --- a/lib/protocol/topic.go +++ /dev/null @@ -1,43 +0,0 @@ -package protocol - -import ( - "fmt" - "path" - "strings" -) - -type ( - TopicID string - - TopicMatcher interface { - fmt.Stringer - Match(TopicID) bool - } - - globTopicMatcher string -) - -func (s TopicID) Match(other TopicID) bool { - return s == other} - -func (s TopicID) String() string { - return string(s) -} - -func (m globTopicMatcher) Match(other TopicID) bool { - ok, _ := path.Match(string(m), string(other)) - return ok -} - -func (m globTopicMatcher) String() string { - return string(m) -} - -func MakeMatcher(spec string) TopicMatcher { - if strings.ContainsAny(spec, "*?[]{}") { - if _, err := path.Match(spec, "FOOBAR"); err == nil{ - return globTopicMatcher(spec) - } - } - return TopicID(spec) -} diff --git a/lib/protocol/topic_test.go b/lib/protocol/topic_test.go deleted file mode 100644 index 5f2f209cf0887a9584a4b88ea5c54b7d3e3026e0..0000000000000000000000000000000000000000 --- a/lib/protocol/topic_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package protocol - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMatcher_Match(t *testing.T) { - type M map[TopicID]assert.BoolAssertionFunc - - testCases := []struct { - Pattern string - IDS M - }{ - {"a", M{"a": assert.True, "b": assert.False}}, - {"*", M{"a": assert.True, "b": assert.True, "a.b": assert.True}}, - {"a.*", M{"a": assert.False, "b": assert.False, "a.b": assert.True, "c.a.c": assert.False}}, - } - - for _, testCase := range testCases { - t.Run(string(testCase.Pattern), func(t *testing.T) { - matcher := MakeMatcher(testCase.Pattern) - for id, expected := range testCase.IDS { - t.Run(string(id), func(t *testing.T) { - expected(t, Match(id)) - }) - } - }) - } -} - -func TestMakeMatcher(t *testing.T) { - m1 := MakeMatcher("a") - m2 := MakeMatcher("a") - m3 := MakeMatcher("b") - - assert.Equal(t, m1, m2) - assert.NotEqual(t, m1, m3) - assert.NotEqual(t, m2, m3) -} diff --git a/lib/safe/goroutine.go b/lib/safe/goroutine.go deleted file mode 100644 index a1a9a8e6de10ea64194ceeb886f19193cd5feb0b..0000000000000000000000000000000000000000 --- a/lib/safe/goroutine.go +++ /dev/null @@ -1,35 +0,0 @@ -package safe - -import ( - "log" - "runtime/debug" -) - -// Go runs a goroutine with a default recovery function. -func Go(goroutine func()) { - GoWithRecovery(goroutine, defaultRecovery) -} - -// Go runs a goroutine with a custom recovery function. -func GoWithRecovery(goroutine func(), recovery func(err interface{})) { - go func() { - defer func() { - if rec := recover(); rec != nil { - recovery(rec) - } - }() - goroutine() - }() -} - -// GoWithStopChan runs a goroutine that can be stopped by closing a channel -func GoWithStopper(goroutine func(stop <-chan struct{})) (stopper *StopOnce) { - stopper = new(StopOnce) - Go(func() { goroutine(stopper.Stopped()) }) - return -} - -func defaultRecovery(err interface{}) { - log.Printf("Error in Go routine: %s", err) - log.Printf("Stack: %s", debug.Stack()) -} \ No newline at end of file diff --git a/lib/safe/goroutine_test.go b/lib/safe/goroutine_test.go deleted file mode 100644 index 60e4aa2399f7117c4b2c602a500c3f494d7b0b79..0000000000000000000000000000000000000000 --- a/lib/safe/goroutine_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package safe_test - -import ( - "fmt" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/safe" - "sync" - "testing" -) - -func ExampleGo() { - var wg sync.WaitGroup - - wg.Add(1) - - safe.Go(func() { - fmt.Println("ok") - wg.Done() - }) - - wg.Wait() - - // Output: - // ok -} - - -func TestGoPanic(t *testing.T) { - var wg sync.WaitGroup - - wg.Add(1) - - safe.Go(func() { - defer wg.Done() - panic("woo") - }) - - wg.Wait() -} - -func ExampleGoWithRecovery() { - var wg sync.WaitGroup - - wg.Add(1) - - safe.GoWithRecovery(func() { - defer wg.Done() - panic("woo") - }, func(err interface{}) { - fmt.Println(err) - }) - - wg.Wait() - - // Output: - // woo -} - - -func ExampleGoWithStopper() { - var wg sync.WaitGroup - wg.Add(2) - - stopper := safe.GoWithStopper(func(stop <-chan struct {}) { - defer wg.Done() - fmt.Println("working") - <- stop - fmt.Println("stopped") - }) - - go func() { - defer wg.Done() - stopper.Stop() - }() - - wg.Wait() - - // Output: - // working - // stopped -} - diff --git a/lib/safe/stopper.go b/lib/safe/stopper.go deleted file mode 100644 index a648389c0b963f820632e4a731a1a0da49c9d2b1..0000000000000000000000000000000000000000 --- a/lib/safe/stopper.go +++ /dev/null @@ -1,62 +0,0 @@ -package safe - -import ( - "io" - "sync" -) - -// Stopper is something you can stop -type Stopper interface { - Stop() -} - -// StopOnce helps stopping things once. -type StopOnce struct { - c chan struct{} - once sync.Once -} - -// Stopped returns a channel that is closed when StopOnce is stopped -func (s *StopOnce) Stopped() <-chan struct{} { - if s.c == nil { - s.c = make(chan struct{}) - } - return s.c -} - -// Stop implements Stopper -func (s *StopOnce) Stop() { - s.once.Do(func() { - _ = s.Stopped() - close(s.c) - }) -} - -// Close implements io.Closer -func (s *StopOnce) Close() error { - s.Stop() - return nil -} - -// OnStop registers a callback to be fired on stop. -func (s *StopOnce) OnStop(callback func()) { - Go(func() { - <-s.Stopped() - callback() - }) -} - -// BindStopper closes another Stopper on stop -func (s *StopOnce) BindStopper(other Stopper) { - s.OnStop(other.Stop) -} - -// BindCloser closes the given io.Closer on stop -func (s *StopOnce) BindCloser(c io.Closer) { - s.OnStop(func() { _ = c.Close() }) -} - -// BindChan closes the given channel on stop -func (s *StopOnce) BindChan(c chan interface{}) { - s.OnStop(func() { close(c) }) -} diff --git a/lib/safe/stopper_test.go b/lib/safe/stopper_test.go deleted file mode 100644 index 9a0f47558b3cfa124affc4ccb3b0950dc599e9ef..0000000000000000000000000000000000000000 --- a/lib/safe/stopper_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package safe_test - -import ( - "fmt" - "gitlab.irstea.fr/guillaume.perreal/coord/lib/safe" - "sync" -) - -func ExampleStopOnce() { - var wg sync.WaitGroup - wg.Add(2) - - var s safe.StopOnce - - go func() { - defer wg.Done() - <- s.Stopped() - fmt.Println("Stopped()") - }() - - s.OnStop(func() { - defer wg.Done() - fmt.Println("Callback") - }) - - s.Stop() - _ = s.Close() - s.Stop() - - wg.Wait() - - // Unordered output: - // Stopped() - // Callback -}