Python高效办公:如何利用问卷星进行批量图片格式收集

图片[1]-Python高效办公:如何利用问卷星进行批量图片格式收集-云村Study

相信不少人对于突然的大量图片收集有一点的烦恼,尤其收集的图片还需要格式命名要求,对于自己手动更改这是一个很繁琐的事情,在Python的世界里,你只需要轻松的调用pandas库就能实现

1、读表拿列名

import pandas as pd

# Excel / CSV 读取
df = pd.read_excel("你的表.xlsx")        # 或:pd.read_csv("your.csv")

columns = list(df.columns)              # 字段名用于 UI 下拉

2、行取值

def get_val(row: pd.Series, col: str) -> str:
    v = row.get(col, "")
    return "" if pd.isna(v) else str(v).strip()

3、下载单个文件

def download_one(idx: int, url: str, row: pd.Series):
    # 1) 生成基础文件名
    if self.tokens:
        base = self.build_filename_for_row(row)
    else:
        path = urlsplit(url).path
        base = sanitize_filename(os.path.splitext(os.path.basename(path))[0] or f"row_{idx}")

    # 2) 先尝试 HEAD 获取文件类型
    headers = {}
    try:
        r_head = session.head(url, timeout=10, allow_redirects=True)
        headers = r_head.headers or {}
    except:
        pass

    # 3) 真正下载
    with session.get(url, timeout=30, stream=True) as r:
        r.raise_for_status()
        ext = guess_ext_from_url_or_headers(url, r.headers or headers)
        final_path = ensure_unique_path(self.output_dir, base, ext)
        with open(final_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
    return final_path

最终呈现代码

# -*- coding: utf-8 -*-
"""
表格批量下载器(GUI)
- 导入 Excel/CSV
- 选择下载字段(URL 列)
- 构建命名模板:字段 + 自定义文本 + 全局分隔符
- 批量下载并命名:如 姓名-反-身份证号码
依赖:pandas, openpyxl, requests
安装:pip install pandas openpyxl requests
BY:蜗牛村长
"""

import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlsplit, unquote, urlparse
import mimetypes
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.scrolledtext import ScrolledText

import pandas as pd
import requests

# ================== 工具函数 ==================

ILLEGAL_CHARS = r'[\\/:*?"<>|\r\n\t]'

def sanitize_filename(name: str, max_len: int = 140) -> str:
    """清洗文件名非法字符,并限制长度"""
    if name is None:
        name = ""
    name = str(name).strip()
    name = re.sub(ILLEGAL_CHARS, "_", name)
    # 去掉多余空格与分隔符堆叠
    name = re.sub(r'\s+', ' ', name).strip()
    # Windows 末尾的点和空格不允许
    name = name.rstrip(" .")
    # 限长
    if len(name) > max_len:
        name = name[:max_len].rstrip(" ._")
    # 空名兜底
    return name or "unnamed"

def guess_ext_from_url_or_headers(url: str, headers: dict) -> str:
    """从 URL 路径或响应头推断扩展名,默认 .jpg"""
    # 1) URL 路径
    try:
        path = urlsplit(url).path
        if "." in path:
            ext = os.path.splitext(path)[1]
            # 排除奇怪的长参数拼接成的“扩展名”
            if 1 <= len(ext) <= 6 and re.match(r'^\.[A-Za-z0-9]+$', ext):
                return ext.lower()
    except Exception:
        pass
    # 2) 响应头 Content-Type
    ctype = headers.get("Content-Type", "").split(";")[0].strip().lower()
    if ctype:
        ext = mimetypes.guess_extension(ctype)
        if ext:
            # 某些类型会映射到 .jpe,统一成 .jpg 更常见
            return ".jpg" if ext in (".jpe",) else ext
    # 3) 默认
    return ".jpg"

def ensure_unique_path(dirpath: str, filename_no_ext: str, ext: str) -> str:
    """重名自动加 (1), (2) ..."""
    candidate = os.path.join(dirpath, filename_no_ext + ext)
    if not os.path.exists(candidate):
        return candidate
    i = 1
    while True:
        cand = os.path.join(dirpath, f"{filename_no_ext}({i}){ext}")
        if not os.path.exists(cand):
            return cand
        i += 1

# ================== GUI 应用 ==================

class DownloaderApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("表格批量下载器(字段命名模板)————BY:蜗牛村长")
        self.geometry("980x680")
        self.minsize(900, 600)

        self.df: pd.DataFrame | None = None
        self.filepath: str | None = None
        self.tokens: list[dict] = []  # [{'type':'field','value':'列名'}|{'type':'text','value':'自定义文本'}]
        self.output_dir: str = os.path.abspath("downloads")
        self.thread_count = tk.IntVar(value=8)

        self._build_ui()

    # ---------- UI ----------
    def _build_ui(self):
        main = ttk.Frame(self)
        main.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

        # 顶部:文件加载 & 保存目录
        top = ttk.Frame(main)
        top.pack(fill=tk.X)

        ttk.Button(top, text="打开表格 (Excel/CSV)", command=self.open_table).pack(side=tk.LEFT)
        self.lbl_file = ttk.Label(top, text="未选择文件")
        self.lbl_file.pack(side=tk.LEFT, padx=8)

        ttk.Button(top, text="保存到...", command=self.choose_output_dir).pack(side=tk.LEFT, padx=16)
        self.lbl_out = ttk.Label(top, text=f"保存目录:{self.output_dir}")
        self.lbl_out.pack(side=tk.LEFT, padx=8)

        ttk.Label(top, text="线程数:").pack(side=tk.LEFT, padx=12)
        ttk.Spinbox(top, from_=1, to=64, textvariable=self.thread_count, width=5).pack(side=tk.LEFT)

        # 中部:左列=字段/列名;中列=模板构建;右列=下载字段/预览
        middle = ttk.Panedwindow(main, orient=tk.HORIZONTAL)
        middle.pack(fill=tk.BOTH, expand=True, pady=10)

        # 左:字段列表
        frm_left = ttk.Labelframe(middle, text="表字段 / 列名")
        middle.add(frm_left, weight=1)

        self.lst_columns = tk.Listbox(frm_left, selectmode=tk.SINGLE, exportselection=False)
        self.lst_columns.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)

        # 中:模板构建
        frm_mid = ttk.Labelframe(middle, text="命名模板构建")
        middle.add(frm_mid, weight=1)

        row1 = ttk.Frame(frm_mid)
        row1.pack(fill=tk.X, padx=8, pady=(8, 4))
        ttk.Label(row1, text="分隔符:").pack(side=tk.LEFT)
        self.entry_sep = ttk.Entry(row1, width=6)
        self.entry_sep.insert(0, "-")
        self.entry_sep.pack(side=tk.LEFT, padx=6)

        ttk.Button(row1, text="添加【选中字段】", command=self.add_selected_field_token).pack(side=tk.LEFT, padx=6)

        self.entry_text = ttk.Entry(row1, width=16)
        self.entry_text.insert(0, "自定义文本(如:反)")
        self.entry_text.bind("<FocusIn>", lambda e: self._clear_placeholder(self.entry_text, "自定义文本(如:反)"))
        self.entry_text.pack(side=tk.LEFT, padx=6)
        ttk.Button(row1, text="添加【自定义文本】", command=self.add_text_token).pack(side=tk.LEFT)

        row2 = ttk.Frame(frm_mid)
        row2.pack(fill=tk.BOTH, expand=True, padx=8, pady=4)

        self.lst_tokens = tk.Listbox(row2, selectmode=tk.SINGLE, exportselection=False)
        self.lst_tokens.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        btns = ttk.Frame(row2)
        btns.pack(side=tk.LEFT, fill=tk.Y, padx=6)
        ttk.Button(btns, text="上移", command=self.move_token_up).pack(fill=tk.X, pady=3)
        ttk.Button(btns, text="下移", command=self.move_token_down).pack(fill=tk.X, pady=3)
        ttk.Button(btns, text="删除", command=self.remove_token).pack(fill=tk.X, pady=3)
        ttk.Button(btns, text="清空", command=self.clear_tokens).pack(fill=tk.X, pady=3)

        row3 = ttk.Frame(frm_mid)
        row3.pack(fill=tk.X, padx=8, pady=(4, 8))
        ttk.Button(row3, text="预览文件名(取首行)", command=self.preview_filename).pack(side=tk.LEFT)
        self.lbl_preview = ttk.Label(row3, text="预览:")
        self.lbl_preview.pack(side=tk.LEFT, padx=10)

        # 右:下载字段 & 控制
        frm_right = ttk.Labelframe(middle, text="下载设置")
        middle.add(frm_right, weight=1)

        ttk.Label(frm_right, text="选择【下载字段】(URL列)").pack(anchor="w", padx=8, pady=(8, 2))
        self.cmb_url_field = ttk.Combobox(frm_right, state="readonly")
        self.cmb_url_field.pack(fill=tk.X, padx=8)

        ttk.Button(frm_right, text="开始下载", command=self.start_download).pack(fill=tk.X, padx=8, pady=(12, 4))
        self.progress = ttk.Progressbar(frm_right, orient=tk.HORIZONTAL, mode="determinate")
        self.progress.pack(fill=tk.X, padx=8, pady=(0, 6))
        self.lbl_progress = ttk.Label(frm_right, text="进度:0/0")
        self.lbl_progress.pack(anchor="w", padx=8)

        # 下:日志 + 数据预览
        bottom = ttk.Panedwindow(main, orient=tk.VERTICAL)
        bottom.pack(fill=tk.BOTH, expand=True)

        frm_log = ttk.Labelframe(bottom, text="日志")
        bottom.add(frm_log, weight=1)
        self.txt_log = ScrolledText(frm_log, height=8)
        self.txt_log.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)

        frm_preview = ttk.Labelframe(bottom, text="数据预览(前 30 行)")
        bottom.add(frm_preview, weight=1)
        self.tree = ttk.Treeview(frm_preview, show="headings")
        self.tree.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)

    # ---------- 事件 & 逻辑 ----------
    def _clear_placeholder(self, entry: ttk.Entry, placeholder: str):
        if entry.get() == placeholder:
            entry.delete(0, tk.END)

    def log(self, msg: str):
        self.txt_log.insert(tk.END, msg + "\n")
        self.txt_log.see(tk.END)
        self.update_idletasks()

    def open_table(self):
        path = filedialog.askopenfilename(
            title="选择表格文件",
            filetypes=[("Excel 文件", "*.xlsx;*.xls"), ("CSV 文件", "*.csv"), ("所有文件", "*.*")]
        )
        if not path:
            return
        try:
            if path.lower().endswith((".xlsx", ".xls")):
                df = pd.read_excel(path)
            elif path.lower().endswith(".csv"):
                df = pd.read_csv(path, encoding="utf-8", engine="python")
            else:
                messagebox.showerror("错误", "不支持的文件类型")
                return
        except Exception as e:
            messagebox.showerror("读取失败", f"无法读取文件:\n{e}")
            return

        self.filepath = path
        self.df = df
        self.lbl_file.config(text=os.path.basename(path))
        self.populate_columns()
        self.populate_preview()
        self.log(f"已加载:{path},共 {len(df)} 行,{len(df.columns)} 列。")

    def populate_columns(self):
        self.lst_columns.delete(0, tk.END)
        self.cmb_url_field["values"] = list(self.df.columns)
        for c in self.df.columns:
            self.lst_columns.insert(tk.END, str(c))
        if len(self.df.columns) > 0:
            self.lst_columns.select_set(0)
            self.cmb_url_field.current(0)

    def populate_preview(self):
        # 清空 tree
        for col in self.tree["columns"]:
            self.tree.heading(col, text="")
        self.tree.delete(*self.tree.get_children())
        # 设置列
        cols = [str(c) for c in self.df.columns]
        self.tree["columns"] = cols
        for c in cols:
            self.tree.heading(c, text=c)
            self.tree.column(c, width=120, anchor="w")
        # 插入前 30 行
        for _, row in self.df.head(30).iterrows():
            vals = [str(row.get(c, "")) for c in self.df.columns]
            self.tree.insert("", tk.END, values=vals)

    def choose_output_dir(self):
        dirpath = filedialog.askdirectory(title="选择保存目录")
        if dirpath:
            self.output_dir = dirpath
            self.lbl_out.config(text=f"保存目录:{self.output_dir}")

    # ----- 模板 token 操作 -----
    def add_selected_field_token(self):
        sel = self.lst_columns.curselection()
        if not sel:
            messagebox.showwarning("提示", "请先在左侧列表选中一个字段/列名")
            return
        col = self.lst_columns.get(sel[0])
        self.tokens.append({"type": "field", "value": col})
        self.refresh_token_listbox()

    def add_text_token(self):
        txt = self.entry_text.get().strip()
        if not txt or txt == "自定义文本(如:反)":
            messagebox.showwarning("提示", "请输入自定义文本")
            return
        self.tokens.append({"type": "text", "value": txt})
        self.entry_text.delete(0, tk.END)
        self.refresh_token_listbox()

    def refresh_token_listbox(self):
        self.lst_tokens.delete(0, tk.END)
        for t in self.tokens:
            prefix = "[字段]" if t["type"] == "field" else "[文本]"
            self.lst_tokens.insert(tk.END, f"{prefix} {t['value']}")

    def move_token_up(self):
        idxs = self.lst_tokens.curselection()
        if not idxs:
            return
        i = idxs[0]
        if i == 0:
            return
        self.tokens[i-1], self.tokens[i] = self.tokens[i], self.tokens[i-1]
        self.refresh_token_listbox()
        self.lst_tokens.select_set(i-1)

    def move_token_down(self):
        idxs = self.lst_tokens.curselection()
        if not idxs:
            return
        i = idxs[0]
        if i >= len(self.tokens) - 1:
            return
        self.tokens[i+1], self.tokens[i] = self.tokens[i], self.tokens[i+1]
        self.refresh_token_listbox()
        self.lst_tokens.select_set(i+1)

    def remove_token(self):
        idxs = self.lst_tokens.curselection()
        if not idxs:
            return
        i = idxs[0]
        self.tokens.pop(i)
        self.refresh_token_listbox()

    def clear_tokens(self):
        self.tokens.clear()
        self.refresh_token_listbox()

    # ----- 生成文件名 -----
    def build_filename_for_row(self, row: pd.Series) -> str:
        sep = self.entry_sep.get()
        parts = []
        for t in self.tokens:
            if t["type"] == "field":
                val = row.get(t["value"], "")
                parts.append("" if pd.isna(val) else str(val).strip())
            else:
                parts.append(t["value"])
        joined = sep.join([p for p in parts if p is not None and str(p) != ""])
        return sanitize_filename(joined)

    def preview_filename(self):
        if self.df is None or self.df.empty:
            messagebox.showwarning("提示", "请先加载表格文件")
            return
        name = self.build_filename_for_row(self.df.iloc[0])
        self.lbl_preview.config(text=f"预览:{name}")
        self.log(f"[预览] 首行文件名:{name}")

    # ----- 下载 -----
    def start_download(self):
        if self.df is None or self.df.empty:
            messagebox.showwarning("提示", "请先加载表格文件")
            return
        url_field = self.cmb_url_field.get()
        if not url_field:
            messagebox.showwarning("提示", "请选择【下载字段】")
            return
        if not any(self.tokens):
            # 如果未设置模板,则默认用 URL 文件名
            if not messagebox.askyesno("确认", "未设置命名模板,将使用 URL 的文件名作为基础名,是否继续?"):
                return

        os.makedirs(self.output_dir, exist_ok=True)

        # 准备任务
        tasks = []
        for idx, row in self.df.iterrows():
            url = row.get(url_field, "")
            if pd.isna(url) or not str(url).strip():
                continue
            tasks.append((idx, str(url).strip(), row))

        total = len(tasks)
        if total == 0:
            messagebox.showwarning("提示", f"字段「{url_field}」中没有可用的 URL。")
            return

        self.progress["maximum"] = total
        self.progress["value"] = 0
        self.lbl_progress.config(text=f"进度:0/{total}")
        self.txt_log.delete("1.0", tk.END)

        self.log(f"开始下载:共 {total} 个文件。保存目录:{self.output_dir}")
        threading.Thread(target=self._run_downloads, args=(tasks,), daemon=True).start()

    def _run_downloads(self, tasks: list[tuple[int, str, pd.Series]]):
        ok, fail = 0, 0
        session = requests.Session()
        session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
        })

        def download_one(idx: int, url: str, row: pd.Series):
            # 构建基础文件名
            if self.tokens:
                base = self.build_filename_for_row(row)
            else:
                # 未设置模板:用 URL 路径名(不含扩展)
                path = urlsplit(url).path
                base = sanitize_filename(os.path.splitext(os.path.basename(path))[0] or f"row_{idx}")

            # 先 HEAD 获取类型(不强求)
            headers = {}
            try:
                r_head = session.head(url, timeout=10, allow_redirects=True)
                headers = r_head.headers or {}
            except Exception:
                pass

            # GET
            try:
                with session.get(url, timeout=30, stream=True) as r:
                    r.raise_for_status()
                    ext = guess_ext_from_url_or_headers(url, r.headers or headers)
                    # 最终路径(处理重名)
                    final_path = ensure_unique_path(self.output_dir, base, ext)
                    with open(final_path, "wb") as f:
                        for chunk in r.iter_content(chunk_size=8192):
                            if chunk:
                                f.write(chunk)
                return True, final_path
            except Exception as e:
                return False, str(e)

        with ThreadPoolExecutor(max_workers=self.thread_count.get()) as ex:
            future_map = {ex.submit(download_one, idx, url, row): (idx, url) for idx, url, row in tasks}
            for fut in as_completed(future_map):
                self.progress.step(1)
                done = int(self.progress["value"])
                total = int(self.progress["maximum"])
                self.lbl_progress.config(text=f"进度:{done}/{total}")

                success, info = fut.result()
                if success:
                    ok += 1
                    self.log(f"[OK] {info}")
                else:
                    fail += 1
                    idx, url = future_map[fut]
                    self.log(f"[FAIL] 行 {idx} | {url} | {info}")

        self.log(f"完成:成功 {ok},失败 {fail}。")

# ================== 运行 ==================

if __name__ == "__main__":
    app = DownloaderApp()
    app.mainloop()
© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容