Covid Face Mask Webcam Classification : Custom CNN model with numpy(VGG6)
Covid Face Mask Webcam Classification : Custom CNN model with numpy(VGG6)
넘파이로 직접 구현해서 짜보는 마스크 분류기입니다. VGG net에 영감을 받아 커스터마이징한 VGG6 모델입니다.
전체 코드로 진행된 ipynb는 👉 Mask_Classification
Deep Learning이 어떤식으로 구현되는지 공부하기 위한 목표의 작품이였습니다. 개인적으로 진행하며 공부가 많이 되었지만, 직접적으로 numpy를 이용해 함수도 구현한 것을 사용하다보니 에러가 많이 나왔고, 해결하는데 굉장히 애를 먹었던 기억이 납니다 ㅠㅠ
또한 학습과정에서 학습이 제대로 되지 않는 문제를 발견하였고, 실제로 테스트를 해보니 마스크를 쓴 것과 쓰지 않은 것을 구분하지 못하는 결과를 맞이하였습니다…
예상하는 이유로는
- 학습데이터량 현저히 부족
- 데이터 라벨링을 원-핫 인코딩 형태로 진행했어야하는데 단순히 숫자로 매긴 점
으로 예측하고 있습니다. 시간이 되면 안되는 부분들을 찾아 제대로 작동하게 완성을 하고 싶네요 ㅜㅜ 일단 혹시라도 이 글을 참고하시는 분은 어떤식으로 CNN이 구현되는지 정도에 초점을 맞춰서 봐주시면 감사하겠습니다.
작품 발표날에는 급하게 torch를 사용한 버전으로 발표하였고, 이마저도 학습데이터가 부족해서인지 웹캠 코드를 처음 써봐서, 모델이 predict하는 시간보다 빠르게 웹캠 데이터가 들어가서인지 모르겠지만 정말 가끔가다 몇번씩만 작동되는 것을 확인하였습니다.
(발표날에 google이라 저장한 것을 사용한 것을 돌렸는지, torch라고 저장한 것을 사용한 것을 돌렸는지 잘 기억이 나지 않아서 두가지 코드를 모두 공유합니다. google이라 명시한 것은 급하기 구글링 한 것에서 따와서 그렇게 지었습니다ㅋㅋ)
필요한 모듈 import
import numpy as np
import torchvision
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import glob
import sys, os
sys.path.append(os.pardir) # 부모 디렉토리의 파일을 가져올 수 있도록 설정
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
from collections import OrderedDict
import pickle
import torch
%matplotlib inline
import cv2
from imutils.video import VideoStream
print('GPU 사용 가능 여부: {}'.format(torch.cuda.is_available()))
device = "cuda" if torch.cuda.is_available() else "CPU"
디렉토리 설정
from pathlib import Path
folder = "Anaconda/Baram"
project_dir = "MaskClassificaion"
base_path = Path("/Users/yuchul/")
project_path = base_path / folder / project_dir
os.chdir(project_path)
for x in list(project_path.glob("*")):
if x.is_dir():
dir_name = str(x.relative_to(project_path))
os.rename(dir_name, dir_name.split(" ", 1)[0])
print(f"현재 디렉토리 위치: {os.getcwd()}")
current_path = Path().absolute()
data_path = current_path / "data_old"
print("현재 디렉토리 위치: {}".format(current_path))
if (data_path / "mask_cnn").exists():
print("이미 'data/mask_cnn' 폴더가 있습니다! 이어서 진행하세요~")
else: print("없습니다")
data_dir = './data_old/mask_cnn'
print(data_path)
하이퍼파라미터 설정
batch_size = 32
num_epochs = 30
learning_rate = 0.0001
데이터 불러오기 및 라벨링
class MaskNonMaskDataset(Dataset):
def __init__(self, data_dir, mode=None, transform=None, train_size=None):
self.all_data = sorted(glob.glob(os.path.join(data_dir, mode, '*', '*')))
#RGB 가 아닌 파일필터링
# self.all_data = [i for i in self.old_all_data if transform(Image.open(i)).shape[0] == 3]
self.mode = mode
self.transform = transform
self.train_size = train_size
def __getitem__(self, index):
data_path = self.all_data[index]
img = Image.open(data_path)
if self.transform != None:
img = self.transform(img)
if os.path.basename(data_path).startswith("Mask"):
label = 0
else:
label = 1
return img, label # [1, 0, 0] , [0, 1, 0], [0, 0, 1]
def __len__(self):
length = len(self.all_data)
return length
데이터 전처리
data_transforms = {
'train' : transforms.Compose([
transforms.RandomRotation(5),
transforms.RandomHorizontalFlip(),
transforms.RandomResizedCrop(224, scale=(0.96, 1.0), ratio=(0.95, 1.05)),
transforms.ToTensor(),
# transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val' : transforms.Compose([
transforms.Resize([224, 224]),
transforms.ToTensor(),
# transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
}
# train_data = MaskNonMaskDataset(data_dir='./data/mask_cnn', mode='Train', transform=data_transforms['train'])
# val_data = MaskNonMaskDataset(data_dir='./data/mask_cnn', mode='Validation', transform=data_transforms['val'])
# test_data = MaskNonMaskDataset(data_dir='./data/mask_cnn', mode='Test', transform=data_transforms['val'])
# train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
# val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, drop_last=True)
# test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)
넘파이로 구현하며 진행하다보니, Normalize할 때 에러가 나서, 해결하기가 어려워서 주석처리하고 진행했었습니다! ㅠㅠ
train_data = torchvision.datasets.ImageFolder(root='./data_old/mask_cnn/Train', transform=data_transforms['train'])
val_data = torchvision.datasets.ImageFolder(root='./data_old/mask_cnn/Validation', transform=data_transforms['val'])
test_data = torchvision.datasets.ImageFolder(root='./data_old/mask_cnn/Test', transform=data_transforms['val'])
add_data = torchvision.datasets.ImageFolder(root='./data_old/mask_cnn/add_data/Train_', transform=None)
부족한 훈련데이터를 나중에 추가하였지만, 그래도 부족한 데이터 양이였던 것 같습니다.
add_data = torchvision.datasets.ImageFolder(root='./data_old/mask_cnn/add_data/Train', transform=data_transforms['train'])
print(len(train_data))
print(len(val_data))
print(len(test_data))
x_train = []
t_train = []
아마 ipynb 를 보시면 train 데이터 개수가 18238개로 나와있을겁니다. 이는 발표후에 추가로 넣어서 개수가 늘어난 것이지 실제 학습시켰을 당시에는 아마 600~1000개 정도 였던 것 같습니다! 2달 정도 지나고 정리하려니까 기억이 가물가물하네요 ㅠㅠ
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)
각종 필요 함수들 구현
ReLU 구현
class ReLU:
def __init__(self):
self.mask = None
def forward(self, x):
self.mask = (x <= 0)
out = x.copy()
out[self.mask] = 0
return out
def backward(self, dout):
dout[self.mask] = 0
dx = dout
return dx
grad 구하는 함수
def get_grad(f, x):
h = 1e-4 #0.0001
grad = np.zeros_like(x)
it = np.nditer(x, flag=['multi_index'], op_flags=['readwrite'])
while not it.finished:
idx = it.multi_index
tmp_val = x[idx]
x[idx] = float(tmp_val) + h
fxh1 = f(x) #f(x+h)
x[idx] = tmp_val - h
fxh2 = f(x) #f(x-h)
grad[idx] = (fxh1 - fxh2) / (2*h)
x[idx] = tmp_val # 값 복원
it.iternext()
return grad
Adam Optimizer 구현
class Adam:
def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.iter = 0
self.m = None
self.v = None
def update(self, params, grads):
if self.m is None:
self.m, self.v = {}, {}
for key, val in params.items():
self.m[key] = np.zeros_like(val)
self.v[key] = np.zeros_like(val)
self.iter += 1
lr_t = self.lr * np.sqrt(1.0 - self.beta2**self.iter) / (1.0 - self.beta1**self.iter)
for key in params.keys():
# print("계산 전 m[key]: ", self.m[key].shape)
# print("계산 전 grads[key]: ", grads[key].shape)
self.m[key] = self.beta1*self.m[key] + (1-self.beta1)*grads[key]
self.v[key] = self.beta2*self.v[key] + (1-self.beta2)*(grads[key]**2)
# print("key: ", key)
# print("m키값", self.m[key].shape)
# print("grads키값", grads[key].shape)
# print("v키값", self.v[key].shape)
# self.m[key] += (1 - self.beta1) * (grads[key] - self.m[key])
# self.v[key] += (1 - self.beta2) * (grads[key]**2 - self.v[key])
params[key] -= lr_t * self.m[key] / (np.sqrt(self.v[key]) + 1e-7)
# print("params키값", params[key].shape)
#unbias_m += (1 - self.beta1) * (grads[key] - self.m[key]) # correct bias
#unbisa_b += (1 - self.beta2) * (grads[key]*grads[key] - self.v[key]) # correct bias
#params[key] += self.lr * unbias_m / (np.sqrt(unbisa_b) + 1e-7)
중간 중간에 print는 디버깅할 때 사용했던 부분들이라 주석처리하였습니다.
FC_Layer 구현
class FC_Layer:
def __init__(self, W, b):
self.W = W
self.b = b
self.x = None
self.original_x_shape = None
#가중치와 편향 매개변수 미분
self.dW = None
self.db = None
def forward(self, x):
#텐서 대응
# print("FC입력", x.shape)
# print("가중치", self.W.shape)
self.original_x_shape = x.shape
x = x.reshape(x.shape[0], -1)
self.x = x
# print("FC reshape 입력", x.shape)
out = np.dot(self.x, self.W) + self.b
# print("FC출력", out.shape)
return out
def backward(self, dout):
# print("FC back입력", dout.shape)
dx = np.dot(dout, self.W.T) #.T는 Transpose
self.dW = np.dot(self.x.T, dout)
self.db = np.sum(dout, axis=0)
dx = dx.reshape(*self.original_x_shape) # 입력 데이터 모양 변경(텐서 대응)
# print("FC back출력", dx.shape)
return dx
Dropout 구현
class Dropout:
def __init__(self, dropout_ratio=0.5):
self.dropout_ratio = dropout_ratio
self.mask = None
def forward(self, x, train_flg=True):
if train_flg:
self.mask = np.random.rand(*x.shape) > self.dropout_ratio
# x모양에 맞춰서 랜덤한 행렬생성, 원소별 0~1 사이의 랜던한 값,
#그 값이 dropout_ratio보다 큰 값이 true, 작으면 false인 행렬을 mask에 저장
return x * self.mask
else:
return x * (1.0 - self.dropout_ratio)
def backward(self, dout):
return dout * self.mask
im2col & col2im
im2col & col2im 은 딥러닝 프레임워크로 간단히 컨볼루션연산을 하는 것이 아니라 numpy로 구현하기위해 평탄화하고 되돌리기위한 함수들입니다.
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
"""다수의 이미지를 입력받아 2차원 배열로 변환한다(평탄화).
Parameters
----------
input_data : 4차원 배열 형태의 입력 데이터(이미지 수, 채널 수, 높이, 너비)
filter_h : 필터의 높이
filter_w : 필터의 너비
stride : 스트라이드
pad : 패딩
Returns
-------
col : 2차원 배열
"""
N, C, H, W = input_data.shape
out_h = (H + 2*pad - filter_h)//stride + 1
out_w = (W + 2*pad - filter_w)//stride + 1
img = np.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))
for y in range(filter_h):
y_max = y + stride*out_h
for x in range(filter_w):
x_max = x + stride*out_w
col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
return col
def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
"""(im2col과 반대) 2차원 배열을 입력받아 다수의 이미지 묶음으로 변환한다.
Parameters
----------
col : 2차원 배열(입력 데이터)
input_shape : 원래 이미지 데이터의 형상(예:(10, 1, 28, 28))
filter_h : 필터의 높이
filter_w : 필터의 너비
stride : 스트라이드
pad : 패딩
Returns
-------
img : 변환된 이미지들
"""
# print(pad)
N, C, H, W = input_shape
out_h = (H + 2*pad - filter_h)//stride + 1
out_w = (W + 2*pad - filter_w)//stride + 1
col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)
img = np.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1))
for y in range(filter_h):
y_max = y + stride*out_h
for x in range(filter_w):
x_max = x + stride*out_w
img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
return img[:, :, pad:H + pad, pad:W + pad]
Shuffle Dataset
def shuffle_dataset(x, t):
"""데이터셋을 뒤섞는다.
Parameters
----------
x : 훈련 데이터
t : 정답 레이블
Returns
-------
x, t : 뒤섞은 훈련 데이터와 정답 레이블
"""
permutation = np.random.permutation(x.shape[0])
x = x[permutation,:] if x.ndim == 2 else x[permutation,:,:,:]
t = t[permutation]
return x, t
BatchNormalization 구현
class BatchNormalization:
def __init__(self, gamma, beta, momentum=0.9, running_mean=None, running_var=None):
self.gamma = gamma
self.beta = beta
self.momentum = momentum
self.input_shape = None # 합성곱 계층은 4차원, 완전연결 계층은 2차원
# 시험할 때 사용할 평균과 분산
self.running_mean = running_mean
self.running_var = running_var
# backward 시에 사용할 중간 데이터
self.batch_size = None
self.xc = None
self.std = None
self.dgamma = None
self.dbeta = None
def forward(self, x, train_flg=True):
# print("BN 입력", x.shape)
self.input_shape = x.shape
if x.ndim != 2:
N, C, H, W = x.shape
x = x.reshape(N, -1)
out = self.__forward(x, train_flg)
# print("BN 출력", out.reshape(*self.input_shape).shape)
return out.reshape(*self.input_shape)
def __forward(self, x, train_flg):
if self.running_mean is None:
N, D = x.shape
self.running_mean = np.zeros(D)
self.running_var = np.zeros(D)
if train_flg:
mu = x.mean(axis=0)
xc = x - mu
var = np.mean(xc**2, axis=0)
std = np.sqrt(var + 10e-7)
xn = xc / std
self.batch_size = x.shape[0]
self.xc = xc
self.xn = xn
self.std = std
self.running_mean = self.momentum * self.running_mean + (1-self.momentum) * mu
self.running_var = self.momentum * self.running_var + (1-self.momentum) * var
else:
xc = x - self.running_mean
xn = xc / ((np.sqrt(self.running_var + 10e-7)))
out = self.gamma * xn + self.beta
return out
def backward(self, dout):
# print('BN back입력', dout.shape)
if dout.ndim != 2:
N, C, H, W = dout.shape
dout = dout.reshape(N, -1)
dx = self.__backward(dout)
# print('BN back출력 reshape전', dx.shape)
dx = dx.reshape(*self.input_shape)
# print('BN back출력 reshape후', dx.shape)
return dx
def __backward(self, dout):
dbeta = dout.sum(axis=0)
dgamma = np.sum(self.xn * dout, axis=0)
dxn = self.gamma * dout
dxc = dxn / self.std
dstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0)
dvar = 0.5 * dstd / self.std
dxc += (2.0 / self.batch_size) * self.xc * dvar
dmu = np.sum(dxc, axis=0)
dx = dxc - dmu / self.batch_size
self.dgamma = dgamma
self.dbeta = dbeta
return dx
Convolution 구현
class Convolution:
def __init__(self, W, b, stride=None, pad=None):
self.W = W
self.b = b
self.stride = stride
self.pad = pad
# 중간 데이터(backward 시 사용)
self.x = None
self.col = None
self.col_W = None
# 가중치와 편향 매개변수의 기울기
self.dW = None
self.db = None
def forward(self, x):
# print("Con입력값=", x.shape)
# print("가중치", self.W.shape)
FN, C, FH, FW = self.W.shape
N, C, H, W = x.shape
out_h = 1 + int((H + 2*self.pad - FH) / self.stride)
out_w = 1 + int((W + 2*self.pad - FW) / self.stride)
# print("out_h=", out_h)
# print("out_w=", out_w)
col = im2col(x, FH, FW, self.stride, self.pad)
col_W = self.W.reshape(FN, -1).T
out = np.dot(col, col_W) + self.b
out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
self.x = x
self.col = col
self.col_W = col_W
# print("Con출력값", out.shape)
return out
def backward(self, dout):
# print("Con back입력", dout.shape)
FN, C, FH, FW = self.W.shape
dout = dout.transpose(0,2,3,1).reshape(-1, FN)
self.db = np.sum(dout, axis=0)
self.dW = np.dot(self.col.T, dout)
self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)
dcol = np.dot(dout, self.col_W.T)
dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
# print("Con back출력", dx.shape)
return dx
Pooling 구현 (MaxPooling)
class Pooling:
def __init__(self, pool_h, pool_w, stride, pad=0):
self.pool_h = pool_h
self.pool_w = pool_w
self.stride = stride
self.pad = pad
self.x = None
self.arg_max = None
def forward(self, x):
# print("Pool 입력", x.shape)
N, C, H, W = x.shape
out_h = int(1 + (H - self.pool_h) / self.stride)
out_w = int(1 + (W - self.pool_w) / self.stride)
col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
col = col.reshape(-1, self.pool_h*self.pool_w)
arg_max = np.argmax(col, axis=1)
out = np.max(col, axis=1)
out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)
self.x = x
self.arg_max = arg_max
# print("Pool 출력", out.shape)
return out
def backward(self, dout):
# print("Pool back입력", dout.shape)
dout = dout.transpose(0, 2, 3, 1)
pool_size = self.pool_h * self.pool_w
dmax = np.zeros((dout.size, pool_size))
dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
dmax = dmax.reshape(dout.shape + (pool_size,))
dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)
# print("Pool back출력", dx.shape)
return dx
Softmax 구현
def softmax(x):
if x.ndim == 2:
x = x.T
x = x - np.max(x, axis=0)
y = np.exp(x) / np.sum(np.exp(x), axis=0)
return y.T
x = x - np.max(x) # 오버플로 대책
return np.exp(x) / np.sum(np.exp(x))
Cross Entropy Loss 구현
def cross_entropy_error(y, t):
if y.ndim == 1:
t = t.reshape(1, t.size)
y = y.reshape(1, y.size)
# 훈련 데이터가 원-핫 벡터라면 정답 레이블의 인덱스로 반환
if t.size == y.size:
t = t.argmax(axis=1)
batch_size = y.shape[0]
return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size
위에 구현한 Softmax, CrossEntropyLoss로 Loss 구하기
class SoftmaxWithLoss:
def __init__(self):
self.loss = None #손실함수
self.y = None #softmax의 출력
self.t = None #정답 레이블(원-핫 인코딩 형태)
def forward(self, x, t):
# print("SoftmaxWithLoss의 t", t.shape)
self.t = t
self.y = softmax(x)
# print("Softmax(x) = y: ", self.y.shape)
self.loss = cross_entropy_error(self.y, self.t)
# print("크로스엔트로피에러함수", self.loss.shape)
# print("크로스엔트로피에러함수", self.loss)
return self.loss
def backward(self, dout=1):
batch_size = self.t.shape[0]
# print("SoftmaxWithLoss back의 batchsize", batch_size)
# print("SoftmaxWithLoss back의 y모양", self.y.shape)
# print("SoftmaxWithLoss back의 t모양", self.t.shape)
# print("SoftmaxWithLoss back의 y", self.y)
# print("SoftmaxWithLoss back의 t", self.t)
if self.t.size == self.y.size: #정답 레이블이 원-핫 인코딩 형태일 때
dx = (self.y - self.t) / batch_size
else:
dx = self.y.copy()
dx[np.arange(batch_size), self.t] -= 1
dx = dx / batch_size
# print("SoftmaxWithLoss back의 dx", dx.shape)
return dx
Network(custom model)
VGG6 구성은 다음과 같습니다. Conv-BN-Relu-Conv-BN-Relu-Pool 이 두번 반복됩니다. 그 후 FC_layer 2층을 통과하여 마스크 썼는지 안썼는지를 구분해야하므로 출력값은 2입니다.
# import numba
class VGG6:
def __init__(self, input_dim={3, 224, 224},
conv_param_1 = {'filter_num':16, 'filter_size':3, 'pad':1, 'stride':2},
conv_param_2 = {'filter_num':32, 'filter_size':3, 'pad':1, 'stride':1},
conv_param_3 = {'filter_num':32, 'filter_size':3, 'pad':1, 'stride':2},
conv_param_4 = {'filter_num':64, 'filter_size':3, 'pad':1, 'stride':2},
hidden_size=50, output_size=2):
self.first_flg = True
# ======= 가중치 초기화 =======
pre_node_nums = np.array([3*3*3, 16*3*3, 32*3*3, 32*3*3, 64*7*7, hidden_size])
weight_init_scales = np.sqrt(2.0 / pre_node_nums) # ReLU 사용할 때 권장 초깃값
self.params = {}
pre_channel_num = 3
for idx, conv_param in enumerate([conv_param_1, conv_param_2, conv_param_3, conv_param_4]):
self.params['W' + str(idx+1)] = weight_init_scales[idx] * np.random.randn(conv_param['filter_num'], pre_channel_num, conv_param['filter_size'], conv_param['filter_size'])
self.params['b' + str(idx+1)] = np.zeros(conv_param['filter_num'])
pre_channel_num = conv_param['filter_num']
self.params['gamma' + str(idx+1)] = 1.0
self.params['beta' + str(idx+1)] = 0.0
self.params['W5'] = weight_init_scales[4] * np.random.randn(64*7*7, hidden_size)
self.params['b5'] = np.zeros(hidden_size)
self.params['W6'] = weight_init_scales[5] * np.random.randn(hidden_size, output_size)
self.params['b6'] = np.zeros(output_size)
# ======= 계층 생성 =======
self.layers = []
self.layers.append(Convolution(self.params['W1'], self.params['b1'], conv_param_1['stride'], conv_param_1['pad'])) # 0
self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1'])) #1
self.layers.append(ReLU())
self.layers.append(Convolution(self.params['W2'], self.params['b2'], conv_param_2['stride'], conv_param_2['pad'])) # 3
self.layers.append(BatchNormalization(self.params['gamma2'], self.params['beta2'])) #4
self.layers.append(ReLU())
self.layers.append(Pooling(pool_h=2, pool_w=2, stride=2))
self.layers.append(Convolution(self.params['W3'], self.params['b3'], conv_param_3['stride'], conv_param_3['pad'])) # 7
self.layers.append(BatchNormalization(self.params['gamma3'], self.params['beta3'])) #8
self.layers.append(ReLU())
self.layers.append(Convolution(self.params['W4'], self.params['b4'], conv_param_4['stride'], conv_param_4['pad'])) # 10
self.layers.append(BatchNormalization(self.params['gamma4'], self.params['beta4']))#11
self.layers.append(ReLU())
self.layers.append(Pooling(pool_h=2, pool_w=2, stride=2))
self.layers.append(FC_Layer(self.params['W5'], self.params['b5'])) # 14
# self.layers.append(ReLU())
# self.layers.append(Dropout(0.5))
self.layers.append(FC_Layer(self.params['W6'], self.params['b6'])) # 17
# self.layers.append(ReLU())
# self.layers.append(Dropout(0.5))
# self.layers.append(SoftmaxWithLoss())
self.last_layer = SoftmaxWithLoss()
def predict(self, x, train_flg=False, first_flg=None):
for layer in self.layers:
if isinstance(layer, Dropout):
if self.first_flg and isinstance(layer, BatchNormalization):
x = layer.__forward(x, train_flg)
else:
x = layer.forward(x, train_flg)
else:
x = layer.forward(x)
return x
def loss(self, x, t):
y = self.predict(x, train_flg=True, first_flg=self.first_flg)
self.first_flg = False
return self.last_layer.forward(y, t)
def accuracy(self, x, t, batch_size=32):
if t.ndim != 1 : t = np.argmax(t, axis=1)
acc = 0.0
for i in range(int(x.shape[0] / batch_size)):
tx = x[i*batch_size:(i+1)*batch_size]
tt = t[i*batch_size:(i+1)*batch_size]
y = self.predict(tx, train_flg=False)
y = np.argmax(y, axis=1)
acc += np.sum(y == tt)
return acc / x.shape[0]
# def accuracy(self, inputs, labels):
# # 0, 1 -> [1, 0], [0, 1]
# for i in range()
# if t.ndim != 1 : t = np.argmax(t, axis=1)
# acc = 0.0
# for i in range(int(x.shape[0] / batch_size)):
# tx = x[i*batch_size:(i+1)*batch_size]
# tt = t[i*batch_size:(i+1)*batch_size]
# y = self.predict(tx, train_flg=False)
# y = np.argmax(y, axis=1)
# acc += np.sum(y == tt)
# return acc / x.shape[0]
def gradient(self, x, t):
#forward
self.loss(x, t)
#backward
dout = 1
dout = self.last_layer.backward(dout)
tmp_layers = self.layers.copy()
tmp_layers.reverse()
for layer in tmp_layers:
dout = layer.backward(dout)
#결과 저장
grads = {}
idx = 0
for i, layer in enumerate(self.layers):
if isinstance(layer, Convolution) or isinstance(layer, FC_Layer):
grads['W' + str(idx+1)] = self.layers[i].dW
grads['b' + str(idx+1)] = self.layers[i].db
if(i==14):
idx += 1
elif isinstance(layer, BatchNormalization):
grads['gamma'+str(idx+1)] = self.layers[i].dgamma
grads['beta'+str(idx+1)] = self.layers[i].dbeta
idx += 1
return grads
# for i, layer_idx in enumerate((0, 1, 3, 4, 7, 8, 10, 11, 14, 17)):
# # print("len(self.layers):",len(self.layers))
# grads['W' + str(i+1)] = self.layers[layer_idx].dW
# grads['b' + str(i+1)] = self.layers[layer_idx].db
# grads['gamma'+str(i+1)] = self.layers[layer_idx].dgamma
# grads['beta'+str(i+1)] = self.layers[layer_idx].dbeta
# return grads
def save_params(self, file_name='params.pkl'):
params = {}
for key, val in self.params.items():
params[key] = val
with open(file_name, 'wb') as f:
pickle.dump(params, f)
print("training network params:", self.params.keys())
def load_params(self, file_name='epoch.pkl'):
with open(file_name, 'rb') as f:
params = pickle.load(f)
for key, val in params.items():
self.params[key] = val
# for i, layer_idx in enumerate((0, 1, 3, 4, 7, 8, 10, 11, 14, 17)):
# self.layers[layer_idx].W = self.params['W' + str(i+1)]
# self.layers[layer_idx].b = self.params['b' + str(i+1)]
idx = 0
for i, layer in enumerate(self.layers):
if isinstance(layer, Convolution) or isinstance(layer, FC_Layer):
self.layers[i].W = self.params['W' + str(idx+1)]
self.layers[i].b = self.params['b' + str(idx+1)]
if(i==14):
idx += 1
elif isinstance(layer, BatchNormalization):
self.layers[i].gamma = self.params['gamma'+str(idx+1)]
self.layers[i].beta = self.params['beta'+str(idx+1)]
idx += 1
print("Params is successfully loaded!:", self.params.keys())
network = VGG6()
Epoch당 Loss 그래프 그리기
def graph(data_list, title, color, save_path):
batch_num_list = [i for i in range(0, len(data_list))]
plt.figure(figsize=(20, 10))
plt.rc('font', size=25)
plt.plot(batch_num_list, data_list, color=color, marker='o', linestyle='solid')
plt.title(title)
plt.xlabel('Epoch')
title = plt.ylabel(title)
plt.savefig(save_path, dpi=600)
# plt.show()
plt.close()
Train 할 수 있게 하는 함수
class Trainer:
"""신경망 훈련을 대신 해주는 클래스"""
def __init__(self, network, x_train_loader, x_test_loader,
epochs=30, mini_batch_size=32,
optimizer='adagrad', optimizer_param={'lr':0.0001},
evaluate_sample_num_per_epoch=None, verbose=True):
self.network = network
# print(f"Network Spec: {self.network.keys()}")
self.verbose = verbose
self.train_loader = x_train_loader
self.test_loader = x_test_loader
# print(x_train[0][0][0][0])
# self.x_train = x_train
# self.t_train = t_train
# self.x_test = x_test
# self.t_test = t_test
# self.epochs = epochs
self.batch_size = mini_batch_size
self.evaluate_sample_num_per_epoch = evaluate_sample_num_per_epoch
# optimzer
optimizer_class_dict = {'adam':Adam}
self.optimizer = optimizer_class_dict[optimizer.lower()](**optimizer_param)
self.train_size = len(x_train) # x_train.shape[0]
self.iter_per_epoch = max(self.train_size / mini_batch_size, 1)
self.epochs = epochs
self.current_iter = 0
self.current_epoch = 0
self.train_loss_list = []
self.train_acc_list = []
self.test_acc_list = []
self.train_mode = True
def train_step(self):
# batch_mask = np.random.choice(self.train_size, self.batch_size).tolist()
# x_batch = []
# t_batch = []
# for idx in batch_mask:
# x_batch.append(self.x_train[idx])
# t_batch.append(self.t_train[idx])
dataloader = self.train_loader if self.train_mode else self.test_loader
name = "train" if self.train_mode else "evaluate"
for x_batch, t_batch in dataloader:
# print(x_batch.shape)
grads = self.network.gradient(x_batch, t_batch)
# for key in grads.keys():
# print(key, ":", grads[key].shape)
self.optimizer.update(self.network.params, grads)
loss = self.network.loss(x_batch, t_batch)
if self.train_mode:
self.train_loss_list.append(loss)
if self.verbose: print(f"\t{name} loss: {loss}")
if self.current_iter % self.iter_per_epoch == 0:
self.current_epoch += 1
x_train_sample, t_train_sample = x_batch, t_batch
train_acc = self.network.accuracy(x_train_sample, t_train_sample)
if self.train_mode:
self.train_acc_list.append(train_acc)
else:
self.test_acc_list.append(test_acc)
if self.verbose: print("=== epoch:" + str(self.current_epoch) + ", train acc:" + str(train_acc) + ", test acc:" + str(test_acc) + " ===")
self.current_iter += 1
def train(self, current_epochs):
for epoch in range(self.epochs):
self.train_step()
self.network.save_params(file_name=f"epoch_{epoch+current_epochs+1}.pkl")
print(f"model({epoch+1}/{self.epochs}) is saved!")
graph(self.train_loss_list, 'loss', 'red', f"epoch_{epoch+current_epochs+1}")
def test(self, test_data_length = len(test_data)):
hits = 0.0
acc = 0.0
for imgs, labels in self.test_loader:
hits += self.network.accuracy(imgs, labels)
acc = hist/test_data_length
if self.verbose:
print("=============== Final Test Accuracy ===============")
print("test acc:" + str(test_acc))
trainer = Trainer(network, x_train_loader = train_loader, x_test_loader = test_loader,
epochs=15, mini_batch_size=32,
optimizer='Adam', optimizer_param={'lr':0.0001},
evaluate_sample_num_per_epoch=3)
# trainer.network.load_params(file_name='epoch_1.pkl')
trainer.train(0)
network.save_params("VGG6.pkl")
print("Saved Newwork Parameters!")
network.load_params('epoch_15.pkl')
img = cv2.imread('/Users/yuchul/Anaconda/Baram/MaskClassificaion/data_old/mask_cnn/Train/Non_Mask/586_1.jpg')
# img = cv2.imread('/Users/yuchul/Anaconda/Baram/MaskClassificaion/data_old/mask_cnn/Train/Mask/0_0.jpg')
img_resize = cv2.resize(img, dsize=(224, 224), interpolation=cv2.INTER_LINEAR)
fix_img = cv2.cvtColor(img_resize, cv2.COLOR_BGR2RGB)
fimg = fix_img.transpose(2,0,1)
fixed_img = np.expand_dims(fimg, axis=0)
print(fixed_img.shape)
# print(fixed_img.label)
imshow(fix_img)
network.predict(fixed_img)
이 셀의 결과도 마찬가지로 제가 테스트할 때, 어떠한 사진을 넣어도 ipynb 파일을 보시면 볼 수 있는 예측값 $array([[-0.03602253, 0.05814829]])$ 이 나와서 아 뭔가 단단히 잘못되었구나를 알 수 있었습니다 ㅎ…;
Webcam
labels_dict={0:'Mask',1:'NoMask'}
color_dict={1:(0,0,255),0:(0,255,0)}
size = 4
webcam = cv2.VideoCapture(0) #Use camera 0
# We load the xml file
classifier = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
while True:
(rval, im) = webcam.read()
im=cv2.flip(im,1,1) #Flip to act as a mirror
# Resize the image to speed up detection
mini = cv2.resize(im, (im.shape[1] // size, im.shape[0] // size))
# detect MultiScale / faces
faces = classifier.detectMultiScale(mini)
# Draw rectangles around each face
for f in faces:
(x, y, w, h) = [v * size for v in f] #Scale the shapesize backup
#Save just the rectangle faces in SubRecFaces
face_img = im[y:y+h, x:x+w]
resized=cv2.resize(face_img,(224,224))
normalized=resized/255.0
reshaped=np.reshape(normalized,(1,3,224,224))
# print(reshaped.shape)
# reshaped = np.vstack([reshaped])
# print(reshaped.shape)
result=network.predict(reshaped)
print(result)
label=np.argmax(result,axis=1)[0]
cv2.rectangle(im,(x,y),(x+w,y+h),color_dict[label],2)
cv2.rectangle(im,(x,y-40),(x+w,y),color_dict[label],-1)
cv2.putText(im, labels_dict[label], (x, y-10),cv2.FONT_HERSHEY_SIMPLEX,0.8,(255,255,255),2)
# Show the image
cv2.imshow('LIVE', im)
key = cv2.waitKey(100)
# if Esc key is press then break out of the loop
if key == 27: #The Esc key
break
webcam.release()
cv2.destroyAllWindows()
이렇게 구성해보았습니다! 개인적으로 아쉬움이 많이 남는 작품이라, 향후 틈틈히 디버깅을 해볼 생각입니다. 디버깅에 성공한다면 이 멘트 아래줄에 수정사항들을 적어보도록 하겠습니다!
끝!
Leave a comment