从理论上说,全连接神经网络能够表达一切的多项式,只要深度(隐藏层)够深,全连接神经网络能适应所有的模型,但缺点是需要极大的计算量,而且在深度增加,激活函数的传递量会大大减弱。卷积神经网络是为了解决这个问题,他利用类似生物的局部兴奋逐层整合,形成高级的抽象的概念的过程,减少训练参数,但增加深度,增加对抽象概念的准确性。CNN 在图像领域使用较多。

本文使用 CNN 来对 MNIST ( http://yann.lecun.com/exdb/mnist/) 的手写数字图片集进行分类,预测出图片所对应的手写数字。

MNIST

基本概念

Local Receptive Fields 本地感受野

感受野代表的是在一个图像特征的观察窗口,就像是人的每个神经元只能在一个有限的范围内感受外界,超出这个范围的外界就需要其他的感受野来接收。如下图中的 Layer 1 上的绿色区域, 代表这个感受野为 3x3 像素的区域。这样区域的输入图像,组成的一连串框像素(特征),构成神经元,组成全连接层,最后形成网络,这就是 CNN 的基础。这个过程和人眼的视觉细胞的观察过程是类似的。

Receptive Fields

Convolutional Kernels 卷积核

每个感受野会在向下一层传递时,会应用一个非线性变换——卷积。机器视觉里的卷积和信号处理中的卷积有些类似,但更简单,只是单纯的将区域内的值相加。如上图中,Layer 1 到 Layer 2 的传递,就是将 Layer 1 中的特征相加,得到了 Layer 2 中的一个特征值。在一个特征感受野上应用的非现线性卷积变换,就是卷积核。卷积核上带有权重,会对感受野内的值进行加权计算,得到下一层的卷积特征。

Convolutional Kernels

图像中会常用多感受野的像素尺寸来标记,如 5x5, 7x7 都是很常见的卷积核,代表这个卷积核会应用在每一个 5x5 或 7x7 的感受野上。在整个图像应用卷积核时,会对画面按感受野大小进行滑窗,这个滑窗的行为会有步长,叫 stride size,比如 stride size 为 2 时,卷积核的计算之间会有一个像素的间隔。

卷积核又被称为 filter,而 filter 的数量决定我们能得到的结果数,使用时对同一个图像往往会应用多个不同的卷积核,这个数量就是卷积核的 channel 数。如我看一个风量画面,先用近视眼镜看看近景,再用远视眼镜看看远景,把两者的画面都传到下层,就是 2 个 channels。

一组卷积核就是组成了一个卷积层,比如一个 5x5x64 的卷积层,就是由 64 个以 5x5 为感受野窗口的卷积核组成的卷积层。而在整个网络中,我们要训练得到的就是这 64 个 5x5 的卷积核的权重,共 64x5x5=1600 个参数。

池化层

池化层是一个对参数进行降维的过程,比如一个 5x5 的最大池化层(Maxpool Layer),就是对每个特征的每 5x5 的窗口取最大值,传递给下一层。这样如果是一个 20x20x64 的输入,在传到下一层时,就只有 4x4x64 的大小了,大大减小了特征数量。

上文中我们讨论的卷积核、池化层都简化了第三个维度,也就是 depth。实际使用时,卷积核和池化层都会有 depth,只是在最后输入为 channel 后,得到的维度就是 channel 了。如输入的数据是 28x28x3 的,在 4x4x64 的卷积核计算后,不补齐的情况下,得到的就是 24x24x64 的特征。

Padding 补齐

由于卷积核的运算,输入的特征的尺寸一定是会变小的。实际使用中,我们会用 Padding 来补齐特征,让输出的特征的大小被控制,往往是补齐到和输入一致,这样就可以在 CNN 中多次重复某个结构多多次。如图,使用时是填 0,这样可以防止引入噪音。

padding

卷积神经网络

CNN

一个典型的卷积神经网络是由多个卷积层-池化层组成的结构,多次重复后,接一个全连接神经网络。实际构建时有许多的细节,但基本上是这样一个结构。卷积层的实际作用其实是一种特征变换的过程,全连接网络的作用则是真正的分类。有的多步网络也会把这两个过程分开,变换和学习做两个步骤。

识别 MNIST 手写数字图片

MNIST 数据集

MNIST [LeCun et al., 1998a] 是一个比较老的数据集,有 60,000 个手写数字样例和 10,000 个测试样例。这是 NIST 的一个子集,而后者有更大量的数据样例。每一张图片都已经二值化且合规到一样的大小(28*28)。MNIST 是很合适的图像算法上手数据集。

Google Colaboratory

Google Colaboratory (https://colab.research.google.com/) 是 Google 出品的在线版的 Jupyter Notebook,免费的分配计算资源,甚至还可以免费调用 GPU 和 TPU 来加速计算,还能在线协同,是很好的实验工具。本次识别训练,我就是使用的这个平台。

LeNet-5

LeNet-5

LeNet-5 是一个简单的 CNN 网络结构,由两个卷积层,中夹有两个池化层组成。最后有两个全连接层组成的全连接网络,输出结果。这是一个典型的 CNN 网络的组成。

Google 的教程中,用了一个三层卷积层的网络,后跟了一个局部连接层 Locally-connected layer 做为和全连接层的连接,这个方法常见在图像的特征在画面的局部区域,与其他的区域的关联性比较小的时候,如人脸检测。Google 的方法除了这一点,基本就是 LeNet-5 的类似实现。本次实验我只用了一个 2 层卷积层和 1 层全接连层的网络。

训练过程

导入 MNIST 数据集

由于我使用的是 Google 的 Colaboratory 服务,所以 MNIST 数据直接就使用 Google 提供的在线数据集,不再从 MNIST 的官方地址下载了。

%tensorflow_version 1.x
import tensorflow as tf
import numpy as np
import pandas as pd

tf.logging.set_verbosity(tf.logging.ERROR)
pd.options.display.max_rows = 10
pd.options.display.float_format = '{:.1f}'.format

# 载入 mnist 训练数据, 用 pandas 的 dataframe 读数据

mnist_dataframe = pd.read_csv(
  "https://download.mlcc.google.cn/mledu-datasets/mnist_train_small.csv",
  sep=",",
  header=None)

# 只使用前10000个数据做实验

mnist_dataframe = mnist_dataframe.head(10000)
mnist_dataframe = mnist_dataframe.reindex(np.random.permutation(mnist_dataframe.index))
# 展示一下数据

mnist_dataframe.head()

最后一个语句会打印一部分数据看看,如图

展示的数据

写一个方法从数据中提取标签和特征

def parse_labels_and_features(dataset):
  """从数据中提取标签和特征

  Args:
    dataset: Dataframe, 第一列是标签,余下的列为单色的像素数据

  Returns:
    A `tuple` `(labels, features)`:
      labels: A Pandas `Series`.
      features: A Pandas `DataFrame`.
  """
  labels = dataset[0]
  # DataFrame.loc index ranges are inclusive at both ends.

  features = dataset.loc[:,1:784]
  # 原值是 0~255,归一化到 0~1.

  features = features / 255

  return labels, features

然后用这个方法分别提取出训练集,验证集和测试集。其中因为 MNIST 没有提供验证集,所以直接从训练集中取一部分做验证集。用验证集的作用是调整模型参数,找到最合适的模型,防止进入局部最优。

# 将数据的前7500个做为训练集,将标签和数据拆分

training_targets, training_examples = parse_labels_and_features(mnist_dataframe[:7500])

# 拆出验证集的数据,用数据的后2500个

validation_targets, validation_examples = parse_labels_and_features(mnist_dataframe[7500:10000])

随机的取一个数据样本,看看他长什么样。MNIST 的图都是 28x28x1 的。

# 显示一个随机的数据

from matplotlib import pyplot as plt
from IPython import display

rand_example = np.random.choice(training_examples.index)
_, ax = plt.subplots()
ax.matshow(training_examples.loc[rand_example].values.reshape(28, 28))
ax.set_title("Label: %i" % training_targets.loc[rand_example])
ax.grid(False)

得到结果

一个展示数据的样子

用同样的方法下载 MNIST 的测试数据

# 载入 mnist 测试数据

test_dataframe = pd.read_csv(
  "https://download.mlcc.google.cn/mledu-datasets/mnist_test.csv",
  sep=",",
  header=None)

# 只使用前10000个数据做测试

test_dataframe = test_dataframe.head(5000)
test_dataframe = test_dataframe.reindex(np.random.permutation(test_dataframe.index))

# 提取测试用的特征和标签

testing_targets, testing_examples = parse_labels_and_features(test_dataframe)

设计卷积层

先设定一些基本的参数

# 参数设定

IMAGE_SIZE = 28
NUM_CHANNELS = 1
PIXEL_DEPTH = 255
NUM_LABELS = 10
SEED = 66478  # Set to None for random seed.

BATCH_SIZE = 64 # 训练集的每批次数据量大小

NUM_EPOCHS = 10
EVAL_BATCH_SIZE = 64 # 验证集和测试集的每批次数据量大小

EVAL_FREQUENCY = 100  #用验证集作评估的频率

TRAIN_SIZE = training_targets.shape[0]

然后按照 LeNet-5 的结构,设计卷积层

# 定义 train 用的 placeholder 来存放 input 数据

train_data_node = tf.placeholder(
    tf.float32,
    shape=(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
train_labels_node = tf.placeholder(tf.int64, shape=(BATCH_SIZE,))

# 评估用的 placeholder 来存放数据

eval_data = tf.placeholder(
    tf.float32,
    shape=(EVAL_BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))

# 定义网络的权重,2层卷积层,中有2池化层

# 第一个卷积层的权重, 5x5 的大小, depth 为 32.

conv1_weights = tf.Variable(
    tf.truncated_normal([5, 5, NUM_CHANNELS, 32],  
                        stddev=0.1,
                        seed=SEED, dtype=tf.float32))
# 加一个 bais,下同

conv1_biases = tf.Variable(tf.zeros([32], dtype=tf.float32))
# 第二个卷积层,5x5x64

conv2_weights = tf.Variable(tf.truncated_normal(
    [5, 5, 32, 64], stddev=0.1,
    seed=SEED, dtype=tf.float32))
conv2_biases = tf.Variable(tf.constant(0.1, shape=[64],
                           dtype=tf.float32))

# 定义全连接网络层,由于经过了两个 2x2 的池化层,

# 输入的特征为 (IMAGE_SIZE // 4 )^2 * 64

# 全连接网络为 7*7*64 => 512 => 10

# 第一层, depth 512.

fc1_weights = tf.Variable(
    tf.truncated_normal([IMAGE_SIZE // 4 * IMAGE_SIZE // 4 * 64, 512],
                        stddev=0.1,
                        seed=SEED,
                        dtype=tf.float32))
fc1_biases = tf.Variable(tf.constant(0.1, shape=[512],
                         dtype=tf.float32))
# 第二层,为输出层,depth 为 10

fc2_weights = tf.Variable(tf.truncated_normal([512, NUM_LABELS],
                                              stddev=0.1,
                                              seed=SEED,
                                              dtype=tf.float32))
fc2_biases = tf.Variable(tf.constant(
    0.1, shape=[NUM_LABELS], dtype=tf.float32))

组织卷积网络

开始定义 LeNet-5 网络,组织各层的结构,conv1 → maxPool1 → conv2 -> maxPool2 -> fc1 -> fc2

# 定义网络

def model(data, train=False):
  """The Model definition."""
  # 4维的strides用以匹配数据的格式 [image index, y, x, depth]

  # 每一层的 padding 为 SAME,即输入和输出的大小补全为一样

  ###

  # 第一层

  conv = tf.nn.conv2d(data,
                      conv1_weights,
                      strides=[1, 1, 1, 1],
                      padding='SAME')
  # 使用 relu 做激活函数

  relu = tf.nn.relu(tf.nn.bias_add(conv, conv1_biases))
  # 最大池化层,窗口为2,步长为2

  pool = tf.nn.max_pool(relu,
                        ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1],
                        padding='SAME')
  ###

  # 第二层

  conv = tf.nn.conv2d(pool,
                      conv2_weights,
                      strides=[1, 1, 1, 1],
                      padding='SAME')
  relu = tf.nn.relu(tf.nn.bias_add(conv, conv2_biases))
  pool = tf.nn.max_pool(relu,
                        ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1],
                        padding='SAME')

  # 将结果 reshape 后输入到全连接网络, [image index, x*y*64]

  pool_shape = pool.get_shape().as_list()
  reshape = tf.reshape(
      pool,
      [pool_shape[0], pool_shape[1] * pool_shape[2] * pool_shape[3]])

  # 一个全连接层

  hidden = tf.nn.relu(tf.matmul(reshape, fc1_weights) + fc1_biases)

  # 在训练时加入一个50%的dropout,防止过拟合

  if train:
    hidden = tf.nn.dropout(hidden, 0.5, seed=SEED)
  return tf.matmul(hidden, fc2_weights) + fc2_biases

构建预测和损失评估

# 训练计算,使用 logits + cross_entropy 做损失计算

logits = model(train_data_node, True)
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
    labels=train_labels_node, logits=logits))

# 使用 L2 regularization 正则化全连接网络参数

regularizers = (tf.nn.l2_loss(fc1_weights)
                + tf.nn.l2_loss(fc1_biases)
                + tf.nn.l2_loss(fc2_weights)
                + tf.nn.l2_loss(fc2_biases))

# Add the regularization term to the loss.

loss += 5e-4 * regularizers

# 优化器:设一个变量,每批数据会增加,以控制训练速度

batch = tf.Variable(0, dtype=tf.float32)

# 每个 epoch 后降低训练速度

learning_rate = tf.train.exponential_decay(
    0.01,                # Base learning rate.

    batch * BATCH_SIZE,  # Current index into the dataset.

    TRAIN_SIZE,          # Decay step.

    0.95,                # Decay rate.

    staircase=True)

# 使用 MomentumOptimizer 做为优化器

optimizer = tf.train.MomentumOptimizer(learning_rate,
                                       0.9).minimize(loss,
                                                     global_step=batch)

# 计算预测结果

train_prediction = tf.nn.softmax(logits)

# 计算余下的结果用于 validation 和 test

eval_prediction = tf.nn.softmax(model(eval_data))

其中,使用 L2 regularization 正则化的目的是为了防止过拟合,控制模型的复杂度。更详细的介绍看这里 https://www.zhihu.com/question/26485586

另外定义一下预测结果的 loss 的评估方式。利用 numpy 的便捷语法,实现很简洁

def error_rate(predictions, labels):
  """返回预测结果的误差率"""
  return 100.0 - (
      100.0 *
      np.sum(np.argmax(predictions, 1) == labels) /
      predictions.shape[0])

分批的计算

在计算的过程中,我们采用多个 batch 的方案来让网络分批训练。使用多个 batch 的目的是减少内存和 GPU 的使用,加速训练过程,适合数据量比较大的训练过程。多个 batch 组成一个 epoch,构成一次训练。

这里定义一个方法能分批的在验证集和测试集上计算

def eval_in_batches(data, sess):
  """分批计算网络,减少内存的使用"""
  size = data.shape[0]
  predictions = np.ndarray(shape=(size, NUM_LABELS),
                              dtype=np.float32)
  for begin in xrange(0, size, EVAL_BATCH_SIZE):
    end = begin + EVAL_BATCH_SIZE
    if end <= size:
      # 取一个 EVAL_BATCH_SIZE 的数据量,放入网络评估

      predictions[begin:end, :] = sess.run(
          eval_prediction,
          feed_dict={eval_data: data[begin:end, ...]})
    else:
      # 最后一批数据,放入余下的所有数据

      batch_predictions = sess.run(
          eval_prediction,
          feed_dict={eval_data: data[-EVAL_BATCH_SIZE:, ...]})
      predictions[begin:, :] = batch_predictions[begin - size:, :]
  return predictions

整合训练过程

from six.moves import xrange

# 把 dataframe 转成 numpy array

train_data = training_examples.to_numpy(np.float32).reshape((-1, 28, 28, 1))
train_labels = training_targets.to_numpy(np.float32)
validation_data = validation_examples.to_numpy(np.float32).reshape((-1, 28, 28, 1))
validation_labels = validation_targets.to_numpy(np.float32)
test_data = testing_examples.to_numpy(np.float32).reshape((-1, 28, 28, 1))
test_labels = testing_targets.to_numpy(np.float32)

# 开启 TF session,运行训练

with tf.Session() as sess:
  # 初始化 session

  tf.global_variables_initializer().run()

  # 执行多步循环

  for step in xrange(int(NUM_EPOCHS * TRAIN_SIZE) // BATCH_SIZE):
    # 计算当前 batch 的位置,进行运算

    offset = (step * BATCH_SIZE) % (TRAIN_SIZE - BATCH_SIZE)
    batch_data = train_data[offset:(offset + BATCH_SIZE), ...]
    batch_labels = train_labels[offset:(offset + BATCH_SIZE)]
    
    # 将这一批的数据进行训练

    feed_dict = {train_data_node: batch_data,
                 train_labels_node: batch_labels}
    sess.run(optimizer, feed_dict=feed_dict)

    # 每次训练到一定的程度,用验证集做一次评估,并打印一些信息

    if step % EVAL_FREQUENCY == 0:
      # 计算一下当前的具体 loss 值,指示现在的训练进度

      l, lr, predictions = sess.run([loss, learning_rate,
                                     train_prediction],
                                    feed_dict=feed_dict)

      print('Step %d (epoch %.2f)' %
            (step, float(step) * BATCH_SIZE / TRAIN_SIZE))
      print('Minibatch loss: %.3f, learning rate: %.6f' % (l, lr))
      print('Minibatch error: %.1f%%'
            % error_rate(predictions, batch_labels))
      # 计算一下验证集的错误率

      print('Validation error: %.1f%%' % error_rate(
          eval_in_batches(validation_data, sess), validation_labels))

  # 训练完成,打印模型在测试集上的结果

  test_error = error_rate(eval_in_batches(test_data, sess),
                          test_labels)
  print('Test error: %.1f%%' % test_error)

训练结果

由于这里只做了 10 个 epoch,最后的测试结果在错误率 3.1%,但比线性模型已经有非常大的提升,且只在 4 个 epoch 后就达到了 3.1% 的验证错误率。

训练结果输出

用得到的模型来识别手写数字

现在随便在测试集中找几个数据,用刚训练得到的模型,来识别类型,并比对结果。 我这里就是粗暴的,取了前 4 个结果来看。正式的方法应该是为单个 predict 动作再做一个 placeholde来做运算。

# 测试集数据看看

test_images = test_data[:EVAL_BATCH_SIZE, ...]

# 使用模型识别

test_predictions = sess.run(
eval_prediction,
feed_dict={eval_data: test_images})
predictions = np.argmax(test_predictions, 1)

# 显示4个结果

for i in range(4):
plt.imshow(np.reshape(test_images[i], [28, 28]), cmap='gray')
plt.show()
print("Model prediction:", predictions[i])

结果如下图,可以说是相对准确的识别出了手写数字

预测结果

总结

本文学习了 CNN 的基本结构,卷积层的组成,卷积核的基本算法等。然后仿照 LeNet-5 的结构,用 Google Colaboratory 对 MNIST 手写数据进行训练和验证,得到了不错的结果,并用这个模型识别了测试数据,验证模型的可用性。