Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions homeworks/14_channels/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# # Домашнее задание №14

**В домашнем задании необходимо реализовать worker pool.**

**Worker Pool** — это шаблон проектирования, используемый для управления группой рабочих потоков (горутин), которые одновременно обрабатывают входящие задачи.

У нашей реализации будет несколько особенностей:

+ во время добавления задания в пул, если пул заданий уже заполнен, нужно вернуть ошибку, а не блокировать добавляющую горутину.
+ метод завершения должен не только завершать все горутины и дожидаться их заверешения, но и дожидаться пока горутины не выполнят все задачи из пула.

API для worker pool будет выглядеть следующим образом:

```go
type WorkerPool struct { ... }

func NewWorkerPool(workersNumber int) *WorkerPool // создать пул с заданным количеством горутин
func (wp *WorkerPool) AddTask(task func()) error // добавить задачу в пул
func (wp *WorkerPool) Shutdown() // завершить выполнение и приём задач в пул
```

Для выполнения домашнего задания подготовлен шаблон кода и основные тесты, которые помогут проверить корректность реализации. Шаблон доступен по [ссылке](https://github.com/Balun-courses/deep_go/blob/master/homework/channels/homework_test.go).
64 changes: 64 additions & 0 deletions homeworks/14_channels/homework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package homework14

import (
"errors"
"sync"
)

// go test -v homework_test.go

var ErrPoolIsFull error = errors.New("pool is full")
var ErrPoolIsClosed error = errors.New("pool is closed")

type WorkerPool struct {
tasks chan func()
wg sync.WaitGroup
shutdownOnce sync.Once
}

func NewWorkerPool(workersNumber int) *WorkerPool {
wp := WorkerPool{
tasks: make(chan func(), workersNumber),
wg: sync.WaitGroup{},
shutdownOnce: sync.Once{},
}

wp.wg.Add(workersNumber)
for range workersNumber {
go func() {
defer wp.wg.Done()
for task := range wp.tasks {
task()
}
}()
}

return &wp
}

// Return an error if the pool is full
func (wp *WorkerPool) AddTask(task func()) (err error) {
defer func() {
if r := recover(); r != nil {
err = ErrPoolIsClosed
}

}()

select {
case wp.tasks <- task:
return nil
default:
return ErrPoolIsFull
}
}

// Shutdown all workers and wait for all
// tasks in the pool to complete
func (wp *WorkerPool) Shutdown() {
wp.shutdownOnce.Do(func() {
close(wp.tasks)
wp.wg.Wait()
})

}
36 changes: 36 additions & 0 deletions homeworks/14_channels/homework_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package homework14

import (
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestWorkerPool(t *testing.T) {
var counter atomic.Int32
task := func() {
time.Sleep(time.Millisecond * 500)
counter.Add(1)
}

pool := NewWorkerPool(2)
_ = pool.AddTask(task)
_ = pool.AddTask(task)
err := pool.AddTask(task)
assert.ErrorIs(t, err, ErrPoolIsFull)

time.Sleep(time.Millisecond * 600)
assert.Equal(t, int32(2), counter.Load())

_ = pool.AddTask(task)
_ = pool.AddTask(task)
_ = pool.AddTask(task)
pool.Shutdown() // wait tasks

err = pool.AddTask(task)
assert.ErrorIs(t, err, ErrPoolIsClosed)

assert.Equal(t, int32(5), counter.Load())
}