import os
import pathlib
import matplotlib.pyplot as plt
import numpy as np
# import seaborn as sns
import tensorflow as tf
from IPython.display import Audio
from IPython.display import display
print(tf.__version__)
2.8.0
# Establezca el valor semilla para la reproducibilidad del experimento.
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)
Para ahorrar tiempo con la carga de datos, trabajará con una versión más pequeña del conjunto de datos de comandos de voz. El conjunto de datos original consta de más de 105 000 archivos de audio en formato de archivo de audio WAV (Waveform) de personas que dicen 35 palabras diferentes. Estos datos fueron recopilados por Google y publicados bajo una licencia CC BY.
Descargue y extraiga el archivo mini_speech_commands.zip
que contiene los conjuntos de datos de
comandos de voz más pequeños con tf.keras.utils.get_file
:
DATASET_PATH = 'data/mini_speech_commands' # Ruta del dataset
data_dir = pathlib.Path(DATASET_PATH) # Creamos un objeto de la clase Path con la ruta del dataset
if not data_dir.exists():
tf.keras.utils.get_file(
'mini_speech_commands.zip',
'https://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip',
extract=True,
cache_dir='.',
cache_subdir='data') # Descargamos el dataset
Los clips de audio del conjunto de datos se almacenan en ocho carpetas correspondientes a cada comando de voz:
no
, yes
, down
, go
, left
, up
,
right
y stop
:
commands = np.array(tf.io.gfile.listdir(str(data_dir))) # Obtenemos los comandos
commands = commands[commands != 'README.md'] # Eliminamos el archivo README.md
print('Commands:', commands) # Mostramos los comandos
Commands: ['down' 'go' 'left' 'no' 'right' 'stop' 'up' 'yes']
Extraiga los clips de audio en una lista llamada filenames
de archivo y mezcle:
filenames = tf.io.gfile.glob(str(data_dir) + '/*/*') # Lista de archivos de audio
filenames = tf.random.shuffle(filenames) # Barajamos los archivos de audio
num_samples = len(filenames) # Numero de archivos de audio
print('Num samples:', num_samples) # Mostramos el numero de archivos de audio
print('Number of samples per label:', len(tf.io.gfile.listdir(str(data_dir) + '/' + commands[0]))) # Mostramos el numero de archivos de audio por comando
print('Example file tensor:', filenames[0]) # Mostramos un ejemplo de archivo de audio
Num samples: 8000 Number of samples per label: 1000 Example file tensor: tf.Tensor(b'data\\mini_speech_commands\\yes\\b6ebe225_nohash_1.wav', shape=(), dtype=string)
Divida los nombres de filenames
en conjuntos de entrenamiento, validación y prueba utilizando una
proporción de 80:10:10, respectivamente:
train_files = filenames[:6400] # Archivos de audio de entrenamiento
val_files = filenames[6400: 6400 + 800] # Archivos de audio de validacion
test_files = filenames[-800:] # Archivos de audio de test
print('Train files:', len(train_files)) # Mostramos el numero de archivos de audio de entrenamiento
print('Val files:', len(val_files)) # Mostramos el numero de archivos de audio de validacion
print('Test files:', len(test_files)) # Mostramos el numero de archivos de audio de test
Train files: 6400 Val files: 800 Test files: 800
En esta sección, preprocesará el conjunto de datos, creando tensores decodificados para las formas de onda y las etiquetas correspondientes. Tenga en cuenta que:
La forma del tensor devuelto por tf.audio.decode_wav
es
[samples, channels]
, donde channels son 1 para mono o 2 para estéreo. El conjunto de datos de mini
comandos de voz solo contiene grabaciones mono.
test_file = tf.io.read_file(DATASET_PATH + '/down/0a9f9af7_nohash_0.wav') # Leemos el archivo de audio de test
test_audio, _ = tf.audio.decode_wav(contents=test_file) # Decodificamos el archivo de audio
print('Test audio shape:', test_audio.shape) # Mostramos el tamaño del audio de test
Test audio shape: (13654, 1)
Ahora, definamos una función que preprocesa los archivos de audio WAV sin procesar del conjunto de datos en tensores de audio:
def decode_audio(audio_binary):
audio, _ = tf.audio.decode_wav(contents=audio_binary) # Decodificamos el archivo de audio
return tf.squeeze(audio, axis=-1) # Eliminamos el eje de tamaño 1
Defina una función que cree etiquetas usando los directorios principales para cada archivo:
tf.RaggedTensor
s
(tensores con dimensiones irregulares, con sectores que pueden tener diferentes longitudes).def get_label(file_path):
parts = tf.strings.split(file_path, os.path.sep) # Partimos la ruta del archivo en partes
return parts[-2] # Obtenemos el comando
Defina otra función auxiliar, get_waveform_and_label
, que lo integre todo:
def get_waveform_and_label(file_path):
label = get_label(file_path) # Obtenemos el comando
audio_binary = tf.io.read_file(file_path) # Leemos el archivo de audio
waveform = decode_audio(audio_binary) # Decodificamos el archivo de audio
return waveform, label # Retornamos el audio y el comando
Cree el conjunto de entrenamiento para extraer los pares de etiquetas de audio:
tf.data.Dataset
con Dataset.from_tensor_slices
y Dataset.map
, usando get_waveform_and_label
definido anteriormente.Construirá los conjuntos de validación y prueba usando un procedimiento similar más adelante.
AUTOTUNE = tf.data.AUTOTUNE # Establecemos el valor AUTOTUNE
file_ds = tf.data.Dataset.from_tensor_slices(train_files) # Creamos un dataset con los archivos de audio de entrenamiento
waveform_ds = file_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE) # Creamos un dataset con los archivos de audio de entrenamiento
Tracemos algunas formas de onda de audio:
rows = 3 # Numero de filas
cols = 3 # Numero de columnas
n = rows * cols # Numero de elementos
fig, axes = plt.subplots(rows, cols, figsize=(12, 12)) # Creamos una figura con 3x3 subplots
for i, (auido, label) in enumerate(waveform_ds.take(n)): # Para cada audio y comando
r = i // cols # Fila
c = i % cols # Columna
ax = axes[r][c] # Obtenemos el subplot
ax.plot(auido.numpy()) # Graficamos el audio
ax.set_yticks(np.arange(-1.2, 1.2, 0.2)) # Establecemos los valores de los ejes y
label = label.numpy().decode('utf-8') # Obtenemos el comando
ax.set_title(label) # Establecemos el titulo
plt.show() # Mostramos la figura
Las formas de onda en el conjunto de datos se representan en el dominio del tiempo. A continuación, transformará las formas de onda de las señales de dominio de tiempo en señales de dominio de frecuencia de tiempo mediante el cálculo de la transformada de Fourier de tiempo corto (STFT) para convertir las formas de onda en espectrogramas , que muestran los cambios de frecuencia a lo largo del tiempo y pueden ser representados como imágenes 2D. Introducirá las imágenes del espectrograma en su red neuronal para entrenar el modelo.
Una transformada de Fourier ( tf.signal.fft
) convierte una
señal a las frecuencias de sus componentes, pero pierde toda la información de tiempo. En comparación, STFT ( tf.signal.stft
) divide la
señal en ventanas de tiempo y ejecuta una transformada de Fourier en cada ventana, preservando parte de la
información de tiempo y devolviendo un tensor 2D en el que puede ejecutar convoluciones estándar.
Cree una función de utilidad para convertir formas de onda en espectrogramas:
Las formas de onda deben tener la misma longitud, de modo que cuando las convierta en espectrogramas, los
resultados tengan dimensiones similares. Esto se puede hacer simplemente rellenando con ceros los clips de
audio que son más cortos que un segundo (usando tf.zeros
).
Al llamar a tf.signal.stft
, elija
los parámetros frame_length
y frame_step
modo que la "imagen" del espectrograma
generado sea casi cuadrada. Para obtener más información sobre la elección de los parámetros STFT, consulte
este video de Coursera
sobre el procesamiento de señales de audio y STFT.
La STFT produce una serie de números complejos que representan la magnitud y la fase. Sin embargo, en este
tutorial solo usará la magnitud, que puede derivar aplicando tf.abs
en la salida de tf.signal.stft
.
def get_spectrogram(waveform):
input_len = 16000 # Longitud de entrada
waveform = waveform[:input_len] # Obtenemos el audio de entrada
zero_padding = tf.zeros(
[16000] - tf.shape(waveform), dtype=waveform.dtype) # Agregamos ceros a la izquierda
waveform = tf.cast(waveform, dtype=tf.float32) # Convertimos el audio a float32
equal_length = tf.concat([waveform, zero_padding], axis=0) # Agregamos ceros a la derecha
spectrogram = tf.signal.stft(equal_length, frame_length=255, frame_step=128) # Obtenemos el espectrograma
spectrogram = tf.abs(spectrogram) # Obtenemos la magnitud del espectrograma
spectrogram = spectrogram[..., tf.newaxis] # Agregamos el eje de tamaño 1
return spectrogram # Retornamos el espectrograma
A continuación, comience a explorar los datos. Imprima las formas de la forma de onda tensionada de un ejemplo y el espectrograma correspondiente, y reproduzca el audio original:
for waveform, label in waveform_ds.take(1): # Para cada audio y comando
label = label.numpy().decode('utf-8') # Obtenemos el comando
spectrogram = get_spectrogram(waveform) # Obtenemos el espectrograma
print('Label:', label) # Mostramos el comando
print('Waveform shape:', waveform.shape) # Mostramos el tamaño del audio
print('Spectrogram shape:', spectrogram.shape) # Mostramos el tamaño del espectrograma
print('Audio playback:') # Mostramos el audio de reproduccion
display(Audio(waveform, rate=16000))
Label: yes Waveform shape: (16000,) Spectrogram shape: (124, 129, 1) Audio playback:
Ahora, defina una función para mostrar un espectrograma:
def plot_spectrogram(spectrogram, ax):
if len(spectrogram.shape) > 2:
assert len(spectrogram.shape) == 3
spectrogram = np.squeeze(spectrogram, axis=-1)
# Convert the frequencies to log scale and transpose, so that the time is
# represented on the x-axis (columns).
# Add an epsilon to avoid taking a log of zero.
log_spec = np.log(spectrogram.T + np.finfo(float).eps)
height = log_spec.shape[0]
width = log_spec.shape[1]
X = np.linspace(0, np.size(spectrogram), num=width, dtype=int)
Y = range(height)
ax.pcolormesh(X, Y, log_spec)
Trace la forma de onda del ejemplo a lo largo del tiempo y el espectrograma correspondiente (frecuencias a lo largo del tiempo):
fig, axes = plt.subplots(2, figsize=(12, 8)) # Creamos una figura con 2 subplots
timescale = np.arange(waveform.shape[0]) # Obtenemos los valores de tiempo
axes[0].plot(timescale, waveform.numpy()) # Graficamos el audio
axes[0].set_title('Waveform') # Establecemos el titulo
axes[0].set_xlim([0, 16000]) # Establecemos el limite de X
plot_spectrogram(spectrogram, axes[1]) # Graficamos el espectrograma
axes[1].set_title('Spectrogram') # Establecemos el titulo
plt.show() # Mostramos la figura
C:\Users\erik1\AppData\Local\Temp/ipykernel_9416/3600643975.py:13: MatplotlibDeprecationWarning: shading='flat' when X and Y have the same dimensions as C is deprecated since 3.3. Either specify the corners of the quadrilaterals with X and Y, or pass shading='auto', 'nearest' or 'gouraud', or set rcParams['pcolor.shading']. This will become an error two minor releases later. ax.pcolormesh(X, Y, log_spec)
Ahora, defina una función que transforme el conjunto de datos de forma de onda en espectrogramas y sus etiquetas correspondientes como ID de números enteros:
def get_spectrogram_and_label_id(audio, label):
spectrogram = get_spectrogram(audio) # Obtenemos el espectrograma
label_id = tf.argmax(label == commands) # Obtenemos el indice del comando
return spectrogram, label_id # Retornamos el espectrograma y el indice del comando
Asigne get_spectrogram_and_label_id
a través de los elementos del conjunto de datos con Dataset.map
:
spectrogram_ds = waveform_ds.map(
map_func=get_spectrogram_and_label_id,
num_parallel_calls=AUTOTUNE) # Creamos un dataset con los espectrogramas y los indices de los comandos
Examine los espectrogramas para ver diferentes ejemplos del conjunto de datos:
rows = 3 # Numero de filas
cols = 3 # Numero de columnas
n = rows*cols # Numero de elementos
fig, axes = plt.subplots(rows, cols, figsize=(10, 10)) # Creamos una figura con 3x3 subplots para mostrar los espectrogramas
for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)): # Para cada espectrograma y indice de comando
r = i // cols # Fila
c = i % cols # Columna
ax = axes[r][c] # Obtenemos el subplot
plot_spectrogram(spectrogram.numpy(), ax) # Graficamos el espectrograma
ax.set_title(commands[label_id.numpy()]) # Establecemos el comando
ax.axis('off') # Ocultamos los ejes
plt.show()
C:\Users\erik1\AppData\Local\Temp/ipykernel_9416/3600643975.py:13: MatplotlibDeprecationWarning: shading='flat' when X and Y have the same dimensions as C is deprecated since 3.3. Either specify the corners of the quadrilaterals with X and Y, or pass shading='auto', 'nearest' or 'gouraud', or set rcParams['pcolor.shading']. This will become an error two minor releases later. ax.pcolormesh(X, Y, log_spec)
Repita el preprocesamiento del conjunto de entrenamiento en los conjuntos de validación y prueba:
def preprocess_dataset(files): # Funcion para preprocesar los datos
files_ds = tf.data.Dataset.from_tensor_slices(files) # Creamos un dataset con los archivos
output_ds = files_ds.map( # Aplicamos la funcion get_spectrogram_and_label_id
map_func=get_waveform_and_label, # A la funcion get_waveform_and_label
num_parallel_calls=AUTOTUNE) # Aplicamos el autotune
output_ds = output_ds.map( # Aplicamos la funcion get_spectrogram_and_label_id
map_func=get_spectrogram_and_label_id, # A la funcion get_spectrogram_and_label_id
num_parallel_calls=AUTOTUNE) # Aplicamos el autotune
return output_ds
train_ds = spectrogram_ds # Creamos el dataset de entrenamiento
val_ds = preprocess_dataset(val_files) # Creamos el dataset de validacion
test_ds = preprocess_dataset(test_files) # Creamos el dataset de prueba
Agrupe los conjuntos de entrenamiento y validación para el entrenamiento del modelo:
batch_size = 64 # Tamaño del batch
train_ds = train_ds.batch(batch_size) # Aplicamos el batch
val_ds = val_ds.batch(batch_size) # Aplicamos el batch
Agregue las operaciones Dataset.cache
y Dataset.prefetch
para reducir la latencia de lectura mientras entrena el modelo:
train_ds = train_ds.cache().prefetch(AUTOTUNE) # Cacheamos el dataset
val_ds = val_ds.cache().prefetch(AUTOTUNE) # Cacheamos el dataset
Para el modelo, utilizará una red neuronal convolucional (CNN) simple, ya que ha transformado los archivos de audio en imágenes de espectrograma.
Su modelo tf.keras.Sequential
utilizará las siguientes capas de preprocesamiento de Keras:
tf.keras.layers.Resizing
: para reducir la muestra de la entrada y permitir que el modelo se entrene más rápido.tf.keras.layers.Normalization
: para normalizar cada píxel de la imagen en función de su media y desviación estándar.Para la capa de Normalization
, primero se necesitaría llamar a su método de adapt en los datos de
entrenamiento para calcular las estadísticas agregadas (es decir, la media y la desviación estándar).
for spectrogram, _ in spectrogram_ds.take(1): # Para cada espectrograma
input_shape = spectrogram.shape # Obtenemos el tamaño del espectrograma
print('Input shape:', input_shape) # Mostramos el tamaño del espectrograma
num_labels = len(commands) # Obtenemos el numero de comandos
norm_layer = tf.keras.layers.Normalization() # Creamos una capa de normalizacion
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec)) # Adaptamos la capa de normalizacion con los espectrogramas
model = tf.keras.Sequential([ # Creamos un modelo Sequential
tf.keras.layers.InputLayer(input_shape=input_shape), # Añadimos la capa de entrada
tf.keras.layers.Resizing(32, 32), # Añadimos la capa de redimensionamiento a 32x32
norm_layer, # Añadimos la capa de normalizacion
tf.keras.layers.Conv2D(32, 3, activation='relu'), # Añadimos la capa de convolucion con 32 filtros y kernel 3x3
tf.keras.layers.Conv2D(64, 3, activation='relu'), # Añadimos la capa de convolucion con 64 filtros y kernel 3x3
tf.keras.layers.MaxPooling2D(), # Añadimos la capa de MaxPooling
tf.keras.layers.Dropout(0.25), # Añadimos la capa de Dropout con una probabilidad de 25% de ocultar una neurona
tf.keras.layers.Flatten(), # Añadimos la capa de Flatten para convertir la matriz a un vector
tf.keras.layers.Dense(128, activation='relu'), # Añadimos la capa de Dense con 128 neuronas y activacion ReLU
tf.keras.layers.Dropout(0.5), # Añadimos la capa de Dropout con una probabilidad de 50% de ocultar una neurona
tf.keras.layers.Dense(num_labels) # Añadimos la capa de Dense con num_labels
])
model.summary() # Mostramos el modelo
Input shape: (124, 129, 1) Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= resizing (Resizing) (None, 32, 32, 1) 0 normalization (Normalizatio (None, 32, 32, 1) 3 n) conv2d (Conv2D) (None, 30, 30, 32) 320 conv2d_1 (Conv2D) (None, 28, 28, 64) 18496 max_pooling2d (MaxPooling2D (None, 14, 14, 64) 0 ) dropout (Dropout) (None, 14, 14, 64) 0 flatten (Flatten) (None, 12544) 0 dense (Dense) (None, 128) 1605760 dropout_1 (Dropout) (None, 128) 0 dense_1 (Dense) (None, 8) 1032 ================================================================= Total params: 1,625,611 Trainable params: 1,625,608 Non-trainable params: 3 _________________________________________________________________
Configure el modelo de Keras con el optimizador de Adam y la pérdida de entropía cruzada:
model.compile(
optimizer=tf.keras.optimizers.Adam(), # Optimizador Adam con un learning rate de 0.001
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), # Loss SparseCategoricalCrossentropy
metrics=['accuracy'] # Metricas de precision y recall
)
EPOCHS = 10 # Numero de epocas
history = model.fit(
train_ds, # Dataset de entrenamiento
validation_data=val_ds, # Dataset de validacion
epochs=EPOCHS, # Numero de epocas
callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2) # Callbacks EarlyStopping con una esperanza de 2 epocas
)
Epoch 1/10 100/100 [==============================] - 37s 358ms/step - loss: 1.7113 - accuracy: 0.3770 - val_loss: 1.2591 - val_accuracy: 0.5913 Epoch 2/10 100/100 [==============================] - 18s 185ms/step - loss: 1.1345 - accuracy: 0.6031 - val_loss: 0.9437 - val_accuracy: 0.6888 Epoch 3/10 100/100 [==============================] - 18s 177ms/step - loss: 0.8831 - accuracy: 0.6867 - val_loss: 0.8337 - val_accuracy: 0.7225 Epoch 4/10 100/100 [==============================] - 16s 159ms/step - loss: 0.7480 - accuracy: 0.7334 - val_loss: 0.7170 - val_accuracy: 0.7563 Epoch 5/10 100/100 [==============================] - 16s 160ms/step - loss: 0.6282 - accuracy: 0.7736 - val_loss: 0.6490 - val_accuracy: 0.7725 Epoch 6/10 100/100 [==============================] - 18s 183ms/step - loss: 0.5642 - accuracy: 0.7967 - val_loss: 0.5969 - val_accuracy: 0.7937 Epoch 7/10 100/100 [==============================] - 20s 205ms/step - loss: 0.4965 - accuracy: 0.8236 - val_loss: 0.5679 - val_accuracy: 0.7900 Epoch 8/10 100/100 [==============================] - 21s 208ms/step - loss: 0.4419 - accuracy: 0.8436 - val_loss: 0.5784 - val_accuracy: 0.7925 Epoch 9/10 100/100 [==============================] - 17s 166ms/step - loss: 0.4337 - accuracy: 0.8511 - val_loss: 0.5107 - val_accuracy: 0.8263 Epoch 10/10 100/100 [==============================] - 19s 190ms/step - loss: 0.3842 - accuracy: 0.8658 - val_loss: 0.4992 - val_accuracy: 0.8138
Tracemos las curvas de pérdida de entrenamiento y validación para verificar cómo ha mejorado su modelo durante el entrenamiento:
metrics = history.history # Obtenemos las metricas
plt.plot(history.epoch, metrics['loss'], metrics['val_loss']) # Graficamos las metricas
plt.legend(['training', 'validation']) # Mostramos las etiquetas
plt.show() # Mostramos el grafico
Ejecute el modelo en el conjunto de prueba y verifique el rendimiento del modelo:
test_audio = [] # Creamos una lista para almacenar los audios de prueba
test_labels = [] # Creamos una lista para almacenar los comandos de prueba
for auido, label in test_ds:
test_audio.append(auido.numpy()) # Añadimos el audio a la lista
test_labels.append(label.numpy()) # Añadimos el comando a la lista
test_audio = np.array(test_audio) # Convertimos la lista en un array
test_labels = np.array(test_labels) # Convertimos la lista en un array
y_pred = np.argmax(model.predict(test_audio), axis=1) # Predecimos los comandos
y_true = test_labels # Obtenemos los comandos reales
test_acc = sum(y_pred == y_true) / len(y_true) # Calculamos la precision
print(f'Test set accuracy: {test_acc:.0%}') # Mostramos la precision
Test set accuracy: 83%
Finalmente, verifique la salida de predicción del modelo utilizando un archivo de audio de entrada de alguien que dice "no". ¿Qué tan bien funciona tu modelo?
sample_file = data_dir/'no/01bb6a2a_nohash_0.wav' # Archivo de ejemplo
sample_ds = preprocess_dataset([str(sample_file)]) # Preprocesamos el dataset
for spectrogram, label in sample_ds.batch(1): # Para cada espectrograma
prediction = model(spectrogram) # Predecimos el comando
plt.bar(commands, tf.nn.softmax(prediction[0])) # Mostramos la prediccion
plt.title(f'Predictions for "{commands[label[0]]}"') # Mostramos el comando
plt.show()
Como sugiere el resultado, su modelo debería haber reconocido el comando de audio como "no".
# guardamos el modelo
model.save('model.h5')
display(Audio(str(sample_file))) # Mostramos el audio