1. TensorFlow 数据读取
  2. 1. 数据读取
    1. 1.1. 数据文件格式
      1. 1.1.1. npy 和npz 格式
      2. 1.1.2. pkl 格式
      3. 1.1.3. hdf 格式
      4. 1.1.4. TFRecord
  3. 2. 数据转换
  4. 3. 读出数据

TensorFlow 数据读取

1. 数据读取

在提高运算能力的同时,更高效地处理数据I/O 也是提高整体性能非常重要的一个方面。
TensorFlow 官方给出了三种数据加载的方式:

  1. 用 Python 代码为TensorFlow 供给数据;
  2. 在构建计算图的开始部分,利用管道从文件中读取数据;
  3. 预先加载数据,用常量或者变最将数据保持在内存中(仅适用于小数据)。

对于体积较小的数据,直接预先加载全部样本到内存,然后分batch 输入网络训练是最方便的选择。

而对于几百GB 甚至TB 级别的大数据而言,加载到内存就不太现实了,
一方面对内存的消耗巨大还不一定放得下;
另一方面分块读取的话,频繁I/O也会造成执行效率大打折扣。

1.1. 数据文件格式

对于可以直接加载到内存或显存中的比较少量的数据(一般为lGB 以内的级别),数据读入一般在1分钟内就可以完成,也不会耗尽内存或显存,
因此不会对整个训练过程造成瓶颈。

对于较小量级的数据而言.除了csv 格式以外,还有一些高性能的格式文件类型可以选择,例如 npy 和 npz 格式、pkl 格式和hdf 格式:

1.1.1. npy 和npz 格式

  • NumPy 库作为高性能计算最常用的库, 自然有处理数据 I/O 所需要的方法。
  • numpy.save 方法就可以将数组存储为扩展名为 “.npy” 的二进制文件。
  • 用numpy.load()读出时,它可以自动处理元素的类型和数组维度等信息。
  • npz 文件是存储多个数组数据的文件格式,其内部实际是将多个npy 文件归档。

1.1.2. pkl 格式

  • pickle 是 Python 内置的数据序列化和反序列化模块,通过该模块可以将Python对象持久化成pkl 格式的文件。
  • cPickle 是与pickle 在功能和用法上几乎相同的包,但由于是使用 C 语言编写的,所以在性能上比pickle 高出1000 倍。
    一般若是使用pkl格式,则用cPickle 库进行操作。

1.1.3. hdf 格式

  • HDF ( Hierarchical Data File)是美国国家高级计算应用中心( National Center for Supercomputing Application , NCSA )
    为了满足各种领域研究需求而研制的一种能效存储和分发科学数据的新型数据格式,HDF5 是其系列中最新,也是目前最常用的-种格式。
  • HDF 文件是通用的自描述的数据文件格式,可以跨平台高效读写。

1.1.4. TFRecord

  • 对于大数据, TensorFlow 推荐使用自家的 TFRecord 文件。
  • TFRecord 文件同样是以二进制进行存储数据的,适合以串行的方式读取大批量数据。
  • TFRecord 内部的格式虽然略为复杂不易理解,但是它能更好地利用内存,更方便地复制和移动,更符合 tensorflow 执行引擎的处理方式。
  • 关于大数据的多线程写入操作、关键函数解析、变长序列样本 SequenceExample 的处理和序列样本 batch 包的构建等相关函数后续解释

2. 数据转换

普通的数据很容易转换成TFRecord 格式的文件。只需要写一个小程序,将每一条样本组装成 protocol buffer 定义的Example 结构的对象,序列化成字符串,
再由tf.python_io.TFRecordWriter 导入文件即可。

Titanic 示例如下:

#! -*- coding:utf-8 -*-
import pandas as pd
import tensorflow as tf

# convert train.csv to train.tfrecords
def transform_to_tfrecord():
    data = pd.read_csv('data/train.csv')
    tfrecord_file = 'train.tfrecords'

    def int_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

    def float_feature(value):
        return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

    writer = tf.python_io.TFRecordWriter(tfrecord_file)
    for i in range(len(data)):
        features = tf.train.Features(feature={
            'Age': float_feature(data['Age'][i]),
            'Survived': int_feature(data['Survived'][i]),
            'Pclass': int_feature(data['Pclass'][i]),
            'Parch': int_feature(data['Parch'][i]),
            'SibSp': int_feature(data['SibSp'][i]),
            'Sex': int_feature(1 if data['Sex'][i] == 'male' else 0),
            'Fare': float_feature(data['Fare'][i])
        })
        example = tf.train.Example(features=features)
        writer.write(example.SerializeToString())
    writer.close()

if __name__ == '__main__':
    transform_to_tfrecord()

3. 读出数据

从TFRecord 文件中读出数据,使用TFRecordReader。
TFRecordReader 是一个算子,因此TensorFlow 能够记住 tfrecords 文件读取的位置,并且始终能返回下一条记录。

tf.train.string_input_producer 方法用于定义TFRecord 文件作为模型结构的输入部分。该函数输入文件名列表在Session 运行时产生文件路径字符串循环队列。

根据产生的文件名,TfRecordReader.read 方法打开文件,

reader = tf.TFRecordReader()
filename_queue = tf.train.string_input_producer(
                      train_files,
                      num_epochs=num_epochs)
_, serialized_example = reader.read(filename_queue)

再由 tf.parse_single_example 方法解析成一条可用的数据。
tf.train.shuffle_batch可以设置内存读取样木的上限与上限训练batch 批次的大小等参数,用于定义产生随机生成的 batch 训练数据包。

featuresdict = tf.parse_single_example(
    serialized_example,
    features={
        'Survived': tf.FixedLenFeature([], tf.int64),
        'Pclass': tf.FixedLenFeature([], tf.int64),
        'Parch': tf.FixedLenFeature([], tf.int64),
        'SibSp': tf.FixedLenFeature([], tf.int64),
        'Sex': tf.FixedLenFeature([], tf.int64),
        'Age': tf.FixedLenFeature([], tf.float32),
        'Fare': tf.FixedLenFeature([], tf.float32)})

# decode features to same format of float32
labels = featuresdict.pop('Survived')
features = [tf.cast(value, tf.float32)
            for value in featuresdict.values()]

在 Session 的运行中, tf.train.shuffle_batch 函数生成 batch 数据包的过程是作为线程独立运行的。

数据输入线程的挂起和运行时机由 batch 数据的生成的函数控制。

本例中的 tf.train.shuffle_batch 函数指定内存保存样本数据的上限capacity 和下限 min_after_dequeue 。
当内存中保存的样本数量大于上限capacity 时,数据输入线程挂起。
反之,当样木数量小于 min_after_ dequeue 时,训练程序挂起。

# get data with shuffle batch and return
features, labels = tf.train.shuffle_batch(
    [features, labels],
    batch_size=batch_size,
    num_threads=num_threads,
    capacity=min_after_dequeue + 3 * batch_size,
    min_after_dequeue=min_after_dequeue)

函数start_queue_runners 开启对应运行会话Session 的所有线程队列并返回线程句柄。
Coordinator 类对象负责实现数据输入线程的同步。
当string_input_producer 函数产生无限循环队列时,应取消数据输入与训练程序的线程同步。

从TFRecord读入数据的完整程序如下:

#!/usr/bin/env python
# coding=utf-8

import tensorflow as tf
def read_and_decode(train_files, num_threads=2, num_epochs=100,
                    batch_size=10, min_after_dequeue=10):
    # read data from trainFile with TFRecord format
    reader = tf.TFRecordReader()
    filename_queue = tf.train.string_input_producer(
        train_files,
        num_epochs=num_epochs)
    _, serialized_example = reader.read(filename_queue)
    featuresdict = tf.parse_single_example(
        serialized_example,
        features={
            'Survived': tf.FixedLenFeature([], tf.int64),
            'Pclass': tf.FixedLenFeature([], tf.int64),
            'Parch': tf.FixedLenFeature([], tf.int64),
            'SibSp': tf.FixedLenFeature([], tf.int64),
            'Sex': tf.FixedLenFeature([], tf.int64),
            'Age': tf.FixedLenFeature([], tf.float32),
            'Fare': tf.FixedLenFeature([], tf.float32)})

    # decode features to same format of float32
    labels = featuresdict.pop('Survived')
    features = [tf.cast(value, tf.float32)
                for value in featuresdict.values()]

    # get data with shuffle batch and return
    features, labels = tf.train.shuffle_batch(
        [features, labels],
        batch_size=batch_size,
        num_threads=num_threads,
        capacity=min_after_dequeue + 3 * batch_size,
        min_after_dequeue=min_after_dequeue)
    return features, labels


def train_with_queuerunner():
    x, y = read_and_decode(['train.tfrecords'])

    with tf.Session() as sess:
        tf.group(tf.global_variables_initializer(),
                 tf.local_variables_initializer()).run()

        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        try:
            step = 0
            while not coord.should_stop():
                # Run training steps or whatever
                features, lables = sess.run([x, y])
                if step % 100 == 0:
                    print('step %d:' % step, lables)
                step += 1
        except tf.errors.OutOfRangeError:
            print('Done training -- epoch limit reached')
        finally:
            # When done, ask the threads to stop.
            coord.request_stop()
        # Wait for threads to finish.
        coord.join(threads)


if __name__ == '__main__':
    train_with_queuerunner()


技术交流学习,请加QQ微信:631531977
目录