diff --git a/cmd/coord/command.go b/cmd/coord/command.go new file mode 100644 index 0000000000000000000000000000000000000000..0328f47efbbe4d45da7f453ee8aac24ae8a1bbd0 --- /dev/null +++ b/cmd/coord/command.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "errors" + "os" + "os/exec" + "os/signal" + "strings" + "syscall" + + "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" +) + +type ( + Command struct { + *exec.Cmd + ctx context.Context + cancel func() + } +) + +func NewCommand(command []string) (*Command, error) { + if len(command) < 1 { + return nil, errors.New("a command is required is required") + } + + cmdName := command[0] + arguments := command[1:] + + cmdPath, err := exec.LookPath(cmdName) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + cmd := exec.CommandContext(ctx, cmdPath, arguments...) + cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr + + return &Command{cmd, ctx, cancel}, nil +} + +func (c *Command) String() string { + return strings.Join(c.Args, " ") +} + +func (c *Command) Start() error { + err := c.Cmd.Start() + if err != nil { + return err + } + log.Noticef("started command `%s`, PID: %d", c, c.Process.Pid) + + go c.forwardSignals() + + return nil +} + +func (c *Command) Wait() error { + defer func() { + c.cancel() + log.Noticef("command terminated: %s", c.Cmd.ProcessState) + }() + return c.Cmd.Wait() +} + +func (c *Command) forwardSignals() { + signals := make(chan os.Signal) + signal.Notify(signals) + + defer func() { + signal.Stop(signals) + close(signals) + log.Debug("stopped forwarding signals") + }() + + log.Debug("starting forwarding signals") + for { + select { + case <-c.ctx.Done(): + return + case sig := <-signals: + if sig == syscall.SIGCHLD { + continue + } + if err := c.Cmd.Process.Signal(sig); err != nil { + log.Warningf("could not forward signal %s: %s", sig, err) + } else { + log.Infof("forwarded signal %s", sig) + } + } + } +} diff --git a/cmd/coord/conn.go b/cmd/coord/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..51e724fd3521cf7a9b8d0c7b7d0f2154de774183 --- /dev/null +++ b/cmd/coord/conn.go @@ -0,0 +1,34 @@ +package main + +import ( + "encoding/json" + + "github.com/nats-io/stan.go" + + "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" +) + +type ( + Conn struct { + stan.Conn + Subject string + } +) + +func NewConn(clusterID string, clientId string, url string, subject string) (*Conn, error) { + conn, err := stan.Connect(clusterID, clientId, stan.NatsURL(url)) + if err != nil { + return nil, err + } + log.Debugf("connected to %s as %s", conn.NatsConn().ConnectedUrl(), clientId) + + return &Conn{conn, subject}, nil +} + +func (c *Conn) Send(message interface{}) error { + bytes, err := json.Marshal(message); + if err != nil { + return err + } + return c.Conn.Publish(c.Subject, bytes) +} diff --git a/cmd/coord/coord b/cmd/coord/coord new file mode 100755 index 0000000000000000000000000000000000000000..c88cf1aa64deabc7f772e2bdc3e18da624fbbbd4 Binary files /dev/null and b/cmd/coord/coord differ diff --git a/cmd/coord/main.go b/cmd/coord/main.go index dc59c294404deb3bf711e29fd411e95717fc859c..fe058b9581c91f5bbdbce00a20e4cfea21b5fd93 100644 --- a/cmd/coord/main.go +++ b/cmd/coord/main.go @@ -1,9 +1,12 @@ -package coord +package main import ( + "crypto/sha1" + "encoding/json" + "fmt" "os" - "strconv" - "time" + "strings" + "sync" "github.com/mkideal/cli" "github.com/nats-io/nats.go" @@ -14,14 +17,33 @@ import ( type ( Config struct { - ServerURL string `cli:"s,server" usage:"URL of the NATS server" dft:"$NATS_URL"` - ClusterID string `cli:"c,clusterID" usage:"Cluster ID of the NATS Streaming server" dft:"$NATS_CLUSTER_ID"` - cli.AutoHelper + ServerURL string `cli:"s,server" usage:"URL of the NATS server" dft:"$NATS_URL"` + ClusterID string `cli:"c,clusterID" usage:"Cluster ID of the NATS Streaming server" dft:"$NATS_CLUSTER_ID"` + Subject string `cli:"S,subject" usage:"Reporting subject"` + Wait []string `cli:"w,wait" usage:"wait for a success event on given channel before starting the command"` + Durable string `cli:"n,name" usage:"persistent client name for event replay"` + cli.Helper + } + + StartedEvent struct { + Command []string + PID int + } + + TerminatedEvent struct { + PID int + Success bool + ExitCode int + Comment string + } + + ErrorEvent struct { + Error error } ) func main() { - config := &Config{ + config := Config{ ServerURL: nats.DefaultURL, ClusterID: "https://gitlab.irstea.fr/guillaume.perreal/coord", } @@ -29,16 +51,96 @@ func main() { } func Run(ctx *cli.Context) error { + cmd, err := NewCommand(ctx.Args()) + if err != nil { + return err + } + + hostname, err := os.Hostname() + if err != nil { + return err + } + + hash := sha1.New() + hash.Write([]byte(strings.Join(cmd.Args, " "))) + clientId := fmt.Sprintf("coord-%x", hash.Sum([]byte(hostname))) + config := ctx.Argv().(*Config) - clientId := "coord." + strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 36) + if config.Subject == "" { + config.Subject = fmt.Sprintf("coord.cmd.%x", hash.Sum(nil)) + } - conn, err := stan.Connect(config.ClusterID, clientId, stan.NatsURL(config.ServerURL)) + conn, err := NewConn(config.ClusterID, clientId, config.ServerURL, config.Subject) if err != nil { return err } defer conn.Close() - log.Debugf("connected to %s", conn.NatsConn().ConnectedUrl()) + var opts []stan.SubscriptionOption + if config.Durable == "" { + opts = append(opts, stan.DurableName(clientId)) + } else { + opts = append(opts, stan.DurableName(config.Durable)) + } + + before := sync.WaitGroup{} + for _, channel := range config.Wait { + if sub, err := waitFor(conn, channel, &before, opts); err != nil { + return err + } else { + defer sub.Close() + } + } + + before.Wait() + + err = cmd.Start() + if err != nil { + _ = conn.Send(ErrorEvent{err}) + } else { + _ = conn.Send(StartedEvent{cmd.Args, cmd.Process.Pid}) + } + + err = cmd.Wait() + if err != nil { + _ = conn.Send(ErrorEvent{err}) + } else { + state := cmd.ProcessState + _ = conn.Send(TerminatedEvent{state.Pid(), state.Success(), state.ExitCode(), state.String()}) + } return nil } + +func waitFor(conn *Conn, channel string, wg *sync.WaitGroup, opts []stan.SubscriptionOption) (sub stan.Subscription, err error) { + var once sync.Once + + sub, err = conn.Subscribe(channel, func(msg *stan.Msg) { + log.Debugf("received message %v", msg) + var event struct { + *StartedEvent + *ErrorEvent + *TerminatedEvent + } + if err := json.Unmarshal(msg.Data, &event); err != nil { + log.Debugf("could not unmarshal to terminated event: %s", msg) + return + } + if event.TerminatedEvent == nil { + return + } + log.Debugf("%s terminated: %v", channel, event) + if event.Success { + once.Do(func() { + log.Debugf("%s succeeded", channel) + wg.Done() + }) + } + }, opts...) + + if err == nil { + wg.Add(1) + } + + return +} diff --git a/go.mod b/go.mod index 1f28e814a5002bd63a0d93f890610dc7f658f447..6fd5c6bea82290a55182b487d61ba115a9b88606 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,20 @@ go 1.12 require ( github.com/Bowery/prompt v0.0.0-20190419144237-972d0ceb96f5 // indirect + github.com/hashicorp/go-hclog v0.9.2 // indirect + github.com/hashicorp/go-immutable-radix v1.1.0 // indirect + github.com/hashicorp/go-uuid v1.0.1 // indirect + github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/labstack/gommon v0.2.9 // indirect github.com/mkideal/cli v0.0.3 github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc // indirect + github.com/nats-io/jwt v0.2.8 // indirect github.com/nats-io/nats-streaming-server v0.15.1 // indirect github.com/nats-io/nats.go v1.8.1 github.com/nats-io/stan.go v0.5.0 - golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect + github.com/prometheus/procfs v0.0.3 // indirect + go.etcd.io/bbolt v1.3.3 // indirect + golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 // indirect + golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 // indirect + google.golang.org/appengine v1.6.1 // indirect ) diff --git a/go.sum b/go.sum index 420e7a1a8a786aee3667edd8c3576f4902fe1121..9edff036e95e92da1a9ba87363fdbd191f4c4d60 100644 --- a/go.sum +++ b/go.sum @@ -17,18 +17,27 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= +github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.1.0 h1:vN9wG1D6KG6YHRTWr8512cxGOVgTMEfgEdSj/hr8MPc= +github.com/hashicorp/go-immutable-radix v1.1.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0= github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -48,6 +57,8 @@ github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc h1:eyN9UWVX+CeeCQZPudC github.com/mkideal/pkg v0.0.0-20170503154153-3e188c9e7ecc/go.mod h1:DECgB56amjU/mmmsKuooNPQ1856HASOMC3D4ntSVU70= github.com/nats-io/jwt v0.2.6 h1:eAyoYvGgGLXR2EpnsBUvi/FcFrBqN6YKFVbOoEfPN4k= github.com/nats-io/jwt v0.2.6/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY= +github.com/nats-io/jwt v0.2.8 h1:PXr0mRjPCPX4cXsdfHcsqoplrNXnKOD+g2yHoh9qy1I= +github.com/nats-io/jwt v0.2.8/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY= github.com/nats-io/nats-server/v2 v2.0.0 h1:rbFV7gfUPErVdKImVMOlW8Qb1V22nlcpqup5cb9rYa8= github.com/nats-io/nats-server/v2 v2.0.0/go.mod h1:RyVdsHHvY4B6c9pWG+uRLpZ0h0XsqiuKp2XCTurP5LI= github.com/nats-io/nats-streaming-server v0.15.1 h1:NLQg18mp68e17v+RJpXyPdA7ZH4osFEZQzV3tdxT6/M= @@ -72,6 +83,8 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= @@ -83,22 +96,36 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU= -golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 h1:ydJNl0ENAG67pFbB+9tfhiL2pYqLhfoaZFw/cjLhY4A= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=