Commit ee34e1d7 authored by Guillaume Perréal's avatar Guillaume Perréal
Browse files

Working logger with fields.

No related merge requests found
Showing with 204 additions and 87 deletions
+204 -87
...@@ -4,12 +4,12 @@ import ( ...@@ -4,12 +4,12 @@ import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"io" "io"
"log"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/joomcode/errorx" "github.com/joomcode/errorx"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/log"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/safe" "gitlab.irstea.fr/guillaume.perreal/coord/lib/safe"
) )
...@@ -24,15 +24,18 @@ type Connection struct { ...@@ -24,15 +24,18 @@ type Connection struct {
closing bool closing bool
done sync.WaitGroup done sync.WaitGroup
*log.Logger
} }
func NewConnection(conn io.ReadWriteCloser, stream *Stream) *Connection { func NewConnection(conn io.ReadWriteCloser, stream *Stream, logger *log.Logger) *Connection {
c := &Connection{ c := &Connection{
StopOnce: new(safe.StopOnce), StopOnce: new(safe.StopOnce),
stream: stream, stream: stream,
conn: conn, conn: conn,
responses: make(chan protocol.Response, 1), responses: make(chan protocol.Response, 1),
subscriptions: make(map[protocol.SubscriptionID]*Subscription), subscriptions: make(map[protocol.SubscriptionID]*Subscription),
Logger: logger,
} }
c.OnStop(c.startClosing) c.OnStop(c.startClosing)
...@@ -49,7 +52,7 @@ func NewConnection(conn io.ReadWriteCloser, stream *Stream) *Connection { ...@@ -49,7 +52,7 @@ func NewConnection(conn io.ReadWriteCloser, stream *Stream) *Connection {
func (c *Connection) startClosing() { func (c *Connection) startClosing() {
if !c.closing { if !c.closing {
log.Println("closing connection") c.Debug("closing connection")
c.closing = true c.closing = true
} }
} }
...@@ -57,20 +60,30 @@ func (c *Connection) startClosing() { ...@@ -57,20 +60,30 @@ func (c *Connection) startClosing() {
func (c *Connection) cleanup() { func (c *Connection) cleanup() {
c.done.Wait() c.done.Wait()
close(c.responses) close(c.responses)
log.Println("connection closed") c.Info("connection closed")
} }
func (c *Connection) readRequests() { func (c *Connection) readRequests() {
defer c.startClosing() defer c.startClosing()
defer c.done.Done() defer c.done.Done()
c.Info("accepting requests")
scanner := bufio.NewScanner(c.conn) scanner := bufio.NewScanner(c.conn)
for !c.closing && scanner.Scan() { for !c.closing && scanner.Scan() {
var request protocol.Request var request protocol.Request
if err := json.Unmarshal(scanner.Bytes(), &request); err != nil {
data := scanner.Bytes()
if len(data) == 0 {
continue
}
if err := json.Unmarshal(data, &request); err != nil {
c.Debugf("invalid request: `%s`", data)
c.responses <- protocol.Error(errorx.Decorate(err, "invalid request")) c.responses <- protocol.Error(errorx.Decorate(err, "invalid request"))
continue continue
} }
c.Debugf("received request: %v", request)
response, err := c.handleRequest(request) response, err := c.handleRequest(request)
if err != nil { if err != nil {
response = protocol.Error(errorx.Decorate(err, "handling request")) response = protocol.Error(errorx.Decorate(err, "handling request"))
...@@ -80,7 +93,7 @@ func (c *Connection) readRequests() { ...@@ -80,7 +93,7 @@ func (c *Connection) readRequests() {
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
log.Println("error reading requests:", err) c.Errorf("error reading requests: %s", err)
} }
} }
...@@ -90,9 +103,11 @@ func (c *Connection) writeResponses() { ...@@ -90,9 +103,11 @@ func (c *Connection) writeResponses() {
for response := range c.responses { for response := range c.responses {
if data, err := json.Marshal(response); err != nil { if data, err := json.Marshal(response); err != nil {
log.Println("could not marshal response:", err) c.Errorf("could not marshal response: %s", err)
} else if _, err := c.conn.Write(append(data, '\n')); err != nil { } else if _, err := c.conn.Write(append(data, '\n')); err != nil {
log.Println("could not write response:", err) c.Errorf("could not write response: %s", err)
} else {
c.Debugf("sent response: `%s`", data)
} }
} }
} }
......
package main package main
import ( import (
"fmt"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/log"
"io" "io"
"log"
"net" "net"
"os" "os"
"os/signal" "os/signal"
...@@ -22,10 +23,10 @@ func main() { ...@@ -22,10 +23,10 @@ func main() {
startConsoleConnection(stream) startConsoleConnection(stream)
if err := startUnixListener(stream); err != nil { if err := startUnixListener(stream); err != nil {
log.Println("could not start socket listener:", err) log.Errorf("could not start socket listener: %s", err)
} }
if err := startTCPListener(stream); err != nil { if err := startTCPListener(stream); err != nil {
log.Println("could not start TCP listener:", err) log.Errorf("could not start TCP listener: %s", err)
} }
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
...@@ -33,17 +34,17 @@ func main() { ...@@ -33,17 +34,17 @@ func main() {
select { select {
case <-stop.Stopped(): case <-stop.Stopped():
log.Println("console connection closed")
case sig := <-signals: case sig := <-signals:
log.Printf("received signal '%s', exiting", sig) log.Criticalf("received signal '%s', exiting", sig)
} }
} }
func startConsoleConnection(stream *Stream) { func startConsoleConnection(stream *Stream) {
log.Println("listening to stdin") logger := log.DefaultLogger.WithField("server", "console")
NewConnection( NewConnection(
PseudoReadWriteCloser{os.Stdin, os.Stdout}, PseudoReadWriteCloser{os.Stdin, os.Stdout},
stream, stream,
logger,
) )
} }
...@@ -59,9 +60,8 @@ func startUnixListener(stream *Stream) error { ...@@ -59,9 +60,8 @@ func startUnixListener(stream *Stream) error {
} }
listener.SetUnlinkOnClose(true) listener.SetUnlinkOnClose(true)
log.Println("started unix listener on", addr) logger := log.DefaultLogger.WithField("server", fmt.Sprintf("%s://%s", addr.Net, addr.Name))
safe.Go(func() { runListener(stream, listener) }) safe.Go(func() { runListener(stream, listener, logger) })
return nil return nil
} }
...@@ -76,25 +76,28 @@ func startTCPListener(stream *Stream) error { ...@@ -76,25 +76,28 @@ func startTCPListener(stream *Stream) error {
return err return err
} }
log.Println("started TCP listener on", addr) logger := log.DefaultLogger.WithField("server", fmt.Sprintf("%s://%s:%d", addr.Network(), addr.IP, addr.Port))
safe.Go(func() { runListener(stream, listener) }) safe.Go(func() { runListener(stream, listener, logger) })
return nil return nil
} }
func runListener(stream *Stream, listener net.Listener) { func runListener(stream *Stream, listener net.Listener, logger *log.Logger) {
defer logger.Noticef("closed")
logger.Noticef("accepting connections")
for { for {
if subConn, err := waitNewConnection(listener); err == nil { if subConn, err := waitNewConnection(listener); err == nil {
log.Println("accepted connection from", subConn.RemoteAddr()) subLogger := logger.WithField("client", subConn.RemoteAddr())
NewConnection(subConn, stream) NewConnection(subConn, stream, subLogger)
} else if netErr, ok := err.(net.Error); ok { } else if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() { if netErr.Timeout() {
continue continue
} }
log.Println(err)
if !netErr.Temporary() { if !netErr.Temporary() {
logger.Errorf("%s", err)
return return
} }
logger.Warningf("%s", err)
} }
} }
} }
......
...@@ -53,13 +53,15 @@ type Subscription struct { ...@@ -53,13 +53,15 @@ type Subscription struct {
*safe.StopOnce *safe.StopOnce
} }
var opener = protocol.Event{Topic: "READY"}
func (s *Stream) Subscribe(matcher protocol.TopicMatcher) *Subscription { func (s *Stream) Subscribe(matcher protocol.TopicMatcher) *Subscription {
events := make(chan protocol.Event, s.SubscriberChanSize) events := make(chan protocol.Event, s.SubscriberChanSize)
stopper := safe.GoWithStopper(func(stop <-chan struct{}) { stopper := safe.GoWithStopper(func(stop <-chan struct{}) {
defer close(events) defer close(events)
events <- protocol.Event{Topic: "READY"} events <- opener
cursor := &s.origin cursor := &s.origin
s.mu.RLock() s.mu.RLock()
......
package log package log
import "os" import (
"os"
)
var Default *Logger = WriterLogger(os.Stderr) // DefaultLogger is the package logger.
var DefaultLogger = &Logger{NewLogWriter(DEBUG, os.Stderr, TextFormatter)}
func Debug(message string) { Default.Debug(message) } // Debug logs a message at DEBUG level using the package logger.
func Info(message string) { Default.Info(message) } func Debug(message string) { DefaultLogger.Debug(message) }
func Notice(message string) { Default.Notice(message) } func Info(message string) { DefaultLogger.Info(message) }
func Warning(message string) { Default.Warning(message) } func Notice(message string) { DefaultLogger.Notice(message) }
func Error(message string) { Default.Error(message) } func Warning(message string) { DefaultLogger.Warning(message) }
func Critical(message string) { Default.Critical(message) } func Error(message string) { DefaultLogger.Error(message) }
func Critical(message string) { DefaultLogger.Critical(message) }
func Debugf(template string, args ...interface{}) { Default.Debugf(template, args...) } func Debugf(template string, args ...interface{}) { DefaultLogger.Debugf(template, args...) }
func Infof(template string, args ...interface{}) { Default.Infof(template, args...) } func Infof(template string, args ...interface{}) { DefaultLogger.Infof(template, args...) }
func Noticef(template string, args ...interface{}) { Default.Noticef(template, args...) } func Noticef(template string, args ...interface{}) { DefaultLogger.Noticef(template, args...) }
func Warningf(template string, args ...interface{}) { Default.Warningf(template, args...) } func Warningf(template string, args ...interface{}) { DefaultLogger.Warningf(template, args...) }
func Errorf(template string, args ...interface{}) { Default.Errorf(template, args...) } func Errorf(template string, args ...interface{}) { DefaultLogger.Errorf(template, args...) }
func Criticalf(template string, args ...interface{}) { Default.Criticalf(template, args...) } func Criticalf(template string, args ...interface{}) { DefaultLogger.Criticalf(template, args...) }
var ErrorHandler func(err error) = defaultErrorHandler // The errorHandler is used when something goes wrong in a logger.
var errorHandler = defaultErrorHandler
// SetErrorHandler changes the error handler.
func SetErrorHandler(new func(err error)) (old func(err error)) {
errorHandler, old = new, errorHandler
return
}
func defaultErrorHandler(err error) { func defaultErrorHandler(err error) {
_, _ = os.Stderr.WriteString("Error while logging: " + err.Error() + "\n") _, _ = os.Stderr.WriteString("Error while logging: " + err.Error() + "\n")
......
...@@ -2,20 +2,25 @@ package log ...@@ -2,20 +2,25 @@ package log
import ( import (
"fmt" "fmt"
"os"
"path/filepath"
"time" "time"
) )
type Entry struct { type Entry struct {
Timestamp time.Time time.Time
Level Level Category string
Message string Level
Fields *Fields Message string
*Fields
} }
func NewEntry(level Level, message string, fields *Fields) *Entry { var DefaultCategory = filepath.Base(os.Args[0])
return &Entry{time.Now(), level, message, fields}
func NewEntry(level Level, message string) *Entry {
return &Entry{time.Now(), DefaultCategory, level, message, nil}
} }
func NewEntryf(level Level, template string, args []interface{}, fields *Fields) *Entry { func NewEntryf(level Level, template string, args ...interface{}) *Entry {
return NewEntry(level, fmt.Sprintf(template, args...), fields) return NewEntry(level, fmt.Sprintf(template, args...))
} }
package log package log
// Fields is a linked list of (name, value) couple
type Fields struct { type Fields struct {
Name string name string
Value interface{} value interface{}
previous *Fields previous *Fields
} }
// With creates new Fields that include the given (name, value) couple.
func (f *Fields) With(name string, value interface{}) *Fields { func (f *Fields) With(name string, value interface{}) *Fields {
return &Fields{name, value, f} return &Fields{name, value, f}
} }
// ForEach applies the given function for each (name, value) couple, as long as it returns true.
func (f *Fields) ForEach(do func(string, interface{}) bool) { func (f *Fields) ForEach(do func(string, interface{}) bool) {
for i := f; i != nil && do(i.Name, i.Value); i = i.previous { for i := f; i != nil && do(i.name, i.value); i = i.previous {
} }
} }
func (f *Fields) IsEmpty() bool {
return f == nil
}
package log
import (
"fmt"
"strings"
"time"
)
// A Formatter formats an entry.
type Formatter interface {
Format(*Entry) ([]byte, error)
}
// A FormatterFunc implements Formatter using a single function.
type FormatterFunc func(*Entry) ([]byte, error)
// Format implements Formatter.
func (f FormatterFunc) Format(entry *Entry) ([]byte, error) { return f(entry) }
// TextFormatter formats an entry as one-line text.
var TextFormatter = FormatterFunc(func(entry *Entry) (output []byte, err error) {
var builder strings.Builder
if _, err = fmt.Fprintf(
&builder,
"%s %s [%s]: %s",
entry.Time.Format(time.RFC3339),
entry.Category,
entry.Level,
entry.Message,
); err != nil {
return
}
entry.Fields.ForEach(func(name string, value interface{}) bool {
_, err = fmt.Fprintf(&builder, " %s=%v", name, value)
return err == nil
})
if err != nil {
return
}
if err = builder.WriteByte('\n'); err == nil {
output = []byte(builder.String())
}
return
})
package log package log
// Level represents the severity of a log entry.
type Level byte type Level byte
const ( const (
......
package log package log
type logger interface { import "io"
// RawLogger is the building brick of the package.
type RawLogger interface {
Log(*Entry) Log(*Entry)
} }
type Logger struct { // RawLoggerFunc is a RawLogger implementation using a function.
logger type RawLoggerFunc func(*Entry)
Fields *Fields
}
func (l *Logger) WithField(name string, value interface{}) *Logger { // Log implements RawLogger
return &Logger{l, l.Fields.With(name, value)} func (f RawLoggerFunc) Log(entry *Entry) { f(entry) }
}
// Logger adds convenience methods to a RawLogger.
type Logger struct{ inner RawLogger }
func (l *Logger) Debug(message string) { l.Log(NewEntry(DEBUG, message, l.Fields)) } func (l *Logger) Log(entry *Entry) { l.inner.Log(entry) }
func (l *Logger) Info(message string) { l.Log(NewEntry(INFO, message, l.Fields)) }
func (l *Logger) Notice(message string) { l.Log(NewEntry(NOTICE, message, l.Fields)) } func (l *Logger) Debug(message string) { l.inner.Log(NewEntry(DEBUG, message)) }
func (l *Logger) Warning(message string) { l.Log(NewEntry(WARNING, message, l.Fields)) } func (l *Logger) Info(message string) { l.inner.Log(NewEntry(INFO, message)) }
func (l *Logger) Error(message string) { l.Log(NewEntry(ERROR, message, l.Fields)) } func (l *Logger) Notice(message string) { l.inner.Log(NewEntry(NOTICE, message)) }
func (l *Logger) Critical(message string) { l.Log(NewEntry(CRITICAL, message, l.Fields)) } func (l *Logger) Warning(message string) { l.inner.Log(NewEntry(WARNING, message)) }
func (l *Logger) Error(message string) { l.inner.Log(NewEntry(ERROR, message)) }
func (l *Logger) Critical(message string) { l.inner.Log(NewEntry(CRITICAL, message)) }
func (l *Logger) Debugf(template string, args ...interface{}) { func (l *Logger) Debugf(template string, args ...interface{}) {
l.Log(NewEntryf(DEBUG, template, args, l.Fields)) l.inner.Log(NewEntryf(DEBUG, template, args...))
} }
func (l *Logger) Infof(template string, args ...interface{}) { func (l *Logger) Infof(template string, args ...interface{}) {
l.Log(NewEntryf(INFO, template, args, l.Fields)) l.inner.Log(NewEntryf(INFO, template, args...))
} }
func (l *Logger) Noticef(template string, args ...interface{}) { func (l *Logger) Noticef(template string, args ...interface{}) {
l.Log(NewEntryf(NOTICE, template, args, l.Fields)) l.inner.Log(NewEntryf(NOTICE, template, args...))
} }
func (l *Logger) Warningf(template string, args ...interface{}) { func (l *Logger) Warningf(template string, args ...interface{}) {
l.Log(NewEntryf(WARNING, template, args, l.Fields)) l.inner.Log(NewEntryf(WARNING, template, args...))
} }
func (l *Logger) Errorf(template string, args ...interface{}) { func (l *Logger) Errorf(template string, args ...interface{}) {
l.Log(NewEntryf(ERROR, template, args, l.Fields)) l.inner.Log(NewEntryf(ERROR, template, args...))
} }
func (l *Logger) Criticalf(template string, args ...interface{}) { func (l *Logger) Criticalf(template string, args ...interface{}) {
l.Log(NewEntryf(CRITICAL, template, args, l.Fields)) l.inner.Log(NewEntryf(CRITICAL, template, args...))
}
func (l *Logger) WithField(name string, value interface{}) *Logger {
return &Logger{RawLoggerFunc(func(entry *Entry) {
entry.Fields = entry.Fields.With(name, value)
l.inner.Log(entry)
})}
}
func (l *Logger) WithCategory(category string) *Logger {
return &Logger{RawLoggerFunc(func(entry *Entry) {
entry.Category = category
l.inner.Log(entry)
})}
}
type LogWriter struct {
minLevel Level
writer io.Writer
formatter Formatter
}
func NewLogWriter(minLevel Level, writer io.Writer, formatter Formatter) *LogWriter {
return &LogWriter{minLevel, writer, formatter}
}
func (w *LogWriter) Log(entry *Entry) {
if entry.Level < w.minLevel {
return
}
if bytes, err := w.formatter.Format(entry); err != nil {
errorHandler(err)
} else if _, err := w.writer.Write(bytes); err != nil {
errorHandler(err)
}
} }
package log
import (
"io"
)
type writerLogger struct {
target io.Writer
}
func (l writerLogger) Log(entry *Entry) {
}
func WriterLogger(target io.WriteCloser) *Logger {
return &Logger{logger: writerLogger{target}}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment