diff options
author | Alex Auvolat <alex.auvolat@ens.fr> | 2015-05-05 11:36:59 -0400 |
---|---|---|
committer | Alex Auvolat <alex.auvolat@ens.fr> | 2015-05-05 11:36:59 -0400 |
commit | ab1076e00d6a92120e46d4a0085911b4425a0d60 (patch) | |
tree | 83bf857eb61b2df332362b95600686f8d90f508c /transformers.py | |
parent | 5b496677ea1db59a6718e5c9b2958177c76cb25f (diff) | |
download | taxi-ab1076e00d6a92120e46d4a0085911b4425a0d60.tar.gz taxi-ab1076e00d6a92120e46d4a0085911b4425a0d60.zip |
Add date/time transformer and new model that uses it
Diffstat (limited to 'transformers.py')
-rw-r--r-- | transformers.py | 127 |
1 files changed, 56 insertions, 71 deletions
diff --git a/transformers.py b/transformers.py index 876cee2..d6ed611 100644 --- a/transformers.py +++ b/transformers.py @@ -4,6 +4,8 @@ import theano import random import data +import datetime + def at_least_k(k, v, pad_at_begin, is_longitude): if len(v) == 0: v = numpy.array([data.porto_center[1 if is_longitude else 0]], dtype=theano.config.floatX) @@ -63,78 +65,61 @@ class TaxiGenerateSplits(Transformer): return tuple(r + [dlat, dlon]) - -class first_k(object): - def __init__(self, k, id_latitude, id_longitude): - self.k = k - self.id_latitude = id_latitude - self.id_longitude = id_longitude - def __call__(self, data): - return (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k], - dtype=theano.config.floatX), - numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k], - dtype=theano.config.floatX)) -def add_first_k(k, stream): - id_latitude = stream.sources.index('latitude') - id_longitude = stream.sources.index('longitude') - return Mapping(stream, first_k(k, id_latitude, id_longitude), ('first_k_latitude', 'first_k_longitude')) - -class random_k(object): - def __init__(self, k, id_latitude, id_longitude): +class TaxiAddFirstK(Transformer): + def __init__(self, k, stream): + super(TaxiAddFirstK, self).__init__(stream) + self.sources = stream.sources + ('first_k_latitude', 'first_k_longitude') + self.id_latitude = stream.sources.index('latitude') + self.id_longitude = stream.sources.index('longitude') self.k = k - self.id_latitude = id_latitude - self.id_longitude = id_longitude - def __call__(self, x): - lat = at_least_k(self.k, x[self.id_latitude], True, False) - lon = at_least_k(self.k, x[self.id_longitude], True, True) - loc = random.randrange(len(lat)-self.k+1) - return (numpy.array(lat[loc:loc+self.k], dtype=theano.config.floatX), - numpy.array(lon[loc:loc+self.k], dtype=theano.config.floatX)) -def add_random_k(k, stream): - id_latitude = stream.sources.index('latitude') - id_longitude = stream.sources.index('longitude') - return Mapping(stream, random_k(k, id_latitude, id_longitude), ('last_k_latitude', 'last_k_longitude')) - -class last_k(object): - def __init__(self, k, id_latitude, id_longitude): + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + first_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k], + dtype=theano.config.floatX), + numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k], + dtype=theano.config.floatX)) + return data + first_k + +class TaxiAddLastK(Transformer): + def __init__(self, k, stream): + super(TaxiAddLastK, self).__init__(stream) + self.sources = stream.sources + ('last_k_latitude', 'last_k_longitude') + self.id_latitude = stream.sources.index('latitude') + self.id_longitude = stream.sources.index('longitude') self.k = k - self.id_latitude = id_latitude - self.id_longitude = id_longitude - def __call__(self, data): - return (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:], - dtype=theano.config.floatX), - numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:], - dtype=theano.config.floatX)) -def add_last_k(k, stream): - id_latitude = stream.sources.index('latitude') - id_longitude = stream.sources.index('longitude') - return Mapping(stream, last_k(k, id_latitude, id_longitude), ('last_k_latitude', 'last_k_longitude')) - -class destination(object): - def __init__(self, id_latitude, id_longitude): - self.id_latitude = id_latitude - self.id_longitude = id_longitude - def __call__(self, data): - return (numpy.array(at_least_k(1, data[self.id_latitude], True, False)[-1], + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + last_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:], dtype=theano.config.floatX), - numpy.array(at_least_k(1, data[self.id_longitude], True, True)[-1], - dtype=theano.config.floatX)) -def add_destination(stream): - id_latitude = stream.sources.index('latitude') - id_longitude = stream.sources.index('longitude') - return Mapping(stream, destination(id_latitude, id_longitude), ('destination_latitude', 'destination_longitude')) - - -class trip_filter(object): - def __init__(self, id_trip_id, exclude): - self.id_trip_id = id_trip_id - self.exclude = exclude - def __call__(self, data): - if data[self.id_trip_id] in self.exclude: - return False - else: - return True -def filter_out_trips(exclude_trips, stream): - id_trip_id = stream.sources.index('trip_id') - return Filter(stream, trip_filter(id_trip_id, exclude_trips)) + numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:], + dtype=theano.config.floatX)) + return data + last_k + +class TaxiAddDateTime(Transformer): + def __init__(self, stream): + super(TaxiAddDateTime, self).__init__(stream) + self.sources = stream.sources + ('week_of_year', 'day_of_week', 'qhour_of_day') + self.id_timestamp = stream.sources.index('timestamp') + def get_data(self, request=None): + if request is not None: raise ValueError + data = next(self.child_epoch_iterator) + ts = data[self.id_timestamp] + date = datetime.datetime.utcfromtimestamp(ts) + info = (date.isocalendar()[1] - 1, date.weekday(), date.hour * 4 + date.minute / 15) + return data + info + +class TaxiExcludeTrips(Transformer): + def __init__(self, exclude_list, stream): + super(TaxiExcludeTrips, self).__init__(stream) + self.id_trip_id = stream.sources.index('trip_id') + self.exclude = {v: True for v in exclude_list} + def get_data(self, request=None): + if request is not None: raise ValueError + while True: + data = next(self.child_epoch_iterator) + if not data[self.id_trip_id] in self.exclude: break + return data + |