Source code for mlapi.images
import io
import os
import uuid
from mimetypes import guess_extension, guess_type
import json
import logging
import base64
from werkzeug.datastructures import FileStorage
from mlapi.shellColors import ShellColors
[docs]class ImageStorage(object):
'''Contains methods for on-server images processing.
Attributes
----------
_storage_path: str
Path to a folder where the iages will be saved.
_uuidgen: uuid.uuid4, any function returning str
Function returning unique strings. \`uuid.uuid4\` by default
fopen: io.open, function opening files
Function that opens files. \`io.open\` by default
Methods
-------
save
'''
_CHUNK_SIZE_BYTES = 4096
DBGT = ShellColors.YELLOW + "[ImageStorage] " + ShellColors.ENDC
# Note the use of dependency injection for standard library
# methods. We'll use these later to avoid monkey-patching.
def __init__(self, storage_path, uuidgen=uuid.uuid4, fopen=io.open):
self._storage_path = storage_path
self._uuidgen = uuidgen
self._fopen = fopen
[docs] def save(self, image_stream, image_content_type):
'''Saves given image stream. Sets the extension basing on Content-Type.
Parameters
----------
image_stream: str
Stream of bytes, Base64 coded image, FileStorage
image_content_type: str
Type of image sent e.g "image/jpeg", "application/json" etc.
'''
if image_stream:
logging.info(self.DBGT + "Loading {} type image".format(image_content_type))
try:
ext = guess_extension(image_content_type)
except:
logging.error("There was a problem while guessing image's extension.")
if image_content_type == "application/json" or image_content_type == "application/x-www-form-urlencoded":
base64_split = image_stream['image'].split(',')
logging.debug("Got header: {}".format(base64_split[0]))
# Splitting for getting string in "image/jpeg" format.
ext = guess_extension(base64_split[0].split(';')[0].split(":")[1])
if ext==".jpe":
ext=".jpg"
name = '{uuid}{ext}'.format(uuid=self._uuidgen(), ext=ext)
image_path = os.path.join(self._storage_path, name)
logging.info(self.DBGT + "Image's extension: {}".format(ext))
with self._fopen(image_path, 'wb') as image_file:
if image_content_type == "application/json" or image_content_type == "application/x-www-form-urlencoded":
try:
### Decoding part without the "Data:image/jpeg;base64," header in the beginning.
b = base64.b64decode(base64_split[1])
# logging.debug(self.DBGT + "Decoded image: {}".format(b))
except:
logging.error(self.DBGT + "Error while reading Base64 image from client.")
image_file.write(b)
else:
if type(image_stream) == FileStorage:
image_stream = image_stream.stream
while True:
chunk = image_stream.read(self._CHUNK_SIZE_BYTES)
if not chunk:
break
image_file.write(chunk)
logging.debug("Saved file to {}".format(image_path))
return image_path
else:
logging.error("The posted file is empty")