基于后端和爬虫创建的代理ip池

搭建免费的代理ip池


需要解决的问题:

  1. 使用什么方式存储ip

    • 文件存储

      缺点: 打开文件修改文件操作较麻烦

    • mysql

      缺点: 查询速度较慢

    • mongodb

      缺点: 查询速度较慢. 没有查重功能

    • redis --> 使用redis存储最为合适

    所以 -> 数据结构采用redis中的zset有序集合

  2. 获取ip的网站

  3. 项目架构???

项目架构

  1. 获取api
  2. 筛选api
  3. 验证api的有效性
  4. 提供api

项目结构图

基于后端和爬虫创建的代理ip池

项目结构如下:

基于后端和爬虫创建的代理ip池

项目代码

code文件夹

redis_proxy.py

# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu """ 这里用来做redis中间商. 去控制redis和ip之间的调用关系 """ from redis import Redis import random   class RedisProxy:     def __init__(self):         # 连接到redis数据库         self.red = Redis(             host='localhost',             port=6379,             db=9,             password=123456,             decode_responses=True         )      # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉     # 2. 需要校验所有的ip. 查询ip     # 3. 验证可用性. 可用分值拉满. 不可用扣分     # 4. 将可用的ip查出来返回给用户     #       先给满分的     #       再给有分的     #       都没有分. 就不给      def add_ip(self, ip):  # 外界调用并传入ip         # 判断ip在redis中是否存在         if not self.red.zscore('proxy_ip', ip):             self.red.zadd('proxy_ip', {ip: 10})             print('proxy_ip存储完毕', ip)         else:             print('存在重复', ip)      def get_all_proxy(self):         # 查询所有的ip功能         return self.red.zrange('proxy_ip', 0, -1)      def set_max_score(self, ip):         self.red.zadd('proxy_ip', {ip: 100})  # 注意是引号的格式      def deduct_score(self, ip):         # 先将分数查询出来         score = self.red.zscore('proxy_ip', ip)         # 如果有分值.那就扣一分         if score > 0:             self.red.zincrby('proxy_ip', -1, ip)         else:             # 如果分值已经扣的小于0了. 那么可以直接删除了             self.red.zrem('proxy_ip', ip)      def effect_ip(self):         # 先将ip通过分数筛选出来         ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)         if ips:             return random.choice(ips)         else:  # 没有满分的             # 将九十分以上的筛选出来             ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1)             if ips:                 return random.choice(ips)             else:                 print('无可用ip')                 return None  

ip_collection.py

# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:32 # @author: Maxs_hu """ 这里用来收集ip """ from redis_proxy import RedisProxy import requests from lxml import html from multiprocessing import Process import time import random   def get_kuai_ip(red):     url = "https://free.kuaidaili.com/free/intr/"     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"     }     resp = requests.get(url, headers=headers)     etree = html.etree     et = etree.HTML(resp.text)     trs = et.xpath('//table//tr')     for tr in trs:         ip = tr.xpath('./td[1]/text()')         port = tr.xpath('./td[2]/text()')         if not ip:  # 将不含有ip值的筛除             continue         proxy_ip = ip[0] + ":" + port[0]         red.add_ip(proxy_ip)   def get_unknown_ip(red):     url = "https://ip.jiangxianli.com/"     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"     }     resp = requests.get(url, headers=headers)     etree = html.etree     et = etree.HTML(resp.text)     trs = et.xpath('//table//tr')     for tr in trs:         ip = tr.xpath('./td[1]/text()')         port = tr.xpath('./td[2]/text()')         if not ip:  # 将不含有ip值的筛除             continue         proxy_ip = ip[0] + ":" + port[0]         red.add_ip(proxy_ip)   def get_happy_ip(red):     page = random.randint(1, 5)     url = f'http://www.kxdaili.com/dailiip/2/{page}.html'     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"     }     resp = requests.get(url, headers=headers)     etree = html.etree     et = etree.HTML(resp.text)     trs = et.xpath('//table//tr')     for tr in trs:         ip = tr.xpath('./td[1]/text()')         port = tr.xpath('./td[2]/text()')         if not ip:  # 将不含有ip值的筛除             continue         proxy_ip = ip[0] + ":" + port[0]         red.add_ip(proxy_ip)   def get_nima_ip(red):     url = 'http://www.nimadaili.com/'     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"     }     resp = requests.get(url, headers=headers)     etree = html.etree     et = etree.HTML(resp.text)     trs = et.xpath('//table//tr')     for tr in trs:         ip = tr.xpath('./td[1]/text()')  # 这里存在空值. 所以不能在后面加[0]         if not ip:             continue         red.add_ip(ip[0])   def get_89_ip(red):     page = random.randint(1, 26)     url = f'https://www.89ip.cn/index_{page}.html'     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"     }     resp = requests.get(url, headers=headers)     etree = html.etree     et = etree.HTML(resp.text)     trs = et.xpath('//table//tr')     for tr in trs:         ip = tr.xpath('./td[1]/text()')         if not ip:             continue         red.add_ip(ip[0].strip())   def main():     # 创建一个redis实例化对象     red = RedisProxy()     print("开始采集数据")     while 1:         try:             # 这里可以添加各种采集的网站             print('>>>开始收集快代理ip')             get_kuai_ip(red)  # 收集快代理             # get_unknown_ip(red)  # 收集ip             print(">>>开始收集开心代理ip")             get_happy_ip(red)  # 收集开心代理             print(">>>开始收集泥马代理ip")             # get_nima_ip(red)  # 收集泥马代理             print(">>>开始收集89代理ip")             get_89_ip(red)             time.sleep(60)         except Exception as e:             print('ip储存出错了', e)             time.sleep(60)   if __name__ == '__main__':     main()     # 创建一个子进程     # p = Process(target=main)     # p.start()  

ip_verify.py

# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:34 # @author: Maxs_hu """ 这里用来验证ip的可用性: 使用携程发送请求增加效率 """ from redis_proxy import RedisProxy from multiprocessing import Process import asyncio import aiohttp import time   async def verify_ip(ip, red, sem):     timeout = aiohttp.ClientTimeout(total=10)  # 设置网页等待时间不超过十秒     try:         async with sem:             async with aiohttp.ClientSession() as session:                 async with session.get(url='http://www.baidu.com/',                                        proxy='http://'+ip,                                        timeout=timeout) as resp:                     page_source = await resp.text()                     if resp.status in [200, 302]:                         # 如果可用. 加分                         red.set_max_score(ip)                         print('验证没有问题. 分值拉满~', ip)                     else:                         # 如果不可用. 扣分                         red.deduct_score(ip)                         print('问题ip. 扣一分', ip)     except Exception as e:         print('出错了', e)         red.deduct_score(ip)         print('问题ip. 扣一分', ip)   async def task(red):     ips = red.get_all_proxy()     sem = asyncio.Semaphore(30)  # 设置每次三十的信号量     tasks = []     for ip in ips:         tasks.append(asyncio.create_task(verify_ip(ip, red, sem)))     if tasks:         await asyncio.wait(tasks)   def main():     red = RedisProxy()     time.sleep(5)  # 初始的等待时间. 等待采集到数据     print("开始验证可用性")     while 1:         try:             asyncio.run(task(red))             time.sleep(100)         except Exception as e:             print("ip_verify出错了", e)             time.sleep(100)   if __name__ == '__main__':     main()     # 创建一个子进程     # p = Process(target=main())     # p.start() 

ip_api.py

# -*- encoding:utf-8 -*- # @time: 2022/7/4 11:35 # @author: Maxs_hu  """ 这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip:    1. flask    2. sanic --> 今天使用这个要稍微简单一点 """ from redis_proxy import RedisProxy from sanic import Sanic, json from sanic_cors import CORS from multiprocessing import Process  # 创建一个app app = Sanic('ip')  # 随便给个名字 # 解决跨域问题 CORS(app) red = RedisProxy()   @app.route('maxs_hu_ip')  # 添加路由 def api(req):  # 第一个请求参数固定. 请求对象    ip = red.effect_ip()    return json({"ip": ip})   def main():    # 让sanic跑起来    app.run(host='127.0.0.1', port=1234)   if __name__ == '__main__':    main()    # p = Process(target=main())    # p.start() 

runner.py

# -*- encoding:utf-8 -*- # @time: 2022/7/5 17:36 # @author: Maxs_hu from ip_api import main as api_run from ip_collection import main as coll_run from ip_verify import main as veri_run from multiprocessing import Process   def main():     # 设置互不干扰的三个进程     p1 = Process(target=api_run)  # 只需要将目标函数的内存地址传过去即可     p2 = Process(target=coll_run)     p3 = Process(target=veri_run)      p1.start()     p2.start()     p3.start()   if __name__ == '__main__':     main() 

测试ip是否可用.py

# -*- encoding:utf-8 -*- # @time: 2022/7/5 18:15 # @author: Maxs_hu import requests   def get_proxy():     url = "http://127.0.0.1:1234/maxs_hu_ip"     resp = requests.get(url)     return resp.json()   def main():     url = 'http://mip.chinaz.com/?query=' + get_proxy()["ip"]     proxies = {         "http": 'http://' + get_proxy()["ip"],         "https": 'http://' + get_proxy()["ip"]  # 目前代理只支持http请求     }     headers = {         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36",     }     resp = requests.get(url, proxies=proxies, headers=headers)     resp.encoding = 'utf-8'     print(resp.text)  # 物理位置   if __name__ == '__main__':     main() 

运行效果

项目运行截图:

基于后端和爬虫创建的代理ip池

redis储存截图:

基于后端和爬虫创建的代理ip池

总结

  1. 免费代理ip只支持http的网页操作. 并不好用. 如果有需求可以进行购买然后加入ip代理池
  2. 网页部署到自己的服务器上. 别人访问自己的服务器. 以后学了全栈可以加上登录. 和付费功能. 实现功能的进一步拓展
  3. 项目架构是生产者消费者模型. 三个模块同时运行. 每个模块为一个进程. 互不影响
  4. 代理设计细节有待处理. 但总体运行效果还可以. 遇到问题再修改
发表评论

评论已关闭。

相关文章