aboutsummaryrefslogtreecommitdiff
path: root/transformers.py
diff options
context:
space:
mode:
authorAlex Auvolat <alex.auvolat@ens.fr>2015-05-05 11:36:59 -0400
committerAlex Auvolat <alex.auvolat@ens.fr>2015-05-05 11:36:59 -0400
commitab1076e00d6a92120e46d4a0085911b4425a0d60 (patch)
tree83bf857eb61b2df332362b95600686f8d90f508c /transformers.py
parent5b496677ea1db59a6718e5c9b2958177c76cb25f (diff)
downloadtaxi-ab1076e00d6a92120e46d4a0085911b4425a0d60.tar.gz
taxi-ab1076e00d6a92120e46d4a0085911b4425a0d60.zip
Add date/time transformer and new model that uses it
Diffstat (limited to 'transformers.py')
-rw-r--r--transformers.py127
1 files changed, 56 insertions, 71 deletions
diff --git a/transformers.py b/transformers.py
index 876cee2..d6ed611 100644
--- a/transformers.py
+++ b/transformers.py
@@ -4,6 +4,8 @@ import theano
import random
import data
+import datetime
+
def at_least_k(k, v, pad_at_begin, is_longitude):
if len(v) == 0:
v = numpy.array([data.porto_center[1 if is_longitude else 0]], dtype=theano.config.floatX)
@@ -63,78 +65,61 @@ class TaxiGenerateSplits(Transformer):
return tuple(r + [dlat, dlon])
-
-class first_k(object):
- def __init__(self, k, id_latitude, id_longitude):
- self.k = k
- self.id_latitude = id_latitude
- self.id_longitude = id_longitude
- def __call__(self, data):
- return (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k],
- dtype=theano.config.floatX),
- numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k],
- dtype=theano.config.floatX))
-def add_first_k(k, stream):
- id_latitude = stream.sources.index('latitude')
- id_longitude = stream.sources.index('longitude')
- return Mapping(stream, first_k(k, id_latitude, id_longitude), ('first_k_latitude', 'first_k_longitude'))
-
-class random_k(object):
- def __init__(self, k, id_latitude, id_longitude):
+class TaxiAddFirstK(Transformer):
+ def __init__(self, k, stream):
+ super(TaxiAddFirstK, self).__init__(stream)
+ self.sources = stream.sources + ('first_k_latitude', 'first_k_longitude')
+ self.id_latitude = stream.sources.index('latitude')
+ self.id_longitude = stream.sources.index('longitude')
self.k = k
- self.id_latitude = id_latitude
- self.id_longitude = id_longitude
- def __call__(self, x):
- lat = at_least_k(self.k, x[self.id_latitude], True, False)
- lon = at_least_k(self.k, x[self.id_longitude], True, True)
- loc = random.randrange(len(lat)-self.k+1)
- return (numpy.array(lat[loc:loc+self.k], dtype=theano.config.floatX),
- numpy.array(lon[loc:loc+self.k], dtype=theano.config.floatX))
-def add_random_k(k, stream):
- id_latitude = stream.sources.index('latitude')
- id_longitude = stream.sources.index('longitude')
- return Mapping(stream, random_k(k, id_latitude, id_longitude), ('last_k_latitude', 'last_k_longitude'))
-
-class last_k(object):
- def __init__(self, k, id_latitude, id_longitude):
+ def get_data(self, request=None):
+ if request is not None: raise ValueError
+ data = next(self.child_epoch_iterator)
+ first_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], False, False)[:self.k],
+ dtype=theano.config.floatX),
+ numpy.array(at_least_k(self.k, data[self.id_longitude], False, True)[:self.k],
+ dtype=theano.config.floatX))
+ return data + first_k
+
+class TaxiAddLastK(Transformer):
+ def __init__(self, k, stream):
+ super(TaxiAddLastK, self).__init__(stream)
+ self.sources = stream.sources + ('last_k_latitude', 'last_k_longitude')
+ self.id_latitude = stream.sources.index('latitude')
+ self.id_longitude = stream.sources.index('longitude')
self.k = k
- self.id_latitude = id_latitude
- self.id_longitude = id_longitude
- def __call__(self, data):
- return (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:],
- dtype=theano.config.floatX),
- numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:],
- dtype=theano.config.floatX))
-def add_last_k(k, stream):
- id_latitude = stream.sources.index('latitude')
- id_longitude = stream.sources.index('longitude')
- return Mapping(stream, last_k(k, id_latitude, id_longitude), ('last_k_latitude', 'last_k_longitude'))
-
-class destination(object):
- def __init__(self, id_latitude, id_longitude):
- self.id_latitude = id_latitude
- self.id_longitude = id_longitude
- def __call__(self, data):
- return (numpy.array(at_least_k(1, data[self.id_latitude], True, False)[-1],
+ def get_data(self, request=None):
+ if request is not None: raise ValueError
+ data = next(self.child_epoch_iterator)
+ last_k = (numpy.array(at_least_k(self.k, data[self.id_latitude], True, False)[-self.k:],
dtype=theano.config.floatX),
- numpy.array(at_least_k(1, data[self.id_longitude], True, True)[-1],
- dtype=theano.config.floatX))
-def add_destination(stream):
- id_latitude = stream.sources.index('latitude')
- id_longitude = stream.sources.index('longitude')
- return Mapping(stream, destination(id_latitude, id_longitude), ('destination_latitude', 'destination_longitude'))
-
-
-class trip_filter(object):
- def __init__(self, id_trip_id, exclude):
- self.id_trip_id = id_trip_id
- self.exclude = exclude
- def __call__(self, data):
- if data[self.id_trip_id] in self.exclude:
- return False
- else:
- return True
-def filter_out_trips(exclude_trips, stream):
- id_trip_id = stream.sources.index('trip_id')
- return Filter(stream, trip_filter(id_trip_id, exclude_trips))
+ numpy.array(at_least_k(self.k, data[self.id_longitude], True, True)[-self.k:],
+ dtype=theano.config.floatX))
+ return data + last_k
+
+class TaxiAddDateTime(Transformer):
+ def __init__(self, stream):
+ super(TaxiAddDateTime, self).__init__(stream)
+ self.sources = stream.sources + ('week_of_year', 'day_of_week', 'qhour_of_day')
+ self.id_timestamp = stream.sources.index('timestamp')
+ def get_data(self, request=None):
+ if request is not None: raise ValueError
+ data = next(self.child_epoch_iterator)
+ ts = data[self.id_timestamp]
+ date = datetime.datetime.utcfromtimestamp(ts)
+ info = (date.isocalendar()[1] - 1, date.weekday(), date.hour * 4 + date.minute / 15)
+ return data + info
+
+class TaxiExcludeTrips(Transformer):
+ def __init__(self, exclude_list, stream):
+ super(TaxiExcludeTrips, self).__init__(stream)
+ self.id_trip_id = stream.sources.index('trip_id')
+ self.exclude = {v: True for v in exclude_list}
+ def get_data(self, request=None):
+ if request is not None: raise ValueError
+ while True:
+ data = next(self.child_epoch_iterator)
+ if not data[self.id_trip_id] in self.exclude: break
+ return data
+