1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import oss2, logging
- import datetime
- from config import access_id as ACCESS_KEY_ID, access_key as ACCESS_KEY_SECRET, OSS_ENDPOINT, OSS_BUCKET
- logger = logging.getLogger(__name__)
- auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
- # 填写Bucket名称,CNAME是指将自定义域名绑定到存储空间。
- bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET, is_cname=False)
- def upload_bytes_to_oss(bytes, oss_file_name):
- index = oss_file_name.rfind("/")
- file_name = oss_file_name[index+1:]
- content_disposition = 'attachment;filename="%s"' % file_name
-
- bucket.put_object(oss_file_name, bytes, headers={'Content-Disposition': content_disposition})
- def upload_to_oss(local_file_path, oss_file_name):
- """
- 上传本地文件到 OSS 文件, 并返回 一定周期内有效的 URL
- """
- # 上传文件到 OSS
- index = oss_file_name.rfind("/")
- file_name = oss_file_name[index+1:]
- content_disposition = 'attachment;filename="%s"' % file_name
-
- with open(local_file_path, 'rb') as f:
- bucket.put_object(oss_file_name, f, headers={'Content-Disposition': content_disposition})
- def sign_url(oss_file_name):
- # 生成带有有效期的 OSS URL, 生成一个60分钟后过期的下载链接:
- expiration_seconds = 60 * 60
- url = bucket.sign_url('GET', oss_file_name, expiration_seconds)
- url = url.replace("http:", "https:")
- logger.info(f'Getting url for {oss_file_name}')
- return url
- # entry point
- if __name__ == '__main__':
- local_file_path, oss_file_name = "/data/Downloads/pt-freqDomain.ppt", "Downloads/pt-freqDomain.ppt"
- upload_to_oss(local_file_path, oss_file_name)
|