python操作minio

前言

python版本:3.7

minio版本:5.0.5

1
2
3
4
5
6
7
# 使用pip安装
pip install minio

# 使用源码安装
git clone https://github.com/minio/minio-py
cd minio-py
python setup.py install

参考文档:

快速入门:https://docs.min.io/cn/python-client-quickstart-guide.html

完整文档:https://docs.min.io/cn/python-client-api-reference.html

API示例:https://github.com/minio/minio-py/tree/master/examples

初始化MinIO Client

1
2
3
4
5
6
7
from minio import Minio
from minio.error import ResponseError

minioClient = Minio(endpoint='play.min.io',
access_key='Q3AM3UQ867SPQQA43P2F',
secret_key='zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG',
secure=True)

操作存储桶

创建一个存储桶

make_bucket(bucket_name, location=’us-east-1’) -> None

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try:
minioClient.make_bucket(bucket_name="mybucket", location="us-east-1")
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except (BucketAlreadyOwnedByYou, BucketAlreadyExists) as err:
pass
except InvalidBucketError as err:
print(err)
except Exception as err:
print(err)

列出所有的存储桶

list_buckets() -> list

1
2
3
4
5
6
7
8
9
10
11
12
try:
buckets = minioClient.list_buckets()
for bucket in buckets:
print(bucket.name, bucket.creation_date)
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except Exception as err:
print(err)

检查存储桶是否存在

bucket_exists(bucket_name) -> bool

1
2
3
4
5
6
7
8
9
10
11
12
13
try:
result = minioClient.bucket_exists(bucket_name="mybucket")
print(result)
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except InvalidBucketError as err:
print(err)
except Exception as err:
print(err)

删除存储桶

remove_bucket(bucket_name) -> None

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try:
minioClient.remove_bucket(bucket_name="mybucket")
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except InvalidBucketError as err:
print(err)
except NoSuchBucket as err:
print(err)
except Exception as err:
print(err)

列出存储桶中所有对象

list_objects(bucket_name, prefix=None, recursive=False) -> Iterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try:
objects = minioClient.list_objects("mybucket")
for obj in objects:
print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.is_dir,
obj.size, obj.etag, obj.last_modified, obj.content_type,
obj.metadata)
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except InvalidBucketError as err:
print(err)
except Exception as err:
print(err)

列出存储桶中未完整上传的对象

list_incomplete_uploads(bucket_name, prefix, recursive=False) -> Iterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try:
# List all object paths in bucket that begin with my-prefixname.
uploads = minioClient.list_incomplete_uploads('mybucket',
prefix='my-prefixname',
recursive=True)
for obj in uploads:
print(obj.bucket_name, obj.object_name, obj.upload_id, obj.size)
except MaxRetryError as err:
print("connect minio server fail, %s" % err)
raise
except ResponseError as err:
print("minio server response fail, %s" % err)
raise
except InvalidBucketError as err:
print(err)
except Exception as err:
print(err)

获取存储桶的当前策略

get_bucket_policy(bucket_name, prefix) -> Policy枚举

1
2
3
4
# Get current policy of all object paths in bucket that begin with my-prefixname.
policy = minioClient.get_bucket_policy('mybucket',
'my-prefixname')
print(policy)

给指定的存储桶设置存储桶策略

set_bucket_policy(bucket_name, prefix, policy) -> Policy枚举

1
2
3
4
# Set policy Policy.READ_ONLY to all object paths in bucket that begin with my-prefixname.
minioClient.set_bucket_policy('mybucket',
'my-prefixname',
Policy.READ_ONLY)

获取存储桶上的通知配置

get_bucket_notification(bucket_name) -> dict

1
2
3
4
# Get the notifications configuration for a bucket.
notification = minioClient.get_bucket_notification('mybucket')
# If no notification is present on the bucket:
# notification == {}

给存储桶设置通知配置

set_bucket_notification(bucket_name, notification) ->

删除存储桶上配置的所有通知

remove_all_bucket_notification(bucket_name)

监听存储桶上的通知,可以额外提供前缀、后缀和时间类型来进行过滤

listen_bucket_notification(bucket_name, prefix, suffix, events)

1
2
3
4
5
6
7
8
# Put a file with default content-type.
events = minioClient.listen_bucket_notification('my-bucket', 'my-prefix/',
'.my-suffix',
['s3:ObjectCreated:*',
's3:ObjectRemoved:*',
's3:ObjectAccessed:*'])
for event in events:
print event

操作对象

下载一个对象

get_object(bucket_name, object_name, request_headers=None)

1
2
3
4
5
6
7
8
# Get a full object.
try:
data = minioClient.get_object('mybucket', 'myobject')
with open('my-testfile', 'wb') as file_data:
for d in data.stream(32*1024):
file_data.write(d)
except ResponseError as err:
print(err)

下载一个对象的指定区间的字节数组

get_partial_object(bucket_name, object_name, offset=0, length=0, request_headers=None)

1
2
3
4
5
6
7
8
# Offset the download by 2 bytes and retrieve a total of 4 bytes.
try:
data = minioClient.get_partial_object('mybucket', 'myobject', 2, 4)
with open('my-testfile', 'wb') as file_data:
for d in data:
file_data.write(d)
except ResponseError as err:
print(err)

下载并将文件保存到本地

get_object(bucket_name, object_name, file_path, request_headers=None)

1
2
3
4
5
# Get a full object and prints the original object stat information.
try:
print(minioClient.fget_object('mybucket', 'myobject', '/tmp/myobject'))
except ResponseError as err:
print(err)

拷贝对象存储服务上的源对象到一个新对象

copy_object(bucket_name, object_name, object_source, copy_conditions=None, metadata=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
from datetime import datetime
from minio import CopyConditions

copy_conditions = CopyConditions()
# Set modified condition, copy object modified since 2014 April.
t = (2014, 4, 0, 0, 0, 0, 0, 0, 0)
mod_since = datetime.utcfromtimestamp(time.mktime(t))
copy_conditions.set_modified_since(mod_since)

# Set unmodified condition, copy object unmodified since 2014 April.
copy_conditions.set_unmodified_since(mod_since)

# Set matching ETag condition, copy object which matches the following ETag.
copy_conditions.set_match_etag("31624deb84149d2f8ef9c385918b653a")

# Set matching ETag except condition, copy object which does not match the following ETag.
copy_conditions.set_match_etag_except("31624deb84149d2f8ef9c385918b653a")

# Set metadata
metadata = {"test-key": "test-data"}

try:
copy_result = minioClient.copy_object("mybucket", "myobject",
"/my-sourcebucketname/my-sourceobjectname",
copy_conditions,metadata=metadata)
print(copy_result)
except ResponseError as err:
print(err)

添加一个新的对象到对象存储服务

put_object(bucket_name, object_name, data, length, content_type=’application/octet-stream’, metadata=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
# Put a file with default content-type, upon success prints the etag identifier computed by server.
try:
with open('my-testfile', 'rb') as file_data:
file_stat = os.stat('my-testfile')
print(minioClient.put_object('mybucket', 'myobject',
file_data, file_stat.st_size))
except ResponseError as err:
print(err)

# Put a file with 'application/csv'.
try:
with open('my-testfile.csv', 'rb') as file_data:
file_stat = os.stat('my-testfile.csv')
minioClient.put_object('mybucket', 'myobject.csv', file_data,
file_stat.st_size, content_type='application/csv')
except ResponseError as err:
print(err)

通过文件上传到对象中

fput_object(bucket_name, object_name, file_path, content_type=’application/octet-stream’, metadata=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Put an object 'myobject' with contents from '/tmp/otherobject', upon success prints the etag identifier computed by server.
try:
print(minioClient.fput_object('mybucket', 'myobject', '/tmp/otherobject'))
except ResponseError as err:
print(err)

# Put on object 'myobject.csv' with contents from
# '/tmp/otherobject.csv' as 'application/csv'.
try:
print(minioClient.fput_object('mybucket', 'myobject.csv',
'/tmp/otherobject.csv',
content_type='application/csv'))
except ResponseError as err:
print(err)

获取对象的元数据

stat_object(bucket_name, object_name)

1
2
3
4
5
# Fetch stats on your object.
try:
print(minioClient.stat_object('mybucket', 'myobject'))
except ResponseError as err:
print(err)

删除一个对象

remove_object(bucket_name, object_name)

1
2
3
4
5
# Remove an object.
try:
minioClient.remove_object('mybucket', 'myobject')
except ResponseError as err:
print(err)

删除存储桶中的多个对象

remove_objects(bucket_name, objects_iter)

1
2
3
4
5
6
7
8
9
# Remove multiple objects in a single library call.
try:
objects_to_delete = ['myobject-1', 'myobject-2', 'myobject-3']
# force evaluation of the remove_objects() call by iterating over
# the returned value.
for del_err in minioClient.remove_objects('mybucket', objects_to_delete):
print("Deletion Error: {}".format(del_err))
except ResponseError as err:
print(err)

删除一个未完整上传的对象

remove_incomplete_upload(bucket_name, object_name)

1
2
3
4
5
# Remove an partially uploaded object.
try:
minioClient.remove_incomplete_upload('mybucket', 'myobject')
except ResponseError as err:
print(err)

Presigned操作

生成一个用于HTTP GET操作的presigned URL

presigned_get_object(bucket_name, object_name, expiry=timedelta(days=7))

1
2
3
4
5
6
7
8
from datetime import timedelta

# presigned get object URL for object name, expires in 2 days.
try:
print(minioClient.presigned_get_object('mybucket', 'myobject', expires=timedelta(days=2)))
# Response error is still possible since internally presigned does get bucket location.
except ResponseError as err:
print(err)

生成一个用于HTTP PUT操作的presigned URL

presigned_put_object(bucket_name, object_name, expires=timedelta(days=7))

1
2
3
4
5
6
7
8
9
10
11
from datetime import timedelta

# presigned Put object URL for an object name, expires in 3 days.
try:
print(minioClient.presigned_put_object('mybucket',
'myobject',
expires=timedelta(days=3)))
# Response error is still possible since internally presigned does get
# bucket location.
except ResponseError as err:
print(err)

允许给POST操作的presigned URL设置策略条件

presigned_post_policy(PostPolicy)

-------------本文结束感谢您的阅读-------------