Skip to content

a customizable worker pool, do task async with workers. For simple use, you only need create your task struct and the Do method. For advancer you can custom your self worker manager and worker and the task

License

Notifications You must be signed in to change notification settings

xinst/workerpool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

WorkerPool

License GoDoc Go Report Card Build Status Version Gitter

A customizable worker pool, do task async with workers.
For simple use, you only need create your task struct and the Do method.

package main

import (
	"fmt"
	"math/rand"
	"time"
	"sync"

	"github.com/xinst/workerpool"
)

// SimpleTask defined your task
type SimpleTask struct {
	index int
	wg *sync.WaitGroup
}

// Do method is implied the interface of workerpool.Task
// you can define a result channel to recv the task result if you like
func (st *SimpleTask) Do() error {
	defer st.wg.Done()
	// random sleep Seconds
	time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
	fmt.Printf("%d SimpleTask task done\n", st.index)
	return nil
}

func simpleTest() {
	// create a WorkerPool with default worker to do the task
	// with 100 taskQueueCap and 3 workers
	wp := workerpool.NewWorkerPoolWithDefault(100, 3, false)
	wp.Start()

	var wg sync.WaitGroup
	wg.Add(10)
	// push 10 task
	for index := 0; index < 10; index++ {

		task := &SimpleTask{
			index: index,
			wg : &wg,
		}
		wp.PushTask(task)
	}
	wg.Wait()
	fmt.Println("all tasks have done")
}

func main() {
	simpleTest()
}

For advancer you can custom your self worker manager and worker and the task

package main

import (
	"fmt"
	"math/rand"
	"time"
	"sync"

	"github.com/xinst/workerpool"
)

// DownloadTask defined your task
type DownloadTask struct {
	index int
	wg *sync.WaitGroup
}

// Do method is implied the interface of workerpool.Task
// you can define a result channel to recv the task result if you like
func (dt *DownloadTask) Do() error {
	defer dt.wg.Done()
	// random sleep Seconds
	time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
	fmt.Printf("%d Download task done\n", dt.index)
	return nil
}

// DownloadWorker struct
type DownloadWorker struct {
}

// Work fun implied
func (dw *DownloadWorker) Work(task workerpool.Task) {
	//fmt.Printf("Download worker do %#v \n", task)
	task.Do()
}

// Close func will be called when the worker is destroy
func (dw *DownloadWorker) Close() error {
	return nil
}

func advanceTest() {

	mgr := workerpool.NewWorkerMgr(3, false, func() workerpool.Worker {
		return &DownloadWorker{}
	})

	// create a WorkerPool with default worker to do the task
	// with 100 taskQueueCap and 3 workers
	wp := workerpool.NewWorkerPool(100, mgr)
	wp.Start()

	var wg sync.WaitGroup
	wg.Add(10)

	// push 10 task
	for index := 0; index < 10; index++ {

		task := &DownloadTask{
			index: index,
			wg : &wg,
		}
		wp.PushTask(task)
	}
	wg.Wait()
	fmt.Println("all tasks have done")
}

func main() {
	advanceTest()
}

About

a customizable worker pool, do task async with workers. For simple use, you only need create your task struct and the Do method. For advancer you can custom your self worker manager and worker and the task

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages