An error occurred while loading the file. Please try again.
-
Pierre-Antoine Rouby authoredd3288069
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import os.path as op
from typing import List, Tuple, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D
from spatio_temporal_dataset.coordinates.utils import get_index_without_spatio_temporal_index_suffix
from spatio_temporal_dataset.slicer.abstract_slicer import AbstractSlicer, df_sliced
from spatio_temporal_dataset.slicer.spatial_slicer import SpatialSlicer
from spatio_temporal_dataset.slicer.spatio_temporal_slicer import SpatioTemporalSlicer
from spatio_temporal_dataset.slicer.split import s_split_from_df, ind_train_from_s_split, Split
from spatio_temporal_dataset.slicer.temporal_slicer import TemporalSlicer
class AbstractCoordinates(object):
"""
Main attribute of the class is the DataFrame df_all_coordinates
Index are coordinates index
Columns are the value of each coordinates
So far, the train_split_ratio is the same between the spatial part of the data, and the temporal part
"""
# Spatial columns
COORDINATE_X = 'coord_x'
COORDINATE_Y = 'coord_y'
COORDINATE_Z = 'coord_z'
COORDINATE_SPATIAL_NAMES = [COORDINATE_X, COORDINATE_Y, COORDINATE_Z]
SPATIAL_SPLIT = 'spatial_split'
# Temporal columns
COORDINATE_T = 'coord_t'
TEMPORAL_SPLIT = 'temporal_split'
# Coordinates columns
COORDINATES_NAMES = COORDINATE_SPATIAL_NAMES + [COORDINATE_T]
def __init__(self, df: pd.DataFrame, slicer_class: type, s_split_spatial: pd.Series = None,
s_split_temporal: pd.Series = None):
# Extract df_all_coordinates from df
coordinate_columns = [c for c in df.columns if c in self.COORDINATES_NAMES]
assert len(coordinate_columns) > 0
# Sort coordinates according to a specified order
sorted_coordinates_columns = [c for c in self.COORDINATES_NAMES if c in coordinate_columns]
self.df_all_coordinates = df.loc[:, sorted_coordinates_columns].copy() # type: pd.DataFrame
# Check the data type of the coordinate columns
accepted_dtypes = ['float64', 'int64']
assert len(self.df_all_coordinates.select_dtypes(include=accepted_dtypes).columns) == len(coordinate_columns), \
'coordinates columns dtypes should belong to {}'.format(accepted_dtypes)
# Slicing attributes
self.s_split_spatial = s_split_spatial # type: pd.Series
self.s_split_temporal = s_split_temporal # type: pd.Series
self.slicer = None # type: Union[None, AbstractSlicer]
# Load the slicer
if slicer_class is TemporalSlicer:
self.slicer = TemporalSlicer(self.ind_train_temporal)
elif slicer_class is SpatialSlicer:
self.slicer = SpatialSlicer(self.ind_train_spatial)
elif slicer_class is SpatioTemporalSlicer:
self.slicer = SpatioTemporalSlicer(self.ind_train_spatial, self.ind_train_temporal)
else:
raise ValueError("Unknown slicer_class: {}".format(slicer_class))
# ClassMethod constructor
@classmethod
def from_df(cls, df: pd.DataFrame):
# Extract the split if they are specified
s_split_spatial = df[cls.SPATIAL_SPLIT].copy() if cls.SPATIAL_SPLIT in df.columns else None
s_split_temporal = df[cls.TEMPORAL_SPLIT].copy() if cls.TEMPORAL_SPLIT in df.columns else None
# Infer the slicer class
if s_split_temporal is None and s_split_spatial is None:
raise ValueError('Both split are unspecified')
elif s_split_temporal is not None and s_split_spatial is None:
slicer_class = TemporalSlicer
elif s_split_temporal is None and s_split_spatial is not None:
slicer_class = SpatialSlicer
else:
slicer_class = SpatioTemporalSlicer
return cls(df=df, slicer_class=slicer_class, s_split_spatial=s_split_spatial, s_split_temporal=s_split_temporal)
@classmethod
def from_df_and_slicer(cls, df: pd.DataFrame, slicer_class: type, train_split_ratio: float = None):
# All the index should be unique
assert len(set(df.index)) == len(df), 'df indices are not unique'
# Create a spatial split
s_split_spatial = s_split_from_df(df, cls.COORDINATE_X, cls.SPATIAL_SPLIT, train_split_ratio, True)
# Create a temporal split
s_split_temporal = s_split_from_df(df, cls.COORDINATE_T, cls.TEMPORAL_SPLIT, train_split_ratio, False)
return cls(df=df, slicer_class=slicer_class, s_split_spatial=s_split_spatial, s_split_temporal=s_split_temporal)
@classmethod
def from_csv(cls, csv_path: str = None):
assert csv_path is not None
assert op.exists(csv_path)
df = pd.read_csv(csv_path)
# Index correspond to the first column
index_column_name = df.columns[0]
assert index_column_name not in cls.COORDINATE_SPATIAL_NAMES
df.set_index(index_column_name, inplace=True)
return cls.from_df(df)
@property
def index(self) -> pd.Index:
return self.df_all_coordinates.index
@property
def df_merged(self) -> pd.DataFrame:
# Merged DataFrame of df_coord with s_split
return self.df_all_coordinates.join(self.df_split)
# Split
def df_coordinates(self, split: Split = Split.all) -> pd.DataFrame:
return df_sliced(df=self.df_all_coordinates, split=split, slicer=self.slicer)
def coordinates_values(self, split: Split = Split.all) -> np.ndarray:
return self.df_coordinates(split).values
def coordinates_index(self, split: Split = Split.all) -> pd.Index:
return self.df_coordinates(split).index
@property
def ind_train_spatial(self) -> pd.Series:
return ind_train_from_s_split(s_split=self.s_split_spatial)
@property
def ind_train_temporal(self) -> pd.Series:
return ind_train_from_s_split(s_split=self.s_split_temporal)
@property
def df_split(self) -> pd.DataFrame:
split_name_to_s_split = {
self.SPATIAL_SPLIT: self.s_split_spatial,
self.TEMPORAL_SPLIT: self.s_split_temporal,
}
# Delete None s_split from the dictionary
split_name_to_s_split = {k: v for k, v in split_name_to_s_split.items() if v is not None}
# Create df_split from dict
return pd.DataFrame.from_dict(split_name_to_s_split)
@property
def coordinates_names(self) -> List[str]:
return self.coordinates_spatial_names + self.coordinates_temporal_names
@property
def nb_coordinates(self) -> int:
return len(self.coordinates_names)
@property
def coordinates_dims(self) -> List[int]:
return list(range(self.nb_coordinates))
# Spatial attributes
@property
def coordinates_spatial_names(self) -> List[str]:
return [name for name in self.COORDINATE_SPATIAL_NAMES if name in self.df_all_coordinates.columns]
@property
def nb_coordinates_spatial(self) -> int:
return len(self.coordinates_spatial_names)
@property
def has_spatial_coordinates(self) -> bool:
return self.nb_coordinates_spatial > 0
def df_spatial_coordinates(self, split: Split = Split.all) -> pd.DataFrame:
if self.nb_coordinates_spatial == 0:
return pd.DataFrame()
else:
return self.df_coordinates(split).loc[:, self.coordinates_spatial_names].drop_duplicates()
def spatial_index(self, split: Split = Split.all) -> pd.Index:
df_spatial = self.df_spatial_coordinates(split)
if self.has_spatio_temporal_coordinates:
# Remove the spatio temporal index suffix
return get_index_without_spatio_temporal_index_suffix(df_spatial)
else:
return df_spatial.index
# Temporal attributes
@property
def coordinates_temporal_names(self) -> List[str]:
return [self.COORDINATE_T] if self.COORDINATE_T in self.df_all_coordinates else []
@property
def nb_coordinates_temporal(self) -> int:
return len(self.coordinates_temporal_names)
@property
def has_temporal_coordinates(self) -> bool:
return self.nb_coordinates_temporal > 0
def df_temporal_coordinates(self, split: Split = Split.all) -> pd.DataFrame:
if self.nb_coordinates_temporal == 0:
return pd.DataFrame()
else:
return self.df_coordinates(split).loc[:, self.coordinates_temporal_names].drop_duplicates()
@property
def nb_steps(self, split: Split = Split.all):
return len(self.df_temporal_coordinates(split))
def df_temporal_range(self, split: Split = Split.all) -> Tuple[int, int]:
df_temporal_coordinates = self.df_temporal_coordinates(split)
return int(df_temporal_coordinates.min()), int(df_temporal_coordinates.max()),
@property
def idx_temporal_coordinates(self):
return self.coordinates_names.index(self.COORDINATE_T)
# Spatio temporal attributes
@property
def has_spatio_temporal_coordinates(self) -> bool:
return self.has_spatial_coordinates and self.has_temporal_coordinates
def spatio_temporal_shape(self, split: Split.all) -> Tuple[int, int]:
return len(self.df_spatial_coordinates(split)), len(self.df_temporal_coordinates(split))
def ind_of_df_all_coordinates(self, coordinate_name, value):
return self.df_all_coordinates.loc[:, coordinate_name] == value
# Visualization
@property
def x_coordinates(self) -> np.ndarray:
return self.df_all_coordinates[self.COORDINATE_X].values.copy()
@property
def y_coordinates(self) -> np.ndarray:
return self.df_all_coordinates[self.COORDINATE_Y].values.copy()
@property
def z_coordinates(self) -> np.ndarray:
return self.df_all_coordinates[self.COORDINATE_Z].values.copy()
@property
def t_coordinates(self) -> np.ndarray:
return self.df_all_coordinates[self.COORDINATE_T].values.copy()
def visualize(self):
if self.nb_coordinates_spatial == 1:
self.visualization_1D()
elif self.nb_coordinates_spatial == 2:
self.visualization_2D()
else:
self.visualization_3D()
def visualization_1D(self):
assert self.nb_coordinates_spatial >= 1
x = self.x_coordinates
y = np.zeros(len(x))
plt.scatter(x, y)
plt.show()
def visualization_2D(self):
assert self.nb_coordinates_spatial >= 2
plt.scatter(self.x_coordinates, self.y_coordinates)
plt.show()
def visualization_3D(self):
assert self.nb_coordinates_spatial == 3
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d') # type: Axes3D
ax.scatter(self.x_coordinates, self.y_coordinates, self.z_coordinates, marker='^')
plt.show()
# Magic Methods
def __len__(self):
return len(self.df_all_coordinates)
def __mul__(self, other: float):
self.df_all_coordinates *= other
return self
def __rmul__(self, other):
return self * other
def __eq__(self, other):
return self.df_merged.equals(other.df_merged)
def __str__(self):
return self.df_all_coordinates.__str__()