In the previous article, we already discuss the Worker Pool pattern. Now, we'll talk about a similar thing: the pipeline concurrency pattern in Go.
Pipeline it's a common pattern in continuous improvement tools. GitHub actions and the Bitbucket pipeline use this pattern. The pipeline pattern allows us to separate the task into sub-tasks. And then run the sub-tasks one by one.
For example, in the pipeline for running automation tests in Go, we will have three sub-tasks (or stages):
Install packages
Run linter
Run tests
This pattern also can be useful in other use cases. For example, you can use this to create a program to transform some complex data and other things.
Let's say we want to create a linter for JavaScript. Here are the example stages that we'll have:
no-dupe-else-if (no duplication inside else-if)
no-debugger (disallow debugger command)
no-duplicate-imports (removing duplicated imports)
...
With pipelines, we can use concurrency to make the program more efficient. Instead of analyzing files one by one, our program will analyze them concurrently. So, we can use many CPUs to do our task.
Now, let’s see how we use this pattern in Go. We’ll create a basic program to transform data to take the person's name from their email.
Input:
bill.john@gmail.com
hanabi@hey.com
Output:
Bill John
Hanabi
To create the program, at least we’ll have two stages:
Take the email’s username.
The result would be “bill.john” and “hanabi”.
Transform to capitalize format.
The result would be “Bill John” and “Hanabi”.
Let’s create a method for the first stage to take the email’s username:
func takeUsername(mails []string) []string {
var res []string
for _, mail := range mails {
res = append(res, strings.Split(mail, "@")[0])
}
return res
}
A method for the second stage to transform to capitalize format:
func toCapitalize(str []string) []string {
var res []string
ca := cases.Title(language.English)
for _, s := range str {
formatted := strings.ReplaceAll(s, ".", " ")
formatted = ca.String(formatted)
res = append(res, formatted)
}
return res
}
Now the main function would be like this:
func main() {
emails := []string{"bill.john@gmail.com", "hanabi@hey.com"}
names := takeUsername(emails)
names = toCapitalize(names)
for _, n := range names {
fmt.Println(n)
}
}
Bill John
Hanabi
Now, let’s implement concurrency to this program. We will use channels for communicating between stages.
First, we’ll create a method to publish all the emails into the channel.
func gen(emails ...string) chan string {
out := make(chan string)
go func() {
for _, email := range emails {
out <- email
}
close(out)
}()
return out
}
Then, create a method that will accept a channel as an argument. And then will return a new channel that only contains the username’s email:
func takeUsername(mails chan string) chan string {
out := make(chan string)
go func() {
for email := range mails {
out <- strings.Split(email, "@")[0]
}
close(out)
}()
return out
}
The “takeUsername” will read all the data from the “gen” method, and then create a new channel to store the username.
Next, we’ll do the same thing. We'll read the data from the returned channel from “takeUsername”. And then create a new channel to store the username in capitalized format.
func toCapitalize(str chan string) chan string {
out := make(chan string)
ca := cases.Title(language.English)
go func() {
for email := range str {
out <- ca.String(strings.ReplaceAll(email, ".", " "))
}
close(out)
}()
return out
}
Now, we have successfully use the pipeline pattern in our program:
func main() {
emails := []string{"bill.john@gmail.com", "hanabi@gmail.com"}
c := gen(emails...)
c = takeUsername(c)
c = toCapitalize(c)
for email := range c {
fmt.Println(email)
}
}
Bill John
Hanabi
We can add more type-safety to the program by using “←chan string” instead of “chan string”. "<- chan string" is to tell Go that the channel will be only read. Go will raise a compilation error when the user tries to send the data to it.
func gen(emails ...string) <-chan string {
// ...
}
func takeUsername(mails <-chan string) <-chan string {
// ...
}
func toCapitalize(str <-chan string) <-chan string {
// ...
}
func toCapitalize(str <-chan string) <-chan string {
str <- "test" // invalid operation: cannot send to receive-only channel str (variable of type <-chan string)
}
func printer(str chan<- string) {
<-str // invalid operation: cannot receive from send-only channel str (variable of type chan<- string)
}
That’s all for the pipeline pattern in Go. I hope you can get something from it. Happy hacking and see you in another article.