import matplotlib
matplotlib.use('Agg')
%pylab inline
import datetime as dt
from os import listdir
from os.path import join, basename
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.dataset import mnist
#from utils import get_mnist
from matplotlib.pyplot import imshow
import matplotlib.pyplot as plt
from pyspark import SparkContext
sc=SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[4]").set("spark.driver.memory","20g"))
from scipy import misc
init_engine()#build file structure and associated labels
def read_local_path(folder, has_label=True):
# read directory, create map of pics and labels
dirs = listdir(folder)
# create image path and label list
image_paths = []
if has_label:
dirs.sort()
for d in dirs:
for f in listdir(join(folder, d)):
image_paths.append((join(join(folder, d), f), dirs.index(d) + 1))
return image_paths
#building RDD from Traingdata set
def read_local_with_name(sc, folder, normalize=255.0, has_label=True):
# read directory, create image paths list
image_paths = read_local_path(folder, has_label)
image_paths_rdd = sc.parallelize(image_paths)
print(image_paths_rdd)
features_label_name_rdd = image_paths_rdd.map(lambda path_label: (misc.imread(path_label[0]), np.array(path_label[1]), basename(path_label[0]))) \
.map(lambda img_label_name:
(Resize(128, 128)(img_label_name[0]), img_label_name[1], img_label_name[2])) \
.map(lambda features_label_name:
(((features_label_name[0] & 0xff) / normalize).astype("float32"), features_label_name[1], features_label_name[2]))
return features_label_name_rdd
#build and test RDD dataset
imFolder = "/home/spark/dataset/ct_scans_Train"
localPath = read_local_path(imFolder)
print(len(localPath)) #print total number of elements
#test of random chosen path and associated label.
rndmItm = np.random.choice(range(len(localPath)))
print("PATH..... {} \nLABEL.... {}".format(localPath[rndmItm][0],localPath[rndmItm][1]))
Train_RDD = read_local_with_name(sc, imFolder, normalize=255.0, has_label=True)
#build file structure and associated labels
def read_local_path_T(folder_T, has_label=True):
# read directory, create map of pics and labels
dirs_T = listdir(folder_T)
# create image path and label list
image_paths_T = []
if has_label:
dirs_T.sort()
for d_T in dirs_T:
for f_T in listdir(join(folder_T, d_T)):
image_paths_T.append((join(join(folder_T, d_T), f_T), dirs_T.index(d_T) + 1))
return image_paths_T
#building RDD from Testing Dataset
def read_local_with_name_T(sc, folder_T, normalize=255.0, has_label=True):
# read directory, create image paths list
image_paths_T = read_local_path_T(folder_T, has_label)
image_paths_rdd_T = sc.parallelize(image_paths_T)
print(image_paths_rdd_T)
features_label_name_rdd_T = image_paths_rdd_T.map(lambda path_label_T: (misc.imread(path_label_T[0]), np.array(path_label_T[1]), basename(path_label_T[0]))) \
.map(lambda img_label_name_T:
(Resize(128, 128)(img_label_name_T[0]), img_label_name_T[1], img_label_name_T[2])) \
.map(lambda features_label_name_T:
(((features_label_name_T[0] & 0xff) / normalize).astype("float32"), features_label_name_T[1], features_label_name_T[2]))
return features_label_name_rdd_T
#build and test RDD dataset
imFolder_T = "/home/spark/dataset/ct_scans_Test/"
localPath_T = read_local_path_T(imFolder_T)
print(len(localPath_T)) #print total number of elements
#test of random chosen path and associated label.
rndmItm_T = np.random.choice(range(len(localPath_T)))
print("PATH..... {} \nLABEL.... {}".format(localPath_T[rndmItm_T][0],localPath_T[rndmItm_T][1]))
Test_RDD = read_local_with_name_T(sc, imFolder_T, normalize=255.0, has_label=True)
# Create a model
def build_model(class_num):
model = Sequential()
model.add(Reshape([1, 28, 28]))
model.add(SpatialConvolution(1, 32, 5, 2, 1).set_name('conv1'))
model.add(ReLU())
model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool1'))
model.add(SpatialConvolution(32, 64, 5, 1, 2).set_name('conv2'))
model.add(ReLU())
model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool2'))
model.add(SpatialConvolution(64, 128, 5, 1, 2).set_name('conv3'))
model.add(ReLU())
model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool3'))
model.add(SpatialConvolution(128, 256, 5, 1, 2).set_name('conv4'))
model.add(ReLU())
model.add(SpatialMaxPooling(2, 2, 2, 2).set_name('pool4'))
model.add(Reshape([7 * 7 * 256]))
model.add(Dropout(0.5))
model.add(Linear(7 * 7 * 256, 1000).set_name('fc1'))
model.add(ReLU())
model.add(Linear(1000, class_num).set_name('score'))
model.add(Sigmoid())
return model
model_Train = build_model(2)
optimizer = Optimizer(
model= model_Train,
training_rdd=Train_RDD,
criterion=BCECriterion(),
optim_method=Adam(learningrate=0.4,learningrate_decay=0.0, beta1=0.9, beta2=0.999, epsilon=1e-8, bigdl_type="float"),
end_trigger=MaxEpoch(7),
batch_size=2048)
# Set the validation logic
optimizer.set_validation(
batch_size=2048,
val_rdd=Test_RDD,
trigger=EveryEpoch(),
val_method=[Top1Accuracy()]
)
app_name='-cNNModel-'+dt.datetime.now().strftime("%Y%m%d-%H%M%S")
train_summary = TrainSummary(log_dir='/tmp/bigdl_summaries',
app_name=app_name)
train_summary.set_summary_trigger("Parameters", SeveralIteration(50))
val_summary = ValidationSummary(log_dir='/tmp/bigdl_summaries',
app_name=app_name)
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)
print("saving logs to ",app_name)
# Boot training process
trained_model = optimizer.optimize()
print("Optimization Done.")
This is the code I used.
Give me some suggestions on what am I doing wrong, how can I solve this problem, and also by using an Analytical zoo can I train a CNN model on my own dataset?
bests,
Vendaim