using System;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
namespace HttpDownloader
{
class Program
{
static async Task Main(string[] args)
{
string url = "https://example.com/file.zip";
string savePath = @"C:\Downloads\downloaded_file.zip";
await DownloadFileAsync(url, savePath);
}
/// <summary>
/// 异步下载文件(先获取HEAD信息)
/// </summary>
/// <param name="url">下载链接</param>
/// <param name="savePath">保存路径</param>
public static async Task DownloadFileAsync(string url, string savePath)
{
try
{
using (HttpClient client = new HttpClient())
{
// 设置超时时间
client.Timeout = TimeSpan.FromMinutes(10);
// 添加请求头(可选)
client.DefaultRequestHeaders.Add("User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36");
Console.WriteLine("开始下载文件...");
Console.WriteLine($"URL: {url}");
Console.WriteLine($"保存路径: {savePath}");
// 先发送HEAD请求获取文件信息
Console.WriteLine("正在获取文件信息...");
HttpResponseMessage headResponse = await client.SendAsync(
new HttpRequestMessage(HttpMethod.Head, url));
headResponse.EnsureSuccessStatusCode();
// 获取文件大小
long? contentLength = headResponse.Content.Headers.ContentLength;
if (contentLength.HasValue)
{
Console.WriteLine($"文件大小: {contentLength.Value / 1024 / 1024:F2} MB");
}
else
{
Console.WriteLine("无法获取文件大小");
}
// 获取文件类型
string contentType = headResponse.Content.Headers.ContentType?.MediaType ?? "未知";
Console.WriteLine($"文件类型: {contentType}");
// 确保目录存在
string directory = Path.GetDirectoryName(savePath);
if (!Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
}
// 现在发送GET请求下载文件(使用流式下载)
Console.WriteLine("开始下载文件内容...");
using (HttpResponseMessage response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();
using (Stream contentStream = await response.Content.ReadAsStreamAsync())
using (FileStream fileStream = new FileStream(savePath, FileMode.Create, FileAccess.Write))
{
await contentStream.CopyToAsync(fileStream);
}
}
Console.WriteLine("文件下载完成!");
}
}
catch (HttpRequestException ex)
{
Console.WriteLine($"HTTP请求错误: {ex.Message}");
}
catch (TaskCanceledException ex)
{
Console.WriteLine($"请求超时: {ex.Message}");
}
catch (IOException ex)
{
Console.WriteLine($"文件操作错误: {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($"发生未知错误: {ex.Message}");
}
}
/// <summary>
/// 带进度显示的下载方法
/// </summary>
/// <param name="url">下载链接</param>
/// <param name="savePath">保存路径</param>
public static async Task DownloadFileWithProgressAsync(string url, string savePath)
{
try
{
using (HttpClient client = new HttpClient())
{
client.Timeout = TimeSpan.FromMinutes(10);
client.DefaultRequestHeaders.Add("User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36");
Console.WriteLine("开始下载文件...");
using (HttpResponseMessage response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();
long? contentLength = response.Content.Headers.ContentLength;
using (Stream contentStream = await response.Content.ReadAsStreamAsync())
using (FileStream fileStream = new FileStream(savePath, FileMode.Create, FileAccess.Write))
{
byte[] buffer = new byte[8192];
long totalRead = 0;
int bytesRead;
while ((bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fileStream.WriteAsync(buffer, 0, bytesRead);
totalRead += bytesRead;
if (contentLength.HasValue)
{
double percentage = (double)totalRead / contentLength.Value * 100;
Console.Write($"\r下载进度: {percentage:F2}% ({totalRead}/{contentLength.Value} bytes)");
}
}
}
}
Console.WriteLine("\n文件下载完成!");
}
}
catch (Exception ex)
{
Console.WriteLine($"\n下载失败: {ex.Message}");
}
}
}
}
import requests
import os
from urllib.parse import urlparse
from pathlib import Path
import time
def download_file(url, save_path=None, chunk_size=8192, timeout=600):
"""
下载文件
Args:
url (str): 下载链接
save_path (str): 保存路径,如果为None则自动生成
chunk_size (int): 数据块大小
timeout (int): 超时时间(秒)
Returns:
bool: 下载是否成功
"""
try:
# 如果没有指定保存路径,则从URL中提取文件名
if save_path is None:
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path) or "downloaded_file"
save_path = os.path.join("downloads", filename)
# 确保目录存在
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
print(f"开始下载文件...")
print(f"URL: {url}")
print(f"保存路径: {save_path}")
# 发送HEAD请求获取文件信息
head_response = requests.head(url, headers=headers, timeout=30)
content_length = head_response.headers.get('content-length')
if content_length:
file_size = int(content_length)
print(f"文件大小: {file_size / 1024 / 1024:.2f} MB")
else:
file_size = None
print("无法获取文件大小")
# 下载文件
response = requests.get(url, headers=headers, stream=True, timeout=timeout)
response.raise_for_status()
downloaded = 0
start_time = time.time()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示进度
if file_size:
progress = (downloaded / file_size) * 100
elapsed_time = time.time() - start_time
if elapsed_time > 0:
speed = downloaded / elapsed_time / 1024 # KB/s
print(f"\r下载进度: {progress:.2f}% ({downloaded}/{file_size} bytes) 速度: {speed:.2f} KB/s", end='')
print(f"\n文件下载完成!")
print(f"保存位置: {os.path.abspath(save_path)}")
return True
except requests.exceptions.RequestException as e:
print(f"\n请求错误: {e}")
return False
except IOError as e:
print(f"\n文件操作错误: {e}")
return False
except Exception as e:
print(f"\n发生未知错误: {e}")
return False
def download_file_with_resume(url, save_path, chunk_size=8192, timeout=600):
"""
支持断点续传的下载函数
Args:
url (str): 下载链接
save_path (str): 保存路径
chunk_size (int): 数据块大小
timeout (int): 超时时间(秒)
Returns:
bool: 下载是否成功
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 检查是否已经存在部分文件
first_byte = 0
if os.path.exists(save_path):
first_byte = os.path.getsize(save_path)
print(f"发现已存在文件,从第 {first_byte} 字节开始续传")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Range': f'bytes={first_byte}-'
}
print(f"开始下载文件...")
print(f"URL: {url}")
print(f"保存路径: {save_path}")
response = requests.get(url, headers=headers, stream=True, timeout=timeout)
# 检查服务器是否支持范围请求
if response.status_code == 206: # Partial Content
print("服务器支持断点续传")
mode = 'ab' # 追加模式
elif response.status_code == 200: # OK
print("服务器不支持断点续传,重新下载")
first_byte = 0
mode = 'wb' # 写模式
else:
response.raise_for_status()
# 获取文件总大小
content_range = response.headers.get('content-range')
if content_range:
file_size = int(content_range.split('/')[-1])
else:
content_length = response.headers.get('content-length')
file_size = int(content_length) + first_byte if content_length else None
if file_size:
print(f"文件总大小: {file_size / 1024 / 1024:.2f} MB")
downloaded = first_byte
start_time = time.time()
with open(save_path, mode) as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# 显示进度
if file_size:
progress = (downloaded / file_size) * 100
elapsed_time = time.time() - start_time
if elapsed_time > 0:
speed = (downloaded - first_byte) / elapsed_time / 1024 # KB/s
print(f"\r下载进度: {progress:.2f}% ({downloaded}/{file_size} bytes) 速度: {speed:.2f} KB/s", end='')
print(f"\n文件下载完成!")
return True
except Exception as e:
print(f"\n下载失败: {e}")
return False
def main():
"""主函数示例"""
# 基本下载示例
url = "https://example.com/file.zip"
save_path = "downloads/example_file.zip"
# 方式1: 基本下载
success = download_file(url, save_path)
if success:
print("下载成功!")
else:
print("下载失败!")
# 方式2: 支持断点续传的下载
# success = download_file_with_resume(url, save_path)
if __name__ == "__main__":
main()