![图片[1]-Python高效办公:如何利用问卷星进行批量图片格式收集-云村Study](https://yuncunoss.wokewu.cn/wp-content/uploads/2025/09/20250921120339757.png)
相信不少人对于突然的大量图片收集有一点的烦恼,尤其收集的图片还需要格式命名要求,对于自己手动更改这是一个很繁琐的事情,在Python的世界里,你只需要轻松的调用pandas库就能实现
1、读表拿列名
import pandas as pd
# Excel / CSV 读取
df = pd.read_excel("你的表.xlsx") # 或:pd.read_csv("your.csv")
columns = list(df.columns) # 字段名用于 UI 下拉
2、行取值
def get_val(row: pd.Series, col: str) -> str:
v = row.get(col, "")
return "" if pd.isna(v) else str(v).strip()
3、下载单个文件
def download_one(idx: int, url: str, row: pd.Series):
# 1) 生成基础文件名
if self.tokens:
base = self.build_filename_for_row(row)
else:
path = urlsplit(url).path
base = sanitize_filename(os.path.splitext(os.path.basename(path))[0] or f"row_{idx}")
# 2) 先尝试 HEAD 获取文件类型
headers = {}
try:
r_head = session.head(url, timeout=10, allow_redirects=True)
headers = r_head.headers or {}
except:
pass
# 3) 真正下载
with session.get(url, timeout=30, stream=True) as r:
r.raise_for_status()
ext = guess_ext_from_url_or_headers(url, r.headers or headers)
final_path = ensure_unique_path(self.output_dir, base, ext)
with open(final_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return final_path
最终呈现代码
# -*- coding: utf-8 -*-
"""
表格批量下载器(GUI)
- 导入 Excel/CSV
- 选择下载字段(URL 列)
- 构建命名模板:字段 + 自定义文本 + 全局分隔符
- 批量下载并命名:如 姓名-反-身份证号码
依赖:pandas, openpyxl, requests
安装:pip install pandas openpyxl requests
BY:蜗牛村长
"""
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlsplit, unquote, urlparse
import mimetypes
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.scrolledtext import ScrolledText
import pandas as pd
import requests
# ================== 工具函数 ==================
ILLEGAL_CHARS = r'[\\/:*?"<>|\r\n\t]'
def sanitize_filename(name: str, max_len: int = 140) -> str:
"""清洗文件名非法字符,并限制长度"""
if name is None:
name = ""
name = str(name).strip()
name = re.sub(ILLEGAL_CHARS, "_", name)
# 去掉多余空格与分隔符堆叠
name = re.sub(r'\s+', ' ', name).strip()
# Windows 末尾的点和空格不允许
name = name.rstrip(" .")
# 限长
if len(name) > max_len:
name = name[:max_len].rstrip(" ._")
# 空名兜底
return name or "unnamed"
def guess_ext_from_url_or_headers(url: str, headers: dict) -> str:
"""从 URL 路径或响应头推断扩展名,默认 .jpg"""
# 1) URL 路径
try:
path = urlsplit(url).path
if "." in path:
ext = os.path.splitext(path)[1]
# 排除奇怪的长参数拼接成的“扩展名”
if 1 <= len(ext) <= 6 and re.match(r'^\.[A-Za-z0-9]+$', ext):
return ext.lower()
except Exception:
pass
# 2) 响应头 Content-Type
ctype = headers.get("Content-Type", "").split(";")[0].strip().lower()
if ctype:
ext = mimetypes.guess_extension(ctype)
if ext:
# 某些类型会映射到 .jpe,统一成 .jpg 更常见
return ".jpg" if ext in (".jpe",) else ext
# 3) 默认
return ".jpg"
def ensure_unique_path(dirpath: str, filename_no_ext: str, ext: str) -> str:
"""重名自动加 (1), (2) ..."""
candidate = os.path.join(dirpath, filename_no_ext + ext)
if not os.path.exists(candidate):
return candidate
i = 1
while True:
cand = os.path.join(dirpath, f"{filename_no_ext}({i}){ext}")
if not os.path.exists(cand):
return cand
i += 1
# ================== GUI 应用 ==================
class DownloaderApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("表格批量下载器(字段命名模板)————BY:蜗牛村长")
self.geometry("980x680")
self.minsize(900, 600)
self.df: pd.DataFrame | None = None
self.filepath: str | None = None
self.tokens: list[dict] = [] # [{'type':'field','value':'列名'}|{'type':'text','value':'自定义文本'}]
self.output_dir: str = os.path.abspath("downloads")
self.thread_count = tk.IntVar(value=8)
self._build_ui()
# ---------- UI ----------
def _build_ui(self):
main = ttk.Frame(self)
main.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 顶部:文件加载 & 保存目录
top = ttk.Frame(main)
top.pack(fill=tk.X)
ttk.Button(top, text="打开表格 (Excel/CSV)", command=self.open_table).pack(side=tk.LEFT)
self.lbl_file = ttk.Label(top, text="未选择文件")
self.lbl_file.pack(side=tk.LEFT, padx=8)
ttk.Button(top, text="保存到...", command=self.choose_output_dir).pack(side=tk.LEFT, padx=16)
self.lbl_out = ttk.Label(top, text=f"保存目录:{self.output_dir}")
self.lbl_out.pack(side=tk.LEFT, padx=8)
ttk.Label(top, text="线程数:").pack(side=tk.LEFT, padx=12)
ttk.Spinbox(top, from_=1, to=64, textvariable=self.thread_count, width=5).pack(side=tk.LEFT)
# 中部:左列=字段/列名;中列=模板构建;右列=下载字段/预览
middle = ttk.Panedwindow(main, orient=tk.HORIZONTAL)
middle.pack(fill=tk.BOTH, expand=True, pady=10)
# 左:字段列表
frm_left = ttk.Labelframe(middle, text="表字段 / 列名")
middle.add(frm_left, weight=1)
self.lst_columns = tk.Listbox(frm_left, selectmode=tk.SINGLE, exportselection=False)
self.lst_columns.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
# 中:模板构建
frm_mid = ttk.Labelframe(middle, text="命名模板构建")
middle.add(frm_mid, weight=1)
row1 = ttk.Frame(frm_mid)
row1.pack(fill=tk.X, padx=8, pady=(8, 4))
ttk.Label(row1, text="分隔符:").pack(side=tk.LEFT)
self.entry_sep = ttk.Entry(row1, width=6)
self.entry_sep.insert(0, "-")
self.entry_sep.pack(side=tk.LEFT, padx=6)
ttk.Button(row1, text="添加【选中字段】", command=self.add_selected_field_token).pack(side=tk.LEFT, padx=6)
self.entry_text = ttk.Entry(row1, width=16)
self.entry_text.insert(0, "自定义文本(如:反)")
self.entry_text.bind("<FocusIn>", lambda e: self._clear_placeholder(self.entry_text, "自定义文本(如:反)"))
self.entry_text.pack(side=tk.LEFT, padx=6)
ttk.Button(row1, text="添加【自定义文本】", command=self.add_text_token).pack(side=tk.LEFT)
row2 = ttk.Frame(frm_mid)
row2.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)
self.lst_tokens = tk.Listbox(row2, selectmode=tk.SINGLE, exportselection=False)
self.lst_tokens.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
btns = ttk.Frame(row2)
btns.pack(side=tk.LEFT, fill=tk.Y, padx=6)
ttk.Button(btns, text="上移", command=self.move_token_up).pack(fill=tk.X, pady=3)
ttk.Button(btns, text="下移", command=self.move_token_down).pack(fill=tk.X, pady=3)
ttk.Button(btns, text="删除", command=self.remove_token).pack(fill=tk.X, pady=3)
ttk.Button(btns, text="清空", command=self.clear_tokens).pack(fill=tk.X, pady=3)
row3 = ttk.Frame(frm_mid)
row3.pack(fill=tk.X, padx=8, pady=(4, 8))
ttk.Button(row3, text="预览文件名(取首行)", command=self.preview_filename).pack(side=tk.LEFT)
self.lbl_preview = ttk.Label(row3, text="预览:")
self.lbl_preview.pack(side=tk.LEFT, padx=10)
# 右:下载字段 & 控制
frm_right = ttk.Labelframe(middle, text="下载设置")
middle.add(frm_right, weight=1)
ttk.Label(frm_right, text="选择【下载字段】(URL列)").pack(anchor="w", padx=8, pady=(8, 2))
self.cmb_url_field = ttk.Combobox(frm_right, state="readonly")
self.cmb_url_field.pack(fill=tk.X, padx=8)
ttk.Button(frm_right, text="开始下载", command=self.start_download).pack(fill=tk.X, padx=8, pady=(12, 4))
self.progress = ttk.Progressbar(frm_right, orient=tk.HORIZONTAL, mode="determinate")
self.progress.pack(fill=tk.X, padx=8, pady=(0, 6))
self.lbl_progress = ttk.Label(frm_right, text="进度:0/0")
self.lbl_progress.pack(anchor="w", padx=8)
# 下:日志 + 数据预览
bottom = ttk.Panedwindow(main, orient=tk.VERTICAL)
bottom.pack(fill=tk.BOTH, expand=True)
frm_log = ttk.Labelframe(bottom, text="日志")
bottom.add(frm_log, weight=1)
self.txt_log = ScrolledText(frm_log, height=8)
self.txt_log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
frm_preview = ttk.Labelframe(bottom, text="数据预览(前 30 行)")
bottom.add(frm_preview, weight=1)
self.tree = ttk.Treeview(frm_preview, show="headings")
self.tree.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
# ---------- 事件 & 逻辑 ----------
def _clear_placeholder(self, entry: ttk.Entry, placeholder: str):
if entry.get() == placeholder:
entry.delete(0, tk.END)
def log(self, msg: str):
self.txt_log.insert(tk.END, msg + "\n")
self.txt_log.see(tk.END)
self.update_idletasks()
def open_table(self):
path = filedialog.askopenfilename(
title="选择表格文件",
filetypes=[("Excel 文件", "*.xlsx;*.xls"), ("CSV 文件", "*.csv"), ("所有文件", "*.*")]
)
if not path:
return
try:
if path.lower().endswith((".xlsx", ".xls")):
df = pd.read_excel(path)
elif path.lower().endswith(".csv"):
df = pd.read_csv(path, encoding="utf-8", engine="python")
else:
messagebox.showerror("错误", "不支持的文件类型")
return
except Exception as e:
messagebox.showerror("读取失败", f"无法读取文件:\n{e}")
return
self.filepath = path
self.df = df
self.lbl_file.config(text=os.path.basename(path))
self.populate_columns()
self.populate_preview()
self.log(f"已加载:{path},共 {len(df)} 行,{len(df.columns)} 列。")
def populate_columns(self):
self.lst_columns.delete(0, tk.END)
self.cmb_url_field["values"] = list(self.df.columns)
for c in self.df.columns:
self.lst_columns.insert(tk.END, str(c))
if len(self.df.columns) > 0:
self.lst_columns.select_set(0)
self.cmb_url_field.current(0)
def populate_preview(self):
# 清空 tree
for col in self.tree["columns"]:
self.tree.heading(col, text="")
self.tree.delete(*self.tree.get_children())
# 设置列
cols = [str(c) for c in self.df.columns]
self.tree["columns"] = cols
for c in cols:
self.tree.heading(c, text=c)
self.tree.column(c, width=120, anchor="w")
# 插入前 30 行
for _, row in self.df.head(30).iterrows():
vals = [str(row.get(c, "")) for c in self.df.columns]
self.tree.insert("", tk.END, values=vals)
def choose_output_dir(self):
dirpath = filedialog.askdirectory(title="选择保存目录")
if dirpath:
self.output_dir = dirpath
self.lbl_out.config(text=f"保存目录:{self.output_dir}")
# ----- 模板 token 操作 -----
def add_selected_field_token(self):
sel = self.lst_columns.curselection()
if not sel:
messagebox.showwarning("提示", "请先在左侧列表选中一个字段/列名")
return
col = self.lst_columns.get(sel[0])
self.tokens.append({"type": "field", "value": col})
self.refresh_token_listbox()
def add_text_token(self):
txt = self.entry_text.get().strip()
if not txt or txt == "自定义文本(如:反)":
messagebox.showwarning("提示", "请输入自定义文本")
return
self.tokens.append({"type": "text", "value": txt})
self.entry_text.delete(0, tk.END)
self.refresh_token_listbox()
def refresh_token_listbox(self):
self.lst_tokens.delete(0, tk.END)
for t in self.tokens:
prefix = "[字段]" if t["type"] == "field" else "[文本]"
self.lst_tokens.insert(tk.END, f"{prefix} {t['value']}")
def move_token_up(self):
idxs = self.lst_tokens.curselection()
if not idxs:
return
i = idxs[0]
if i == 0:
return
self.tokens[i-1], self.tokens[i] = self.tokens[i], self.tokens[i-1]
self.refresh_token_listbox()
self.lst_tokens.select_set(i-1)
def move_token_down(self):
idxs = self.lst_tokens.curselection()
if not idxs:
return
i = idxs[0]
if i >= len(self.tokens) - 1:
return
self.tokens[i+1], self.tokens[i] = self.tokens[i], self.tokens[i+1]
self.refresh_token_listbox()
self.lst_tokens.select_set(i+1)
def remove_token(self):
idxs = self.lst_tokens.curselection()
if not idxs:
return
i = idxs[0]
self.tokens.pop(i)
self.refresh_token_listbox()
def clear_tokens(self):
self.tokens.clear()
self.refresh_token_listbox()
# ----- 生成文件名 -----
def build_filename_for_row(self, row: pd.Series) -> str:
sep = self.entry_sep.get()
parts = []
for t in self.tokens:
if t["type"] == "field":
val = row.get(t["value"], "")
parts.append("" if pd.isna(val) else str(val).strip())
else:
parts.append(t["value"])
joined = sep.join([p for p in parts if p is not None and str(p) != ""])
return sanitize_filename(joined)
def preview_filename(self):
if self.df is None or self.df.empty:
messagebox.showwarning("提示", "请先加载表格文件")
return
name = self.build_filename_for_row(self.df.iloc[0])
self.lbl_preview.config(text=f"预览:{name}")
self.log(f"[预览] 首行文件名:{name}")
# ----- 下载 -----
def start_download(self):
if self.df is None or self.df.empty:
messagebox.showwarning("提示", "请先加载表格文件")
return
url_field = self.cmb_url_field.get()
if not url_field:
messagebox.showwarning("提示", "请选择【下载字段】")
return
if not any(self.tokens):
# 如果未设置模板,则默认用 URL 文件名
if not messagebox.askyesno("确认", "未设置命名模板,将使用 URL 的文件名作为基础名,是否继续?"):
return
os.makedirs(self.output_dir, exist_ok=True)
# 准备任务
tasks = []
for idx, row in self.df.iterrows():
url = row.get(url_field, "")
if pd.isna(url) or not str(url).strip():
continue
tasks.append((idx, str(url).strip(), row))
total = len(tasks)
if total == 0:
messagebox.showwarning("提示", f"字段「{url_field}」中没有可用的 URL。")
return
self.progress["maximum"] = total
self.progress["value"] = 0
self.lbl_progress.config(text=f"进度:0/{total}")
self.txt_log.delete("1.0", tk.END)
self.log(f"开始下载:共 {total} 个文件。保存目录:{self.output_dir}")
threading.Thread(target=self._run_downloads, args=(tasks,), daemon=True).start()
def _run_downloads(self, tasks: list[tuple[int, str, pd.Series]]):
ok, fail = 0, 0
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
})
def download_one(idx: int, url: str, row: pd.Series):
# 构建基础文件名
if self.tokens:
base = self.build_filename_for_row(row)
else:
# 未设置模板:用 URL 路径名(不含扩展)
path = urlsplit(url).path
base = sanitize_filename(os.path.splitext(os.path.basename(path))[0] or f"row_{idx}")
# 先 HEAD 获取类型(不强求)
headers = {}
try:
r_head = session.head(url, timeout=10, allow_redirects=True)
headers = r_head.headers or {}
except Exception:
pass
# GET
try:
with session.get(url, timeout=30, stream=True) as r:
r.raise_for_status()
ext = guess_ext_from_url_or_headers(url, r.headers or headers)
# 最终路径(处理重名)
final_path = ensure_unique_path(self.output_dir, base, ext)
with open(final_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True, final_path
except Exception as e:
return False, str(e)
with ThreadPoolExecutor(max_workers=self.thread_count.get()) as ex:
future_map = {ex.submit(download_one, idx, url, row): (idx, url) for idx, url, row in tasks}
for fut in as_completed(future_map):
self.progress.step(1)
done = int(self.progress["value"])
total = int(self.progress["maximum"])
self.lbl_progress.config(text=f"进度:{done}/{total}")
success, info = fut.result()
if success:
ok += 1
self.log(f"[OK] {info}")
else:
fail += 1
idx, url = future_map[fut]
self.log(f"[FAIL] 行 {idx} | {url} | {info}")
self.log(f"完成:成功 {ok},失败 {fail}。")
# ================== 运行 ==================
if __name__ == "__main__":
app = DownloaderApp()
app.mainloop()
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容