Training multiple networks on a multi-GPU system using tensorflow and multiprocessing












0















I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.



I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.



I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.



This is my CNN code



import numpy as np
import os
import tensorflow as tf
import sys

class DNN():

def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir

if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)

self._image_shape = [300,300,3]

def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x

def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss

def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt

def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc

def _inp_fn(self,dataset,mode):

def input_parser(img_path, label):

# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])

# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot

dataset = tf.data.Dataset.from_tensor_slices(dataset)

if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)

elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)

iterator = dataset.make_one_shot_iterator()
return iterator.get_next()

def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)

elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)

elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)

def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data

run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)

if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())

gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']


The code for calling multiprocessing is as follows:



#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill

def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict

if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)


What is the best way to simultaneously train multiple networks?
What is the problem with multiprocessing that it shows random behaviour and how can it be solved?



PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?










share|improve this question



























    0















    I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.



    I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.



    I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.



    This is my CNN code



    import numpy as np
    import os
    import tensorflow as tf
    import sys

    class DNN():

    def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
    self._epochs = epochs
    self._batch_size = batch_size
    self._learning_rate = learning_rate
    self._data_dir = data_dir

    if verbose:
    tf.logging.set_verbosity(tf.logging.INFO)
    else:
    tf.logging.set_verbosity(tf.logging.ERROR)

    self._image_shape = [300,300,3]

    def _build_net(self,inp):
    with tf.name_scope('net'):
    x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
    x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
    x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
    x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
    x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
    x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
    x = tf.layers.flatten(x)
    x = tf.layers.dense(x, 50, name = 'dense1')
    x = tf.layers.dense(x, self.number_of_classes, name='output')
    return x

    def _loss_fn(self,logits,labels,mode):
    if mode == tf.estimator.ModeKeys.TRAIN:
    loss_name = 'train_loss'
    elif mode == tf.estimator.ModeKeys.EVAL:
    loss_name = 'eval_loss'
    with tf.name_scope(loss_name):
    loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
    return loss

    def _opt_fn(self,loss):
    with tf.name_scope('optimizer'):
    opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
    return opt

    def _acc(self,predictions,labels):
    with tf.name_scope('acc'):
    acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
    return acc

    def _inp_fn(self,dataset,mode):

    def input_parser(img_path, label):

    # convert the label to one-hot encoding
    one_hot = tf.one_hot(label, self.number_of_classes)
    one_hot = tf.reshape(one_hot,[self.number_of_classes])

    # read the img from file
    img_file = tf.read_file(img_path)
    img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
    img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
    img_decoded = tf.cast(img_decoded, tf.float32)
    img_decoded = tf.reshape(img_decoded,self._image_shape)
    return img_decoded, one_hot

    dataset = tf.data.Dataset.from_tensor_slices(dataset)

    if mode == tf.estimator.ModeKeys.TRAIN:
    dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
    dataset = dataset.prefetch(1)
    iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)

    elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
    dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
    dataset = dataset.prefetch(1)

    iterator = dataset.make_one_shot_iterator()
    return iterator.get_next()

    def _model_fn(self,features,labels,mode,params):
    logits = self._build_net(features)
    loss = self._loss_fn(logits,labels,mode)
    predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
    "probabilities": tf.nn.softmax(logits,name = "softmax")
    }
    acc_op = self._acc(predictions = predictions["classes"], labels = labels)
    if mode == tf.estimator.ModeKeys.TRAIN:
    opt = self._opt_fn(loss)
    tf.summary.scalar('train_accuracy', acc_op)
    return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)

    elif mode == tf.estimator.ModeKeys.PREDICT:
    return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)

    elif mode == tf.estimator.ModeKeys.EVAL:
    eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
    eval_metric_ops = {"val_accuracy": eval_acc}
    tf.summary.scalar('val_accuracy',acc_op)
    return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)

    def train(self, train_data, val_data, queue, dict):
    x_train, y_train = train_data
    x_eval, y_eval = val_data

    run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
    self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)

    if not os.path.isdir(self.clf.eval_dir()):
    os.makedirs(self.clf.eval_dir())

    gpu_id = queue.get()
    os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
    with tf.device('gpu:'+ gpu_id):
    out = tf.estimator.train_and_evaluate(
    self.clf,
    train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
    eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
    )
    print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
    queue.put(gpu_id)
    return out[0]['accuracy']


    The code for calling multiprocessing is as follows:



    #from multiprocessing import Pool, Manager
    from pathos.multiprocessing import ProcessPool as Pool
    from multiprocess import Manager
    import dill

    def train_pp(dnn, dict, man, pool):
    queue = man.Queue(dnn.NUM_GPUS)
    [queue.put(i) for i in range(dnn.NUM_GPUS)]
    all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
    #queue.join()
    pool.close()
    pool.join()
    pool.clear()
    pool.restart()
    return all_acc, dict

    if __name__ == "__main__":
    dnn = DNN()
    pool = Pool(processes=dnn.NUM_GPUS)
    man = Manager()
    dict = man.dict()
    net_acc, dict = train_pp(dnn,dict,man,pool)


    What is the best way to simultaneously train multiple networks?
    What is the problem with multiprocessing that it shows random behaviour and how can it be solved?



    PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?










    share|improve this question

























      0












      0








      0








      I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.



      I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.



      I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.



      This is my CNN code



      import numpy as np
      import os
      import tensorflow as tf
      import sys

      class DNN():

      def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
      self._epochs = epochs
      self._batch_size = batch_size
      self._learning_rate = learning_rate
      self._data_dir = data_dir

      if verbose:
      tf.logging.set_verbosity(tf.logging.INFO)
      else:
      tf.logging.set_verbosity(tf.logging.ERROR)

      self._image_shape = [300,300,3]

      def _build_net(self,inp):
      with tf.name_scope('net'):
      x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
      x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
      x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
      x = tf.layers.flatten(x)
      x = tf.layers.dense(x, 50, name = 'dense1')
      x = tf.layers.dense(x, self.number_of_classes, name='output')
      return x

      def _loss_fn(self,logits,labels,mode):
      if mode == tf.estimator.ModeKeys.TRAIN:
      loss_name = 'train_loss'
      elif mode == tf.estimator.ModeKeys.EVAL:
      loss_name = 'eval_loss'
      with tf.name_scope(loss_name):
      loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
      return loss

      def _opt_fn(self,loss):
      with tf.name_scope('optimizer'):
      opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
      return opt

      def _acc(self,predictions,labels):
      with tf.name_scope('acc'):
      acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
      return acc

      def _inp_fn(self,dataset,mode):

      def input_parser(img_path, label):

      # convert the label to one-hot encoding
      one_hot = tf.one_hot(label, self.number_of_classes)
      one_hot = tf.reshape(one_hot,[self.number_of_classes])

      # read the img from file
      img_file = tf.read_file(img_path)
      img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
      img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
      img_decoded = tf.cast(img_decoded, tf.float32)
      img_decoded = tf.reshape(img_decoded,self._image_shape)
      return img_decoded, one_hot

      dataset = tf.data.Dataset.from_tensor_slices(dataset)

      if mode == tf.estimator.ModeKeys.TRAIN:
      dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
      dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
      dataset = dataset.prefetch(1)
      iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)

      elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
      dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
      dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
      dataset = dataset.prefetch(1)

      iterator = dataset.make_one_shot_iterator()
      return iterator.get_next()

      def _model_fn(self,features,labels,mode,params):
      logits = self._build_net(features)
      loss = self._loss_fn(logits,labels,mode)
      predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
      "probabilities": tf.nn.softmax(logits,name = "softmax")
      }
      acc_op = self._acc(predictions = predictions["classes"], labels = labels)
      if mode == tf.estimator.ModeKeys.TRAIN:
      opt = self._opt_fn(loss)
      tf.summary.scalar('train_accuracy', acc_op)
      return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)

      elif mode == tf.estimator.ModeKeys.PREDICT:
      return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)

      elif mode == tf.estimator.ModeKeys.EVAL:
      eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
      eval_metric_ops = {"val_accuracy": eval_acc}
      tf.summary.scalar('val_accuracy',acc_op)
      return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)

      def train(self, train_data, val_data, queue, dict):
      x_train, y_train = train_data
      x_eval, y_eval = val_data

      run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
      self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)

      if not os.path.isdir(self.clf.eval_dir()):
      os.makedirs(self.clf.eval_dir())

      gpu_id = queue.get()
      os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
      with tf.device('gpu:'+ gpu_id):
      out = tf.estimator.train_and_evaluate(
      self.clf,
      train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
      eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
      )
      print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
      queue.put(gpu_id)
      return out[0]['accuracy']


      The code for calling multiprocessing is as follows:



      #from multiprocessing import Pool, Manager
      from pathos.multiprocessing import ProcessPool as Pool
      from multiprocess import Manager
      import dill

      def train_pp(dnn, dict, man, pool):
      queue = man.Queue(dnn.NUM_GPUS)
      [queue.put(i) for i in range(dnn.NUM_GPUS)]
      all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
      #queue.join()
      pool.close()
      pool.join()
      pool.clear()
      pool.restart()
      return all_acc, dict

      if __name__ == "__main__":
      dnn = DNN()
      pool = Pool(processes=dnn.NUM_GPUS)
      man = Manager()
      dict = man.dict()
      net_acc, dict = train_pp(dnn,dict,man,pool)


      What is the best way to simultaneously train multiple networks?
      What is the problem with multiprocessing that it shows random behaviour and how can it be solved?



      PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?










      share|improve this question














      I am trying to train multiple CNNs on separate GPUs, like one net per GPU. As I want all the networks to train simultaneously, I am using multiprocessing to start the training at the same time.



      I have been able to run it with the below given codes, however, the behaviour of the program is not deterministic. Sometimes, it throws errors 'can't pickle SwigpyObject' or 'can't pickle _thread.rlock object'. Also, somtimes it would get stuck on queue.get() or at self.map_async(func, iterable, chunksize).get(). I looked it up and found people suggesting to use pathos. I tried that but even then the behaviour is not repeatable.



      I use tensorflow's dataset and estimator APIs. Also, I use Queue and Dict from multiprocessing manager (also tried multiprocess) to keep a track of available GPU and they are shared between processes.



      This is my CNN code



      import numpy as np
      import os
      import tensorflow as tf
      import sys

      class DNN():

      def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
      self._epochs = epochs
      self._batch_size = batch_size
      self._learning_rate = learning_rate
      self._data_dir = data_dir

      if verbose:
      tf.logging.set_verbosity(tf.logging.INFO)
      else:
      tf.logging.set_verbosity(tf.logging.ERROR)

      self._image_shape = [300,300,3]

      def _build_net(self,inp):
      with tf.name_scope('net'):
      x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
      x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
      x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
      x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
      x = tf.layers.flatten(x)
      x = tf.layers.dense(x, 50, name = 'dense1')
      x = tf.layers.dense(x, self.number_of_classes, name='output')
      return x

      def _loss_fn(self,logits,labels,mode):
      if mode == tf.estimator.ModeKeys.TRAIN:
      loss_name = 'train_loss'
      elif mode == tf.estimator.ModeKeys.EVAL:
      loss_name = 'eval_loss'
      with tf.name_scope(loss_name):
      loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
      return loss

      def _opt_fn(self,loss):
      with tf.name_scope('optimizer'):
      opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
      return opt

      def _acc(self,predictions,labels):
      with tf.name_scope('acc'):
      acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
      return acc

      def _inp_fn(self,dataset,mode):

      def input_parser(img_path, label):

      # convert the label to one-hot encoding
      one_hot = tf.one_hot(label, self.number_of_classes)
      one_hot = tf.reshape(one_hot,[self.number_of_classes])

      # read the img from file
      img_file = tf.read_file(img_path)
      img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
      img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
      img_decoded = tf.cast(img_decoded, tf.float32)
      img_decoded = tf.reshape(img_decoded,self._image_shape)
      return img_decoded, one_hot

      dataset = tf.data.Dataset.from_tensor_slices(dataset)

      if mode == tf.estimator.ModeKeys.TRAIN:
      dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
      dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
      dataset = dataset.prefetch(1)
      iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)

      elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
      dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
      dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
      dataset = dataset.prefetch(1)

      iterator = dataset.make_one_shot_iterator()
      return iterator.get_next()

      def _model_fn(self,features,labels,mode,params):
      logits = self._build_net(features)
      loss = self._loss_fn(logits,labels,mode)
      predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
      "probabilities": tf.nn.softmax(logits,name = "softmax")
      }
      acc_op = self._acc(predictions = predictions["classes"], labels = labels)
      if mode == tf.estimator.ModeKeys.TRAIN:
      opt = self._opt_fn(loss)
      tf.summary.scalar('train_accuracy', acc_op)
      return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)

      elif mode == tf.estimator.ModeKeys.PREDICT:
      return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)

      elif mode == tf.estimator.ModeKeys.EVAL:
      eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
      eval_metric_ops = {"val_accuracy": eval_acc}
      tf.summary.scalar('val_accuracy',acc_op)
      return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)

      def train(self, train_data, val_data, queue, dict):
      x_train, y_train = train_data
      x_eval, y_eval = val_data

      run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
      self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)

      if not os.path.isdir(self.clf.eval_dir()):
      os.makedirs(self.clf.eval_dir())

      gpu_id = queue.get()
      os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
      with tf.device('gpu:'+ gpu_id):
      out = tf.estimator.train_and_evaluate(
      self.clf,
      train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
      eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
      )
      print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
      queue.put(gpu_id)
      return out[0]['accuracy']


      The code for calling multiprocessing is as follows:



      #from multiprocessing import Pool, Manager
      from pathos.multiprocessing import ProcessPool as Pool
      from multiprocess import Manager
      import dill

      def train_pp(dnn, dict, man, pool):
      queue = man.Queue(dnn.NUM_GPUS)
      [queue.put(i) for i in range(dnn.NUM_GPUS)]
      all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
      #queue.join()
      pool.close()
      pool.join()
      pool.clear()
      pool.restart()
      return all_acc, dict

      if __name__ == "__main__":
      dnn = DNN()
      pool = Pool(processes=dnn.NUM_GPUS)
      man = Manager()
      dict = man.dict()
      net_acc, dict = train_pp(dnn,dict,man,pool)


      What is the best way to simultaneously train multiple networks?
      What is the problem with multiprocessing that it shows random behaviour and how can it be solved?



      PS: An extra, kind of off the topic question: how should the data sharing be done as all the networks are being trained on same data? Currently, I believe multiprocessing is making multiple copies of the DNN code and so each GPU has its own iterator. Is it the best way? How to have data on CPU and shared with all the processes without interlocks?







      python tensorflow multiprocessing python-multiprocessing pathos






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 26 '18 at 3:26









      AnandAnand

      12




      12
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474419%2ftraining-multiple-networks-on-a-multi-gpu-system-using-tensorflow-and-multiproce%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53474419%2ftraining-multiple-networks-on-a-multi-gpu-system-using-tensorflow-and-multiproce%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Ottavio Pratesi

          Tricia Helfer

          15 giugno