go语言后端批量入库 golang批量执行sql( 五 )


log.Errorf("Error uploading to S3: %s", err.Error())
}
case -w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit - true
}()
}
我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取 。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue - work
}
w.WriteHeader(http.StatusOK)
}
在web server初始化时 , 我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job 。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是dispatcher的实现代码:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; id.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := -JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := -d.WorkerPool
// dispatch the job to the worker job channel
jobChannel - job
}(job)
}
}
}
注意到,我们提供了初始化并加入到池子的worker的最大数量 。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境 , 所以我们常常会遵守12-factor方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值 。这种方式,我们控制worker的数量和JobQueue的大?。晕颐强梢院芸斓母谋湔庑┲? ,而不需要重新部署集群 。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue= os.Getenv("MAX_QUEUE")
)
直接结果
我们部署了之后 , 立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大 。
Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求 。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次 。
我们一旦部署了新的代码 , 服务器的数量从100台大幅 下降到大约20台 。
我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率 , 我们就会产生新的实例 。
总结
在我的书中,简单总是获胜 。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发 。