博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang实现任务分发处理
阅读量:6152 次
发布时间:2019-06-21

本文共 3920 字,大约阅读时间需要 13 分钟。

 

package mainimport (	"flag"	"fmt"	"os"	"log"	"net/http"	"io/ioutil"	"github.com/bitly/go-simplejson"	"encoding/csv"	"io"	"time"	"sync"	"net/url"	"strconv"	"errors")var (	concurrency int	timeout int	infile string	outfile string)var usage = `Usage:%s [options] 	Options are: 		-c concurrency Number of request to preform 		-t timeout Request timeout 		-i infile Input file 		-o outfile Output file`func main() {	flag.Usage = func() {		fmt.Fprintf(os.Stderr, usage, os.Args[0])	}	flag.IntVar(&concurrency, "c", 10, "")	flag.IntVar(&timeout, "t", 60, "")	flag.StringVar(&infile, "i", "", "")	flag.StringVar(&outfile, "o", "", "")	flag.Parse()	_, err := os.Stat(infile)	if err != nil {		log.Fatalln("error:", err)	}	f, err := os.Create(outfile)	if err != nil {		log.Fatalln("error:", err)	}	defer f.Close()	var lock sync.Mutex	w := &Worker{		concurrency:concurrency,		timeout:timeout,		infile:infile,		lw:&LockWriter{			m:lock,			writer:f,		},	}	w.Run()}type LockWriter struct {	m      sync.Mutex	writer io.Writer}func (lw LockWriter) write(b []byte) (n int, err error) {	lw.m.Lock()	defer lw.m.Unlock()	return lw.writer.Write(b)}type CarInfo struct {	Carno string	Ecode string	Vcode string}type Worker struct {	concurrency int	timeout     int	infile      string	jobs        chan *CarInfo	lw          *LockWriter}func (w *Worker) Run() {	var wg sync.WaitGroup	wg.Add(w.concurrency + 1) 	w.jobs = make(chan *CarInfo, w.concurrency)	go func() {		w.loadJobs()		wg.Done()	}()	//并发数	for i := 1; i <= w.concurrency; i++ {		go func(n int) {			w.doWork(n)			wg.Done()		}(i)	}	wg.Wait()}func (w *Worker) loadJobs() {	fin, err := os.Open(w.infile)	if err != nil {		log.Fatalln("file open failed,error:", err.Error())	}	defer fin.Close()	reader := csv.NewReader(fin)	for {		row, err := reader.Read()		if err != nil {			if err != io.EOF {				log.Println("file read error:", err)			}			break		}		time.Sleep(time.Duration(1) * time.Second)		w.jobs <- &CarInfo{Carno:row[3], Vcode:row[4], Ecode:row[5]}	}	close(w.jobs)}func (w *Worker) doWork(num int) {	total := 0	log.Println("worker:", num)	uriParams := url.Values{}	uriParams.Add("openUDID", "3122dcf3-3a2a-34e9-8da5-e9dde29579a4")	uriParams.Add("appid", "1")	uriParams.Add("cartype", "02")	uriParams.Add("os", "android")	uriParams.Add("appVersion", "6.6.6")	uriParams.Add("prefetch", "1")	uriParams.Add("reqfrom", "1")	uriParams.Add("secret", "UBjUFDL9kZSDUqivn4wb063QI4Es3mZhvWvT")	httpClient := &http.Client{		Timeout:time.Duration(w.timeout) * time.Second,	}	for carInfo := range w.jobs {		log.Println(carInfo.Carno, "|", carInfo.Vcode, "|", carInfo.Ecode)		uriParams.Set("carno", carInfo.Carno)		uriParams.Set("vcode", carInfo.Vcode)		uriParams.Set("ecode", carInfo.Ecode)		uri := fmt.Sprintf("http://xxx.cn/common_prefix?%s", uriParams.Encode())		fmt.Println(uri)		code, err := w.doRequest(httpClient, uri)		if err != nil {			log.Println("url request error:", err)		}		switch code {		case 203, 9999:			w.writeResult(carInfo)		}		total++	}	log.Println("worker[", num, "] total:", total)}func (w *Worker) doRequest(client *http.Client, uri string) (int, error) {	req, err := http.NewRequest("GET", uri, nil)	if err != nil {		log.Println("get new request failed!")		return -1, err	}	resp, err := client.Do(req)	if err != nil {		log.Println("error:", err)		return -1, err	}	defer resp.Body.Close()	result, err := ioutil.ReadAll(resp.Body)	if err != nil {		fmt.Println("read respose body failed")		return -1, err	}	log.Println("resonse:", string(result))	if resp.StatusCode != 200 {		log.Println("status code:", resp.StatusCode)		return -1, errors.New("stats code error:" + strconv.Itoa(resp.StatusCode))	}	js, err := simplejson.NewJson(result)	if err != nil {		return -1, err	}	code, err := js.Get("code").Int()	return code, nil}func (w *Worker) writeResult(carInfo *CarInfo) {	line := fmt.Sprintf("%s,%s,%s\n", carInfo.Carno, carInfo.Vcode, carInfo.Ecode)	w.lw.write([]byte(line))}

  

转载地址:http://akwfa.baihongyu.com/

你可能感兴趣的文章
openstack集群访问外部服务出现访问失败
查看>>
10款常用Java测试工具
查看>>
Codeforces Round #412 Div. 2 补题 D. Dynamic Problem Scoring
查看>>
牛客多校第六场C
查看>>
分享一下身边朋友自学android开发及找工作的那些事!【不足勿喷】
查看>>
显示hibernate的sql语句
查看>>
linux vue uwsgi nginx 部署路飞学城 安装 vue
查看>>
初学者如何学习运维?
查看>>
拒绝某个用户或组应用组策略
查看>>
大数据虚拟化实例:Tarball方式部署Hadoop发行版
查看>>
一个资深系统管理员的O2O实践(引子)
查看>>
kube-proxy配置 ipvs模式
查看>>
SCOM 2012系列⑥邮件通知下
查看>>
如何在Ubuntu 11.10上连接L2TP ***
查看>>
Windows Server 2016-Active Directory域服务概述
查看>>
Open-E DSS V7 应用系列之八 远程访问和管理员密码恢复
查看>>
华为boss力荐公司高层看的一篇文章,很长很经典
查看>>
Oracle中ORACLE_SID,INSTANCE_NAME,DB_NAME几个名词的区别
查看>>
解决WebForm项目在高版本IE下控件显示异常的问题
查看>>
SQL Server 2014 许可证(四)许可证的变化
查看>>