fix: concurrent map access for mutexes for in-flight messages#319
fix: concurrent map access for mutexes for in-flight messages#319alfrunes wants to merge 4 commits intomendersoftware:masterfrom
Conversation
This commit replaces the map with chan to better handle in flight messages while listening to concurrent events. This also solves the concurrency issue where the application crashes due to concurrent map writes. Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
| listen net.Listener | ||
| remoteHost string | ||
| remotePort uint16 | ||
| mutexAck map[string]*sync.Mutex |
There was a problem hiding this comment.
there is also one in https://github.com/mendersoftware/mender-cli/pull/303/files#diff-e1a621ea821c78f5bf64ce3fe74d6dbfc3211e7cba95be68139c627cc989813aR98
is this PR inspired by #303?
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
merlin-northern
left a comment
There was a problem hiding this comment.
I do not have enough to block it. it looks good, however it is hard to read, and the handler passing makes things a bit more tricky to easily understand. I was expecting a quick fix for the concurrent map access. this one is between that and some architectural re-design. lots of good work.
| msgChan <- m | ||
| // wait for the ack to be received before processing more data | ||
| select { | ||
| case ackChan <- struct{}{}: |
There was a problem hiding this comment.
do I read this correctly, that now we ack every message and wait for ack with each message? I am having some doubts if that is the correct way of doing things. I know it is preexisting, but doubts I have.
There was a problem hiding this comment.
Yes, it has always been a peer-to-peer stop and wait and was previously achieved with the mutex dance 👯
I noticed that in the previous implementation that despite using a map of mutexes, there's only a single entry in the map, so I replaced it with a chan primitive.
There is an epic of improving the protocol to move the transmission control to the only faulty link (NATS), but that will come with a separate change of the protocol. We cannot just change this on the client side or we risk connections hanging due to lost packets.
There was a problem hiding this comment.
I totally trust you as this is quite hard to review with large portions of code moved. I did open side by side both parts of the code and compared, but you know.
| fmt.Fprintf(os.Stderr, | ||
| "error accepting new connection on socket %s: %s\n", | ||
| p.listen.Addr(), err.Error(), | ||
| ) | ||
| fmt.Fprintf(os.Stderr, | ||
| "closing listening socket %s\n", | ||
| p.listen.Addr(), | ||
| ) |
There was a problem hiding this comment.
there are new errors that have not been reported before in that form, could we please mention it explicitly so there is a plain change log for that?
There was a problem hiding this comment.
Maybe we should be consistent about using log (of slog) instead of printing to standard error?
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
|
I made some improvement based on your feedback. I'm not sure how much I want to refactor as part of this task as there is also work planned for improving port-forwarding altogether. |
merlin-northern
left a comment
There was a problem hiding this comment.
you always produce something that improves the situation! could we please create a follow-up task to create a test case for "massive concurrent port forwarding with mender-cli"? let me know if you want me to create it.
| c.recvChanMu.RLock() | ||
| if recvChan, ok := c.recvChans[connectionID]; ok { | ||
| recvChan <- m | ||
| } | ||
| c.recvChanMu.RUnlock() |
There was a problem hiding this comment.
isnt it better like that:
| c.recvChanMu.RLock() | |
| if recvChan, ok := c.recvChans[connectionID]; ok { | |
| recvChan <- m | |
| } | |
| c.recvChanMu.RUnlock() | |
| c.recvChanMu.RLock() | |
| recvChan, ok := c.recvChans[connectionID]; ok { | |
| c.recvChanMu.RUnlock() | |
| if ok { | |
| recvChan <- m | |
| } |
| func (c *PortForwardCmd) registerRecvChan(connectionID string, recvChan chan<- *ws.ProtoMsg) { | ||
| c.recvChanMu.Lock() | ||
| defer c.recvChanMu.Unlock() | ||
| c.recvChans[connectionID] = recvChan |
There was a problem hiding this comment.
what if (and even if it is a plausible comment here) the hash already contains an entry for the connectionID?
This commit replaces the map with chan to better handle in flight messages while listening to concurrent events. This also solves the concurrency issue where the application crashes due to concurrent map writes.