192 lines
4.4 KiB
Python
192 lines
4.4 KiB
Python
|
|
|
|
import exif
|
|
from exif import Image as ExifImage
|
|
|
|
|
|
from PIL import Image as PilImage
|
|
|
|
|
|
import os
|
|
import multiprocessing
|
|
import threading
|
|
|
|
|
|
class ApplyImageOrientation:
|
|
|
|
def __init__(self, input_folder, output_folder):
|
|
|
|
self.ensure_folder(output_folder)
|
|
|
|
assert os.path.isdir(input_folder), "Invalid input folder: %s" % input_folder
|
|
assert os.path.isdir(output_folder), "Invalid output folder: %s" % output_folder
|
|
|
|
self.__input_folder = os.path.abspath(input_folder)
|
|
self.__output_folder = os.path.abspath(output_folder)
|
|
|
|
self.__valid_image_extensions = [
|
|
"jpeg", "jpg",
|
|
"png",
|
|
"tiff"
|
|
]
|
|
|
|
def run(self, threads_count=None, jpeg=False):
|
|
|
|
images_paths = self.get_input_images()
|
|
|
|
lock = threading.RLock()
|
|
threads = []
|
|
if threads_count is None:
|
|
threads_count = multiprocessing.cpu_count()
|
|
|
|
print("Applying orientation using %s threads" % (threads_count,))
|
|
for i in range(threads_count):
|
|
t = threading.Thread(
|
|
target=self._apply_orientation_thread,
|
|
kwargs={
|
|
"lock": lock,
|
|
"images_paths": images_paths,
|
|
"jpeg": jpeg
|
|
}
|
|
)
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
except KeyboardInterrupt:
|
|
with RAIILock(lock):
|
|
print("Detected keyboard interrupt; Aborting!")
|
|
images_paths.clear()
|
|
|
|
def _apply_orientation_thread(self, lock: threading.RLock, images_paths: list, jpeg: bool):
|
|
|
|
while True:
|
|
|
|
with RAIILock(lock) as raii_lock:
|
|
|
|
if len(images_paths) == 0:
|
|
break
|
|
|
|
image_path = images_paths.pop()
|
|
common_path = os.path.commonpath([self.__input_folder, image_path])
|
|
print("Rotating: %s (%s remaining)" % (image_path[len(common_path) + 1:], len(images_paths)))
|
|
|
|
raii_lock.release()
|
|
|
|
self.apply_orientation(
|
|
image_path=image_path,
|
|
jpeg=jpeg
|
|
)
|
|
|
|
@staticmethod
|
|
def ensure_folder(folder_path):
|
|
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
|
|
def get_input_images(self):
|
|
|
|
return self.get_directory_images(self.__input_folder)
|
|
|
|
def get_directory_images(self, path):
|
|
|
|
images_files_paths = []
|
|
|
|
for current_dir, subdirs, files in os.walk(path):
|
|
|
|
for file_name in files:
|
|
|
|
file_path = os.path.join(current_dir, file_name)
|
|
file_basename, file_extension = os.path.splitext(file_path)
|
|
if file_extension:
|
|
file_extension = file_extension[1:]
|
|
file_extension = file_extension.lower()
|
|
if file_extension in self.__valid_image_extensions:
|
|
images_files_paths.append(file_path)
|
|
|
|
return images_files_paths
|
|
|
|
def apply_orientation(self, image_path, jpeg=False):
|
|
|
|
image_basename = os.path.basename(image_path)
|
|
|
|
with open(image_path, "rb") as f:
|
|
|
|
exif_image = ExifImage(f)
|
|
orientation = exif_image.orientation
|
|
|
|
pil_image = PilImage.open(image_path)
|
|
|
|
rotation_degrees = 0
|
|
if orientation == exif.Orientation.LEFT_BOTTOM:
|
|
rotation_degrees = 90
|
|
elif orientation == exif.Orientation.BOTTOM_RIGHT:
|
|
rotation_degrees = 180
|
|
elif orientation == exif.Orientation.RIGHT_TOP:
|
|
rotation_degrees = 270
|
|
|
|
if rotation_degrees != 0:
|
|
pil_image_rotated = pil_image.rotate(rotation_degrees, expand=True)
|
|
else:
|
|
pil_image_rotated = pil_image
|
|
|
|
# print(image_basename, pil_image.size, "==>", orientation, "==>", rotation_degrees)
|
|
|
|
image_rotated_file_path = self.make_image_output_path(image_path=image_path, jpeg=jpeg)
|
|
self.ensure_folder(os.path.dirname(image_rotated_file_path))
|
|
pil_image_rotated.save(image_rotated_file_path, None if jpeg else "png")
|
|
|
|
def get_image_relative_path(self, image_path):
|
|
|
|
image_path_relative = image_path[len(self.__input_folder):]
|
|
|
|
return image_path_relative
|
|
|
|
def make_image_output_path(self, image_path, jpeg=False):
|
|
|
|
image_path_relative = self.get_image_relative_path(image_path=image_path)
|
|
|
|
image_output_path = self.__output_folder + image_path_relative
|
|
|
|
if not jpeg:
|
|
path_name, extension = os.path.splitext(image_output_path)
|
|
image_output_path = path_name + ".png"
|
|
|
|
return image_output_path
|
|
|
|
|
|
class RAIILock:
|
|
|
|
def __init__(self, lock: threading.RLock):
|
|
|
|
self.__lock = lock
|
|
|
|
def __enter__(self):
|
|
|
|
self.acquire()
|
|
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
if self.__lock:
|
|
|
|
self.release()
|
|
self.__lock = None
|
|
|
|
def acquire(self):
|
|
|
|
if self.__lock:
|
|
|
|
self.__lock.acquire()
|
|
|
|
def release(self):
|
|
|
|
if self.__lock:
|
|
|
|
try:
|
|
self.__lock.release()
|
|
except RuntimeError:
|
|
pass
|