I am attempting to make Keras models that use Bert is a component of the overall model architecture, and although my models run fine on GPU, I run into errors on TPU. I am running into
>NotImplementedError: TPUStrategy.run(fn, ...) does not support pure eager execution. please make sure the function passed into `strategy.run` is a `tf.function` or `strategy.run` is called inside a `tf.function` if eager behavior is enabled.
Even though I don't believe I have any Python functions in my code.
Here is my best attempt to minimize an example, with the code in a colab notebook
And here is the code pasted below
```
%tensorflow_version 2.x
!pip install transformers --q
!gcloud auth login
'''NEED TO RUN THIS CELL TWICE TO AVOID ERROR'''
from google.colab import auth
auth.authenticate_user()
project_id = 'machinelearning-264918'
!gcloud config set project {project_id}
!pip install tfa-nightly
import tensorflow_addons as tfa
from transformers import TFBertModel, AutoModel, TFRobertaModel
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import (Dense,
Dropout)
import tensorflow_addons as tfa
import numpy as np
import os
from copy import deepcopy
from time import time
logger = tf.get_logger()
autotune = tf.data.experimental.AUTOTUNE
try:
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)
print('strategy.num_replicas_in_sync', strategy.num_replicas_in_sync)
logger.info('Running with TPUStrategy on TPU {} with {} cores '
.format(tpu.cluster_spec().as_dict()['worker'],
strategy.num_replicas_in_sync))
batch_size = 16 * strategy.num_replicas_in_sync
except Exception:
# raise ValueError
strategy = tf.distribute.OneDeviceStrategy(device='/gpu:0')
logger.warning('Failed initializing TPU! Running on GPU')
batch_size = 16
class Dora_A(tf.keras.Model):
def __init__(self, **kwargs):
super(Dora_A, self).__init__(**kwargs)
self.bioRoberta = TFRobertaModel.from_pretrained('allenai/biomed_roberta_base', from_pt=True)
def call(self, inputIds):
queryInputs, passageInputs = inputIds
Q_outputs = self.bioRoberta(queryInputs)[0]
P_outputs = self.bioRoberta(passageInputs)[0]
dotProductMatrix = tf.linalg.matmul(Q_outputs, P_outputs, transpose_b=True, name='mm')
return dotProductMatrix
@tf.function
def loss_fn(_, probs):
'''
1. Every sample is its own positive, and the rest of the
elements in the batch are its negative.
2. Each TPU core gets 1/8 * global_batch_size elements, hence
compute shape dynamically.
3. Dataset produces dummy labels to make sure the loss_fn matches
the loss signature of keras, actual labels are computed inside this
function.
4. Inputs are logits, for better numerical stability.
'''
bs = tf.shape(probs)[0]
labels = tf.eye(bs, bs)
return tf.losses.categorical_crossentropy(labels,
probs,
from_logits=True)
CLS_inputID = tf.constant([0])
SEP_inputID = tf.constant([2])
def _parse_example(example_proto):
features = {
'bioRoberta_SentenceIndex': tf.io.VarLenFeature( dtype=tf.int64),
'BioRoberta_IDs': tf.io.VarLenFeature( dtype=tf.int64),
}
parsed_example_dict = tf.io.parse_single_example(example_proto, features)
bertIds = parsed_example_dict['BioRoberta_IDs']
bertIds = tf.sparse.to_dense(bertIds)
bertIds = tf.cast(bertIds, dtype=tf.int32)
queryPiece = tf.slice(bertIds, [0], [510])
restPassagePiece = tf.slice(bertIds, [0], [510])
# add special tokens for proper input into the model
queryBertInput = tf.concat( [CLS_inputID, queryPiece, SEP_inputID], axis=0)
paragraphBertInput = tf.concat( [CLS_inputID, restPassagePiece, SEP_inputID], axis=0)
return queryBertInput, paragraphBertInput
config_name = 'model_a'
base_dir = 'gs://a-dora-semantic-scholar'
model_dir = os.path.join(base_dir, config_name)
tensorboard_dir = os.path.join(model_dir, 'logs_' + str(time()))
tfrecords_pattern_train = os.path.join(base_dir, 'VersionA_00022*')
tfrecords_pattern_val = os.path.join(base_dir, 'VersionA_00022*')
if 'COLAB_TPU_ADDR' in os.environ:
print('Setting tf.data objects')
with strategy.scope():
filenames = tf.io.gfile.glob(tfrecords_pattern_train)
train_dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=autotune)
train_dataset = train_dataset.map(
_parse_example, num_parallel_calls=autotune)
train_dataset = train_dataset.shuffle(130_000, seed=1000, reshuffle_each_iteration=True)
train_dataset = train_dataset.padded_batch(batch_size, padding_values=(1, 1))
train_dataset = train_dataset.prefetch(autotune)
train_dataset = train_dataset.apply(tf.data.experimental.ignore_errors())
with strategy.scope():
model = Dora_A(dynamic=True)
model.layers[0].trainable = False
model.compile(loss=loss_fn,
optimizer=tfa.optimizers.AdamW(weight_decay=1e-5,
learning_rate=1e-5,
epsilon=1e-06))
model.fit(train_dataset)
```
And here's a Google Drive link to the sample data file
This is the full error output
```
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<ipython-input-12-50bee5f74f82> in <module>()
----> 1 model.fit(train_dataset)
4 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in _method_wrapper(self, *args, **kwargs)
64 def _method_wrapper(self, *args, **kwargs):
65 if not self._in_multi_worker_mode(): # pylint: disable=protected-access
---> 66 return method(self, *args, **kwargs)
67
68 # Running inside `run_distribute_coordinator` already.
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_batch_size, validation_freq, max_queue_size, workers, use_multiprocessing)
846 batch_size=batch_size):
847 callbacks.on_train_batch_begin(step)
--> 848 tmp_logs = train_function(iterator)
849 # Catch OutOfRangeError for Datasets of unknown size.
850 # This blocks until the batch has finished executing.
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in train_function(iterator)
570 data = next(iterator)
571 outputs = self.distribute_strategy.run(
--> 572 self.train_step, args=(data,))
573 outputs = reduce_per_replica(
574 outputs, self.distribute_strategy, reduction='first')
/usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/tpu_strategy.py in run(self, fn, args, kwargs, options)
166 def run(self, fn, args=(), kwargs=None, options=None):
167 """See base class."""
--> 168 validate_run_function(fn)
169
170 # Note: the target function is converted to graph even when in Eager mode,
/usr/local/lib/python3.6/dist-packages/tensorflow/python/distribute/tpu_strategy.py in validate_run_function(fn)
104 and not (callable(fn) and isinstance(fn.__call__, def_function.Function)):
105 raise NotImplementedError(
--> 106 "TPUStrategy.run(fn, ...) does not support pure eager "
107 "execution. please make sure the function passed into "
108 "`strategy.run` is a `tf.function` or "
NotImplementedError: TPUStrategy.run(fn, ...) does not support pure eager execution. please make sure the function passed into `strategy.run` is a `tf.function` or `strategy.run` is called inside a `tf.function` if eager behavior is enabled.
```
I also tried to decorated the call function, so that my model class looked like this
```
class Dora_A(tf.keras.Model):
def __init__(self, **kwargs):
super(Dora_A, self).__init__(**kwargs)
self.bioRoberta = TFRobertaModel.from_pretrained('allenai/biomed_roberta_base', from_pt=True)
@tf.function
def call(self, inputIds):
queryInputs, passageInputs = inputIds
Q_outputs = self.bioRoberta(queryInputs)[0]
P_outputs = self.bioRoberta(passageInputs)[0]
dotProductMatrix = tf.linalg.matmul(Q_outputs, P_outputs, transpose_b=True, name='mm')
return dotProductMatrix
```
But I got the same error message.
I also tried decorating my tf.data parse function
```
CLS_inputID = tf.constant([0])
SEP_inputID = tf.constant([2])
@tf.function
def _parse_example(example_proto):
features = {
'bioRoberta_SentenceIndex': tf.io.VarLenFeature( dtype=tf.int64),
'BioRoberta_IDs': tf.io.VarLenFeature( dtype=tf.int64),
}
parsed_example_dict = tf.io.parse_single_example(example_proto, features)
bertIds = parsed_example_dict['BioRoberta_IDs']
bertIds = tf.sparse.to_dense(bertIds)
bertIds = tf.cast(bertIds, dtype=tf.int32)
queryPiece = tf.slice(bertIds, [0], [510])
restPassagePiece = tf.slice(bertIds, [0], [510])
# add special tokens for proper input into the model
queryBertInput = tf.concat( [CLS_inputID, queryPiece, SEP_inputID], axis=0)
paragraphBertInput = tf.concat( [CLS_inputID, restPassagePiece, SEP_inputID], axis=0)
return queryBertInput, paragraphBertInput
```