目录

colly 自定义代理池

Abstract: colly是golang 下的一款爬虫框架。本文讲述了如何对colly 定制自己的代 理池和代理管理模块。

colly

colly 是一款golang 下的爬虫框架。相比于scrapy, 其继承了go 常见的代码逻辑清晰的优 点。同时,因为golang 优雅的协程设计,也让基于golang 的异步编程比python 简单清晰 得多。

当然,其也有很多缺点。在使用得过程中,我发现了不少坑,许多特性并不像文档中说得那 样可以简单直接地实现。并且在功能上其也缺少scrapy 那么广泛和丰富的扩展。

不过,对我来说,使用colly 的过程中可以很好地学习golang 网络编程以及爬虫相关的知 识,这些甚至比爬虫本身还要有用。golang 通常无法像python 那样通过一行调用实现你想 要的功能,但是其源码设计和规范都很优雅,大多数问题读一下源代码尝试自己去解决并花 不了多少时间。

这里我记录一下我是如何自定义colly 代理池的。

代理池和代理管理

在实际应用场景中,可能需要对大量代理进行轮循。同时,每个代理可以有各自不同的有效 时间,超过有效时间的代理就应该被丢弃。另外,可能还需要设置每个代理的使用频率,比 如每3秒使用一次。这些需求在colly 的官方教程中没有给出相应的解决方案。那么这个问 题应该如何解决呢?

官方示例

package main

import (
	"bytes"
	"log"

	"github.com/gocolly/colly"
	"github.com/gocolly/colly/proxy"
)

func main() {
	// Instantiate default collector
	c := colly.NewCollector(colly.AllowURLRevisit())

	// Rotate two socks5 proxies
	rp, err := proxy.RoundRobinProxySwitcher("socks5://127.0.0.1:1337", "socks5://127.0.0.1:1338")
	if err != nil {
		log.Fatal(err)
	}
	c.SetProxyFunc(rp)

	// Print the response
	c.OnResponse(func(r *colly.Response) {
		log.Printf("%s\n", bytes.Replace(r.Body, []byte("\n"), nil, -1))
	})

	// Fetch httpbin.org/ip five times
	for i := 0; i < 5; i++ {
		c.Visit("https://httpbin.org/ip")
	}
}

查看源代码

在察看官方文档后,发现代理相关的函数主要是这两个: RoundRobinProxySwitcherSetProxyFunc 如果翻看下源代码会发现其实逻辑很简单

type ProxyFunc func(*http.Request) (*url.URL, error)

func (c *Collector) SetProxyFunc(p ProxyFunc) {
	t, ok := c.backend.Client.Transport.(*http.Transport)
	if c.backend.Client.Transport != nil && ok {
		t.Proxy = p
	} else {
		c.backend.Client.Transport = &http.Transport{
			Proxy: p,
		}
	}
}
func (r *roundRobinSwitcher) GetProxy(pr *http.Request) (*url.URL, error) {
	index := atomic.AddUint32(&r.index, 1) - 1
	u := r.proxyURLs[index%uint32(len(r.proxyURLs))]

	ctx := context.WithValue(pr.Context(), colly.ProxyURLKey, u.String())
	*pr = *pr.WithContext(ctx)
	return u, nil
}

func RoundRobinProxySwitcher(ProxyURLs ...string) (colly.ProxyFunc, error) {
	if len(ProxyURLs) < 1 {
		return nil, colly.ErrEmptyProxyURL
	}
	urls := make([]*url.URL, len(ProxyURLs))
	for i, u := range ProxyURLs {
		parsedU, err := url.Parse(u)
		if err != nil {
			return nil, err
		}
		urls[i] = parsedU
	}
	return (&roundRobinSwitcher{urls, 0}).GetProxy, nil
}

首先 RoundRobinProxySwitcher 输入代理列表并返回一个 ProxyFunc 函数,这个函数 的功能是随机从代理列表中抽取字符串封装为 url.URL 格式。在爬虫运行的时候,每次 request 会调用这个 ProxyFunc 函数获得代理,然后封装到 http 请求中进行发送。

所以思路就很清楚了,要实现自己的代理池管理,只需要自己定义一个 ProxyFunc 函数, 在其中实现管理代理的逻辑就可以了。这里一个简单的方法是建一个redis 数据库,在 ProxyFunc 中通过redis 获取代理地址。因为redis 可以作为有序集合来使用,可以很方 便地解决代理去重、频率控制和过期控制的功能。

不过这里我没有使用redis, 而是用golang 简单实现了一个基于优先级队列的代理管理功能。

简单的优先级队列代理管理

我的代理使用规则为每次从全部代理中取出上一次调用时间最早的代理,判断其距离现在是 否已经达到要求的时间间隔。如果时间间隔满足设置的 delay 值,则代表代理可用,通 过 ProxyFunc 返还给 colly. 否则等待直到时间间隔满足要求。

首先实现一个优先级队列,用来对代理上次调用时间进行排序。优先级队列我就不讲了,代 码如下。其效果是在将代理通过 ProxyPool.Add 方法添加进队列后,运行 ProxyPool.GetProxy, 可用列表通过channel one 返回或在无可用代理时实现阻塞。这 样colly 就会在 request 时等待直到取得可用代理,相当于从外部实现了对colly 请求频 率的限制。

type Proxy struct {
	IP            string
	Port          int
	expireTime    time.Time
	lastCalledTime time.Time
}

type ProxyHeap []*Proxy
func (h ProxyHeap) Len() int           { return len(h) }
func (h ProxyHeap) Less(i, j int) bool { return h[i].lastCalledTime.Before(h[j].lastCalledTime) }
func (h ProxyHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h *ProxyHeap) Push(x interface{}) {
	*h = append(*h, x.(*Proxy))
}
func (h *ProxyHeap) Pop() interface{} {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[0 : n-1]
	return x
}

type ProxyPool struct {
	proxies ProxyHeap
	delay   time.Duration
	mu      sync.Mutex
	one chan string
}

func NewProxyPool(delay time.Duration) *ProxyPool {
	ch := make(chan string, 0)
	return &ProxyPool{
		proxies: make(ProxyHeap, 0),
		delay:   delay,
		mu:      sync.Mutex{},
		one: 	 ch,
	}
}

func (p *ProxyPool) Add(proxy *Proxy) {
	p.mu.Lock()
	defer p.mu.Unlock()
	heap.Push(&p.proxies, proxy)
}

func (p *ProxyPool) GetProxy(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			//log.Println("total proxies size is: ", p.proxies.Len())
			for p.proxies.Len() == 0 {
				return errors.New("No valid proxy")
			}
			for p.proxies.Len() > 0 {
				proxy := p.proxies[0]
				if time.Now().After(proxy.expireTime) {
					heap.Pop(&p.proxies)
				} else if time.Now().After(proxy.lastCalledTime.Add(p.delay)) {
					proxyURL := fmt.Sprintf("http://%s:%d", proxy.IP, proxy.Port)
					p.one <- proxyURL
					proxy.lastCalledTime = time.Now()
					heap.Pop(&p.proxies)
					heap.Push(&p.proxies, proxy)
					break
				} else {
					time.Sleep(proxy.lastCalledTime.Sub(time.Now()))
				}
			}
		}
	}
}

向 colly 中添加代理函数

接下来要创建一个 ProxyFunc 函数,让其通过 ProxyPool.GetProxy 过去代理,并将 其注册进 colly 实例中。

func (p *ProxyPool) ProxyFunc(pr *http.Request) (*url.URL, error) {
	log.Println(">>> test my proxyFunc: proxy func called once")
	proxy := <- p.one
	u, _ := url.Parse(proxy)
	log.Println("proxy url:>>>", u)
	if strings.Contains(u.String(), "noproxy") {
		return nil, nil
	}
	ctx := context.WithValue(pr.Context(), colly.ProxyURLKey, u.String())
	*pr = *pr.WithContext(ctx)
	return u, nil
}

func main() {
	ctx := context.Background()

	t := time.Now()
	pool := NewProxyPool(2 * time.Second)
	pool.Add(&Proxy{
		IP:            "127.0.0.1",
		Port:          7890,
		expireTime:    time.Now().Add(10 * time.Minute),
		lastCalledTime: time.Now().Add(1*time.Second),
	})
	pool.Add(&Proxy{
		IP:            "noproxy",
		Port:          1234,
		expireTime:    time.Now().Add(30 * time.Second),
		lastCalledTime: time.Now(),
	})
	pool.Add(&Proxy{
		IP:            "10.0.0.24",
		Port:          7890,
		expireTime:    time.Now().Add(30 * time.Second),
		lastCalledTime: time.Now(),
	})
	go func() {
		err := pool.GetProxy(ctx)
		if err != nil {
			fmt.Println("Error:", err)
		}
	}()



	url := "https://httpbin.org/ip"

	q, _ := queue.New(
		1, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

	c := colly.NewCollector()
	c.AllowURLRevisit = true
	c.SetProxyFunc(pool.ProxyFunc)

	c.OnResponse(func(r *colly.Response){
		log.Printf(">>> %+v\n",string(r.Body))
		log.Println(time.Now().Sub(t))
	})

	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)
	q.Run(c)
}

colly 踩坑

之前描述的逻辑其实是对的,但是当我执行上述代码的时候,发现结果有问题。其结果如下

2023/03/25 15:26:36 >>> test my proxyFunc: proxy func called once
2023/03/25 15:26:38 proxy url:>>> http://noproxy:1234
2023/03/25 15:26:39 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:26:39 2.875306769s
2023/03/25 15:26:39 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:26:39 3.107933344s
2023/03/25 15:26:39 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:26:39 3.323815899s
2023/03/25 15:26:40 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:26:40 3.749706513s
2023/03/25 15:26:40 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:26:40 4.24531686s

我们发现 ProxyFunc 其实只用了一次,代理IP 也都是同一个。我一开始白思不得其解。 于是就测试了官方给出例子中的代理示例。发现其也没有如他们描述的那样对代理进行轮循。

在测试中我发现,如果请求的网址是 http://httpbin.org/ip, 我的程序和官方例子都能 正常工作。但若请求 https, 就会出现上面描述的问题。

在翻看了github 上面许多代理相关的问题后,我意识到可能是由于 KeepAlives 参数默 认被打开了,导致colly 其实没有建立新的连接,自然也不会使用新的代理。。

要修复这个问题,只需要添加下面几行就可以了:

c.WithTransport(&http.Transport{
  DisableKeepAlives: true,
})

完整代码

package main

import (

	"container/heap"
	"fmt"
	"strings"
	"sync"
	"errors"
	"time"

	"log"
	"context"
	"net/url"
	"net/http"
	"github.com/gocolly/colly/v2"
	"github.com/gocolly/colly/v2/queue"
)

type Proxy struct {
	IP            string
	Port          int
	expireTime    time.Time
	lastCalledTime time.Time
}

type ProxyHeap []*Proxy

func (h ProxyHeap) Len() int           { return len(h) }
func (h ProxyHeap) Less(i, j int) bool { return h[i].lastCalledTime.Before(h[j].lastCalledTime) }
func (h ProxyHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h *ProxyHeap) Push(x interface{}) {
	*h = append(*h, x.(*Proxy))
}

func (h *ProxyHeap) Pop() interface{} {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[0 : n-1]
	return x
}

type ProxyPool struct {
	proxies ProxyHeap
	delay   time.Duration
	mu      sync.Mutex
	one chan string
}

func NewProxyPool(delay time.Duration) *ProxyPool {
	ch := make(chan string, 0)
	return &ProxyPool{
		proxies: make(ProxyHeap, 0),
		delay:   delay,
		mu:      sync.Mutex{},
		one: 	 ch,
	}
}

func (p *ProxyPool) Add(proxy *Proxy) {
	p.mu.Lock()
	defer p.mu.Unlock()
	heap.Push(&p.proxies, proxy)
}

func (p *ProxyPool) GetProxy(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			//log.Println("total proxies size is: ", p.proxies.Len())
			for p.proxies.Len() == 0 {
				return errors.New("No valid proxy")
			}
			for p.proxies.Len() > 0 {
				proxy := p.proxies[0]
				if time.Now().After(proxy.expireTime) {
					heap.Pop(&p.proxies)
				} else if time.Now().After(proxy.lastCalledTime.Add(p.delay)) {
					proxyURL := fmt.Sprintf("http://%s:%d", proxy.IP, proxy.Port)
					p.one <- proxyURL
					proxy.lastCalledTime = time.Now()
					heap.Pop(&p.proxies)
					heap.Push(&p.proxies, proxy)
					break
				} else {
					time.Sleep(proxy.lastCalledTime.Sub(time.Now()))
				}
			}
		}
	}
}


func (p *ProxyPool) ProxyFunc(pr *http.Request) (*url.URL, error) {
	log.Println(">>> test my proxyFunc: proxy func called once")
	proxy := <- p.one
	u, _ := url.Parse(proxy)
	log.Println("proxy url:>>>", u)
	if strings.Contains(u.String(), "noproxy") {
		return nil, nil
	}
	ctx := context.WithValue(pr.Context(), colly.ProxyURLKey, u.String())
	*pr = *pr.WithContext(ctx)
	return u, nil
}

func main() {
	ctx := context.Background()

	t := time.Now()
	pool := NewProxyPool(2 * time.Second)
	pool.Add(&Proxy{
		IP:            "127.0.0.1",
		Port:          7890,
		expireTime:    time.Now().Add(10 * time.Minute),
		lastCalledTime: time.Now().Add(1*time.Second),
	})
	pool.Add(&Proxy{
		IP:            "noproxy",
		Port:          1234,
		expireTime:    time.Now().Add(30 * time.Second),
		lastCalledTime: time.Now(),
	})
	pool.Add(&Proxy{
		IP:            "10.0.0.24",
		Port:          7890,
		expireTime:    time.Now().Add(30 * time.Second),
		lastCalledTime: time.Now(),
	})
	go func() {
		err := pool.GetProxy(ctx)
		if err != nil {
			fmt.Println("Error:", err)
		}
	}()



	url := "https://httpbin.org/ip"

	q, _ := queue.New(
		1, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

	// Instantiate default collector
	c := colly.NewCollector()

	c.AllowURLRevisit = true
	c.WithTransport(&http.Transport{
    		DisableKeepAlives: true,
	})

	c.SetProxyFunc(pool.ProxyFunc)

	c.OnResponse(func(r *colly.Response){
		log.Printf(">>> %+v\n",string(r.Body))
		log.Println(time.Now().Sub(t))
	})

	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)
	q.AddURL(url)

	q.Run(c)
}

结果如下

2023/03/25 15:35:11 >>> test my proxyFunc: proxy func called once
2023/03/25 15:35:13 proxy url:>>> http://noproxy:1234
2023/03/25 15:35:14 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:35:14 2.914752166s
2023/03/25 15:35:14 >>> test my proxyFunc: proxy func called once
2023/03/25 15:35:14 proxy url:>>> http://10.0.0.24:7890
2023/03/25 15:35:16 >>> {
  "origin": "103.x.x.x"
}

2023/03/25 15:35:16 5.020741709s
2023/03/25 15:35:16 >>> test my proxyFunc: proxy func called once
2023/03/25 15:35:16 proxy url:>>> http://127.0.0.1:7890
2023/03/25 15:35:19 >>> {
  "origin": "194.x.x.x"
}

2023/03/25 15:35:19 7.666083742s
2023/03/25 15:35:19 >>> test my proxyFunc: proxy func called once
2023/03/25 15:35:19 proxy url:>>> http://noproxy:1234
2023/03/25 15:35:19 >>> {
  "origin": "114.x.x.x"
}

2023/03/25 15:35:19 8.522034392s
2023/03/25 15:35:19 >>> test my proxyFunc: proxy func called once
2023/03/25 15:35:19 proxy url:>>> http://10.0.0.24:7890
2023/03/25 15:35:21 >>> {
  "origin": "103.x.x.x"
}

2023/03/25 15:35:21 10.060854023s