package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/go-redis/redis"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"os"
"log"
)
type OrderedGetUrl struct {
BaseURL string
Params []map[string]string
}
func (o *OrderedGetUrl) Add(key string, value string) {
m := make(map[string]string)
m[key] = value
o.Params = append(o.Params, m)
}
func (o *OrderedGetUrl) BuildUrl() string {
buf := bytes.NewBufferString(o.BaseURL)
if len(o.Params) > 0 {
buf.WriteString("?")
}
for index, param := range o.Params {
if index > 0 {
buf.WriteString("&")
}
for k, v := range param {
buf.WriteString(fmt.Sprint(k, "="))
buf.WriteString(url.QueryEscape(v))
}
}
return buf.String()
}
type JsonData struct {
Pview string `json:"pview"`
Ua string `json:"ua"`
IP string `json:"ip"`
}
var queue = make(chan string)
var limiter chan bool
var errLogger = log.New(os.Stderr, "Error: ", 0)
func main() {
host := os.Getenv("REDIS_HOST")
if host == "" {
errLogger.Fatalln("Plase set REDIS_HOST environment")
//os.Exit(1)
}
workersEnv := os.Getenv("WORKERS")
if workersEnv == "" {
errLogger.Fatalln("Plase set WORKERS environment")
os.Exit(1)
}
workers, _ := strconv.Atoi(workersEnv)
limiter = make(chan bool, workers)
fmt.Println("Host is: ", host)
client := redis.NewClient(&redis.Options{
Addr: host,
Password: "", // no password set
DB: 0, // use default DB
})
go func() {
for {
func() {
defer recoverFromMainPanic()
limiter <- true
result, err := client.BLPop(0, "pview").Result()
if err != nil {
time.Sleep(3 * time.Second)
<-limiter
errLogger.Println("Wait")
return
}
queue <- result[1]
}()
}
}()
for key := range queue {
go worker(key)
}
}
func recoverFromMainPanic() {
if r := recover(); r != nil {
<-limiter
errLogger.Println("recovered from Main Panic", r)
}
}
func recoverFromPanic() {
if r := recover(); r != nil {
errLogger.Println("recovered from ", r)
}
}
func worker(item string) {
defer func() {
<-limiter
}()
//fmt.Println("Started a GoRoutine")
defer recoverFromPanic()
splited := strings.SplitN(item, ":", 3)
auctionId := splited[0]
price := splited[1]
payload := splited[2]
var m JsonData
err := json.Unmarshal([]byte(payload), &m)
if err != nil {
errLogger.Println("Unmarshaling error: ", err.Error())
errLogger.Println(splited)
return
}
parsedUrl, err := url.Parse(m.Pview)
if err != nil {
log.Fatal(err)
}
query := parsedUrl.Query()
query.Set("q", auctionId)
query.Set("ecpm", price)
query.Set("ip", m.IP)
parsedUrl.RawQuery = query.Encode()
//fmt.Println(parsedUrl.String())
client := http.Client{Timeout: time.Second * 10}
req, err := http.NewRequest("GET", parsedUrl.String(), nil)
if err != nil {
errLogger.Println(err.Error())
}
req.Header.Add("User-Agent", m.Ua)
resp, err := client.Do(req)
if err != nil {
errLogger.Println(err.Error())
return
}
const layout = "2006 Jan 01 15:04:05 (MST)"
fmt.Println(time.Now().Format(layout), parsedUrl.String(), resp.Status)
resp.Body.Close()
//time.Sleep(25 * time.Second)
//fmt.Println("Finished")
}