aboutsummaryrefslogtreecommitdiff
path: root/data/csv.py
diff options
context:
space:
mode:
authorÉtienne Simon <esimon@esimon.eu>2015-05-05 21:55:13 -0400
committerÉtienne Simon <esimon@esimon.eu>2015-05-05 22:05:21 -0400
commit1f2ff96e6480a62089fcac35154a956c218ed678 (patch)
treed0bb7a2a6d7ba6ae512a2ce3729b1ccbdc21c822 /data/csv.py
parent54613c1f9cf510ca7a71d6619418f2247515aec6 (diff)
downloadtaxi-1f2ff96e6480a62089fcac35154a956c218ed678.tar.gz
taxi-1f2ff96e6480a62089fcac35154a956c218ed678.zip
Clean data module and generalize use of hdf5.
Diffstat (limited to 'data/csv.py')
-rw-r--r--data/csv.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/data/csv.py b/data/csv.py
new file mode 100644
index 0000000..b6fe5b1
--- /dev/null
+++ b/data/csv.py
@@ -0,0 +1,107 @@
+import ast
+import csv
+import numpy
+
+from fuel.datasets import Dataset
+from fuel.streams import DataStream
+from fuel.iterator import DataIterator
+
+import data
+from data.hdf5 import origin_call_normalize, taxi_id_normalize
+
+
+class TaxiData(Dataset):
+ example_iteration_scheme=None
+
+ class State:
+ __slots__ = ('file', 'index', 'reader')
+
+ def __init__(self, pathes, columns, has_header=False):
+ if not isinstance(pathes, list):
+ pathes=[pathes]
+ assert len(pathes)>0
+ self.columns=columns
+ self.provides_sources = tuple(map(lambda x: x[0], columns))
+ self.pathes=pathes
+ self.has_header=has_header
+ super(TaxiData, self).__init__()
+
+ def open(self):
+ state=self.State()
+ state.file=open(self.pathes[0])
+ state.index=0
+ state.reader=csv.reader(state.file)
+ if self.has_header:
+ state.reader.next()
+ return state
+
+ def close(self, state):
+ state.file.close()
+
+ def reset(self, state):
+ if state.index==0:
+ state.file.seek(0)
+ else:
+ state.index=0
+ state.file.close()
+ state.file=open(self.pathes[0])
+ state.reader=csv.reader(state.file)
+ return state
+
+ def get_data(self, state, request=None):
+ if request is not None:
+ raise ValueError
+ try:
+ line=state.reader.next()
+ except (ValueError, StopIteration):
+ # print state.index
+ state.file.close()
+ state.index+=1
+ if state.index>=len(self.pathes):
+ raise StopIteration
+ state.file=open(self.pathes[state.index])
+ state.reader=csv.reader(state.file)
+ if self.has_header:
+ state.reader.next()
+ return self.get_data(state)
+
+ values = []
+ for _, constructor in self.columns:
+ values.append(constructor(line))
+ return tuple(values)
+
+taxi_columns = [
+ ("trip_id", lambda l: l[0]),
+ ("call_type", lambda l: ord(l[1])-ord('A')),
+ ("origin_call", lambda l: 0 if l[2] == '' or l[2] == 'NA' else origin_call_normalize(int(l[2]))),
+ ("origin_stand", lambda l: 0 if l[3] == '' or l[3] == 'NA' else int(l[3])),
+ ("taxi_id", lambda l: taxi_id_normalize(int(l[4]))),
+ ("timestamp", lambda l: int(l[5])),
+ ("day_type", lambda l: ord(l[6])-ord('A')),
+ ("missing_data", lambda l: l[7][0] == 'T'),
+ ("polyline", lambda l: map(tuple, ast.literal_eval(l[8]))),
+ ("longitude", lambda l: map(lambda p: p[0], ast.literal_eval(l[8]))),
+ ("latitude", lambda l: map(lambda p: p[1], ast.literal_eval(l[8]))),
+]
+
+taxi_columns_valid = taxi_columns + [
+ ("destination_longitude", lambda l: numpy.float32(float(l[9]))),
+ ("destination_latitude", lambda l: numpy.float32(float(l[10]))),
+ ("time", lambda l: int(l[11])),
+]
+
+train_file="%s/train.csv" % data.path
+valid_file="%s/valid2-cut.csv" % data.path
+test_file="%s/test.csv" % data.path
+
+train_data=TaxiData(train_file, taxi_columns, has_header=True)
+valid_data = TaxiData(valid_file, taxi_columns_valid)
+test_data = TaxiData(test_file, taxi_columns, has_header=True)
+
+valid_trips = [l for l in open("%s/valid2-cut-ids.txt" % data.path)]
+
+def train_it():
+ return DataIterator(DataStream(train_data))
+
+def test_it():
+ return DataIterator(DataStream(valid_data))