go语言后端批量入库 golang批量执行sql( 三 )


有趣的是 , 在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的 。不要错怪我 。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境 。但是一段时间后,你会开始以ruby的方式开始思考和设计系统 , 你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单 。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些 。
作为首席架构师,我不会很关心在互联网上的语言和框架战争 。我相信效率、生产力 。代码可维护性主要依赖于你如何把解决方案设计得很简单 。
问题
当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求 。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中 。接下来,map-reduce系统可以操作这些数据 。
按照习惯,我们会调研服务层级架构,涉及的软件如下:
Sidekiq
Resque
DelayedJob
Elasticbeanstalk Worker Tier
RabbitMQ
and so on…
搭建了2个不同的集群,一个提供web前端 , 另外一个提供后端处理,这样我们可以横向扩展后端服务的数量 。
但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统 。我已经使用了Go语言大约2年时间 , 我们开发了几个系统,但是很少会达到这样的负载(amount of load) 。
我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数 。
type PayloadCollection struct {
WindowsVersionstring`json:"version"`
Tokenstring`json:"token"`
Payloads[]Payload `json:"data"`
【go语言后端批量入库 golang批量执行sql】}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
本地Go routines方法
刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3()// ----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于中小负载 , 这会对大多数的人适用 , 但是大规模下,这个方案会很快被证明不是很好用 。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上 。我们完全低估了流量 。