diff --git a/cmd/coord-server/connection.go b/cmd/coord-server/connection.go index a6eed4f741509f103ac5fc5bfd403612d06615aa..e52481562beca6845f736b58ad27c96ccd4ed8ac 100644 --- a/cmd/coord-server/connection.go +++ b/cmd/coord-server/connection.go @@ -4,12 +4,12 @@ import ( "bufio" "encoding/json" "io" - "log" "sync" "github.com/google/uuid" "github.com/joomcode/errorx" + "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" "gitlab.irstea.fr/guillaume.perreal/coord/lib/safe" ) @@ -24,15 +24,18 @@ type Connection struct { closing bool done sync.WaitGroup + + *log.Logger } -func NewConnection(conn io.ReadWriteCloser, stream *Stream) *Connection { +func NewConnection(conn io.ReadWriteCloser, stream *Stream, logger *log.Logger) *Connection { c := &Connection{ StopOnce: new(safe.StopOnce), stream: stream, conn: conn, responses: make(chan protocol.Response, 1), subscriptions: make(map[protocol.SubscriptionID]*Subscription), + Logger: logger, } c.OnStop(c.startClosing) @@ -49,7 +52,7 @@ func NewConnection(conn io.ReadWriteCloser, stream *Stream) *Connection { func (c *Connection) startClosing() { if !c.closing { - log.Println("closing connection") + c.Debug("closing connection") c.closing = true } } @@ -57,20 +60,30 @@ func (c *Connection) startClosing() { func (c *Connection) cleanup() { c.done.Wait() close(c.responses) - log.Println("connection closed") + c.Info("connection closed") } func (c *Connection) readRequests() { defer c.startClosing() defer c.done.Done() + c.Info("accepting requests") scanner := bufio.NewScanner(c.conn) for !c.closing && scanner.Scan() { var request protocol.Request - if err := json.Unmarshal(scanner.Bytes(), &request); err != nil { + + data := scanner.Bytes() + if len(data) == 0 { + continue + } + + if err := json.Unmarshal(data, &request); err != nil { + c.Debugf("invalid request: `%s`", data) c.responses <- protocol.Error(errorx.Decorate(err, "invalid request")) continue } + c.Debugf("received request: %v", request) + response, err := c.handleRequest(request) if err != nil { response = protocol.Error(errorx.Decorate(err, "handling request")) @@ -80,7 +93,7 @@ func (c *Connection) readRequests() { } if err := scanner.Err(); err != nil { - log.Println("error reading requests:", err) + c.Errorf("error reading requests: %s", err) } } @@ -90,9 +103,11 @@ func (c *Connection) writeResponses() { for response := range c.responses { if data, err := json.Marshal(response); err != nil { - log.Println("could not marshal response:", err) + c.Errorf("could not marshal response: %s", err) } else if _, err := c.conn.Write(append(data, '\n')); err != nil { - log.Println("could not write response:", err) + c.Errorf("could not write response: %s", err) + } else { + c.Debugf("sent response: `%s`", data) } } } diff --git a/cmd/coord-server/main.go b/cmd/coord-server/main.go index 9dd03e1bf5bb0e47adaad8ec704a221dd88532dd..73b8afea5edb7734a158a5c1ad3707b24b2000ba 100644 --- a/cmd/coord-server/main.go +++ b/cmd/coord-server/main.go @@ -1,8 +1,9 @@ package main import ( + "fmt" + "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" "io" - "log" "net" "os" "os/signal" @@ -22,10 +23,10 @@ func main() { startConsoleConnection(stream) if err := startUnixListener(stream); err != nil { - log.Println("could not start socket listener:", err) + log.Errorf("could not start socket listener: %s", err) } if err := startTCPListener(stream); err != nil { - log.Println("could not start TCP listener:", err) + log.Errorf("could not start TCP listener: %s", err) } signals := make(chan os.Signal, 1) @@ -33,17 +34,17 @@ func main() { select { case <-stop.Stopped(): - log.Println("console connection closed") case sig := <-signals: - log.Printf("received signal '%s', exiting", sig) + log.Criticalf("received signal '%s', exiting", sig) } } func startConsoleConnection(stream *Stream) { - log.Println("listening to stdin") + logger := log.DefaultLogger.WithField("server", "console") NewConnection( PseudoReadWriteCloser{os.Stdin, os.Stdout}, stream, + logger, ) } @@ -59,9 +60,8 @@ func startUnixListener(stream *Stream) error { } listener.SetUnlinkOnClose(true) - log.Println("started unix listener on", addr) - safe.Go(func() { runListener(stream, listener) }) - + logger := log.DefaultLogger.WithField("server", fmt.Sprintf("%s://%s", addr.Net, addr.Name)) + safe.Go(func() { runListener(stream, listener, logger) }) return nil } @@ -76,25 +76,28 @@ func startTCPListener(stream *Stream) error { return err } - log.Println("started TCP listener on", addr) - safe.Go(func() { runListener(stream, listener) }) - + logger := log.DefaultLogger.WithField("server", fmt.Sprintf("%s://%s:%d", addr.Network(), addr.IP, addr.Port)) + safe.Go(func() { runListener(stream, listener, logger) }) return nil } -func runListener(stream *Stream, listener net.Listener) { +func runListener(stream *Stream, listener net.Listener, logger *log.Logger) { + defer logger.Noticef("closed") + logger.Noticef("accepting connections") + for { if subConn, err := waitNewConnection(listener); err == nil { - log.Println("accepted connection from", subConn.RemoteAddr()) - NewConnection(subConn, stream) + subLogger := logger.WithField("client", subConn.RemoteAddr()) + NewConnection(subConn, stream, subLogger) } else if netErr, ok := err.(net.Error); ok { if netErr.Timeout() { continue } - log.Println(err) if !netErr.Temporary() { + logger.Errorf("%s", err) return } + logger.Warningf("%s", err) } } } diff --git a/cmd/coord-server/stream.go b/cmd/coord-server/stream.go index ccbaedacba1b8909965c00339f86f623cb3baadf..f84e49d78d0ae76117eea242af05b92d37537b22 100644 --- a/cmd/coord-server/stream.go +++ b/cmd/coord-server/stream.go @@ -53,13 +53,15 @@ type Subscription struct { *safe.StopOnce } +var opener = protocol.Event{Topic: "READY"} + func (s *Stream) Subscribe(matcher protocol.TopicMatcher) *Subscription { events := make(chan protocol.Event, s.SubscriberChanSize) stopper := safe.GoWithStopper(func(stop <-chan struct{}) { defer close(events) - events <- protocol.Event{Topic: "READY"} + events <- opener cursor := &s.origin s.mu.RLock() diff --git a/lib/log/default.go b/lib/log/default.go index beddd59aeb0bcda9058e47f9afe9c9dcfb93d431..1f762d2454f5dd11534eb516966a33d3b08816be 100644 --- a/lib/log/default.go +++ b/lib/log/default.go @@ -1,24 +1,35 @@ package log -import "os" +import ( + "os" +) -var Default *Logger = WriterLogger(os.Stderr) +// DefaultLogger is the package logger. +var DefaultLogger = &Logger{NewLogWriter(DEBUG, os.Stderr, TextFormatter)} -func Debug(message string) { Default.Debug(message) } -func Info(message string) { Default.Info(message) } -func Notice(message string) { Default.Notice(message) } -func Warning(message string) { Default.Warning(message) } -func Error(message string) { Default.Error(message) } -func Critical(message string) { Default.Critical(message) } +// Debug logs a message at DEBUG level using the package logger. +func Debug(message string) { DefaultLogger.Debug(message) } +func Info(message string) { DefaultLogger.Info(message) } +func Notice(message string) { DefaultLogger.Notice(message) } +func Warning(message string) { DefaultLogger.Warning(message) } +func Error(message string) { DefaultLogger.Error(message) } +func Critical(message string) { DefaultLogger.Critical(message) } -func Debugf(template string, args ...interface{}) { Default.Debugf(template, args...) } -func Infof(template string, args ...interface{}) { Default.Infof(template, args...) } -func Noticef(template string, args ...interface{}) { Default.Noticef(template, args...) } -func Warningf(template string, args ...interface{}) { Default.Warningf(template, args...) } -func Errorf(template string, args ...interface{}) { Default.Errorf(template, args...) } -func Criticalf(template string, args ...interface{}) { Default.Criticalf(template, args...) } +func Debugf(template string, args ...interface{}) { DefaultLogger.Debugf(template, args...) } +func Infof(template string, args ...interface{}) { DefaultLogger.Infof(template, args...) } +func Noticef(template string, args ...interface{}) { DefaultLogger.Noticef(template, args...) } +func Warningf(template string, args ...interface{}) { DefaultLogger.Warningf(template, args...) } +func Errorf(template string, args ...interface{}) { DefaultLogger.Errorf(template, args...) } +func Criticalf(template string, args ...interface{}) { DefaultLogger.Criticalf(template, args...) } -var ErrorHandler func(err error) = defaultErrorHandler +// The errorHandler is used when something goes wrong in a logger. +var errorHandler = defaultErrorHandler + +// SetErrorHandler changes the error handler. +func SetErrorHandler(new func(err error)) (old func(err error)) { + errorHandler, old = new, errorHandler + return +} func defaultErrorHandler(err error) { _, _ = os.Stderr.WriteString("Error while logging: " + err.Error() + "\n") diff --git a/lib/log/entry.go b/lib/log/entry.go index 37180acf4997052fffac16b1746dc04c744f1bd2..7ab776c0f3ca8ee65f90a9fc296ea2d217217a22 100644 --- a/lib/log/entry.go +++ b/lib/log/entry.go @@ -2,20 +2,25 @@ package log import ( "fmt" + "os" + "path/filepath" "time" ) type Entry struct { - Timestamp time.Time - Level Level - Message string - Fields *Fields + time.Time + Category string + Level + Message string + *Fields } -func NewEntry(level Level, message string, fields *Fields) *Entry { - return &Entry{time.Now(), level, message, fields} +var DefaultCategory = filepath.Base(os.Args[0]) + +func NewEntry(level Level, message string) *Entry { + return &Entry{time.Now(), DefaultCategory, level, message, nil} } -func NewEntryf(level Level, template string, args []interface{}, fields *Fields) *Entry { - return NewEntry(level, fmt.Sprintf(template, args...), fields) +func NewEntryf(level Level, template string, args ...interface{}) *Entry { + return NewEntry(level, fmt.Sprintf(template, args...)) } diff --git a/lib/log/fields.go b/lib/log/fields.go index ccd860f14ee06af6ba24827626b8787b03612652..d0f0eb3d9680ea752d43b3e4fe40669e93fbdcbb 100644 --- a/lib/log/fields.go +++ b/lib/log/fields.go @@ -1,16 +1,24 @@ package log +// Fields is a linked list of (name, value) couple type Fields struct { - Name string - Value interface{} + name string + value interface{} previous *Fields } +// With creates new Fields that include the given (name, value) couple. func (f *Fields) With(name string, value interface{}) *Fields { return &Fields{name, value, f} } +// ForEach applies the given function for each (name, value) couple, as long as it returns true. func (f *Fields) ForEach(do func(string, interface{}) bool) { - for i := f; i != nil && do(i.Name, i.Value); i = i.previous { + for i := f; i != nil && do(i.name, i.value); i = i.previous { } } + +func (f *Fields) IsEmpty() bool { + return f == nil +} + diff --git a/lib/log/formatter.go b/lib/log/formatter.go new file mode 100644 index 0000000000000000000000000000000000000000..f051b45c2ebd1b84dd075abb3b7d94039e4fc291 --- /dev/null +++ b/lib/log/formatter.go @@ -0,0 +1,48 @@ +package log + +import ( + "fmt" + "strings" + "time" +) + +// A Formatter formats an entry. +type Formatter interface { + Format(*Entry) ([]byte, error) +} + +// A FormatterFunc implements Formatter using a single function. +type FormatterFunc func(*Entry) ([]byte, error) + +// Format implements Formatter. +func (f FormatterFunc) Format(entry *Entry) ([]byte, error) { return f(entry) } + +// TextFormatter formats an entry as one-line text. +var TextFormatter = FormatterFunc(func(entry *Entry) (output []byte, err error) { + var builder strings.Builder + + if _, err = fmt.Fprintf( + &builder, + "%s %s [%s]: %s", + entry.Time.Format(time.RFC3339), + entry.Category, + entry.Level, + entry.Message, + ); err != nil { + return + } + + entry.Fields.ForEach(func(name string, value interface{}) bool { + _, err = fmt.Fprintf(&builder, " %s=%v", name, value) + return err == nil + }) + if err != nil { + return + } + + if err = builder.WriteByte('\n'); err == nil { + output = []byte(builder.String()) + } + + return +}) diff --git a/lib/log/level.go b/lib/log/level.go index b9c3b8c5513a93cda78e3f1757e3ab30a3d33378..65548f394b7a5a902bc771ab09037a64d8fc04ca 100644 --- a/lib/log/level.go +++ b/lib/log/level.go @@ -1,5 +1,6 @@ package log +// Level represents the severity of a log entry. type Level byte const ( diff --git a/lib/log/logger.go b/lib/log/logger.go index 7565fb5eb076434633a2501722a4fae0e9e5ae9d..40586d8412faf06965889906240f1e78b4a04730 100644 --- a/lib/log/logger.go +++ b/lib/log/logger.go @@ -1,45 +1,85 @@ package log -type logger interface { +import "io" + +// RawLogger is the building brick of the package. +type RawLogger interface { Log(*Entry) } -type Logger struct { - logger - Fields *Fields -} +// RawLoggerFunc is a RawLogger implementation using a function. +type RawLoggerFunc func(*Entry) -func (l *Logger) WithField(name string, value interface{}) *Logger { - return &Logger{l, l.Fields.With(name, value)} -} +// Log implements RawLogger +func (f RawLoggerFunc) Log(entry *Entry) { f(entry) } + +// Logger adds convenience methods to a RawLogger. +type Logger struct{ inner RawLogger } -func (l *Logger) Debug(message string) { l.Log(NewEntry(DEBUG, message, l.Fields)) } -func (l *Logger) Info(message string) { l.Log(NewEntry(INFO, message, l.Fields)) } -func (l *Logger) Notice(message string) { l.Log(NewEntry(NOTICE, message, l.Fields)) } -func (l *Logger) Warning(message string) { l.Log(NewEntry(WARNING, message, l.Fields)) } -func (l *Logger) Error(message string) { l.Log(NewEntry(ERROR, message, l.Fields)) } -func (l *Logger) Critical(message string) { l.Log(NewEntry(CRITICAL, message, l.Fields)) } +func (l *Logger) Log(entry *Entry) { l.inner.Log(entry) } + +func (l *Logger) Debug(message string) { l.inner.Log(NewEntry(DEBUG, message)) } +func (l *Logger) Info(message string) { l.inner.Log(NewEntry(INFO, message)) } +func (l *Logger) Notice(message string) { l.inner.Log(NewEntry(NOTICE, message)) } +func (l *Logger) Warning(message string) { l.inner.Log(NewEntry(WARNING, message)) } +func (l *Logger) Error(message string) { l.inner.Log(NewEntry(ERROR, message)) } +func (l *Logger) Critical(message string) { l.inner.Log(NewEntry(CRITICAL, message)) } func (l *Logger) Debugf(template string, args ...interface{}) { - l.Log(NewEntryf(DEBUG, template, args, l.Fields)) + l.inner.Log(NewEntryf(DEBUG, template, args...)) } func (l *Logger) Infof(template string, args ...interface{}) { - l.Log(NewEntryf(INFO, template, args, l.Fields)) + l.inner.Log(NewEntryf(INFO, template, args...)) } func (l *Logger) Noticef(template string, args ...interface{}) { - l.Log(NewEntryf(NOTICE, template, args, l.Fields)) + l.inner.Log(NewEntryf(NOTICE, template, args...)) } func (l *Logger) Warningf(template string, args ...interface{}) { - l.Log(NewEntryf(WARNING, template, args, l.Fields)) + l.inner.Log(NewEntryf(WARNING, template, args...)) } func (l *Logger) Errorf(template string, args ...interface{}) { - l.Log(NewEntryf(ERROR, template, args, l.Fields)) + l.inner.Log(NewEntryf(ERROR, template, args...)) } func (l *Logger) Criticalf(template string, args ...interface{}) { - l.Log(NewEntryf(CRITICAL, template, args, l.Fields)) + l.inner.Log(NewEntryf(CRITICAL, template, args...)) +} + +func (l *Logger) WithField(name string, value interface{}) *Logger { + return &Logger{RawLoggerFunc(func(entry *Entry) { + entry.Fields = entry.Fields.With(name, value) + l.inner.Log(entry) + })} +} + +func (l *Logger) WithCategory(category string) *Logger { + return &Logger{RawLoggerFunc(func(entry *Entry) { + entry.Category = category + l.inner.Log(entry) + })} +} + +type LogWriter struct { + minLevel Level + writer io.Writer + formatter Formatter +} + +func NewLogWriter(minLevel Level, writer io.Writer, formatter Formatter) *LogWriter { + return &LogWriter{minLevel, writer, formatter} +} + +func (w *LogWriter) Log(entry *Entry) { + if entry.Level < w.minLevel { + return + } + if bytes, err := w.formatter.Format(entry); err != nil { + errorHandler(err) + } else if _, err := w.writer.Write(bytes); err != nil { + errorHandler(err) + } } diff --git a/lib/log/writer.go b/lib/log/writer.go deleted file mode 100644 index bd4eadc50bd20c32e2020c4f243e58c53cb7f363..0000000000000000000000000000000000000000 --- a/lib/log/writer.go +++ /dev/null @@ -1,16 +0,0 @@ -package log - -import ( - "io" -) - -type writerLogger struct { - target io.Writer -} - -func (l writerLogger) Log(entry *Entry) { -} - -func WriterLogger(target io.WriteCloser) *Logger { - return &Logger{logger: writerLogger{target}} -}