Skip to content

fix: concurrent map access for mutexes for in-flight messages#319

Open
alfrunes wants to merge 4 commits intomendersoftware:masterfrom
alfrunes:MEN-9019
Open

fix: concurrent map access for mutexes for in-flight messages#319
alfrunes wants to merge 4 commits intomendersoftware:masterfrom
alfrunes:MEN-9019

Conversation

@alfrunes
Copy link
Contributor

@alfrunes alfrunes commented Dec 8, 2025

This commit replaces the map with chan to better handle in flight messages while listening to concurrent events. This also solves the concurrency issue where the application crashes due to concurrent map writes.

This commit replaces the map with chan to better handle in flight
messages while listening to concurrent events. This also solves the
concurrency issue where the application crashes due to concurrent map
writes.

Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
@alfrunes alfrunes requested a review from a team as a code owner December 8, 2025 14:52
listen net.Listener
remoteHost string
remotePort uint16
mutexAck map[string]*sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
Copy link
Contributor

@merlin-northern merlin-northern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have enough to block it. it looks good, however it is hard to read, and the handler passing makes things a bit more tricky to easily understand. I was expecting a quick fix for the concurrent map access. this one is between that and some architectural re-design. lots of good work.

msgChan <- m
// wait for the ack to be received before processing more data
select {
case ackChan <- struct{}{}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I read this correctly, that now we ack every message and wait for ack with each message? I am having some doubts if that is the correct way of doing things. I know it is preexisting, but doubts I have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it has always been a peer-to-peer stop and wait and was previously achieved with the mutex dance 👯
I noticed that in the previous implementation that despite using a map of mutexes, there's only a single entry in the map, so I replaced it with a chan primitive.
There is an epic of improving the protocol to move the transmission control to the only faulty link (NATS), but that will come with a separate change of the protocol. We cannot just change this on the client side or we risk connections hanging due to lost packets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally trust you as this is quite hard to review with large portions of code moved. I did open side by side both parts of the code and compared, but you know.

Comment on lines +75 to +82
fmt.Fprintf(os.Stderr,
"error accepting new connection on socket %s: %s\n",
p.listen.Addr(), err.Error(),
)
fmt.Fprintf(os.Stderr,
"closing listening socket %s\n",
p.listen.Addr(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are new errors that have not been reported before in that form, could we please mention it explicitly so there is a plain change log for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be consistent about using log (of slog) instead of printing to standard error?

Signed-off-by: Alf-Rune Siqveland <alf.rune@northern.tech>
@alfrunes
Copy link
Contributor Author

alfrunes commented Jan 8, 2026

I made some improvement based on your feedback. I'm not sure how much I want to refactor as part of this task as there is also work planned for improving port-forwarding altogether.

Copy link
Contributor

@merlin-northern merlin-northern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you always produce something that improves the situation! could we please create a follow-up task to create a test case for "massive concurrent port forwarding with mender-cli"? let me know if you want me to create it.

Comment on lines +422 to +426
c.recvChanMu.RLock()
if recvChan, ok := c.recvChans[connectionID]; ok {
recvChan <- m
}
c.recvChanMu.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt it better like that:

Suggested change
c.recvChanMu.RLock()
if recvChan, ok := c.recvChans[connectionID]; ok {
recvChan <- m
}
c.recvChanMu.RUnlock()
c.recvChanMu.RLock()
recvChan, ok := c.recvChans[connectionID]; ok {
c.recvChanMu.RUnlock()
if ok {
recvChan <- m
}

func (c *PortForwardCmd) registerRecvChan(connectionID string, recvChan chan<- *ws.ProtoMsg) {
c.recvChanMu.Lock()
defer c.recvChanMu.Unlock()
c.recvChans[connectionID] = recvChan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if (and even if it is a plausible comment here) the hash already contains an entry for the connectionID?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments