24 字
1 分钟
图床爬虫
没什么好说的,又去爬举个栗子的Api了,写了个新脚本
import osimport requestsfrom bs4 import BeautifulSoupfrom urllib.parse import urljoin, urlparsefrom concurrent.futures import ThreadPoolExecutor, as_completedimport threadingfrom queue import Queueimport time
class ImageDownloader: def __init__(self, max_workers=10): self.base_url = "https://t.alcy.cc/img/" self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } self.max_workers = max_workers self.downloaded_count = 0 self.total_count = 0 self.lock = threading.Lock()
def get_categories(self): """获取所有分类""" print(f"正在访问主页: {self.base_url}") try: response = requests.get(self.base_url, headers=self.headers) response.raise_for_status() except Exception as e: print(f"访问主页失败: {e}") return []
soup = BeautifulSoup(response.text, 'html.parser') categories = []
for a in soup.find_all('a', href=True): if '(' in a.text and ')' in a.text: category_name = a.text.split('(')[0].strip() category_url = urljoin(self.base_url, a['href']) categories.append({ 'name': category_name, 'url': category_url })
return categories
def get_image_urls(self, category_url): """获取分类下的所有图片URL""" try: response = requests.get(category_url, headers=self.headers) response.raise_for_status() except Exception as e: print(f"获取分类页面失败: {e}") return []
soup = BeautifulSoup(response.text, 'html.parser') img_tags = soup.find_all('img') img_urls = []
for img in img_tags: src = img.get('data-src') or img.get('src') if src and not src.startswith('data:'): img_urls.append(urljoin(self.base_url, src))
# 去重并返回 return list(set(img_urls))
def download_single_image(self, img_url, save_path): """下载单张图片""" try: response = requests.get(img_url, headers=self.headers, timeout=15) response.raise_for_status()
# 检查文件是否已存在 if os.path.exists(save_path): with self.lock: self.downloaded_count += 1 return True, "已存在"
# 保存图片 with open(save_path, 'wb') as f: f.write(response.content)
with self.lock: self.downloaded_count += 1
return True, "成功"
except Exception as e: return False, str(e)
def download_category_images(self, category, img_urls): """多线程下载一个分类的所有图片""" category_name = category['name']
# 创建分类目录 if not os.path.exists(category_name): os.makedirs(category_name) print(f"\n创建目录: {category_name}")
print(f"分类 [{category_name}] 共有 {len(img_urls)} 张图片")
# 使用ThreadPoolExecutor进行多线程下载 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = []
for i, img_url in enumerate(img_urls): # 获取文件名 parsed_url = urlparse(img_url) filename = os.path.basename(parsed_url.path) if not filename: filename = f"image_{i}.jpg"
save_path = os.path.join(category_name, filename)
# 提交下载任务 future = executor.submit(self.download_single_image, img_url, save_path) futures.append((future, filename, i+1))
# 显示进度 completed = 0 for future, filename, idx in futures: try: success, message = future.result(timeout=20) completed += 1
if success and message == "成功": status = "✓" elif success and message == "已存在": status = "↻" else: status = "✗"
# 更新进度显示 progress = f"[{idx}/{len(img_urls)}] {status} {filename}" print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 当前: {progress[:50]}", end="")
except Exception as e: completed += 1 print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 失败: {filename[:30]}...", end="")
print(f"\n分类 [{category_name}] 下载完成")
def run(self): """主运行函数""" print("=" * 60) print("开始获取图片分类...") print("=" * 60)
# 获取所有分类 categories = self.get_categories() if not categories: print("未找到任何分类。") return
print(f"找到 {len(categories)} 个分类: {[c['name'] for c in categories]}")
# 获取所有图片URL all_img_urls = [] category_data = []
for cat in categories: print(f"正在获取分类 [{cat['name']}] 的图片列表...", end="") img_urls = self.get_image_urls(cat['url']) all_img_urls.extend(img_urls) category_data.append({ 'category': cat, 'img_urls': img_urls }) print(f" 找到 {len(img_urls)} 张图片")
self.total_count = len(all_img_urls) print(f"\n总共发现 {self.total_count} 张图片") print("=" * 60)
# 开始下载 start_time = time.time()
for data in category_data: if data['img_urls']: self.download_category_images(data['category'], data['img_urls'])
# 统计信息 end_time = time.time() total_time = end_time - start_time
print("\n" + "=" * 60) print("下载完成!") print("=" * 60) print(f"总计图片数量: {self.total_count}") print(f"成功下载/已存在: {self.downloaded_count}") print(f"总耗时: {total_time:.2f}秒") print(f"平均速度: {self.downloaded_count/total_time:.2f} 张/秒" if total_time > 0 else "速度计算中...") print("=" * 60)
def main(): # 可以调整线程数,根据网络情况和电脑性能 max_workers = 20 # 默认20个线程,可以调整
downloader = ImageDownloader(max_workers=max_workers) downloader.run()
if __name__ == "__main__": main()部分信息可能已经过时









