Training multiple networks on a multi-GPU system using tensorflow and multiprocessing
I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.
I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.
I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.
This is my CNN code
import numpy as np
import os
import tensorflow as tf
import sys
class DNN():
def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir
if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)
self._image_shape = [300,300,3]
def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x
def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss
def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt
def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc
def _inp_fn(self,dataset,mode):
def input_parser(img_path, label):
# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])
# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot
dataset = tf.data.Dataset.from_tensor_slices(dataset)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)
elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)
elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)
def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data
run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)
if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())
gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']
The code for calling multiprocessing is as follows:
#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill
def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict
if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)
What is the best way to simultaneously train multiple networks?
What is the problem with multiprocessing that it shows random behaviour and how can it be solved?
PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?
python tensorflow multiprocessing python-multiprocessing pathos
add a comment |
I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.
I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.
I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.
This is my CNN code
import numpy as np
import os
import tensorflow as tf
import sys
class DNN():
def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir
if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)
self._image_shape = [300,300,3]
def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x
def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss
def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt
def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc
def _inp_fn(self,dataset,mode):
def input_parser(img_path, label):
# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])
# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot
dataset = tf.data.Dataset.from_tensor_slices(dataset)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)
elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)
elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)
def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data
run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)
if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())
gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']
The code for calling multiprocessing is as follows:
#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill
def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict
if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)
What is the best way to simultaneously train multiple networks?
What is the problem with multiprocessing that it shows random behaviour and how can it be solved?
PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?
python tensorflow multiprocessing python-multiprocessing pathos
add a comment |
I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.
I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.
I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.
This is my CNN code
import numpy as np
import os
import tensorflow as tf
import sys
class DNN():
def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir
if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)
self._image_shape = [300,300,3]
def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x
def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss
def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt
def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc
def _inp_fn(self,dataset,mode):
def input_parser(img_path, label):
# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])
# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot
dataset = tf.data.Dataset.from_tensor_slices(dataset)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)
elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)
elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)
def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data
run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)
if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())
gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']
The code for calling multiprocessing is as follows:
#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill
def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict
if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)
What is the best way to simultaneously train multiple networks?
What is the problem with multiprocessing that it shows random behaviour and how can it be solved?
PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?
python tensorflow multiprocessing python-multiprocessing pathos
I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.
I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.
I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.
This is my CNN code
import numpy as np
import os
import tensorflow as tf
import sys
class DNN():
def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir
if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)
self._image_shape = [300,300,3]
def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x
def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss
def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt
def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc
def _inp_fn(self,dataset,mode):
def input_parser(img_path, label):
# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])
# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot
dataset = tf.data.Dataset.from_tensor_slices(dataset)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)
elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)
elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)
def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data
run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)
if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())
gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']
The code for calling multiprocessing is as follows:
#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill
def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict
if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)
What is the best way to simultaneously train multiple networks?
What is the problem with multiprocessing that it shows random behaviour and how can it be solved?
PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?
python tensorflow multiprocessing python-multiprocessing pathos
python tensorflow multiprocessing python-multiprocessing pathos
asked Nov 26 '18 at 3:26
AnandAnand
12
12
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474419%2ftraining-multiple-networks-on-a-multi-gpu-system-using-tensorflow-and-multiproce%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474419%2ftraining-multiple-networks-on-a-multi-gpu-system-using-tensorflow-and-multiproce%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown