content/night/other/16-2018-09-06-queue-worker.md
在OpenFaaS中同步调用函数时,将会连接到网关,直到函数成功返回才会关闭连接。同步调用是阻塞的。
/function/<function_name>异步函数会有一些差异:
/async-function/<function_name>docker service logs -f func_queue-worker
如果需要获得异步函数的结果,有两个方法:
github.com/nats-io/go-nats-streaming
github.com/nats-io/go-nats
github.com/openfaas/faas
go-nats和go-nats-streaming是nats和nats-streaming的go版本的客户端。
faas这个依赖其实是只用到了queue包下面的types.go文件。这个文件是定义了异步请求的Request结构体和一个CanQueueRequests接口。如下所示:
package queue
import "net/url"
import "net/http"
// Request for asynchronous processing
type Request struct {
Header http.Header
Body []byte
Method string
QueryString string
Function string
CallbackURL *url.URL `json:"CallbackUrl"`
}
// CanQueueRequests can take on asynchronous requests
type CanQueueRequests interface {
Queue(req *Request) error
}
从这里我们就可以明白作者的设计思路,只要是实现了这个CanQueueRequests接口,就可以作为一个queue-worker。
接口的实现类NatsQueue是在handler包里。它的属性都是nats中常用到的,包括clientId,clusterId,url,连接,主题等,如下所示:
// NatsQueue queue for work
type NatsQueue struct {
nc stan.Conn // nats的连接
ClientID string // nats的clientId
ClusterID string // nats的clusterId
NATSURL string // nats的URL
Topic string // 主题
}
它的queue方法也很简单,主要做了两件事儿:
// Queue request for processing
func (q *NatsQueue) Queue(req *queue.Request) error {
var err error
fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function)
out, err := json.Marshal(req)
if err != nil {
log.Println(err)
}
err = q.nc.Publish(q.Topic, out)
return err
}
go语言没有构造方法,所以NatsQueue还用于创建NatsQueue的实例的方法,这里就成为工厂方法。这个工厂方法主要就是从配置文件中读取环境变量的值,然后创建一个nats的连接,相当于给NatsQueue的对象的每个属性进行赋值。
func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQueue, error) {
queue1 := NatsQueue{}
var err error
natsURL := fmt.Sprintf("nats://%s:%d", address, port)
log.Printf("Opening connection to %s\n", natsURL)
clientID := clientConfig.GetClientID()
clusterID := "faas-cluster"
nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
queue1.nc = nc
return &queue1, err
}
这个CreateNatsQueue方法是Gateway项目中进行调用,我们可以在Gateway项目的main.go中找到,如果Gateway的配置开启了异步函数支持,就会调用该方法,创建一个NatsQueue对象,然后把函数放到队列中,这里就不深入讲解:
if config.UseNATS() {
log.Println("Async enabled: Using NATS Streaming.")
natsQueue, queueErr := natsHandler.CreateNatsQueue(*config.NATSAddress, *config.NATSPort, natsHandler.DefaultNatsConfig{})
if queueErr != nil {
log.Fatalln(queueErr)
}
faasHandlers.QueuedProxy = handlers.MakeQueuedProxy(metricsOptions, true, natsQueue)
faasHandlers.AsyncReport = handlers.MakeAsyncReport(metricsOptions)
}
到这里,我相信读者也了解到,Gateway其实就是一个发布者,将异步请求扔到队列里。接下来肯定要有一个订阅者将请求消费处理。
我们都知道,nats streaming的订阅者订阅到消息之后,会把消息扔给一个回调函数去处理。queue-worker的订阅者实现也是这样,它的实现并不复杂,所有逻辑都在main.go的中。
我们先看回调函数mcb都做了什么:
http.StatusServiceUnavailable,并分别处理CallbackURL是否存在的情况。当然在这个callback中会根据一些环境变量的存在,选择是否打印日志出来。
mcb := func(msg *stan.Msg) {
i++
printMsg(msg, i)
started := time.Now()
req := queue.Request{}
unmarshalErr := json.Unmarshal(msg.Data, &req)
if unmarshalErr != nil {
log.Printf("Unmarshal error: %s with data %s", unmarshalErr, msg.Data)
return
}
fmt.Printf("Request for %s.\n", req.Function)
if config.DebugPrintBody {
fmt.Println(string(req.Body))
}
queryString := ""
if len(req.QueryString) > 0 {
queryString = fmt.Sprintf("?%s", strings.TrimLeft(req.QueryString, "?"))
}
functionURL := fmt.Sprintf("http://%s%s:8080/%s", req.Function, config.FunctionSuffix, queryString)
request, err := http.NewRequest(http.MethodPost, functionURL, bytes.NewReader(req.Body))
defer request.Body.Close()
copyHeaders(request.Header, &req.Header)
res, err := client.Do(request)
var status int
var functionResult []byte
if err != nil {
status = http.StatusServiceUnavailable
log.Println(err)
timeTaken := time.Since(started).Seconds()
if req.CallbackURL != nil {
log.Printf("Callback to: %s\n", req.CallbackURL.String())
resultStatusCode, resultErr := postResult(&client, res, functionResult, req.CallbackURL.String())
if resultErr != nil {
log.Println(resultErr)
} else {
log.Printf("Posted result: %d", resultStatusCode)
}
}
statusCode, reportErr := postReport(&client, req.Function, status, timeTaken, config.GatewayAddress)
if reportErr != nil {
log.Println(reportErr)
} else {
log.Printf("Posting report - %d\n", statusCode)
}
return
}
if res.Body != nil {
defer res.Body.Close()
resData, err := ioutil.ReadAll(res.Body)
functionResult = resData
if err != nil {
log.Println(err)
}
if config.WriteDebug {
fmt.Println(string(functionResult))
} else {
fmt.Printf("Wrote %d Bytes\n", len(string(functionResult)))
}
}
timeTaken := time.Since(started).Seconds()
fmt.Println(res.Status)
if req.CallbackURL != nil {
log.Printf("Callback to: %s\n", req.CallbackURL.String())
resultStatusCode, resultErr := postResult(&client, res, functionResult, req.CallbackURL.String())
if resultErr != nil {
log.Println(resultErr)
} else {
log.Printf("Posted result: %d", resultStatusCode)
}
}
statusCode, reportErr := postReport(&client, req.Function, res.StatusCode, timeTaken, config.GatewayAddress)
if reportErr != nil {
log.Println(reportErr)
} else {
log.Printf("Posting report - %d\n", statusCode)
}
}
postResult函数是用来处理callbackURL存在的情况,在这个函数中将结果,以post请求调用callbackURL发送出去。
postReport函数用来处理callbackURL不存在的情况,这里是将结果发到Gateway网关的"http://" + gatewayAddress + ":8088/system/async-report"中,我们之后就可以从这个url里查询异步函数的执行结果了。
本文主要分析了NATS Streaming版本的queue worker的实现,通过分析源码我们可以看到OpenFaaS在架构的设计很有考究,充分的考虑到了可扩展性,通过定义接口规范,使得开发者很容易实现自定义。