上面的方案在很多地方很不好 。没有办法控制我们产生的go routine的数量 。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了 。
再次尝试
我们需要找一个不同的方式 。自开始我们就讨论过,我们需要保持请求处理程序的生命周期很短,并且进程在后台产生 。当然,这是你在Ruby on Rails的世界里必须要做的事情 , 否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题) 。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等 。这个列表会继续保留 , 因为有很多的方案可以实现这些 。
所以,第二次迭代,我们创建了一个缓冲channel , 我们可以把job排队 , 然后把它们上传到S3 。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了 。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue - payload
}
...
}
接下来 , 我们再从队列中取job,然后处理它们 。我们使用类似于下面的代码:
func StartProcessor() {
for {
select {
case job := -Queue:
job.payload.UploadToS3()// -- STILL NOT GOOD
}
}
}
说实话,我不知道我们在想什么 。这肯定是一个满是Red-Bulls的夜晚 。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了 。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力 。
我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时 。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长 。
最好的解决方案
我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue 。
想法是 , 以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误 。这样我们选择了创建一个Job/Worker模式 。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代 。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue= os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPoolchan chan Job
JobChannelchan Job
quitchan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit:make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool - w.JobChannel
select {
case job := -w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
- Redis批量查询 redis批量rpop
- 将数据保存到文件中c语言 将数据保存到mongodb
- 对于mongodb中更新操作,如何采用“批量更新方式” 在mongodb中如何更新数据
- 罗布人村天气 mysql语言具有的功能
- redis编程语言 redis对应c语言
- mysql 批处理 mysql有批量查询吗
- mysql批量执行sql文件 mysql批量执行更新
- redis一般和什么语言一起开发 和redis类似的编程
- mongodb采用什么语言 mongodb的设计采用什么
- c连接mysql数据库 c连接mysql报错
