From 8d026f6a9eba0bc3905b945543f0e88a19c5f5cc Mon Sep 17 00:00:00 2001 From: Leonard Kugis Date: Fri, 31 Mar 2023 02:50:27 +0200 Subject: Predictor: Moved out of main script --- predictor.py | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 predictor.py (limited to 'predictor.py') diff --git a/predictor.py b/predictor.py new file mode 100644 index 0000000..8a886a7 --- /dev/null +++ b/predictor.py @@ -0,0 +1,104 @@ +import logging +import os +import cv2 +import numpy as np +from util import * + +class Predictor(object): + + class Backend(object): + + def __init__(self): + raise NotImplementedError() + + def predict(self, img, top=10): + raise NotImplementedError() + + class BackendTensorflow(Backend): + + MODEL_DIMENSIONS = 224 + + def __init__(self, top=10, detail=True, detail_factor=4): + logger = logging.getLogger(__name__) + logger.debug("Initializing Tensorflow/Keras backend ...") + from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions + from tensorflow.keras.preprocessing import image + from tensorflow.keras.models import Model + self.__model = ResNet50(weights="imagenet") + self.__top = top + self.__detail = detail + self.__detail_factor = detail_factor + + def __predict(self, img): + logger = logging.getLogger(__name__) + logger.debug("Predicting image part ...") + from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions + array = np.expand_dims(img, axis=0) + array = preprocess_input(array) + predictions = self.__model.predict(array) + classes = decode_predictions(predictions, top=self.__top) + logger.debug("Predicted raw image classes: {}".format(classes[0])) + return set([(name, prob) for _, name, prob in classes[0]]) + + def __predict_partial(self, tags, img, x, y, rot): + logger = logging.getLogger(__name__) + logger.debug("Predicting detail image at x={}, y={}, rot={}".format(x, y, rot)) + if rot is None: + tmp = img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)] + else: + tmp = cv2.rotate(img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)], rot) + tags.update(self.__predict(tmp)) + + def predict(self, img): + logger = logging.getLogger(__name__) + logger.debug("Predicting raw image ...") + ret = self.__predict(cv2.resize(img.copy(), dsize=(self.MODEL_DIMENSIONS, self.MODEL_DIMENSIONS), interpolation=cv2.INTER_AREA)) + + if self.__detail: + logger.debug("Predicting detail image ...") + tmp = set() + pool = ThreadPool(max(1, os.cpu_count() - 2), 10000) + if img.shape[0] > img.shape[1]: + detail = image_resize(img.copy(), height=(self.__detail_factor * self.MODEL_DIMENSIONS)) + else: + detail = image_resize(img.copy(), width=(self.__detail_factor * self.MODEL_DIMENSIONS)) + for x in range(0, detail.shape[0], int(self.MODEL_DIMENSIONS/2)): + for y in range(0, detail.shape[1], int(self.MODEL_DIMENSIONS/2)): + pool.add_task(self.__predict_partial, ret, detail, x, y, None) + pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_CLOCKWISE) + pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_180) + pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_COUNTERCLOCKWISE) + pool.wait_completion() + + ret = [tag[0] for tag in sorted(ret, key=lambda tag: tag[1], reverse=True)] + ret = set(list(dict.fromkeys(ret))[0:self.__top]) + return ret + + class BackendTorch(Backend): + + def __init__(self, top=10): + logger = logging.getLogger(__name__) + logger.debug("Initializing Torch backend ...") + import torch + from torchvision.models import resnet50, ResNet50_Weights + self.__weights = ResNet50_Weights.DEFAULT + self.__model = resnet50(weights=self.__weights) + self.__model.eval() + self.__preprocess = self.__weights.transforms() + self.__top = top + + def predict(self, img): + import torch + from PIL import Image + batch = self.__preprocess(Image.fromarray(img)).unsqueeze(0) + prediction = self.__model(batch).squeeze(0).softmax(0) + classes = torch.topk(prediction.flatten(), self.__top).indices + #return set([(weights.meta["categories"][clazz], prediction[clazz].item()) for clazz in classes]) + return set([self.__weights.meta["categories"][clazz] for clazz in classes]) + + def __init__(self, backend): + self.__backend = backend + + def predict(self, img): + return self.__backend.predict(img) + -- cgit v1.2.1