One of the latest projects I’ve been working on is a simple SMTP client written in Go. I’ve really enjoyed developing applications in Go since it provides concurrency primitives out-of-the-box. In this article, I’m going to describe a technique for sending any number of values on a channel without having the send block.
Before doing that, I’ll take a brief moment to explain the problem that this technique attempts to solve. Let’s pretend we have a string
channel and we want to send a single value. We would write a line that resembles the following:
strChan <- "Hello, world!"
Without any further information about the context of this line or how the channel was created, it is impossible to know whether the send will block or not. If the channel is unbuffered (the default), then it will block until another goroutine receives the value. Let’s suppose the following line was used to create the channel:
strChan := make(chan string, 1)
We now know the channel has a capacity of 1, but that still doesn’t guarantee sending a value on the channel won’t block. If a value is already in the channel’s buffer, the send will indeed block until another goroutine receives a value from the channel. The problem can be delayed by increasing the buffer size but as soon as the channel’s capacity is exceeded, the send will block.
One of the ways you can work around this problem is by using a select{}
block:
select {
case strChan <- "value":
case otherChan <- "other value":
}
The example above will block until one of the operations in the case block succeeds rather than waiting exclusively for one to succeed. It is also possible to combine send and receive operations in a select{}
block. This will come in handy later when I describe my technique. This could be used to implement a timeout for sending a value on a channel:
select {
case strChan <- "value":
case <-time.After(5 * time.Second):
}
The example above will abort the send operation if the send blocks for more than 5 seconds.
Sometimes, however, this isn’t enough. You may need to unconditionally send a list of values on a channel and be completely certain that nothing will block. For that specific scenario, I have developed NonBlockingChan
. I will quickly run through the code and explain how it all works. First, the struct itself:
type NonBlockingChan struct {
Send chan<- interface{}
Recv <-chan interface{}
}
The goal is to let users of this type send values on the Send
channel and never worry about the send blocking. Note that the values sent and received on the channel are of type interface{}
. This means that although you can send a value of any type on the Send
channel, you will need to cast it to the appropriate type when you receive the value from the Recv
channel.
All of the code that makes this work resides in the NewNonBlockingChan()
function, so we’ll take a look at that next.
func NewNonBlockingChan() *NonBlockingChan {
var (
send = make(chan interface{})
recv = make(chan interface{})
n = &NonBlockingChan{
Send: send,
Recv: recv,
}
values = list.New()
sendClosed = false
)
The channels are created first and then assigned to the instance that will eventually be returned to the caller. This lets the function use the channels in either direction while preventing the caller from using them in the wrong direction. A list is also created for buffering the values.
Next, a goroutine is started to facilitate the sending, buffering, and receiving of values. The select{}
block in the goroutine attempts to do one or both of the following operations:
- send a value from the list if it contains one or more values
- receive a new value if the
Send
channel was not closed
Since both of these operations depend on specific conditions being met, a select{}
block cannot be used. Instead, the Select()
function from the reflect
package must be used. The function expects an array of reflect.SelectCase
. To make things easier, constants can be used for the array indices:
go func() {
const (
incomingCase = iota
outgoingCase
numCases
)
The main part of the goroutine consists of a loop that runs until there are no values remaining in the list and the Send
channel is closed:
for !sendClosed || items.Len() > 0 {
Next, the reflect.SelectCase
array is created and populated:
cases := make([]reflect.SelectCase, numCases)
cases[incomingCase].Dir = reflect.SelectRecv
cases[outgoingCase].Dir = reflect.SelectSend
The incomingCase
is only populated if the Send
channel hasn’t already been closed:
if !sendClosed {
cases[incomingCase].Chan = reflect.ValueOf(send)
}
The outgoingCase
is only populated if there is at least one value in the list (in which case the first value is sent):
if items.Len() > 0 {
cases[outgoingCase].Chan = reflect.ValueOf(recv)
cases[outgoingCase].Send = reflect.ValueOf(values.Front().Value)
}
The array of reflect.SelectCase
can now be passed to reflect.Select()
:
i, val, ok := reflect.Select(cases)
Three values are returned:
- the array index of the select operation that succeeded
- the value received from the channel (if applicable)
- a
bool
indicating whether a value was received
A switch{}
block can be used to process the result of Select()
since constants were defined for the array indices:
switch i {
For the incomingCase
, either one of two things happened:
- a value was received (
ok
will betrue
) - the
Send
channel was closed (ok
will befalse
)
If a value was received, it is immediately added to the list. Otherwise, sendClosed
is set to true
.
case incomingCase:
if ok {
values.PushBack(val.Interface())
} else {
sendClosed = true
}
For the outgoingCase
, the value that was sent on the Recv
channel needs to be removed from the list:
case outgoingCase:
values.Remove(values.Front())
That concludes the body of the loop. The only thing that remains to be done in the goroutine is to close the Recv
channel to indicate that all values have been received:
}
close(recv)
}
The NewNonBlockingChan()
function concludes with the new instance being returned to the caller:
return n
}
And that’s it! Using this new type is fairly straightforward:
n := NewNonBlockingChan()
n.Send <- true // it doesn't block!
No matter how many values you send on the channel, the send operations will never block. The “channel” will continue to buffer values in its list until they are received. Closing the Send
channel will cause the Recv
channel to also be closed after the last value is received.