要读写这些数据源,首先需要创建SparkSession:
import os
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder \
.appName("Demo data sources") \
.getOrCreate()
为便于演示,我们构造一个DataFrame:
data = [Row(Country="France",Age=44,Salary=72000,Purchased="No"),
Row(Country="Spain",Age=27,Salary=48000,Purchased="Yes"),
Row(Country="Germany",Age=30,Salary=54000,Purchased="No"),
Row(Country="Spain",Age=38,Salary=61000,Purchased="No"),
Row(Country="Germany",Age=40,Salary=65000,Purchased="Yes"),
Row(Country="France",Age=35,Salary=58000,Purchased="Yes"),
Row(Country="Spain",Age=27,Salary=52000,Purchased="No"),
Row(Country="France",Age=48,Salary=79000,Purchased="Yes"),
Row(Country="Germany",Age=50,Salary=83000,Purchased="No"),
Row(Country="France",Age=37,Salary=67000,Purchased="Yes")]
df = spark.createDataFrame(data)
df.show()
运行结果:
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 27| Spain| Yes| 48000|
| 30|Germany| No| 54000|
| 38| Spain| No| 61000|
| 40|Germany| Yes| 65000|
| 35| France| Yes| 58000|
| 27| Spain| No| 52000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
将dataframe数据集保存为csv, json, parquet和HIVE Table格式文件:
df.write.csv("data/demo.csv", mode='overwrite', header=True)
df.write.json("data/demo.json", mode = 'ignore')
df.write.parquet("data/demo.parquet", mode = 'ignore')
df.write.saveAsTable("demo")
创建一个临时表:customers,它是随同spark Session存在的:
df.createOrReplaceTempView("customers")
查看(用HIVE SQL)有哪些表:
spark.sql("show tables").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| demo| false|
| |customers| true|
+--------+---------+-----------+
用HIVE SQL查询表:
spark.sql("SELECT * FROM customers WHERE Salary > 60000").show()
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 38| Spain| No| 61000|
| 40|Germany| Yes| 65000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
spark.sql("SELECT * FROM customers WHERE Country='France'").show()
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 35| France| Yes| 58000|
| 48| France| Yes| 79000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
读txt文件,内容将放在value字段中:
df = spark.read.text("data/demo.csv")
df.show(3)
+--------------------+
| value|
+--------------------+
|Age,Country,Purch...|
| 27,Spain,No,52000|
| 48,France,Yes,79000|
+--------------------+
only showing top 3 rows
读csv文件(csv文件中可能包含有字段名称信息的文件头,但数据类型需要推断,你也可以显式指定):
df = spark.read.csv("data/demo.csv", inferSchema = True, header = True)
df.show(3)
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 27| Spain| No| 52000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
+---+-------+---------+------+
only showing top 3 rows
读json文件(json文件中包含的字段的名称,但数据类型需要推断,你也可以显式指定):
df = spark.read.json("data/demo.json")
df.show(3)
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 27| Spain| Yes| 48000|
| 30|Germany| No| 54000|
+---+-------+---------+------+
only showing top 3 rows
读parquet文件(parquet文件中保存了数据结构的元信息):
df = spark.read.parquet("data/demo.parquet")
df.show(3)
+-------+---+------+---------+
|Country|Age|Salary|Purchased|
+-------+---+------+---------+
| France| 44| 72000| No|
| Spain| 27| 48000| Yes|
|Germany| 30| 54000| No|
+-------+---+------+---------+
only showing top 3 rows
2019年1月24日星期四
大数据之数据存取:Spark存取文件系统数据源
我们常见的数据文件包括:csv, json, txt和parquet等。Spark对这些文件的读写都提供了内建的支持。
2019年1月23日星期三
大数据之数据存取:从HDFS读excel文件到pandas dataframe
难点:pandas不能直接从hdfs读excel文件。
解决方案:先读excel文件内容到RDD,然后把RDD内容从cluster传回本地,最后调用pandas.read_excel()方法
步骤:
(1)设置pyspark环境
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
# sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
(2)创建spark context
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('Read Excel File on HDFS to Pandas Dataframe')
sc = SparkContext(conf=conf)
(3)读excel文件内容到RDD
rdd = sc.binaryFiles("/HomeProject/my-demo/demo.xlsx")
(4)把RDD内容从cluster传回本地
arr = rdd.collect()
(4)调用pandas.read_excel()方法读文件内容到pandas dataframe
import io
import pandas as pd
df = pd.read_excel(io.BytesIO(arr[0][1]), header = 0)
df.head(5)
解决方案:先读excel文件内容到RDD,然后把RDD内容从cluster传回本地,最后调用pandas.read_excel()方法
步骤:
(1)设置pyspark环境
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
# sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
(2)创建spark context
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('Read Excel File on HDFS to Pandas Dataframe')
sc = SparkContext(conf=conf)
(3)读excel文件内容到RDD
rdd = sc.binaryFiles("/HomeProject/my-demo/demo.xlsx")
(4)把RDD内容从cluster传回本地
arr = rdd.collect()
(4)调用pandas.read_excel()方法读文件内容到pandas dataframe
import io
import pandas as pd
df = pd.read_excel(io.BytesIO(arr[0][1]), header = 0)
df.head(5)
2019年1月12日星期六
Keras源码分析(10):Model
文件:/keras/engine/training.py
把Model类放在training.py文件中,说明它肯定与训练有关,由下面Model的定义我们知道,它继承自Network,所以,Model具有Network和训练的功能,此外它还具有什么功能呢?
在Model类主要有四大功能模块: compile, fit, evaluate和predict。下面,我们来一步步解析。
class Model(Network):
一、compile:模型编译。用于配置训练模型。用compile接受的每一个参数对model进行配置。
(1) 传入的loss是个损失函数的字典或列表,它对应着模型的多个输出,在每个输出上使用不同的损失;
(2) loss只是一个损失函数的名称,如果模型有多个输出,则所有的输出都使用相同的损失函数。
不管是哪种,模型最小化的损失值将是所有单个损失的总和。
如果传入的参数target_tensors不为None,即下面的code,说明要使用外部指定的目标张量,它可以是单个张量(单输出模型),张量列表,或一个映射输出名称到目标张量的字典。
(1) temporal: 即表示要执行按时间步采样权重(2D权重);
(2) None,这是默认,为采样权重(1D)。
如果模型有多个输出,则可以传递一个 mode 字典或列表,以指示在每个输出上使用指定的sample_weight_mode。
二、fit:模型训练。在所有的fit参数中,x为训练数据,y为标签数据,validation_split指定有多少比例的训练数据用作验证数据,validation_data为验证数据集,epochs为训练轮次,batch_size为批大小。
处理验证数据:有两种情况:
(1)是否需要验证:通过置do_validation决定,缺省是False,即不需要;但如果传入了参数validation_data或者validation_split或者validation_steps,则do_validation=True,意味着需要验证;
(2)验证数据产生,与下面if分支相对应:
a)由参数validation_data直接传入;否则
b)由validation_split指定一个划分比例,从训练数据中分出一部分作为验证数据;否则
c)当指定了validation_steps,一般与steps_per_epoch结合使用,这里validation_data则为测试数据和验证数据的生成器,本参数指定验证数据生成器的返回次数。
验证函数的输入是这种形式的元组:(val_x, val_y, val_sample_weights)或者(val_x, val_y, val_sample_weights, lr),其中,val_x: 验证数据, val_y: 验证数据标签, val_sample_weights: 样本权重, lr: 学习速率。
把Model类放在training.py文件中,说明它肯定与训练有关,由下面Model的定义我们知道,它继承自Network,所以,Model具有Network和训练的功能,此外它还具有什么功能呢?
在Model类主要有四大功能模块: compile, fit, evaluate和predict。下面,我们来一步步解析。
class Model(Network):
一、compile:模型编译。用于配置训练模型。用compile接受的每一个参数对model进行配置。
def compile(self, optimizer,
loss=None,
metrics=None,
loss_weights=None,
sample_weight_mode=None,
weighted_metrics=None,
target_tensors=None,
**kwargs):
self.optimizer = optimizers.get(optimizer)
self.loss = loss or []
self.metrics = metrics or []
self.loss_weights = loss_weights
self.sample_weight_mode = sample_weight_mode
self.weighted_metrics = weighted_metrics处理参数loss,准备损失函数(loss functions)。有2种情形:(1) 传入的loss是个损失函数的字典或列表,它对应着模型的多个输出,在每个输出上使用不同的损失;
(2) loss只是一个损失函数的名称,如果模型有多个输出,则所有的输出都使用相同的损失函数。
不管是哪种,模型最小化的损失值将是所有单个损失的总和。
if isinstance(loss, dict):
loss_functions = []
for name in self.output_names:
loss_functions.append(losses.get(loss.get(name)))
elif isinstance(loss, list):
loss_functions = [losses.get(l) for l in loss]
else:
loss_function = losses.get(loss)
loss_functions = [loss_function for _ in range(len(self.outputs))]
self.loss_functions = loss_functions
weighted_losses = [
weighted_masked_objective(fn) for fn in loss_functions]
skip_target_indices = []
skip_target_weighing_indices = []
self._feed_outputs = []
self._feed_output_names = []
self._feed_output_shapes = []
self._feed_loss_fns = []
for i in range(len(weighted_losses)):
if weighted_losses[i] is None:
skip_target_indices.append(i)
skip_target_weighing_indices.append(i)处理损失权重loss_weights,它是用以衡量损失函数对不同的模型输出的贡献。 模型将最小化的误差值是由 loss_weights 对每个输出上的损失进行加权的加权总和误差。if loss_weights is None:
loss_weights_list = [1. for _ in range(len(self.outputs))]
elif isinstance(loss_weights, dict):
loss_weights_list = []
for name in self.output_names:
loss_weights_list.append(loss_weights.get(name, 1.))
elif isinstance(loss_weights, list):
loss_weights_list = loss_weights
else:
raise TypeError('Could not interpret loss_weights argument: ')处理target_tensors,创建模型的目标(targets of model)。如果传入的参数target_tensors不为None,即下面的code,说明要使用外部指定的目标张量,它可以是单个张量(单输出模型),张量列表,或一个映射输出名称到目标张量的字典。
self.targets = []
self._feed_targets = []
if target_tensors is not None:
if isinstance(target_tensors, list):
elif isinstance(target_tensors, dict):
tmp_target_tensors = []
for name in self.output_names:
tmp_target_tensors.append(target_tensors.get(name, None))
target_tensors = tmp_target_tensors
elif K.is_tensor(target_tensors):
target_tensors = [target_tensors]
else:
raise TypeError('Expected `target_tensors` to be a tensor')如果target_tensors为None(默认情况),更或者是其中的某个为None,Keras 将为模型的目标创建一个占位符,在训练过程中将使用目标数据。for i in range(len(self.outputs)):
if i in skip_target_indices:
self.targets.append(None)
else:
shape = K.int_shape(self.outputs[i])
name = self.output_names[i]
if target_tensors is not None:
target = target_tensors[i]
else:
target = None
if target is None or K.is_placeholder(target):
if target is None:
target = K.placeholder(
ndim=len(shape),
name=name + '_target',
sparse=K.is_sparse(self.outputs[i]),
dtype=K.dtype(self.outputs[i]))
self._feed_targets.append(target)
self._feed_outputs.append(self.outputs[i])
self._feed_output_names.append(name)
self._feed_output_shapes.append(shape)
self._feed_loss_fns.append(self.loss_functions[i])
else:
skip_target_weighing_indices.append(i)
self.targets.append(target)处理样本权重模式sample_weight_mode,有两种情况: (1) temporal: 即表示要执行按时间步采样权重(2D权重);
(2) None,这是默认,为采样权重(1D)。
如果模型有多个输出,则可以传递一个 mode 字典或列表,以指示在每个输出上使用指定的sample_weight_mode。
sample_weights = []
sample_weight_modes = []
if isinstance(sample_weight_mode, dict):
for i, name in enumerate(self.output_names):
if i in skip_target_weighing_indices:
weight = None
sample_weight_modes.append(None)
else:
if sample_weight_mode.get(name) == 'temporal':
weight = K.placeholder(ndim=2,
name=name + '_sample_weights')
sample_weight_modes.append('temporal')
else:
weight = K.placeholder(ndim=1,
name=name + '_sample_weights')
sample_weight_modes.append(None)
sample_weights.append(weight)
elif isinstance(sample_weight_mode, list):
for i in range(len(self.output_names)):
if i in skip_target_weighing_indices:
weight = None
sample_weight_modes.append(None)
else:
mode = sample_weight_mode[i]
name = self.output_names[i]
if mode == 'temporal':
weight = K.placeholder(ndim=2,
name=name + '_sample_weights')
sample_weight_modes.append('temporal')
else:
weight = K.placeholder(ndim=1,
name=name + '_sample_weights')
sample_weight_modes.append(None)
sample_weights.append(weight)
else:
for i, name in enumerate(self.output_names):
if i in skip_target_weighing_indices:
sample_weight_modes.append(None)
sample_weights.append(None)
else:
if sample_weight_mode == 'temporal':
sample_weights.append(
K.placeholder(ndim=2,
name=name + '_sample_weights'))
sample_weight_modes.append('temporal')
else:
sample_weights.append(
K.placeholder(ndim=1,
name=name + '_sample_weights'))
sample_weight_modes.append(None)
self.sample_weight_modes = sample_weight_modes
self._feed_sample_weight_modes = []
for i in range(len(self.outputs)):
if i not in skip_target_weighing_indices:
self._feed_sample_weight_modes.append(
self.sample_weight_modes[i])
self.metrics_names = ['loss']
self.metrics_tensors = []计算总损失(total loss): dot(output_loss, loss_weight) + self.lossestotal_loss = None
with K.name_scope('loss'):
for i in range(len(self.outputs)):
if i in skip_target_indices:
continue
y_true = self.targets[i]
y_pred = self.outputs[i]
weighted_loss = weighted_losses[i]
sample_weight = sample_weights[i]
mask = masks[i]
loss_weight = loss_weights_list[i]
with K.name_scope(self.output_names[i] + '_loss'):
output_loss = weighted_loss(y_true, y_pred,
sample_weight, mask)
if len(self.outputs) > 1:
self.metrics_tensors.append(output_loss)
self.metrics_names.append(self.output_names[i] + '_loss')
if total_loss is None:
total_loss = loss_weight * output_loss
else:
total_loss += loss_weight * output_loss
if total_loss is None:
if not self.losses:
raise ValueError('The model cannot be compiled '
'because it has no loss to optimize.')
else:
total_loss = 0.
for loss_tensor in self.losses:
total_loss += loss_tensor处理metrics,metrics指定了训练和测试期间的模型评估指标。可以为多输出模型的不同输出指定不同的评估指标,它可以是一个dict字典或list列表,如 metrics = {'output_a':'accuracy'}。通常指标名称可以用全名,如:accuracy,crossentropy等,也可能简写,如:acc,ce等。nested_metrics = collect_metrics(metrics, self.output_names)
nested_weighted_metrics = collect_metrics(weighted_metrics,
self.output_names)
self.metrics_updates = []
self.stateful_metric_names = []
self.stateful_metric_functions = []
def handle_metrics(metrics, weights=None):
metric_name_prefix = 'weighted_' if weights is not None else ''
for metric in metrics:
if metric in ('accuracy', 'acc', 'crossentropy', 'ce'):
output_shape = K.int_shape(self.outputs[i])
if (output_shape[-1] == 1 or
self.loss_functions[i] == losses.binary_crossentropy):
if metric in ('accuracy', 'acc'):
metric_fn = metrics_module.binary_accuracy
elif metric in ('crossentropy', 'ce'):
metric_fn = metrics_module.binary_crossentropy
elif (self.loss_functions[i] ==
losses.sparse_categorical_crossentropy):
if metric in ('accuracy', 'acc'):
metric_fn = metrics_module.sparse_categorical_accuracy
elif metric in ('crossentropy', 'ce'):
metric_fn = (
metrics_module.sparse_categorical_crossentropy)
else:
if metric in ('accuracy', 'acc'):
metric_fn = metrics_module.categorical_accuracy
elif metric in ('crossentropy', 'ce'):
metric_fn = metrics_module.categorical_crossentropy
if metric in ('accuracy', 'acc'):
suffix = 'acc'
elif metric in ('crossentropy', 'ce'):
suffix = 'ce'
weighted_metric_fn = weighted_masked_objective(metric_fn)
metric_name = metric_name_prefix + suffix
else:
metric_fn = metrics_module.get(metric)
weighted_metric_fn = weighted_masked_objective(metric_fn)
if hasattr(metric_fn, 'name'):
metric_name = metric_fn.name
else:
metric_name = metric_fn.__name__
metric_name = metric_name_prefix + metric_name
with K.name_scope(metric_name):
metric_result = weighted_metric_fn(y_true, y_pred,
weights=weights,
mask=masks[i])
if len(self.output_names) > 1:
metric_name = self.output_names[i] + '_' + metric_name
j = 1
base_metric_name = metric_name
while metric_name in self.metrics_names:
metric_name = base_metric_name + '_' + str(j)
j += 1
self.metrics_names.append(metric_name)
self.metrics_tensors.append(metric_result)
if isinstance(metric_fn, Layer) and metric_fn.stateful:
self.stateful_metric_names.append(metric_name)
self.stateful_metric_functions.append(metric_fn)
self.metrics_updates += metric_fn.updates
with K.name_scope('metrics'):
for i in range(len(self.outputs)):
if i in skip_target_indices:
continue
y_true = self.targets[i]
y_pred = self.outputs[i]
weights = sample_weights[i]
output_metrics = nested_metrics[i]
output_weighted_metrics = nested_weighted_metrics[i]
handle_metrics(output_metrics)
handle_metrics(output_weighted_metrics, weights=weights)
为梯度和状态更新做准备
self.total_loss = total_loss
self.sample_weights = sample_weights
self._feed_sample_weights = []
for i in range(len(self.sample_weights)):
if i not in skip_target_weighing_indices:
self._feed_sample_weights.append(sample_weights[i])
为了节省时间,对于训练函数、测试函数和预测函数设置的惰性编译
self._function_kwargs = kwargs
self.train_function = None
self.test_function = None
self.predict_function = None
trainable_weights = self.trainable_weights
self._collected_trainable_weights = trainable_weights二、fit:模型训练。在所有的fit参数中,x为训练数据,y为标签数据,validation_split指定有多少比例的训练数据用作验证数据,validation_data为验证数据集,epochs为训练轮次,batch_size为批大小。
def fit(self,
x=None,
y=None,
batch_size=None,
epochs=1,
verbose=1,
callbacks=None,
validation_split=0.,
validation_data=None,
shuffle=True,
class_weight=None,
sample_weight=None,
initial_epoch=0,
steps_per_epoch=None,
validation_steps=None,
**kwargs):
对用户输入的数据进行校验,并转换成适合模型处理的标准数据格式
x, y, sample_weights = self._standardize_user_data(
x, y,
sample_weight=sample_weight,
class_weight=class_weight,
batch_size=batch_size)处理验证数据:有两种情况:
(1)是否需要验证:通过置do_validation决定,缺省是False,即不需要;但如果传入了参数validation_data或者validation_split或者validation_steps,则do_validation=True,意味着需要验证;
(2)验证数据产生,与下面if分支相对应:
a)由参数validation_data直接传入;否则
b)由validation_split指定一个划分比例,从训练数据中分出一部分作为验证数据;否则
c)当指定了validation_steps,一般与steps_per_epoch结合使用,这里validation_data则为测试数据和验证数据的生成器,本参数指定验证数据生成器的返回次数。
验证函数的输入是这种形式的元组:(val_x, val_y, val_sample_weights)或者(val_x, val_y, val_sample_weights, lr),其中,val_x: 验证数据, val_y: 验证数据标签, val_sample_weights: 样本权重, lr: 学习速率。
do_validation = False
if validation_data:
do_validation = True
if len(validation_data) == 2:
val_x, val_y = validation_data
val_sample_weight = None
elif len(validation_data) == 3:
val_x, val_y, val_sample_weight = validation_data
else:
raise ValueError('When passing validation_data, '
'it must contain 2 (x_val, y_val) '
'or 3 (x_val, y_val, val_sample_weights) '
'items, however it contains %d items' %
len(validation_data))
val_x, val_y, val_sample_weights = self._standardize_user_data(
val_x, val_y,
sample_weight=val_sample_weight,
batch_size=batch_size)
if self._uses_dynamic_learning_phase():
val_inputs = val_x + val_y + val_sample_weights + [0.]
else:
val_inputs = val_x + val_y + val_sample_weights
elif validation_split and 0. < validation_split < 1.:
if any(K.is_tensor(t) for t in x):
raise ValueError(
'If your data is in the form of symbolic tensors, '
'you cannot use `validation_split`.')
do_validation = True
if hasattr(x[0], 'shape'):
split_at = int(int(x[0].shape[0]) * (1. - validation_split))
else:
split_at = int(len(x[0]) * (1. - validation_split))
x, val_x = (slice_arrays(x, 0, split_at),
slice_arrays(x, split_at))
y, val_y = (slice_arrays(y, 0, split_at),
slice_arrays(y, split_at))
sample_weights, val_sample_weights = (
slice_arrays(sample_weights, 0, split_at),
slice_arrays(sample_weights, split_at))
if self._uses_dynamic_learning_phase():
val_inputs = val_x + val_y + val_sample_weights + [0.]
else:
val_inputs = val_x + val_y + val_sample_weights
elif validation_steps:
do_validation = True
if self._uses_dynamic_learning_phase():
val_inputs = [0.]
为训练准备输入数组和训练函数。训练函数的输入是这种形式的元组:(x, y, sample_weights) 或者 (x, y, sample_weights, lr),其中,x: 训练数据, y: 标签, sample_weights: 样本权重, lr: 学习速率。
if self._uses_dynamic_learning_phase():
fit_inputs = x + y + sample_weights + [1.]
else:
fit_inputs = x + y + sample_weights
self._make_train_function()
fit_function = self.train_function
out_labels = self.metrics_names
准备输验证函数:
if do_validation:
self._make_test_function()
val_function = self.test_function
callback_metrics = copy.copy(out_labels) + [
'val_' + n for n in out_labels]
else:
callback_metrics = copy.copy(out_labels)
val_function = None
val_inputs = []
由training_arrays.fit_loop实现循环训练逻辑:
return training_arrays.fit_loop(self, fit_function, fit_inputs,
out_labels=out_labels,
batch_size=batch_size,
epochs=epochs,
verbose=verbose,
callbacks=callbacks,
val_function=val_function,
val_inputs=val_inputs,
shuffle=shuffle,
callback_metrics=callback_metrics,
initial_epoch=initial_epoch,
steps_per_epoch=steps_per_epoch,
validation_steps=validation_steps)三、evaluate: 模型评估。在测试模式下对模型进行评估,按batch计算模型的误差损失值和其它可能的评估指标量。其代码逻辑与fit类似。
def evaluate(self, x=None, y=None,
batch_size=None,
verbose=1,
sample_weight=None,
steps=None):
对用户输入的数据进行校验,并转换成适合模型处理的标准数据格式
x, y, sample_weights = self._standardize_user_data(
x, y,
sample_weight=sample_weight,
batch_size=batch_size)
为评估准备输入数组和测试函数
if self._uses_dynamic_learning_phase():
ins = x + y + sample_weights + [0.]
else:
ins = x + y + sample_weights
self._make_test_function()
f = self.test_function
由training_arrays.test_loop实现循环评估逻辑:
return training_arrays.test_loop(self, f, ins,
batch_size=batch_size,
verbose=verbose,
steps=steps)四、predict:预测。对输入的数据x进行预测,输出为对应的预测值(numpy array)
def predict(self, x,
batch_size=None,
verbose=0,
steps=None):
对用户输入的数据进行校验,并转换成适合模型处理的标准数据格式
x, _, _ = self._standardize_user_data(x)
if self.stateful:
if x[0].shape[0] > batch_size and x[0].shape[0] % batch_size != 0:
raise ValueError('In a stateful network, '
'you should only pass inputs with '
'a number of samples that can be '
'divided by the batch size. Found: ' +
str(x[0].shape[0]) + ' samples. '
'Batch size: ' + str(batch_size) + '.')
为预测准备输入数组和预测函数
if self._uses_dynamic_learning_phase():
ins = x + [0.]
else:
ins = x
self._make_predict_function()
由training_arrays.predict_loop实现预测逻辑:
f = self.predict_function
return training_arrays.predict_loop(self, f, ins,
batch_size=batch_size,
verbose=verbose,
steps=steps)2019年1月4日星期五
Keras源码分析(9):Network
文件:/keras/engine/network.py
在前面我们讨论Node的作用时说过,Node对象把层和层,输入和输出联结了起来,最终把一个多层网络连成了一个有向无环图(DAG),而Network对象正是实现这样的过程。
class Network(Layer):
首先,要说明的是,它为什么要继承自Layer,应该有这样几点:
(1)因为它要生成一个有向无环图(DAG),有入口也有出口,有输入也有输出,这一点跟Layer是一致的;
(2)它生成有向无环图(DAG)的目的是为了计算,为了触发DAG进行计算,就需要提供一个函数给外部调用,这个函数就是call, 这一点跟Layer也是一致的;
(3)模型作为一个Layer可嵌套使用。
由上可见,DAG图的构建实际上是由调用_init_graph_network完成的。下面我们略去了Network作为Layer的有关代码后,它的主要逻辑也看得很清楚了。它进一步调用位于同一文件中的外部_map_graph_network,从而获得了关键的几个数据结构nodes, nodes_by_depth, layers, layers_by_depth,并把它们保存到对象的相应的成员变量中。
到此build_map结束,开始准备调用build_map函数。首先,初始化finished_nodes和nodes_in_progress,然后,从outputs循环开始调用build_map函数,按反向递归构建DAG图。
(1)network_nodes:所有nodes集合;
(2)layer_indices:layer->index字典;
(3)nodes_in_decreasing_depth:按遍历顺序保存的nodes列表。
其中结果(1)将被_map_graph_network返回,我们仍需用(2)和(3)来构建节点的深度、按深度递减的层列表和层的深度,即:nodes_by_depth, layers, layers_by_depth。
Network继承自Layer,所以计算也是由Network对象的call方法完成。为了减少重复计算的开销,Network对象对同一inputs和masks的计算结果进行了缓存(self._output_tensor_cache),如果已计算过了,则直接从缓存中取出;如果没有,则调用内部方法self.run_internal_graph进行计算。
在前面我们讨论Node的作用时说过,Node对象把层和层,输入和输出联结了起来,最终把一个多层网络连成了一个有向无环图(DAG),而Network对象正是实现这样的过程。
class Network(Layer):
首先,要说明的是,它为什么要继承自Layer,应该有这样几点:
(1)因为它要生成一个有向无环图(DAG),有入口也有出口,有输入也有输出,这一点跟Layer是一致的;
(2)它生成有向无环图(DAG)的目的是为了计算,为了触发DAG进行计算,就需要提供一个函数给外部调用,这个函数就是call, 这一点跟Layer也是一致的;
(3)模型作为一个Layer可嵌套使用。
DAG图的构建是在Network对象实例化时完成的。
def __init__(self, *args, **kwargs):
if (len(args) == 2 or
len(args) == 1 and 'outputs' in kwargs or
'inputs' in kwargs and 'outputs' in kwargs):
self._init_graph_network(*args, **kwargs)
else:
self._init_subclassed_network(**kwargs)由上可见,DAG图的构建实际上是由调用_init_graph_network完成的。下面我们略去了Network作为Layer的有关代码后,它的主要逻辑也看得很清楚了。它进一步调用位于同一文件中的外部_map_graph_network,从而获得了关键的几个数据结构nodes, nodes_by_depth, layers, layers_by_depth,并把它们保存到对象的相应的成员变量中。
def _init_graph_network(self, inputs, outputs, name=None):
......
self.inputs = to_list(inputs, allow_tuple=True)
self.outputs = to_list(outputs, allow_tuple=True)
......
nodes, nodes_by_depth, layers, layers_by_depth = _map_graph_network(
self.inputs, self.outputs)
self._network_nodes = nodes
self._nodes_by_depth = nodes_by_depth
self._layers = layers
self._layers_by_depth = layers_by_depth
......
现在,我们来看看_map_graph_network的代码:
def _map_graph_network(inputs, outputs):
# 初始化几个数据结构用来保存与所要创建的图相关的nodes和layers信息
network_nodes = set() # ids of all nodes relevant to the Network
nodes_depths = {} # dict {node: depth value}
layers_depths = {} # dict {layer: depth value}
layer_indices = {} # dict {layer: index in traversal}
nodes_in_decreasing_depth = []
下面build_map内部数据是一个递归函数,它是要从传入的tensor开始反向递归构建DAG图。
在递归构建过程中,那些相关的node将呈现下列3种状态中的一种:
(1)已完成(finished_nodes)
(2)进行中(nodes_in_progress)
(3)将要处理(layer._inbound_nodes[node_index])
所有这些都作为参数传给了build_map函数。
def build_map(tensor,
finished_nodes,
nodes_in_progress,
layer,
node_index,
tensor_index):
# 获得将要处理的node
node = layer._inbound_nodes[node_index]
# 检查nodes_in_progress以避免环
if node in nodes_in_progress:
raise ValueError('The tensor ' + str(tensor) + ' at layer "' +
layer.name + '" is part of a cycle.')
# 防止重复劳动
if node in finished_nodes:
return
# 更新外层network_nodes集合
node_key = _make_node_key(layer.name, node_index)
network_nodes.add(node_key)
# 更新外层layer_indices字典
if layer not in layer_indices:
layer_indices[layer] = len(layer_indices)
# 开始处理该node
nodes_in_progress.add(node)
# 深度优先搜索那些连接到当前node的输入方向上的tensors
for i in range(len(node.inbound_layers)):
x = node.input_tensors[i]
layer = node.inbound_layers[i]
node_index = node.node_indices[i]
tensor_index = node.tensor_indices[i]
build_map(x, finished_nodes, nodes_in_progress, layer,
node_index, tensor_index)
# 处理完毕,将当前node加到finished_nodes中,
# 并从nodes_in_progress中移出
finished_nodes.add(node)
nodes_in_progress.remove(node)
# 这是最为关键一步,它将所有nodes的遍历深度保存到
# 队列nodes_in_decreasing_depth中,此队列将是后
# 续正向计算和误差反向传播执行的依据
nodes_in_decreasing_depth.append(node)到此build_map结束,开始准备调用build_map函数。首先,初始化finished_nodes和nodes_in_progress,然后,从outputs循环开始调用build_map函数,按反向递归构建DAG图。
finished_nodes = set()
nodes_in_progress = set()
for x in outputs:
layer, node_index, tensor_index = x._keras_history
build_map(x, finished_nodes, nodes_in_progress,
layer=layer,
node_index=node_index,
tensor_index=tensor_index)上述调用build_map函数产生了3个结果:(1)network_nodes:所有nodes集合;
(2)layer_indices:layer->index字典;
(3)nodes_in_decreasing_depth:按遍历顺序保存的nodes列表。
其中结果(1)将被_map_graph_network返回,我们仍需用(2)和(3)来构建节点的深度、按深度递减的层列表和层的深度,即:nodes_by_depth, layers, layers_by_depth。
# 根据nodes_in_decreasing_depth计算
# 节点到深度的映射nodes_depths:node->depth,
# 以及层到深度的映射layers_depths:layer->depth
for node in reversed(nodes_in_decreasing_depth):
depth = nodes_depths.setdefault(node, 0)
previous_depth = layers_depths.get(node.outbound_layer, 0)
depth = max(depth, previous_depth)
layers_depths[node.outbound_layer] = depth
nodes_depths[node] = depth
for i in range(len(node.inbound_layers)):
inbound_layer = node.inbound_layers[i]
node_index = node.node_indices[i]
inbound_node = inbound_layer._inbound_nodes[node_index]
previous_depth = nodes_depths.get(inbound_node, 0)
nodes_depths[inbound_node] = max(depth + 1, previous_depth)
# 按深度对节点进行分组,即:depth->nodes,从而得到nodes_by_depth
nodes_by_depth = {}
for node, depth in nodes_depths.items():
if depth not in nodes_by_depth:
nodes_by_depth[depth] = []
nodes_by_depth[depth].append(node)
# 按深度对层进行分组,即:depth->layers,从而得到layers_by_depth
layers_by_depth = {}
for layer, depth in layers_depths.items():
if depth not in layers_by_depth:
layers_by_depth[depth] = []
layers_by_depth[depth].append(layer)
# 按深度下降的顺序组织排列所有层
depth_keys = list(layers_by_depth.keys())
depth_keys.sort(reverse=True)
layers = []
for depth in depth_keys:
layers_for_depth = layers_by_depth[depth]
# 如果深度相同,则按遍历的顺序
layers_for_depth.sort(key=lambda x: layer_indices[x])
layers.extend(layers_for_depth)
......
返回节点的集合、节点的深度、按深度递减的层列表和层的深度
return network_nodes, nodes_by_depth, layers, layers_by_depthNetwork继承自Layer,所以计算也是由Network对象的call方法完成。为了减少重复计算的开销,Network对象对同一inputs和masks的计算结果进行了缓存(self._output_tensor_cache),如果已计算过了,则直接从缓存中取出;如果没有,则调用内部方法self.run_internal_graph进行计算。
def call(self, inputs, mask=None):
inputs = to_list(inputs)
if mask is None:
masks = [None for _ in range(len(inputs))]
else:
masks = to_list(mask)
cache_key = object_list_uid(inputs)
cache_key += '_' + object_list_uid(masks)
if cache_key in self._output_tensor_cache:
return self._output_tensor_cache[cache_key]
else:
output_tensors, _, _ = self.run_internal_graph(inputs, masks)
return output_tensors
# 根据inputs计算network的输出tensors。
def run_internal_graph(self, inputs, masks=None):
if masks is None:
masks = [None for _ in range(len(inputs))]
# tensor_map中存放所有已计算过的tensors和masks
# 用传入的inputs和masks初始化tensor_map
tensor_map = {}
for x, y, mask in zip(self.inputs, inputs, masks):
tensor_map[str(id(x))] = (y, mask)
# 依深度递减遍历
depth_keys = list(self._nodes_by_depth.keys())
depth_keys.sort(reverse=True)
for depth in depth_keys:
nodes = self._nodes_by_depth[depth]
for node in nodes:
layer = node.outbound_layer
reference_input_tensors = node.input_tensors
reference_output_tensors = node.output_tensors
# 检查reference_input_tensors中的所有tensors是否都在tensor_map中
# 即是否都已计算过了,把其中已计算过的放进computed_data
computed_data = [] # List of tuples (input, mask).
for x in reference_input_tensors:
if str(id(x)) in tensor_map:
computed_data.append(tensor_map[str(id(x))])
# 检查的方法是比较computed_data和reference_input_tensors中元素的个数是否相等
if len(computed_data) == len(reference_input_tensors):
# 相等则表示当前layer的所有input_tensors都已计算过了,所以可以调用layer.call方法了
with K.name_scope(layer.name):
if node.arguments:
kwargs = node.arguments
else:
kwargs = {}
if len(computed_data) == 1:
computed_tensor, computed_mask = computed_data[0]
if has_arg(layer.call, 'mask'):
if 'mask' not in kwargs:
kwargs['mask'] = computed_mask
output_tensors = to_list(
layer.call(computed_tensor, **kwargs))
output_masks = layer.compute_mask(computed_tensor,
computed_mask)
if output_masks is None:
output_masks = [None for _ in output_tensors]
else:
output_masks = to_list(output_masks)
computed_tensors = [computed_tensor]
computed_masks = [computed_mask]
else:
computed_tensors = [x[0] for x in computed_data]
computed_masks = [x[1] for x in computed_data]
if has_arg(layer.call, 'mask'):
if 'mask' not in kwargs:
kwargs['mask'] = computed_masks
output_tensors = to_list(
layer.call(computed_tensors, **kwargs))
output_masks = layer.compute_mask(computed_tensors,
computed_masks)
if output_masks is None:
output_masks = [None for _ in output_tensors]
else:
output_masks = to_list(output_masks)
if (hasattr(layer, 'activity_regularizer') and
layer.activity_regularizer is not None):
with K.name_scope('activity_regularizer'):
regularization_losses = [
layer.activity_regularizer(x)
for x in output_tensors]
layer.add_loss(regularization_losses,
inputs=computed_tensors)
......
# 把当前计算过的output_tensors和output_masks加入到tensor_map中
for x, y, mask in zip(reference_output_tensors,
output_tensors,
output_masks):
tensor_map[str(id(x))] = (y, mask)
# 从tensor_map提取output_tensors和output_masks
# 从output_tensors中提到output_shapes
output_tensors = []
output_masks = []
output_shapes = []
for x in self.outputs:
assert str(id(x)) in tensor_map, 'Could not compute output ' + str(x)
tensor, mask = tensor_map[str(id(x))]
if hasattr(tensor, '_keras_shape') and output_shapes is not None:
shape = tensor._keras_shape
output_shapes.append(shape)
else:
output_shapes = None
output_tensors.append(tensor)
output_masks.append(mask)
return output_tensors, output_masks, output_shapes
订阅:
评论 (Atom)