import oss2, logging import datetime from config import access_id as ACCESS_KEY_ID, access_key as ACCESS_KEY_SECRET, OSS_ENDPOINT, OSS_BUCKET logger = logging.getLogger(__name__) auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET) # 填写Bucket名称,CNAME是指将自定义域名绑定到存储空间。 bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET, is_cname=False) def upload_bytes_to_oss(bytes, oss_file_name): index = oss_file_name.rfind("/") file_name = oss_file_name[index+1:] content_disposition = 'attachment;filename="%s"' % file_name bucket.put_object(oss_file_name, bytes, headers={'Content-Disposition': content_disposition}) def upload_to_oss(local_file_path, oss_file_name): """ 上传本地文件到 OSS 文件, 并返回 一定周期内有效的 URL """ # 上传文件到 OSS index = oss_file_name.rfind("/") file_name = oss_file_name[index+1:] content_disposition = 'attachment;filename="%s"' % file_name with open(local_file_path, 'rb') as f: bucket.put_object(oss_file_name, f, headers={'Content-Disposition': content_disposition}) def sign_url(oss_file_name): # 生成带有有效期的 OSS URL, 生成一个60分钟后过期的下载链接: expiration_seconds = 60 * 60 url = bucket.sign_url('GET', oss_file_name, expiration_seconds) url = url.replace("http:", "https:") logger.info(f'Getting url for {oss_file_name}') return url # entry point if __name__ == '__main__': local_file_path, oss_file_name = "/data/Downloads/pt-freqDomain.ppt", "Downloads/pt-freqDomain.ppt" upload_to_oss(local_file_path, oss_file_name)