From 6c2b552b84386e2a24b401b0a8b60b9e7f57b141 Mon Sep 17 00:00:00 2001 From: Vladislav Byrgazov Date: Tue, 13 Jan 2026 19:27:29 +0500 Subject: [PATCH] homework: implemented worker pool Signed-off-by: Vladislav Byrgazov --- homeworks/14_channels/README.md | 22 +++++++++ homeworks/14_channels/homework.go | 64 ++++++++++++++++++++++++++ homeworks/14_channels/homework_test.go | 36 +++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 homeworks/14_channels/README.md create mode 100644 homeworks/14_channels/homework.go create mode 100644 homeworks/14_channels/homework_test.go diff --git a/homeworks/14_channels/README.md b/homeworks/14_channels/README.md new file mode 100644 index 0000000..4edc482 --- /dev/null +++ b/homeworks/14_channels/README.md @@ -0,0 +1,22 @@ +# # Домашнее задание №14 + +**В домашнем задании необходимо реализовать worker pool.** + +**Worker Pool** — это шаблон проектирования, используемый для управления группой рабочих потоков (горутин), которые одновременно обрабатывают входящие задачи. + +У нашей реализации будет несколько особенностей: + ++ во время добавления задания в пул, если пул заданий уже заполнен, нужно вернуть ошибку, а не блокировать добавляющую горутину. ++ метод завершения должен не только завершать все горутины и дожидаться их заверешения, но и дожидаться пока горутины не выполнят все задачи из пула. + +API для worker pool будет выглядеть следующим образом: + +```go +type WorkerPool struct { ... } + +func NewWorkerPool(workersNumber int) *WorkerPool // создать пул с заданным количеством горутин +func (wp *WorkerPool) AddTask(task func()) error // добавить задачу в пул +func (wp *WorkerPool) Shutdown() // завершить выполнение и приём задач в пул +``` + +Для выполнения домашнего задания подготовлен шаблон кода и основные тесты, которые помогут проверить корректность реализации. Шаблон доступен по [ссылке](https://github.com/Balun-courses/deep_go/blob/master/homework/channels/homework_test.go). diff --git a/homeworks/14_channels/homework.go b/homeworks/14_channels/homework.go new file mode 100644 index 0000000..3f00975 --- /dev/null +++ b/homeworks/14_channels/homework.go @@ -0,0 +1,64 @@ +package homework14 + +import ( + "errors" + "sync" +) + +// go test -v homework_test.go + +var ErrPoolIsFull error = errors.New("pool is full") +var ErrPoolIsClosed error = errors.New("pool is closed") + +type WorkerPool struct { + tasks chan func() + wg sync.WaitGroup + shutdownOnce sync.Once +} + +func NewWorkerPool(workersNumber int) *WorkerPool { + wp := WorkerPool{ + tasks: make(chan func(), workersNumber), + wg: sync.WaitGroup{}, + shutdownOnce: sync.Once{}, + } + + wp.wg.Add(workersNumber) + for range workersNumber { + go func() { + defer wp.wg.Done() + for task := range wp.tasks { + task() + } + }() + } + + return &wp +} + +// Return an error if the pool is full +func (wp *WorkerPool) AddTask(task func()) (err error) { + defer func() { + if r := recover(); r != nil { + err = ErrPoolIsClosed + } + + }() + + select { + case wp.tasks <- task: + return nil + default: + return ErrPoolIsFull + } +} + +// Shutdown all workers and wait for all +// tasks in the pool to complete +func (wp *WorkerPool) Shutdown() { + wp.shutdownOnce.Do(func() { + close(wp.tasks) + wp.wg.Wait() + }) + +} diff --git a/homeworks/14_channels/homework_test.go b/homeworks/14_channels/homework_test.go new file mode 100644 index 0000000..771fb68 --- /dev/null +++ b/homeworks/14_channels/homework_test.go @@ -0,0 +1,36 @@ +package homework14 + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWorkerPool(t *testing.T) { + var counter atomic.Int32 + task := func() { + time.Sleep(time.Millisecond * 500) + counter.Add(1) + } + + pool := NewWorkerPool(2) + _ = pool.AddTask(task) + _ = pool.AddTask(task) + err := pool.AddTask(task) + assert.ErrorIs(t, err, ErrPoolIsFull) + + time.Sleep(time.Millisecond * 600) + assert.Equal(t, int32(2), counter.Load()) + + _ = pool.AddTask(task) + _ = pool.AddTask(task) + _ = pool.AddTask(task) + pool.Shutdown() // wait tasks + + err = pool.AddTask(task) + assert.ErrorIs(t, err, ErrPoolIsClosed) + + assert.Equal(t, int32(5), counter.Load()) +}