From 1f2ff96e6480a62089fcac35154a956c218ed678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89tienne=20Simon?= Date: Tue, 5 May 2015 21:55:13 -0400 Subject: Clean data module and generalize use of hdf5. --- config/dest_simple_mlp_2_cs.py | 6 +- config/dest_simple_mlp_2_cswdt.py | 6 +- config/dest_simple_mlp_2_noembed.py | 2 + config/dest_simple_mlp_tgtcls_0_cs.py | 8 +- config/dest_simple_mlp_tgtcls_1_cs.py | 8 +- config/dest_simple_mlp_tgtcls_1_cswdt.py | 8 +- config/dest_simple_mlp_tgtcls_1_cswdtx.py | 8 +- convert_data.py | 125 ------------------- data.py | 196 ------------------------------ data/__init__.py | 31 +++++ data/csv.py | 107 ++++++++++++++++ data/csv_to_hdf5.py | 127 +++++++++++++++++++ data/cuts/__init__.py | 0 data/cuts/test_times_0.py | 8 ++ data/hdf5.py | 61 ++++++++++ data/init_valid.py | 61 ++++++++++ data/make_valid_cut.py | 72 +++++++++++ data/transformers.py | 127 +++++++++++++++++++ make_valid.py | 37 ------ make_valid_cut.py | 40 ------ model/dest_simple_mlp.py | 10 +- model/dest_simple_mlp_tgtcls.py | 8 +- train.py | 51 ++++---- transformers.py | 125 ------------------- 24 files changed, 654 insertions(+), 578 deletions(-) delete mode 100755 convert_data.py delete mode 100644 data.py create mode 100644 data/__init__.py create mode 100644 data/csv.py create mode 100755 data/csv_to_hdf5.py create mode 100644 data/cuts/__init__.py create mode 100644 data/cuts/test_times_0.py create mode 100644 data/hdf5.py create mode 100755 data/init_valid.py create mode 100755 data/make_valid_cut.py create mode 100644 data/transformers.py delete mode 100644 make_valid.py delete mode 100644 make_valid_cut.py mode change 100644 => 100755 train.py delete mode 100644 transformers.py diff --git a/config/dest_simple_mlp_2_cs.py b/config/dest_simple_mlp_2_cs.py index 2cec78d..0dd2704 100644 --- a/config/dest_simple_mlp_2_cs.py +++ b/config/dest_simple_mlp_2_cs.py @@ -8,8 +8,8 @@ n_end_pts = 5 n_valid = 1000 dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10) + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10) ] dim_input = n_begin_end_pts * 2 * 2 + sum(x for (_, _, x) in dim_embeddings) @@ -19,3 +19,5 @@ dim_output = 2 learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_2_cswdt.py b/config/dest_simple_mlp_2_cswdt.py index f6ddf34..1011488 100644 --- a/config/dest_simple_mlp_2_cswdt.py +++ b/config/dest_simple_mlp_2_cswdt.py @@ -8,8 +8,8 @@ n_end_pts = 5 n_valid = 1000 dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10), + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10), ('week_of_year', 52, 10), ('day_of_week', 7, 10), ('qhour_of_day', 24 * 4, 10), @@ -23,3 +23,5 @@ dim_output = 2 learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_2_noembed.py b/config/dest_simple_mlp_2_noembed.py index 3832146..3cddcb9 100644 --- a/config/dest_simple_mlp_2_noembed.py +++ b/config/dest_simple_mlp_2_noembed.py @@ -16,3 +16,5 @@ dim_output = 2 learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_tgtcls_0_cs.py b/config/dest_simple_mlp_tgtcls_0_cs.py index a8a5a0e..031cd12 100644 --- a/config/dest_simple_mlp_tgtcls_0_cs.py +++ b/config/dest_simple_mlp_tgtcls_0_cs.py @@ -9,11 +9,11 @@ n_end_pts = 5 n_valid = 1000 -with open(data.DATA_PATH + "/arrival-clusters.pkl") as f: tgtcls = cPickle.load(f) +with open("%s/arrival-clusters.pkl" % data.path) as f: tgtcls = cPickle.load(f) dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10) + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10) ] dim_input = n_begin_end_pts * 2 * 2 + sum(x for (_, _, x) in dim_embeddings) @@ -23,3 +23,5 @@ dim_output = tgtcls.shape[0] learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_tgtcls_1_cs.py b/config/dest_simple_mlp_tgtcls_1_cs.py index 8136f10..48d9fa0 100644 --- a/config/dest_simple_mlp_tgtcls_1_cs.py +++ b/config/dest_simple_mlp_tgtcls_1_cs.py @@ -9,11 +9,11 @@ n_end_pts = 5 n_valid = 1000 -with open(data.DATA_PATH + "/arrival-clusters.pkl") as f: tgtcls = cPickle.load(f) +with open("%s/arrival-clusters.pkl" % data.path) as f: tgtcls = cPickle.load(f) dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10) + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10) ] dim_input = n_begin_end_pts * 2 * 2 + sum(x for (_, _, x) in dim_embeddings) @@ -23,3 +23,5 @@ dim_output = tgtcls.shape[0] learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_tgtcls_1_cswdt.py b/config/dest_simple_mlp_tgtcls_1_cswdt.py index af7b2a3..6aa2a03 100644 --- a/config/dest_simple_mlp_tgtcls_1_cswdt.py +++ b/config/dest_simple_mlp_tgtcls_1_cswdt.py @@ -9,11 +9,11 @@ n_end_pts = 5 n_valid = 1000 -with open(data.DATA_PATH + "/arrival-clusters.pkl") as f: tgtcls = cPickle.load(f) +with open("%s/arrival-clusters.pkl" % data.path) as f: tgtcls = cPickle.load(f) dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10), + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10), ('week_of_year', 52, 10), ('day_of_week', 7, 10), ('qhour_of_day', 24 * 4, 10), @@ -27,3 +27,5 @@ dim_output = tgtcls.shape[0] learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/config/dest_simple_mlp_tgtcls_1_cswdtx.py b/config/dest_simple_mlp_tgtcls_1_cswdtx.py index b9832df..7918242 100644 --- a/config/dest_simple_mlp_tgtcls_1_cswdtx.py +++ b/config/dest_simple_mlp_tgtcls_1_cswdtx.py @@ -9,11 +9,11 @@ n_end_pts = 5 n_valid = 1000 -with open(data.DATA_PATH + "/arrival-clusters.pkl") as f: tgtcls = cPickle.load(f) +with open("%s/arrival-clusters.pkl" % data.path) as f: tgtcls = cPickle.load(f) dim_embeddings = [ - ('origin_call', data.n_train_clients+1, 10), - ('origin_stand', data.n_stands+1, 10), + ('origin_call', data.origin_call_train_size, 10), + ('origin_stand', data.stands_size, 10), ('week_of_year', 52, 10), ('day_of_week', 7, 10), ('qhour_of_day', 24 * 4, 10), @@ -28,3 +28,5 @@ dim_output = tgtcls.shape[0] learning_rate = 0.0001 momentum = 0.99 batch_size = 32 + +valid_set = 'cuts/test_times_0' diff --git a/convert_data.py b/convert_data.py deleted file mode 100755 index ca66786..0000000 --- a/convert_data.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python -import os, h5py, csv, sys, numpy, theano, ast -from fuel.converters.base import fill_hdf5_file - -test_size = 320 # `wc -l test.csv` - 1 # Minus 1 to ignore the header -train_size = 1710670 # `wc -l train.csv` - 1 - -stands_size = 63 # `wc -l metaData_taxistandsID_name_GPSlocation.csv` - 1 -taxi_id_size = 448 # `cut -d, -f 5 train.csv test.csv | sort -u | wc -l` - 1 -origin_call_size = 57124 # `cut -d, -f 3 train.csv test.csv | sort -u | wc -l` - 3 # Minus 3 to ignore "NA", "" and the header - -Polyline = h5py.special_dtype(vlen=theano.config.floatX) - -taxi_id_dict = {} -origin_call_dict = {0: 0} - -def get_unique_taxi_id(val): - if val in taxi_id_dict: - return taxi_id_dict[val] - else: - taxi_id_dict[val] = len(taxi_id_dict) - return len(taxi_id_dict) - 1 - -def get_unique_origin_call(val): - if val in origin_call_dict: - return origin_call_dict[val] - else: - origin_call_dict[val] = len(origin_call_dict) - return len(origin_call_dict) - 1 - -def read_stands(input_directory, h5file): - stands_name = numpy.empty(shape=(stands_size+1,), dtype=('a', 24)) - stands_latitude = numpy.empty(shape=(stands_size+1,), dtype=theano.config.floatX) - stands_longitude = numpy.empty(shape=(stands_size+1,), dtype=theano.config.floatX) - stands_name[0] = 'None' - stands_latitude[0] = stands_longitude[0] = 0 - with open(os.path.join(input_directory, 'metaData_taxistandsID_name_GPSlocation.csv'), 'r') as f: - reader = csv.reader(f) - reader.next() # header - for line in reader: - id = int(line[0]) - stands_name[id] = line[1] - stands_latitude[id] = float(line[2]) - stands_longitude[id] = float(line[3]) - return (('stands', 'stands_name', stands_name), - ('stands', 'stands_latitude', stands_latitude), - ('stands', 'stands_longitude', stands_longitude)) - -def read_taxis(input_directory, h5file, dataset): - print >> sys.stderr, 'read %s: begin' % dataset - size=globals()['%s_size'%dataset] - trip_id = numpy.empty(shape=(size,), dtype='S19') - call_type = numpy.empty(shape=(size,), dtype=numpy.uint8) - origin_call = numpy.empty(shape=(size,), dtype=numpy.uint32) - origin_stand = numpy.empty(shape=(size,), dtype=numpy.uint8) - taxi_id = numpy.empty(shape=(size,), dtype=numpy.uint16) - timestamp = numpy.empty(shape=(size,), dtype=numpy.uint32) - day_type = numpy.empty(shape=(size,), dtype=numpy.uint8) - missing_data = numpy.empty(shape=(size,), dtype=numpy.bool) - latitude = numpy.empty(shape=(size,), dtype=Polyline) - longitude = numpy.empty(shape=(size,), dtype=Polyline) - with open(os.path.join(input_directory, '%s.csv'%dataset), 'r') as f: - reader = csv.reader(f) - reader.next() # header - id=0 - for line in reader: - if id%10000==0 and id!=0: - print >> sys.stderr, 'read %s: %d done' % (dataset, id) - trip_id[id] = line[0] - call_type[id] = ord(line[1][0]) - ord('A') - origin_call[id] = 0 if line[2]=='NA' or line[2]=='' else get_unique_origin_call(int(line[2])) - origin_stand[id] = 0 if line[3]=='NA' or line[3]=='' else int(line[3]) - taxi_id[id] = get_unique_taxi_id(int(line[4])) - timestamp[id] = int(line[5]) - day_type[id] = ord(line[6][0]) - ord('A') - missing_data[id] = line[7][0] == 'T' - polyline = ast.literal_eval(line[8]) - latitude[id] = numpy.array([point[1] for point in polyline], dtype=theano.config.floatX) - longitude[id] = numpy.array([point[0] for point in polyline], dtype=theano.config.floatX) - id+=1 - splits = () - print >> sys.stderr, 'read %s: writing' % dataset - for name in ['trip_id', 'call_type', 'origin_call', 'origin_stand', 'taxi_id', 'timestamp', 'day_type', 'missing_data', 'latitude', 'longitude']: - splits += ((dataset, name, locals()[name]),) - print >> sys.stderr, 'read %s: end' % dataset - return splits - -def unique(h5file): - unique_taxi_id = numpy.empty(shape=(taxi_id_size,), dtype=numpy.uint32) - assert len(taxi_id_dict) == taxi_id_size - for k, v in taxi_id_dict.items(): - unique_taxi_id[v] = k - - unique_origin_call = numpy.empty(shape=(origin_call_size+1,), dtype=numpy.uint32) - assert len(origin_call_dict) == origin_call_size+1 - for k, v in origin_call_dict.items(): - unique_origin_call[v] = k - - return (('unique_taxi_id', 'unique_taxi_id', unique_taxi_id), - ('unique_origin_call', 'unique_origin_call', unique_origin_call)) - -def convert(input_directory, save_path): - h5file = h5py.File(save_path, 'w') - split = () - split += read_stands(input_directory, h5file) - split += read_taxis(input_directory, h5file, 'train') - print 'First origin_call not present in training set: ', len(origin_call_dict) - split += read_taxis(input_directory, h5file, 'test') - split += unique(h5file) - - fill_hdf5_file(h5file, split) - - for name in ['stands_name', 'stands_latitude', 'stands_longitude', 'unique_taxi_id', 'unique_origin_call']: - h5file[name].dims[0].label = 'index' - for name in ['trip_id', 'call_type', 'origin_call', 'origin_stand', 'taxi_id', 'timestamp', 'day_type', 'missing_data', 'latitude', 'longitude']: - h5file[name].dims[0].label = 'batch' - - h5file.flush() - h5file.close() - -if __name__ == '__main__': - if len(sys.argv) != 3: - print >> sys.stderr, 'Usage: %s download_dir output_file' % sys.argv[0] - sys.exit(1) - convert(sys.argv[1], sys.argv[2]) diff --git a/data.py b/data.py deleted file mode 100644 index 42ebe1c..0000000 --- a/data.py +++ /dev/null @@ -1,196 +0,0 @@ -import ast, csv -import socket -import fuel -import numpy -import h5py -from enum import Enum -from fuel.datasets import Dataset -from fuel.streams import DataStream -from fuel.iterator import DataIterator -import theano - -if socket.gethostname() == "adeb.laptop": - DATA_PATH = "/Users/adeb/data/taxi" -else: - DATA_PATH="/data/lisatmp3/auvolat/taxikaggle" - -H5DATA_PATH = '/data/lisatmp3/simonet/taxi/data.hdf5' - -porto_center = numpy.array([41.1573, -8.61612], dtype=theano.config.floatX) -data_std = numpy.sqrt(numpy.array([0.00549598, 0.00333233], dtype=theano.config.floatX)) - -n_clients = 57124 -n_train_clients = 57105 -n_stands = 63 - -dataset_size = 1710670 - -# ---- Read client IDs and create reverse dictionnary - -def make_client_ids(): - f = h5py.File(H5DATA_PATH, "r") - l = f['unique_origin_call'] - r = {l[i]: i for i in range(l.shape[0])} - return r - -client_ids = make_client_ids() - -def get_client_id(n): - if n in client_ids and client_ids[n] <= n_train_clients: - return client_ids[n] - else: - return 0 - -# ---- Read taxi IDs and create reverse dictionnary - -def make_taxi_ids(): - f = h5py.File(H5DATA_PATH, "r") - l = f['unique_taxi_id'] - r = {l[i]: i for i in range(l.shape[0])} - return r - -taxi_ids = make_taxi_ids() - -# ---- Enum types - -class CallType(Enum): - CENTRAL = 0 - STAND = 1 - STREET = 2 - - @classmethod - def from_data(cls, val): - if val=='A': - return cls.CENTRAL - elif val=='B': - return cls.STAND - elif val=='C': - return cls.STREET - - @classmethod - def to_data(cls, val): - if val==cls.CENTRAL: - return 'A' - elif val==cls.STAND: - return 'B' - elif val==cls.STREET: - return 'C' - -class DayType(Enum): - NORMAL = 0 - HOLIDAY = 1 - HOLIDAY_EVE = 2 - - @classmethod - def from_data(cls, val): - if val=='A': - return cls.NORMAL - elif val=='B': - return cls.HOLIDAY - elif val=='C': - return cls.HOLIDAY_EVE - - @classmethod - def to_data(cls, val): - if val==cls.NORMAL: - return 'A' - elif val==cls.HOLIDAY: - return 'B' - elif val==cls.HOLIDAY_EVE: - return 'C' - -class TaxiData(Dataset): - example_iteration_scheme=None - - class State: - __slots__ = ('file', 'index', 'reader') - - def __init__(self, pathes, columns, has_header=False): - if not isinstance(pathes, list): - pathes=[pathes] - assert len(pathes)>0 - self.columns=columns - self.provides_sources = tuple(map(lambda x: x[0], columns)) - self.pathes=pathes - self.has_header=has_header - super(TaxiData, self).__init__() - - def open(self): - state=self.State() - state.file=open(self.pathes[0]) - state.index=0 - state.reader=csv.reader(state.file) - if self.has_header: - state.reader.next() - return state - - def close(self, state): - state.file.close() - - def reset(self, state): - if state.index==0: - state.file.seek(0) - else: - state.index=0 - state.file.close() - state.file=open(self.pathes[0]) - state.reader=csv.reader(state.file) - return state - - def get_data(self, state, request=None): - if request is not None: - raise ValueError - try: - line=state.reader.next() - except (ValueError, StopIteration): - # print state.index - state.file.close() - state.index+=1 - if state.index>=len(self.pathes): - raise StopIteration - state.file=open(self.pathes[state.index]) - state.reader=csv.reader(state.file) - if self.has_header: - state.reader.next() - return self.get_data(state) - - values = [] - for _, constructor in self.columns: - values.append(constructor(line)) - return tuple(values) - -taxi_columns = [ - ("trip_id", lambda l: l[0]), - ("call_type", lambda l: CallType.from_data(l[1])), - ("origin_call", lambda l: 0 if l[2] == '' or l[2] == 'NA' else get_client_id(int(l[2]))), - ("origin_stand", lambda l: 0 if l[3] == '' or l[3] == 'NA' else int(l[3])), - ("taxi_id", lambda l: taxi_ids[int(l[4])]), - ("timestamp", lambda l: int(l[5])), - ("day_type", lambda l: ord(l[6])-ord('A')), - ("missing_data", lambda l: l[7][0] == 'T'), - ("polyline", lambda l: map(tuple, ast.literal_eval(l[8]))), - ("longitude", lambda l: map(lambda p: p[0], ast.literal_eval(l[8]))), - ("latitude", lambda l: map(lambda p: p[1], ast.literal_eval(l[8]))), -] - -taxi_columns_valid = taxi_columns + [ - ("destination_longitude", lambda l: numpy.float32(float(l[9]))), - ("destination_latitude", lambda l: numpy.float32(float(l[10]))), - ("time", lambda l: int(l[11])), -] - -valid_files=["%s/valid2-cut.csv" % (DATA_PATH,)] -test_file="%s/test.csv" % (DATA_PATH,) - -valid_data = TaxiData(valid_files, taxi_columns_valid) -test_data = TaxiData(test_file, taxi_columns, has_header=True) - -valid_trips = [l for l in open(DATA_PATH + "/valid2-cut-ids.txt")] - -def train_it(): - return DataIterator(DataStream(train_data)) - -def test_it(): - return DataIterator(DataStream(valid_data)) - - diff --git a/data/__init__.py b/data/__init__.py new file mode 100644 index 0000000..1278e0b --- /dev/null +++ b/data/__init__.py @@ -0,0 +1,31 @@ +import os + +import h5py +import numpy +import theano + + +path = os.environ.get('TAXI_PATH', '/data/lisatmp3/auvolat/taxikaggle') +Polyline = h5py.special_dtype(vlen=theano.config.floatX) + + +# `wc -l test.csv` - 1 # Minus 1 to ignore the header +test_size = 320 + +# `wc -l train.csv` - 1 +train_size = 1710670 + +# `wc -l metaData_taxistandsID_name_GPSlocation.csv` +stands_size = 64 # include 0 ("no origin_stands") + +# `cut -d, -f 5 train.csv test.csv | sort -u | wc -l` - 1 +taxi_id_size = 448 + +# `cut -d, -f 3 train.csv test.csv | sort -u | wc -l` - 2 +origin_call_size = 57125 # include 0 ("no origin_call") + +# As printed by csv_to_hdf5.py +origin_call_train_size = 57106 + +train_gps_mean = numpy.array([41.1573, -8.61612], dtype=theano.config.floatX) +train_gps_std = numpy.sqrt(numpy.array([0.00549598, 0.00333233], dtype=theano.config.floatX)) diff --git a/data/csv.py b/data/csv.py new file mode 100644 index 0000000..b6fe5b1 --- /dev/null +++ b/data/csv.py @@ -0,0 +1,107 @@ +import ast +import csv +import numpy + +from fuel.datasets import Dataset +from fuel.streams import DataStream +from fuel.iterator import DataIterator + +import data +from data.hdf5 import origin_call_normalize, taxi_id_normalize + + +class TaxiData(Dataset): + example_iteration_scheme=None + + class State: + __slots__ = ('file', 'index', 'reader') + + def __init__(self, pathes, columns, has_header=False): + if not isinstance(pathes, list): + pathes=[pathes] + assert len(pathes)>0 + self.columns=columns + self.provides_sources = tuple(map(lambda x: x[0], columns)) + self.pathes=pathes + self.has_header=has_header + super(TaxiData, self).__init__() + + def open(self): + state=self.State() + state.file=open(self.pathes[0]) + state.index=0 + state.reader=csv.reader(state.file) + if self.has_header: + state.reader.next() + return state + + def close(self, state): + state.file.close() + + def reset(self, state): + if state.index==0: + state.file.seek(0) + else: + state.index=0 + state.file.close() + state.file=open(self.pathes[0]) + state.reader=csv.reader(state.file) + return state + + def get_data(self, state, request=None): + if request is not None: + raise ValueError + try: + line=state.reader.next() + except (ValueError, StopIteration): + # print state.index + state.file.close() + state.index+=1 + if state.index>=len(self.pathes): + raise StopIteration + state.file=open(self.pathes[state.index]) + state.reader=csv.reader(state.file) + if self.has_header: + state.reader.next() + return self.get_data(state) + + values = [] + for _, constructor in self.columns: + values.append(constructor(line)) + return tuple(values) + +taxi_columns = [ + ("trip_id", lambda l: l[0]), + ("call_type", lambda l: ord(l[1])-ord('A')), + ("origin_call", lambda l: 0 if l[2] == '' or l[2] == 'NA' else origin_call_normalize(int(l[2]))), + ("origin_stand", lambda l: 0 if l[3] == '' or l[3] == 'NA' else int(l[3])), + ("taxi_id", lambda l: taxi_id_normalize(int(l[4]))), + ("timestamp", lambda l: int(l[5])), + ("day_type", lambda l: ord(l[6])-ord('A')), + ("missing_data", lambda l: l[7][0] == 'T'), + ("polyline", lambda l: map(tuple, ast.literal_eval(l[8]))), + ("longitude", lambda l: map(lambda p: p[0], ast.literal_eval(l[8]))), + ("latitude", lambda l: map(lambda p: p[1], ast.literal_eval(l[8]))), +] + +taxi_columns_valid = taxi_columns + [ + ("destination_longitude", lambda l: numpy.float32(float(l[9]))), + ("destination_latitude", lambda l: numpy.float32(float(l[10]))), + ("time", lambda l: int(l[11])), +] + +train_file="%s/train.csv" % data.path +valid_file="%s/valid2-cut.csv" % data.path +test_file="%s/test.csv" % data.path + +train_data=TaxiData(train_file, taxi_columns, has_header=True) +valid_data = TaxiData(valid_file, taxi_columns_valid) +test_data = TaxiData(test_file, taxi_columns, has_header=True) + +valid_trips = [l for l in open("%s/valid2-cut-ids.txt" % data.path)] + +def train_it(): + return DataIterator(DataStream(train_data)) + +def test_it(): + return DataIterator(DataStream(valid_data)) diff --git a/data/csv_to_hdf5.py b/data/csv_to_hdf5.py new file mode 100755 index 0000000..17217f3 --- /dev/null +++ b/data/csv_to_hdf5.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +import ast +import csv +import os +import sys + +import h5py +import numpy +import theano +from fuel.converters.base import fill_hdf5_file + +import data + + +taxi_id_dict = {} +origin_call_dict = {0: 0} + +def get_unique_taxi_id(val): + if val in taxi_id_dict: + return taxi_id_dict[val] + else: + taxi_id_dict[val] = len(taxi_id_dict) + return len(taxi_id_dict) - 1 + +def get_unique_origin_call(val): + if val in origin_call_dict: + return origin_call_dict[val] + else: + origin_call_dict[val] = len(origin_call_dict) + return len(origin_call_dict) - 1 + +def read_stands(input_directory, h5file): + stands_name = numpy.empty(shape=(data.stands_size,), dtype=('a', 24)) + stands_latitude = numpy.empty(shape=(data.stands_size,), dtype=theano.config.floatX) + stands_longitude = numpy.empty(shape=(data.stands_size,), dtype=theano.config.floatX) + stands_name[0] = 'None' + stands_latitude[0] = stands_longitude[0] = 0 + with open(os.path.join(input_directory, 'metaData_taxistandsID_name_GPSlocation.csv'), 'r') as f: + reader = csv.reader(f) + reader.next() # header + for line in reader: + id = int(line[0]) + stands_name[id] = line[1] + stands_latitude[id] = float(line[2]) + stands_longitude[id] = float(line[3]) + return (('stands', 'stands_name', stands_name), + ('stands', 'stands_latitude', stands_latitude), + ('stands', 'stands_longitude', stands_longitude)) + +def read_taxis(input_directory, h5file, dataset): + print >> sys.stderr, 'read %s: begin' % dataset + size=getattr(data, '%s_size'%dataset) + trip_id = numpy.empty(shape=(size,), dtype='S19') + call_type = numpy.empty(shape=(size,), dtype=numpy.uint8) + origin_call = numpy.empty(shape=(size,), dtype=numpy.uint32) + origin_stand = numpy.empty(shape=(size,), dtype=numpy.uint8) + taxi_id = numpy.empty(shape=(size,), dtype=numpy.uint16) + timestamp = numpy.empty(shape=(size,), dtype=numpy.uint32) + day_type = numpy.empty(shape=(size,), dtype=numpy.uint8) + missing_data = numpy.empty(shape=(size,), dtype=numpy.bool) + latitude = numpy.empty(shape=(size,), dtype=data.Polyline) + longitude = numpy.empty(shape=(size,), dtype=data.Polyline) + with open(os.path.join(input_directory, '%s.csv'%dataset), 'r') as f: + reader = csv.reader(f) + reader.next() # header + id=0 + for line in reader: + if id%10000==0 and id!=0: + print >> sys.stderr, 'read %s: %d done' % (dataset, id) + trip_id[id] = line[0] + call_type[id] = ord(line[1][0]) - ord('A') + origin_call[id] = 0 if line[2]=='NA' or line[2]=='' else get_unique_origin_call(int(line[2])) + origin_stand[id] = 0 if line[3]=='NA' or line[3]=='' else int(line[3]) + taxi_id[id] = get_unique_taxi_id(int(line[4])) + timestamp[id] = int(line[5]) + day_type[id] = ord(line[6][0]) - ord('A') + missing_data[id] = line[7][0] == 'T' + polyline = ast.literal_eval(line[8]) + latitude[id] = numpy.array([point[1] for point in polyline], dtype=theano.config.floatX) + longitude[id] = numpy.array([point[0] for point in polyline], dtype=theano.config.floatX) + id+=1 + splits = () + print >> sys.stderr, 'read %s: writing' % dataset + for name in ['trip_id', 'call_type', 'origin_call', 'origin_stand', 'taxi_id', 'timestamp', 'day_type', 'missing_data', 'latitude', 'longitude']: + splits += ((dataset, name, locals()[name]),) + print >> sys.stderr, 'read %s: end' % dataset + return splits + +def unique(h5file): + unique_taxi_id = numpy.empty(shape=(data.taxi_id_size,), dtype=numpy.uint32) + assert len(taxi_id_dict) == data.taxi_id_size + for k, v in taxi_id_dict.items(): + unique_taxi_id[v] = k + + unique_origin_call = numpy.empty(shape=(data.origin_call_size,), dtype=numpy.uint32) + assert len(origin_call_dict) == data.origin_call_size + for k, v in origin_call_dict.items(): + unique_origin_call[v] = k + + return (('unique_taxi_id', 'unique_taxi_id', unique_taxi_id), + ('unique_origin_call', 'unique_origin_call', unique_origin_call)) + +def convert(input_directory, save_path): + h5file = h5py.File(save_path, 'w') + split = () + split += read_stands(input_directory, h5file) + split += read_taxis(input_directory, h5file, 'train') + print 'First origin_call not present in training set: ', len(origin_call_dict) + split += read_taxis(input_directory, h5file, 'test') + split += unique(h5file) + + fill_hdf5_file(h5file, split) + + for name in ['stands_name', 'stands_latitude', 'stands_longitude', 'unique_taxi_id', 'unique_origin_call']: + h5file[name].dims[0].label = 'index' + for name in ['trip_id', 'call_type', 'origin_call', 'origin_stand', 'taxi_id', 'timestamp', 'day_type', 'missing_data', 'latitude', 'longitude']: + h5file[name].dims[0].label = 'batch' + + h5file.flush() + h5file.close() + +if __name__ == '__main__': + if len(sys.argv) != 3: + print >> sys.stderr, 'Usage: %s download_dir output_file' % sys.argv[0] + sys.exit(1) + convert(sys.argv[1], sys.argv[2]) diff --git a/data/cuts/__init__.py b/data/cuts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data/cuts/test_times_0.py b/data/cuts/test_times_0.py new file mode 100644 index 0000000..b590072 --- /dev/null +++ b/data/cuts/test_times_0.py @@ -0,0 +1,8 @@ +# Cuts of the test set minus 1 year +cuts = [ + 1376503200, # 2013-08-14 18:00 + 1380616200, # 2013-10-01 08:30 + 1381167900, # 2013-10-07 17:45 + 1383364800, # 2013-11-02 04:00 + 1387722600 # 2013-12-22 14:30 +] diff --git a/data/hdf5.py b/data/hdf5.py new file mode 100644 index 0000000..d848023 --- /dev/null +++ b/data/hdf5.py @@ -0,0 +1,61 @@ +import os + +import h5py +from fuel.datasets import H5PYDataset +from fuel.iterator import DataIterator +from fuel.schemes import SequentialExampleScheme +from fuel.streams import DataStream + +import data + + +class TaxiDataset(H5PYDataset): + def __init__(self, which_set, filename='data.hdf5', **kwargs): + self.filename = filename + kwargs.setdefault('load_in_memory', True) + super(TaxiDataset, self).__init__(self.data_path, which_set, **kwargs) + + @property + def data_path(self): + return os.path.join(data.path, self.filename) + +class TaxiStream(DataStream): + def __init__(self, which_set, filename='data.hdf5', iteration_scheme=None, **kwargs): + dataset = TaxiDataset(which_set, filename, **kwargs) + if iteration_scheme is None: + iteration_scheme = SequentialExampleScheme(dataset.num_examples) + super(TaxiStream, self).__init__(dataset, iteration_scheme=iteration_scheme) + +_origin_calls = None +_reverse_origin_calls = None + +def origin_call_unnormalize(x): + if _origin_calls is None: + _origin_calls = h5py.File(os.path.join(data.path, 'data.hdf5'), 'r')['unique_origin_call'] + return _origin_calls[x] + +def origin_call_normalize(x): + if _reverse_origin_calls is None: + origin_call_unnormalize(0) + _reverse_origin_calls = { _origin_calls[i]: i for i in range(_origin_calls.shape[0]) } + return _reverse_origin_calls[x] + +_taxi_ids = None +_reverse_taxi_ids = None + +def taxi_id_unnormalize(x): + if _taxi_ids is None: + _taxi_ids = h5py.File(os.path.join(data.path, 'data.hdf5'), 'r')['unique_taxi_id'] + return _taxi_ids[x] + +def taxi_id_normalize(x): + if _reverse_taxi_ids is None: + taxi_id_unnormalize(0) + _reverse_taxi_ids = { _taxi_ids[i]: i for i in range(_taxi_ids.shape[0]) } + return _reverse_taxi_ids[x] + +def taxi_it(which_set, filename='data.hdf5', sub=None, as_dict=True): + dataset = TaxiDataset(which_set, filename) + if sub is None: + sub = xrange(dataset.num_examples) + return DataIterator(DataStream(dataset), iter(sub), as_dict) diff --git a/data/init_valid.py b/data/init_valid.py new file mode 100755 index 0000000..14a854c --- /dev/null +++ b/data/init_valid.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# Initialize the valid hdf5 + +import os +import sys + +import h5py +import numpy +import theano + +import data + + +_fields = { + 'trip_id': 'S19', + 'call_type': numpy.uint8, + 'origin_call': numpy.uint32, + 'origin_stand': numpy.uint8, + 'taxi_id': numpy.uint16, + 'timestamp': numpy.uint32, + 'day_type': numpy.uint8, + 'missing_data': numpy.bool, + 'latitude': data.Polyline, + 'longitude': data.Polyline, + 'destination_latitude': theano.config.floatX, + 'destination_longitude': theano.config.floatX, + 'travel_time': numpy.uint32, +} + + +def init_valid(path): + h5file = h5py.File(path, 'w') + + for k, v in _fields.items(): + h5file.create_dataset(k, (0,), dtype=v, maxshape=(None,)) + + split_array = numpy.empty(len(_fields), dtype=numpy.dtype([ + ('split', 'a', 64), + ('source', 'a', 21), + ('start', numpy.int64, 1), + ('stop', numpy.int64, 1), + ('available', numpy.bool, 1), + ('comment', 'a', 1)])) + + split_array[:]['split'] = 'dummy'.encode('utf8') + for (i, k) in enumerate(_fields.keys()): + split_array[i] = k.encode('utf8') + split_array[:]['start'] = 0 + split_array[:]['stop'] = 0 + split_array[:]['available'] = False + split_array[:]['comment'] = '.'.encode('utf8') + h5file.attrs['split'] = split_array + + h5file.flush() + h5file.close() + +if __name__ == '__main__': + if len(sys.argv) > 2: + print >> sys.stderr, 'Usage: %s [file]' % sys.argv[0] + sys.exit(1) + init_valid(sys.argv[1] if len(sys.argv) == 2 else os.path.join(data.path, 'valid.hdf5')) diff --git a/data/make_valid_cut.py b/data/make_valid_cut.py new file mode 100755 index 0000000..d5be083 --- /dev/null +++ b/data/make_valid_cut.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# Make a valid dataset by cutting the training set at specified timestamps + +import os +import sys +import importlib + +import h5py +import numpy + +import data +from data.hdf5 import taxi_it + + +_fields = ['trip_id', 'call_type', 'origin_call', 'origin_stand', 'taxi_id', 'timestamp', 'day_type', 'missing_data', 'latitude', 'longitude', 'destination_latitude', 'destination_longitude', 'travel_time'] + +def make_valid(cutfile, outpath): + cuts = importlib.import_module('.%s' % cutfile, 'data.cuts').cuts + + valid = [] + + for line in taxi_it('train'): + time = line['timestamp'] + latitude = line['latitude'] + longitude = line['longitude'] + + if len(latitude) == 0: + continue + + for ts in cuts: + if time <= ts and time + 15 * (len(latitude) - 1) >= ts: + # keep it + n = (ts - time) / 15 + 1 + line.update({ + 'latitude': latitude[:n], + 'longitude': longitude[:n], + 'destination_latitude': latitude[-1], + 'destination_longitude': longitude[-1], + 'travel_time': 15 * (len(latitude)-1) + }) + valid.append(line) + + file = h5py.File(outpath, 'a') + clen = file['trip_id'].shape[0] + alen = len(valid) + for field in _fields: + dset = file[field] + dset.resize((clen + alen,)) + for i in xrange(alen): + dset[clen + i] = valid[i][field] + + splits = file.attrs['split'] + slen = splits.shape[0] + splits = numpy.resize(splits, (slen+len(_fields),)) + for (i, field) in enumerate(_fields): + splits[slen+i]['split'] = ('cuts/%s' % cutfile).encode('utf8') + splits[slen+i]['source'] = field.encode('utf8') + splits[slen+i]['start'] = clen + splits[slen+i]['stop'] = alen + splits[slen+i]['available'] = True + splits[slen+i]['comment'] = '.' + file.attrs['split'] = splits + + file.flush() + file.close() + +if __name__ == '__main__': + if len(sys.argv) < 2 or len(sys.argv) > 3: + print >> sys.stderr, 'Usage: %s cutfile [outfile]' % sys.argv[0] + sys.exit(1) + outpath = os.path.join(data.path, 'valid.hdf5') if len(sys.argv) < 3 else sys.argv[2] + make_valid(sys.argv[1], outpath) diff --git a/data/transformers.py b/data/transformers.py new file mode 100644 index 0000000..1cc4834 --- /dev/null +++ b/data/transformers.py @@ -0,0 +1,127 @@ +import datetime +import random + +import numpy +import theano +from fuel.transformers import Transformer + +import data + + +def at_least_k(k, v, pad_at_begin, is_longitude): + if len(v) == 0: + v = numpy.array([data.porto_center[1 if is_longitude else 0]], dtype=theano.config.floatX) + if len(v) < k: + if pad_at_begin: + v = numpy.concatenate((numpy.full((k - len(v),), v[0]), v)) + else: + v = numpy.concatenate((v, numpy.full((k - len(v),), v[-1]))) + return v + + +class Select(Transformer): + def __init__(self, data_stream, sources): + super(Select, self).__init__(data_stream) + self.ids = [data_stream.sources.index(source) for source in sources] + self.sources=sources + + def get_data(self, request=None): + if request is not None: + raise ValueError + data=next(self.child_epoch_iterator) + return [data[id] for id in self.ids] + +class TaxiGenerateSplits(Transformer): + def __init__(self, data_stream, max_splits=-1): + super(TaxiGenerateSplits, self).__init__(data_stream) + self.sources = data_stream.sources + ('destination_latitude', 'destination_longitude', 'time') + self.max_splits = max_splits + self.data = None + self.splits = [] + self.isplit = 0 + self.id_latitude = data_stream.sources.index('latitude') + self.id_longitude = data_stream.sources.index('longitude') + + def get_data(self, request=None): + if request is not None: + raise ValueError + while self.isplit >= len(self.splits): + self.data = next(self.child_epoch_iterator) + self.splits = range(len(self.data[self.id_longitude])) + random.shuffle(self.splits) + if self.max_splits != -1 and len(self.splits) > self.max_splits: + self.splits = self.splits[:self.max_splits] + self.isplit = 0 + + i = self.isplit + self.isplit += 1 + n = self.splits[i]+1 + + r = list(self.data) + + r[self.id_latitude] = numpy.array(r[self.id_latitude][:n], dtype=theano.config.floatX) + r[self.id_longitude] = numpy.array(r[self.id_longitude][:n], dtype=theano.config.floatX) + + dlat = numpy.float32(self.data[self.id_latitude][-1]) + dlon = numpy.float32(self.data[self.id_longitude][-1]) + + return tuple(r + [dlat, dlon, 15 * (len(self.data[self.id_longitude]) - 1)]) + +class TaxiAddFirstK(Transformer): + def __init__(self, k, stream): + super(TaxiAddFirstK, self).__init__(stream) + self.sources = stream.sources + ('first_k_latitude', 'first_k_longitude') + self.id_latitude = stream.sources.index('latitude') + self.id_longitude = stream.sources.index('longitude') + self.k = k + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + first_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k], + dtype=theano.config.floatX), + numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k], + dtype=theano.config.floatX)) + return data + first_k + +class TaxiAddLastK(Transformer): + def __init__(self, k, stream): + super(TaxiAddLastK, self).__init__(stream) + self.sources = stream.sources + ('last_k_latitude', 'last_k_longitude') + self.id_latitude = stream.sources.index('latitude') + self.id_longitude = stream.sources.index('longitude') + self.k = k + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + last_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:], + dtype=theano.config.floatX), + numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:], + dtype=theano.config.floatX)) + return data + last_k + +class TaxiAddDateTime(Transformer): + def __init__(self, stream): + super(TaxiAddDateTime, self).__init__(stream) + self.sources = stream.sources + ('week_of_year', 'day_of_week', 'qhour_of_day') + self.id_timestamp = stream.sources.index('timestamp') + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + ts = data[self.id_timestamp] + date = datetime.datetime.utcfromtimestamp(ts) + yearweek = date.isocalendar()[1] - 1 + info = ((51 if yearweek == 52 else yearweek), date.weekday(), date.hour * 4 + date.minute / 15) + return data + info + +class TaxiExcludeTrips(Transformer): + def __init__(self, exclude_list, stream): + super(TaxiExcludeTrips, self).__init__(stream) + self.id_trip_id = stream.sources.index('trip_id') + self.exclude = {v: True for v in exclude_list} + def get_data(self, request=None): + if request is not None: raise ValueError + while True: + data = next(self.child_epoch_iterator) + if not data[self.id_trip_id] in self.exclude: break + return data + diff --git a/make_valid.py b/make_valid.py deleted file mode 100644 index d5e147d..0000000 --- a/make_valid.py +++ /dev/null @@ -1,37 +0,0 @@ -# Takes valid-full.csv which is a subset of the lines of train.csv, formatted in the -# exact same way -# Outputs valid.csv which contains the polylines cut at an arbitrary location, and three -# new columns containing the destination point and the length in seconds of the original polyline -# (see contest definition for the time taken by a taxi along a polyline) - -import random -import csv -import ast - -with open("valid-full.csv") as f: - vlines = [l for l in csv.reader(f)] - -def make_valid_item(l): - polyline = ast.literal_eval(l[-1]) - last = polyline[-1] - cut_idx = random.randrange(len(polyline)+1) - cut = polyline[:cut_idx] - return l[:-1] + [ - cut.__str__(), - last[0], - last[1], - 15 * (len(polyline)-1), - ] - -vlines = map(make_valid_item, filter(lambda l: (len(ast.literal_eval(l[-1])) > 0), vlines)) - -with open("valid.csv", "w") as f: - wr = csv.writer(f) - for r in vlines: - wr.writerow(r) - -with open("valid-solution.csv", "w") as f: - wr = csv.writer(f) - wr.writerow(["TRIP_ID", "LATITUDE", "LONGITUDE"]) - for r in vlines: - wr.writerow([r[0], r[-2], r[-3]]) diff --git a/make_valid_cut.py b/make_valid_cut.py deleted file mode 100644 index 2698af8..0000000 --- a/make_valid_cut.py +++ /dev/null @@ -1,40 +0,0 @@ -# Cuts the training dataset at the following timestamps : - -cuts = [ - 1376503200, - 1380616200, - 1381167900, - 1383364800, - 1387722600, -] - -import random -import csv -import ast - -f = open("train.csv") -fr = csv.reader(f) -_skip_header = fr.next() -g = open("cutvalid.csv", "w") -gw = csv.writer(g) - -for l in fr: - polyline = ast.literal_eval(l[-1]) - if len(polyline) == 0: continue - time = int(l[5]) - for ts in cuts: - if time <= ts and time + 15 * (len(polyline) - 1) >= ts: - # keep it - n = (ts - time) / 15 + 1 - cut = polyline[:n] - row = l[:-1] + [ - cut.__str__(), - polyline[-1][0], - polyline[-1][1], - 15 * (len(polyline)-1) - ] - print row - gw.writerow(row) - -f.close() -g.close() diff --git a/model/dest_simple_mlp.py b/model/dest_simple_mlp.py index 896f219..f422f11 100644 --- a/model/dest_simple_mlp.py +++ b/model/dest_simple_mlp.py @@ -11,11 +11,11 @@ import error class Model(object): def __init__(self, config): # The input and the targets - x_firstk_latitude = (tensor.matrix('first_k_latitude') - data.porto_center[0]) / data.data_std[0] - x_firstk_longitude = (tensor.matrix('first_k_longitude') - data.porto_center[1]) / data.data_std[1] + x_firstk_latitude = (tensor.matrix('first_k_latitude') - data.train_gps_mean[0]) / data.train_gps_std[0] + x_firstk_longitude = (tensor.matrix('first_k_longitude') - data.train_gps_mean[1]) / data.train_gps_std[1] - x_lastk_latitude = (tensor.matrix('last_k_latitude') - data.porto_center[0]) / data.data_std[0] - x_lastk_longitude = (tensor.matrix('last_k_longitude') - data.porto_center[1]) / data.data_std[1] + x_lastk_latitude = (tensor.matrix('last_k_latitude') - data.train_gps_mean[0]) / data.train_gps_std[0] + x_lastk_longitude = (tensor.matrix('last_k_longitude') - data.train_gps_mean[1]) / data.train_gps_std[1] input_list = [x_firstk_latitude, x_firstk_longitude, x_lastk_latitude, x_lastk_longitude] embed_tables = [] @@ -43,7 +43,7 @@ class Model(object): # Normalize & Center # outputs = theano.printing.Print("normal_outputs")(outputs) - outputs = data.data_std * outputs + data.porto_center + outputs = data.train_gps_std * outputs + data.train_gps_mean # outputs = theano.printing.Print("outputs")(outputs) # y = theano.printing.Print("y")(y) diff --git a/model/dest_simple_mlp_tgtcls.py b/model/dest_simple_mlp_tgtcls.py index d8fdeb3..a7b6f9b 100644 --- a/model/dest_simple_mlp_tgtcls.py +++ b/model/dest_simple_mlp_tgtcls.py @@ -14,11 +14,11 @@ import error class Model(object): def __init__(self, config): # The input and the targets - x_firstk_latitude = (tensor.matrix('first_k_latitude') - data.porto_center[0]) / data.data_std[0] - x_firstk_longitude = (tensor.matrix('first_k_longitude') - data.porto_center[1]) / data.data_std[1] + x_firstk_latitude = (tensor.matrix('first_k_latitude') - data.train_gps_mean[0]) / data.train_gps_std[0] + x_firstk_longitude = (tensor.matrix('first_k_longitude') - data.train_gps_mean[1]) / data.train_gps_std[1] - x_lastk_latitude = (tensor.matrix('last_k_latitude') - data.porto_center[0]) / data.data_std[0] - x_lastk_longitude = (tensor.matrix('last_k_longitude') - data.porto_center[1]) / data.data_std[1] + x_lastk_latitude = (tensor.matrix('last_k_latitude') - data.train_gps_mean[0]) / data.train_gps_std[0] + x_lastk_longitude = (tensor.matrix('last_k_longitude') - data.train_gps_mean[1]) / data.train_gps_std[1] input_list = [x_firstk_latitude, x_firstk_longitude, x_lastk_latitude, x_lastk_longitude] embed_tables = [] diff --git a/train.py b/train.py old mode 100644 new mode 100755 index 4cbd526..9e915ed --- a/train.py +++ b/train.py @@ -1,36 +1,26 @@ -import logging -import os +#!/usr/bin/env python + import sys +import logging import importlib -from argparse import ArgumentParser import csv -import numpy - -import theano -from theano import printing -from theano import tensor -from theano.ifelse import ifelse - -from blocks.filter import VariableFilter - from blocks.model import Model -from fuel.datasets.hdf5 import H5PYDataset from fuel.transformers import Batch from fuel.streams import DataStream -from fuel.schemes import ConstantScheme, SequentialExampleScheme, ShuffledExampleScheme +from fuel.schemes import ConstantScheme, ShuffledExampleScheme -from blocks.algorithms import GradientDescent, Scale, AdaDelta, Momentum +from blocks.algorithms import GradientDescent, AdaDelta, Momentum from blocks.graph import ComputationGraph from blocks.main_loop import MainLoop from blocks.extensions import Printing, FinishAfter from blocks.extensions.saveload import Dump, LoadFromDump, Checkpoint from blocks.extensions.monitoring import DataStreamMonitoring -import data -import transformers +from data import transformers +from data.hdf5 import TaxiDataset, TaxiStream import apply_model if __name__ == "__main__": @@ -38,18 +28,18 @@ if __name__ == "__main__": print >> sys.stderr, 'Usage: %s config' % sys.argv[0] sys.exit(1) model_name = sys.argv[1] - config = importlib.import_module(model_name) + config = importlib.import_module('.%s' % model_name, 'config') +def compile_valid_trip_ids(): + valid = TaxiDataset(config.valid_set, 'valid.hdf5', sources=('trip_id',)) + ids = valid.get_data(None, slice(0, valid.num_examples)) + return set(ids[0]) -def setup_train_stream(req_vars): - # Load the training and test data - train = H5PYDataset(data.H5DATA_PATH, - which_set='train', - subset=slice(0, data.dataset_size), - load_in_memory=True) - train = DataStream(train, iteration_scheme=ShuffledExampleScheme(data.dataset_size)) +def setup_train_stream(req_vars, valid_trips_ids): + train = TaxiDataset('train') + train = DataStream(train, iteration_scheme=ShuffledExampleScheme(train.num_examples)) - train = transformers.TaxiExcludeTrips(data.valid_trips, train) + train = transformers.TaxiExcludeTrips(valid_trips_ids, train) train = transformers.TaxiGenerateSplits(train, max_splits=100) train = transformers.TaxiAddDateTime(train) @@ -62,7 +52,7 @@ def setup_train_stream(req_vars): return train_stream def setup_valid_stream(req_vars): - valid = DataStream(data.valid_data) + valid = TaxiStream(config.valid_set, 'valid.hdf5') valid = transformers.TaxiAddDateTime(valid) valid = transformers.TaxiAddFirstK(config.n_begin_end_pts, valid) @@ -74,7 +64,7 @@ def setup_valid_stream(req_vars): return valid_stream def setup_test_stream(req_vars): - test = DataStream(data.test_data) + test = TaxiStream('test') test = transformers.TaxiAddDateTime(test) test = transformers.TaxiAddFirstK(config.n_begin_end_pts, test) @@ -95,12 +85,13 @@ def main(): req_vars = model.require_inputs + model.pred_vars req_vars_test = model.require_inputs + [ 'trip_id' ] - train_stream = setup_train_stream(req_vars) + valid_trips_ids = compile_valid_trip_ids() + train_stream = setup_train_stream(req_vars, valid_trips_ids) valid_stream = setup_valid_stream(req_vars) # Training cg = ComputationGraph(cost) - params = cg.parameters # VariableFilter(bricks=[Linear])(cg.parameters) + params = cg.parameters algorithm = GradientDescent( cost=cost, # step_rule=AdaDelta(decay_rate=0.5), diff --git a/transformers.py b/transformers.py deleted file mode 100644 index 73e3868..0000000 --- a/transformers.py +++ /dev/null @@ -1,125 +0,0 @@ -from fuel.transformers import Transformer, Filter, Mapping -import numpy -import theano -import random -import data - -import datetime - -def at_least_k(k, v, pad_at_begin, is_longitude): - if len(v) == 0: - v = numpy.array([data.porto_center[1 if is_longitude else 0]], dtype=theano.config.floatX) - if len(v) < k: - if pad_at_begin: - v = numpy.concatenate((numpy.full((k - len(v),), v[0]), v)) - else: - v = numpy.concatenate((v, numpy.full((k - len(v),), v[-1]))) - return v - - -class Select(Transformer): - def __init__(self, data_stream, sources): - super(Select, self).__init__(data_stream) - self.ids = [data_stream.sources.index(source) for source in sources] - self.sources=sources - - def get_data(self, request=None): - if request is not None: - raise ValueError - data=next(self.child_epoch_iterator) - return [data[id] for id in self.ids] - -class TaxiGenerateSplits(Transformer): - def __init__(self, data_stream, max_splits=-1): - super(TaxiGenerateSplits, self).__init__(data_stream) - self.sources = data_stream.sources + ('destination_latitude', 'destination_longitude', 'time') - self.max_splits = max_splits - self.data = None - self.splits = [] - self.isplit = 0 - self.id_latitude = data_stream.sources.index('latitude') - self.id_longitude = data_stream.sources.index('longitude') - - def get_data(self, request=None): - if request is not None: - raise ValueError - while self.isplit >= len(self.splits): - self.data = next(self.child_epoch_iterator) - self.splits = range(len(self.data[self.id_longitude])) - random.shuffle(self.splits) - if self.max_splits != -1 and len(self.splits) > self.max_splits: - self.splits = self.splits[:self.max_splits] - self.isplit = 0 - - i = self.isplit - self.isplit += 1 - n = self.splits[i]+1 - - r = list(self.data) - - r[self.id_latitude] = numpy.array(r[self.id_latitude][:n], dtype=theano.config.floatX) - r[self.id_longitude] = numpy.array(r[self.id_longitude][:n], dtype=theano.config.floatX) - - dlat = numpy.float32(self.data[self.id_latitude][-1]) - dlon = numpy.float32(self.data[self.id_longitude][-1]) - - return tuple(r + [dlat, dlon, 15 * (len(self.data[self.id_longitude]) - 1)]) - -class TaxiAddFirstK(Transformer): - def __init__(self, k, stream): - super(TaxiAddFirstK, self).__init__(stream) - self.sources = stream.sources + ('first_k_latitude', 'first_k_longitude') - self.id_latitude = stream.sources.index('latitude') - self.id_longitude = stream.sources.index('longitude') - self.k = k - def get_data(self, request=None): - if request is not None: raise ValueError - data = next(self.child_epoch_iterator) - first_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k], - dtype=theano.config.floatX), - numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k], - dtype=theano.config.floatX)) - return data + first_k - -class TaxiAddLastK(Transformer): - def __init__(self, k, stream): - super(TaxiAddLastK, self).__init__(stream) - self.sources = stream.sources + ('last_k_latitude', 'last_k_longitude') - self.id_latitude = stream.sources.index('latitude') - self.id_longitude = stream.sources.index('longitude') - self.k = k - def get_data(self, request=None): - if request is not None: raise ValueError - data = next(self.child_epoch_iterator) - last_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:], - dtype=theano.config.floatX), - numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:], - dtype=theano.config.floatX)) - return data + last_k - -class TaxiAddDateTime(Transformer): - def __init__(self, stream): - super(TaxiAddDateTime, self).__init__(stream) - self.sources = stream.sources + ('week_of_year', 'day_of_week', 'qhour_of_day') - self.id_timestamp = stream.sources.index('timestamp') - def get_data(self, request=None): - if request is not None: raise ValueError - data = next(self.child_epoch_iterator) - ts = data[self.id_timestamp] - date = datetime.datetime.utcfromtimestamp(ts) - yearweek = date.isocalendar()[1] - 1 - info = ((51 if yearweek == 52 else yearweek), date.weekday(), date.hour * 4 + date.minute / 15) - return data + info - -class TaxiExcludeTrips(Transformer): - def __init__(self, exclude_list, stream): - super(TaxiExcludeTrips, self).__init__(stream) - self.id_trip_id = stream.sources.index('trip_id') - self.exclude = {v: True for v in exclude_list} - def get_data(self, request=None): - if request is not None: raise ValueError - while True: - data = next(self.child_epoch_iterator) - if not data[self.id_trip_id] in self.exclude: break - return data - -- cgit v1.2.3