extract number of errors and pod directory

This commit is contained in:
Cyrille Nofficial 2022-09-07 20:09:49 +02:00
parent 95f86d69f4
commit 620cef3310
6 changed files with 174 additions and 5 deletions

5
README.md Normal file
View File

@ -0,0 +1,5 @@
# pod-cleaner
Watch syslog file and generate alerts on orphaned pod error messages.
See https://github.com/kubernetes/kubernetes/issues/105536

View File

@ -1 +1,82 @@
package cleaner package cleaner
import (
"fmt"
"github.com/cyrilix/pod-cleaner/pkg/monitor"
"go.uber.org/zap"
"regexp"
"strconv"
"sync"
)
var (
defaultRegex = regexp.MustCompile("There were many similar errors. Turn up verbosity to see them.\" err=\\\"orphaned pod \\\\\"(?P<podId>[0-9a-f\\-]+)\\\\\" found, but error not a directory occurred when trying to remove the volumes dir\" numErrs=(?P<numErrs>[0-9]+)")
)
func NewPodWatcher(logfile string) *PodWatcher {
return &PodWatcher{
m: monitor.New(),
rex: defaultRegex,
logfile: logfile,
cancel: make(chan interface{}),
}
}
type PodWatcher struct {
m monitor.Monitor
rex *regexp.Regexp
logfile string
cancel chan interface{}
muPodError sync.RWMutex
numErrors int
podIdErrors string
}
func (w *PodWatcher) GetNumErrors() int {
w.muPodError.RLock()
defer w.muPodError.RUnlock()
return w.numErrors
}
func (w *PodWatcher) GetPodDirInError() string {
w.muPodError.RLock()
defer w.muPodError.RUnlock()
if w.podIdErrors == "" {
return ""
}
return fmt.Sprintf("/var/lib/kubelet/pods/%s", w.podIdErrors)
}
func (w *PodWatcher) Watch() {
lineChan := make(chan string)
go func() {
err := w.m.Watch(w.logfile, w.rex, lineChan)
if err != nil {
zap.S().Panicf("unable to init watching on file '%s', %v", w.logfile, err)
}
close(w.cancel)
}()
for {
select {
case line := <-lineChan:
w.processLogRecord(line)
case <-w.cancel:
return
}
}
}
func (w *PodWatcher) processLogRecord(line string) {
numErrors, err := strconv.Atoi(w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("numErrs")])
if err != nil {
zap.S().Errorf("unable to extract numErrors value from line '%v': %v", line, err)
return
}
w.muPodError.Lock()
defer w.muPodError.Unlock()
w.numErrors = numErrors
w.podIdErrors = w.rex.FindStringSubmatch(line)[w.rex.SubexpIndex("podId")]
}

View File

@ -1 +1,50 @@
package cleaner package cleaner
import (
"github.com/cyrilix/pod-cleaner/pkg/monitor"
"testing"
)
func TestPodWatcher_Watch(t *testing.T) {
type fields struct {
logfile string
}
tests := []struct {
name string
fields fields
wantPodDir string
wantNumError int
}{
{
name: "one error in file",
fields: fields{
logfile: "test_data/one-error.log",
},
wantPodDir: "/var/lib/kubelet/pods/1d4bfc07-3469-4eaa-992f-6d23c17f3aee",
wantNumError: 1,
},
{
name: "no error",
fields: fields{
logfile: "test_data/no-error.log",
},
wantPodDir: "",
wantNumError: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := NewPodWatcher(tt.fields.logfile)
w.m = monitor.New(monitor.WithTailFollow(false), monitor.WithReOpen(false))
w.Watch()
gotNumErr := w.GetNumErrors()
if gotNumErr != tt.wantNumError {
t.Errorf("PodWatcher_Watch(), bad numErrors found: %v, want %v", gotNumErr, tt.wantNumError)
}
gotPodDir := w.GetPodDirInError()
if gotPodDir != tt.wantPodDir {
t.Errorf("PodWatcher_Watch(), bad pod directory found: %v, want %v", gotPodDir, tt.wantPodDir)
}
})
}
}

View File

@ -0,0 +1,3 @@
egreg
abcffe
erfer

View File

@ -0,0 +1,4 @@
egreg
Oct 07 12:46:32 k8s-master2-staging kubelet[7310]: E1007 12:46:32.359145 7310 kubelet_volumes.go:245] "There were many similar errors. Turn up verbosity to see them." err="orphaned pod \"1d4bfc07-3469-4eaa-992f-6d23c17f3aee\" found, but error not a directory occurred when trying to remove the volumes dir" numErrs=1
abcffe
erfer

View File

@ -7,18 +7,42 @@ import (
"regexp" "regexp"
) )
type Option func(monitor *LogMonitor)
// WithTailFollow Continue looking for new lines (tail -f)
func WithTailFollow(follow bool) Option {
return func(monitor *LogMonitor) {
monitor.follow = follow
}
}
// WithReOpen Reopen recreated files (tail -F)
func WithReOpen(reopen bool) Option {
return func(monitor *LogMonitor) {
monitor.reopen = reopen
}
}
type Monitor interface { type Monitor interface {
Close() error Close() error
Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error
} }
func New() *LogMonitor { func New(opts ...Option) *LogMonitor {
return &LogMonitor{ l := &LogMonitor{
follow: true,
reopen: true,
cancel: make(chan interface{}), cancel: make(chan interface{}),
} }
for _, opt := range opts {
opt(l)
}
return l
} }
type LogMonitor struct { type LogMonitor struct {
follow bool
reopen bool
cancel chan interface{} cancel chan interface{}
} }
@ -28,7 +52,7 @@ func (l *LogMonitor) Close() error {
} }
func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error { func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan<- string) error {
t, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: true, Logger: tail.DiscardingLogger}) t, err := tail.TailFile(filename, tail.Config{Follow: l.follow, ReOpen: l.reopen, Logger: tail.DiscardingLogger})
if err != nil { if err != nil {
return fmt.Errorf("unable to begin to tail file '%s': %w", filename, err) return fmt.Errorf("unable to begin to tail file '%s': %w", filename, err)
} }
@ -36,11 +60,15 @@ func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan
for { for {
select { select {
case line := <-t.Lines: case line := <-t.Lines:
if line == nil {
// EOF
t.Cleanup()
return nil
}
if line.Err != nil { if line.Err != nil {
zap.S().Errorf("tail error: %v", err) zap.S().Errorf("tail error: %v", err)
continue continue
} }
zap.S().Infof("new line: '%s'", line.Text)
if filter.MatchString(line.Text) { if filter.MatchString(line.Text) {
lineChan <- line.Text lineChan <- line.Text
} }
@ -50,5 +78,4 @@ func (l *LogMonitor) Watch(filename string, filter *regexp.Regexp, lineChan chan
return nil return nil
} }
} }
return nil
} }