nc QueueSubscribe storage queryComplexFields storage func msg nats Msg

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
nc.QueueSubscribe("storage.queryComplexFields.*", "storage", func(msg *nats.Msg) {
request_id := msg.Subject[strings.LastIndex(msg.Subject, ".")+1:]
cLog := nfLog.WithFields(log.Fields{
"request_id": request_id,
})
if request_id != "IGNORED" {
cLog.Tracef("got queryComplexFields request, payload: %s", string(msg.Data))
}
var req QueryGetRequest
json.Unmarshal(msg.Data, &req)
res := GetResponse{}
// TODO : consider using special character deletion to protect from injections.
querystringselect := fmt.Sprintf("%s FROM %s t", strings.Join(req.Select[:],","), bucket.Name())
if len(req.Let) > 0 {
let := make([]string, 0, len(req.Let))
for k, v := range req.Let {
let = append(let, fmt.Sprintf("%s = %s", k, v))
}
querystringselect += " LET " + strings.Join(let[:],",")
}
querystringgroup := ""
if len(req.Group) > 0 {
querystringgroup += fmt.Sprintf(" GROUP BY %s", strings.Join(req.Group[:],","))
}
querystring := fmt.Sprintf("SELECT r.* FROM (SELECT %s WHERE REGEXP_LIKE(META(t).id, '%s.*')%s) r WHERE 1", querystringselect, req.Prefix, querystringgroup)
for k, v := range req.Fields {
vt := reflect.TypeOf(v).Kind()
if vt == reflect.String {
querystring += fmt.Sprintf(" AND %s IN ['%s']", k, strings.Join(strings.Split(v.(string), ","), "','"))
} else {
querystring += fmt.Sprintf(" AND %s = %v", k, v)
}
}
if len(req.Sort) > 0 {
querystring += fmt.Sprintf(" ORDER BY %s", strings.Join(req.Sort[:],","))
} else {
// Sort is required for "SortCount" field
querystring += fmt.Sprintf(" ORDER BY META(%s).id", bucket.Name())
}
if req.Offset > 0 {
querystring += fmt.Sprintf(" OFFSET %d", req.Offset)
}
if req.Limit > 0 {
querystring += fmt.Sprintf(" LIMIT %d", req.Limit)
}
querystring = strings.Replace(querystring, "<bucket>", bucket.Name(), -1)
if request_id != "IGNORED" {
cLog.Traceln("executing query: " + querystring)
}
query := gocb.NewN1qlQuery(querystring)
rows, err := bucket.ExecuteN1qlQuery(query, nil)
if err != nil {
res.Code = 1
res.What = err.Error()
if request_id != "IGNORED" {
cLog.Errorln(err.Error())
}
} else {
ret := struct {
Fields []interface{} `json:"fields"`
Total uint `json:"total"`
} {
[]interface{} {},
0,
}
var row interface{}
for rows.Next(&row) {
ret.Fields = append(ret.Fields, row)
}
ret.Total = rows.Metrics().SortCount
res.Data = ret
}
r, _ := json.Marshal(res)
nc.Publish(msg.Reply, r)
if request_id != "IGNORED" {
cLog.Tracef("queryComplexFields request handled, payload: %s", string(msg.Data))
}
})