Source code for storj

# -*- coding: utf-8 -*-
"""Storj package."""

import io


from abc import ABCMeta
from hashlib import sha256

from ecdsa import SigningKey, SECP256k1

from .api import ecdsa_to_hex
from .configuration import read_config
from .http import Client
from .metadata import __version__
from .model import Bucket, File, Token


[docs]def get_client(): """Returns a pre-configured Storj HTTP client. Returns: (:py:class:`storj.http.Client`): Storj HTTP client. """ return Client(*read_config())
[docs]def generate_new_key_pair(): """ Generate a new key pair. Returns: tuple(:py:class:`ecdsa.keys.SigningKey`, :py:class:`ecdsa.keys.VerifyingKey`): key pair (private, public). """ private_key = SigningKey.generate( curve=SECP256k1, hashfunc=sha256, ) return private_key, private_key.get_verifying_key()
[docs]class BucketManager(ABCMeta): """Class to manage buckets.""" @staticmethod
[docs] def all(): buckets_json = get_client().bucket_list() return [Bucket(payload) for payload in buckets_json]
@staticmethod
[docs] def get(bucket_id): bucket_json = get_client().bucket_get(bucket_id=bucket_id) return Bucket(bucket_json)
@staticmethod
[docs] def create(name, storage_limit=None, transfer_limit=None): """Create bucket. Args: name (str):. storage_limit ():. transfer_limit ():. """ bucket_json = get_client().bucket_create( name=name, storage=storage_limit, transfer=transfer_limit, ) return Bucket(bucket_json)
@staticmethod
[docs] def delete(bucket_id): """Remove bucket. Args: bucket_id (int): bucket unique identifier. """ get_client().bucket_delete(bucket_id=bucket_id)
[docs]class BucketKeyManager: """ Attributes: bucket (): """ def __init__(self, bucket, authorized_public_keys): self.bucket = bucket self._authorized_public_keys = authorized_public_keys
[docs] def all(self): """""" return self._authorized_public_keys
[docs] def add(self, key): """""" if not isinstance(key, str): key = ecdsa_to_hex(key) self._authorized_public_keys.append(key) get_client().bucket_set_keys( bucket_id=self.bucket.id, keys=self._authorized_public_keys)
[docs] def clear(self): """""" self._authorized_public_keys = [] get_client().bucket_set_keys(bucket_id=self.bucket.id, keys=[])
[docs] def remove(self, key): """""" if not isinstance(key, str): key = ecdsa_to_hex(key) self._authorized_public_keys.remove(key) get_client().bucket_set_keys( bucket_id=self.bucket.id, keys=self._authorized_public_keys)
[docs]class FileManager: """""" def __init__(self, bucket_id): self.bucket_id = bucket_id def _upload(self, file, frame): """""" get_client().file_upload( bucket_id=self.bucket_id, file=file, frame=frame)
[docs] def all(self): """""" files_json = get_client().file_list(bucket_id=self.bucket_id) return [File(payload) for payload in files_json]
[docs] def delete(self, bucket_id, file_id): """""" get_client().file_remove(self, bucket_id, file_id)
[docs] def download(self, file_id): """""" get_client().file_download(self, bucket_id, file_hash)
[docs] def upload(self, file, frame): """""" # Support path strings as well as file-like objects if isinstance(file, str): with io.open(file, mode='rb') as file: self._upload(file, frame) else: self._upload(file, frame)
[docs]class TokenManager: """Bucket token manager. Attributes: bucket_id (int): bucket unique identifier. """ def __init__(self, bucket_id): self.bucket_id = bucket_id
[docs] def create(self, operation): """Creates a token. Args: operation (str): operation (PUSH or PULL). """ operation = operation.upper() assert(operation in ['PUSH', 'PULL']) token_json = get_client().token_create( bucket_id=self.bucket_id, operation=operation) return Token(token_json)
[docs]class UserKeyManager(ABCMeta): """""" @staticmethod
[docs] def all(): """""" keys_json = get_client().key_get() return [payload['key'] for payload in keys_json]
@staticmethod
[docs] def add(key): """""" if not isinstance(key, str): key = ecdsa_to_hex(key) get_client().key_register(key)
@staticmethod
[docs] def clear(): """""" for key in UserKeyManager.all(): UserKeyManager.remove(key)
@staticmethod
[docs] def remove(key): """""" if not isinstance(key, str): key = ecdsa_to_hex(key) get_client().key_delete(key)