1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import logging
import os
import cv2
import numpy as np
from util import *
class Predictor(object):
class Backend(object):
def __init__(self):
raise NotImplementedError()
def predict(self, img, top=10):
raise NotImplementedError()
class BackendTensorflow(Backend):
MODEL_DIMENSIONS = 224
def __init__(self, top=10, detail=True, detail_factor=4):
logger = logging.getLogger(__name__)
logger.debug("Initializing Tensorflow/Keras backend ...")
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
self.__model = ResNet50(weights="imagenet")
self.__top = top
self.__detail = detail
self.__detail_factor = detail_factor
def __predict(self, img):
logger = logging.getLogger(__name__)
logger.debug("Predicting image part ...")
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
array = np.expand_dims(img, axis=0)
array = preprocess_input(array)
predictions = self.__model.predict(array)
classes = decode_predictions(predictions, top=self.__top)
logger.debug("Predicted raw image classes: {}".format(classes[0]))
return set([(name, prob) for _, name, prob in classes[0]])
def __predict_partial(self, tags, img, x, y, rot):
logger = logging.getLogger(__name__)
logger.debug("Predicting detail image at x={}, y={}, rot={}".format(x, y, rot))
if rot is None:
tmp = img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)]
else:
tmp = cv2.rotate(img[x:(x+self.MODEL_DIMENSIONS), y:(y+self.MODEL_DIMENSIONS)], rot)
tags.update(self.__predict(tmp))
def predict(self, img):
logger = logging.getLogger(__name__)
logger.debug("Predicting raw image ...")
ret = self.__predict(cv2.resize(img.copy(), dsize=(self.MODEL_DIMENSIONS, self.MODEL_DIMENSIONS), interpolation=cv2.INTER_AREA))
if self.__detail:
logger.debug("Predicting detail image ...")
tmp = set()
pool = ThreadPool(max(1, os.cpu_count() - 2), 10000)
if img.shape[0] > img.shape[1]:
detail = image_resize(img.copy(), height=(self.__detail_factor * self.MODEL_DIMENSIONS))
else:
detail = image_resize(img.copy(), width=(self.__detail_factor * self.MODEL_DIMENSIONS))
for x in range(0, detail.shape[0], int(self.MODEL_DIMENSIONS/2)):
for y in range(0, detail.shape[1], int(self.MODEL_DIMENSIONS/2)):
pool.add_task(self.__predict_partial, ret, detail, x, y, None)
pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_CLOCKWISE)
pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_180)
pool.add_task(self.__predict_partial, ret, detail, x, y, cv2.ROTATE_90_COUNTERCLOCKWISE)
pool.wait_completion()
ret = [tag[0] for tag in sorted(ret, key=lambda tag: tag[1], reverse=True)]
ret = set(list(dict.fromkeys(ret))[0:self.__top])
return ret
class BackendTorch(Backend):
def __init__(self, top=10):
logger = logging.getLogger(__name__)
logger.debug("Initializing Torch backend ...")
import torch
from torchvision.models import resnet50, ResNet50_Weights
self.__weights = ResNet50_Weights.DEFAULT
self.__model = resnet50(weights=self.__weights)
self.__model.eval()
self.__preprocess = self.__weights.transforms()
self.__top = top
def predict(self, img):
import torch
from PIL import Image
batch = self.__preprocess(Image.fromarray(img)).unsqueeze(0)
prediction = self.__model(batch).squeeze(0).softmax(0)
classes = torch.topk(prediction.flatten(), self.__top).indices
#return set([(weights.meta["categories"][clazz], prediction[clazz].item()) for clazz in classes])
return set([self.__weights.meta["categories"][clazz] for clazz in classes])
def __init__(self, backend):
self.__backend = backend
def predict(self, img):
return self.__backend.predict(img)
|