build: upgrade to go 1.17 and dependencies

This commit is contained in:
2021-09-01 21:01:28 +02:00
parent de6f39bddc
commit 8b5888ee1b
321 changed files with 41511 additions and 9444 deletions

View File

@ -37,17 +37,11 @@ type route struct {
// and returns a boolean of the outcome
func match(route []string, topic []string) bool {
if len(route) == 0 {
if len(topic) == 0 {
return true
}
return false
return len(topic) == 0
}
if len(topic) == 0 {
if route[0] == "#" {
return true
}
return false
return route[0] == "#"
}
if route[0] == "#" {
@ -87,15 +81,13 @@ type router struct {
routes *list.List
defaultHandler MessageHandler
messages chan *packets.PublishPacket
stop chan bool
}
// newRouter returns a new instance of a Router and channel which can be used to tell the Router
// to stop
func newRouter() (*router, chan bool) {
router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket), stop: make(chan bool)}
stop := router.stop
return router, stop
func newRouter() *router {
router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
return router
}
// addRoute takes a topic string and MessageHandler callback. It looks in the current list of
@ -105,7 +97,7 @@ func (r *router) addRoute(topic string, callback MessageHandler) {
r.Lock()
defer r.Unlock()
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(topic) {
if e.Value.(*route).topic == topic {
r := e.Value.(*route)
r.callback = callback
return
@ -120,7 +112,7 @@ func (r *router) deleteRoute(topic string) {
r.Lock()
defer r.Unlock()
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(topic) {
if e.Value.(*route).topic == topic {
r.routes.Remove(e)
return
}
@ -139,49 +131,99 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) {
go func() {
for {
select {
case message := <-messages:
sent := false
r.RLock()
m := messageFromPublish(message, client.ackFunc(message))
handlers := []MessageHandler{}
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
hd(client, m)
m.Ack()
}()
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
sent = true
}
}
if !sent && r.defaultHandler != nil {
}
}()
}
go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
m.Ack()
wg.Done()
}()
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
m.Ack()
wg.Done()
}()
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
r.RUnlock()
for _, handler := range handlers {
func() {
handler(client, m)
m.Ack()
}()
}
case <-r.stop:
return
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
}