香蕉加速器官网正版
Python3 异步并发下载删除zip包中指定文件然后上传压缩包到阿里云oss

Python3 异步并发下载删除zip包中指定文件然后上传压缩包到阿里云oss

近期文章:推荐最好的fomepay虚拟信用卡,解决ChatGPT Plus Telegram会员无法支付的问题等。

对于生产项目,4w个压缩包的跨项目迁移需要下载后删除压缩包中以html、txt、url结尾的文件,上传到oss。不过在下载之前,需要对文件列表中的地址进行裁剪,最后与域名进行拼接。使用shell命令zip -d去下载并删除压缩包中的文件。这里注意,上传的时候需要使用文件列表中的地址来上传,但是上传到oss的时候需要去掉第一个/。使用lstrip(‘/’)删除,上传时不会报错电报TG。

我是自学的python,水平有限。总感觉自己写的不太理想,但是想要的功能是可以实现的,以后会慢慢完善的。

Python3异步并发下载脚本

导入aiohttp

导入异步

导入操作系统

导入压缩文件

导入子流程

进口2

导入日志记录

#指定文件列表文件

file_list=’zip.txt’

#指定数据保存目录

DATA_DIR=’/数据/’

BASE_URL=’下载域名’

#指定最大并发数

最大并发下载数=8

#上传成功后是否删除本地文件

DELETE_LOCAL_FILE_AFTER_UPLOAD=True #False

#几秒后重试以及最大重试次数

重试延迟=5

最大重试次数=3

#指定下载超时时间

超时=300

#配置下载成功和失败记录文件

SUCCESS_FILE=os.path.join(DATA_DIR, ‘success_log.txt’)

FAILURE_FILE=os.path.join(DATA_DIR, ‘failure_log.txt’)

# 配置oss

OSS_ACCESS_KEY_ID=”

OSS_ACCESS_KEY_SECRET=”

OSS_ENDPOINT=”

OSS_BUCKET_NAME=”

#配置日志文件

logging.basicConfig(filename=’download_upload.log’, level=logging.INFO)

异步def extract_content_from_url(url):

#urldo 切割

parts=url.split(‘/’, 4)

first_part=’/’ + ‘/’.join(parts[1:4])

内容=部分[4]

返回first_part,内容

异步def download_from_ku(url, 会话, 信号量, success_file, failure_file, retries=MAX_RETRIES):

第一部分,内容=等待extract_content_from_url(url)

#构建完整的下载地址

full_url=BASE_URL + 内容

data_dir=DATA_DIR + os.path.dirname(内容)

os.makedirs(data_dir,exist_ok=True)

local_filename=os.path.join(data_dir, os.path.basename(内容))

如果os.path.exists(local_filename):

logging.info(f’文件已存在: {local_filename}’)

返回本地文件名

重试时0:

尝试:

与信号量异步,session.get(full_url, timeout=TIMEOUT) 作为response:

以open(local_filename, ‘wb’) 作为file:

块=等待响应.content.read(8192)

如果不是chunk:

文件.write(块)

print(f’下载成功: {first_part}/{content}’)

logging.info(f’下载成功: {first_part}/{content}’)

打开(SUCCESS_FILE,’a’)作为success_log:

success_log.write(f’下载成功: {first_part}/{content}n’)

返回本地文件名Telegram会员

除了asyncio.TimeoutError:

logging.warning(f’下载超时,跳过下载{first_part}/{content}’)

打开(FAILURE_FILE,’a’)作为failure_log:

failure_log.write(f’下载超时: {first_part}/{content}n’)

返回无

除了异常e:

print(f’下载错误{内容}: {e}’)

logging.error(f’下载错误{content}: {e}’)

打开(FAILURE_FILE,’a’)作为failure_log:

failure_log.write(f’下载错误{内容}: {e}n’)

重试-=1

如果重试==0:

logging.error(f’达到最大重试次数并放弃下载{content}’)

否则:

logging.info(f’重试下载{content},剩余次数: {retries}’)

等待asyncio.sleep(RETRY_DELAY)

异步def remove_files_from_zip(zip_file, *files_to_remove):

#删除压缩包中指定类型的文件,调用zip -d

data_dir=DATA_DIR + os.path.dirname(zip_file)

zip_file_path=os.path.join(data_dir, zip_file)

命令=[‘zip’, ‘-d’, zip_file_path] + list(files_to_remove)

尝试:

子进程.运行(命令)

logging.info(f’已从: {zip_file_path} 中删除文件’)

除了subprocess.CalledProcessError 为e:

logging.error(f’从{zip_file_path}: {e} 删除文件时出错’)

异步def upload_to_oss(local_path, oss_key, 重试=MAX_RETRIES):

# 上传文件到OSS

auth=oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)

桶=oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)

重试时0:

尝试:

以open(local_path, ‘rb’) 作为file:

Bucket.put_object(oss_key, 文件)

oss_url=f’https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}’

print(f’上传成功: {oss_url}’)

logging.info(f’上传成功: {oss_url}’)

如果DELETE_LOCAL_FILE_AFTER_UPLOAD:

os.remove(本地路径)

除了oss2.exceptions.OssError 为e:

print(f’上传失败: {e}’)

logging.error(f’上传失败: {e}’)

重试-=1

如果重试==0:

logging.error(f’已达到最大重试次数,放弃上传{local_path}’)

否则:

logging.info(f’重试上传{local_path},剩余次数: {retries}’)

等待asyncio.sleep(RETRY_DELAY)

异步def download_remove_upload_task(url, 会话, 信号量):

第一部分,内容=等待extract_content_from_url(url)

如果不满足:

local_filename=等待download_from_ku(url, 会话, 信号量, SUCCESS_FILE, FAILURE_FILE)

如果本地文件名:

等待remove_files_from_zip(local_filename, ‘*.html’, ‘*.txt’, ‘*.url’)

第一部分,内容=等待extract_content_from_url(url)

oss_key=(first_part.lstrip(‘/’) + ‘/’ + os.path.dirname(内容)+ ‘/’ + os.path.basename(local_filename)

等待upload_to_oss(local_filename, oss_key)

除了异常e:

print(f’处理{url} 时出现错误: {e}’)

logging.error(f’处理{url} 时出现错误: {e}’)

异步def main():

使用open(file_list, ‘r’) 作为file:

信号量=asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)

连接器=aiohttp.TCPConnector(限制=MAX_CONCURRENT_DOWNLOADS)

与aiohttp.ClientSession(connector=connector) 异步作为session:

#创建电报TG下载任务列表

任务=[

download_remove_upload_task(url.strip().strip(”’), 会话, 信号量)

对于文件中的url

]

#同时执行下载和删除上传任务

等待asyncio.gather(*tasks)

如果__name__==’__main__’:

asyncio.get_event_loop().run_until_complete(main())

pip3安装依赖aiohttp asyncio oss2子进程

pip3 安装aiohttp

pip3安装异步

pip3安装oss2

pip3 安装子进程

python3脚本内容简单备注

正确的下载地址是去掉第四个/前面的。使用split 剪切路径的第一部分和第二部分并将其保存到变量中。

异步def extract_content_from_url(url):

#urldo 切割

parts=url.split(‘/’, 4)

first_part=’/’ + ‘/’.join(parts[1:4])

内容=部分[4]

返回first_part,内容

Python3 异步并发下载删除zip包中指定文件然后上传压缩包到阿里云oss - 第1张 文件列表Python3 异步并发下载删除zip包中指定文件然后上传压缩包到阿里云oss - 第2张下载日志构建下载删除上传任务

Python3 异步并发下载删除zip包中指定文件然后上传压缩包到阿里云oss - 第3张 最后在main函数中使用asyncio.gather(*tasks)启动任务,扔到后台运行一整夜。有些大文件下载超时,如果失败,再次运行就结束了。

python官方中文文档