#!/usr/bin/env python2 # Separate the training set into a Training Valid and Test set import os import sys import importlib import cPickle import h5py import numpy import theano import data from data.hdf5 import TaxiDataset from error import hdist native_fields = { 'trip_id': 'S19', 'call_type': numpy.int8, 'origin_call': numpy.int32, 'origin_stand': numpy.int8, 'taxi_id': numpy.int16, 'timestamp': numpy.int32, 'day_type': numpy.int8, 'missing_data': numpy.bool, 'latitude': data.Polyline, 'longitude': data.Polyline, } all_fields = { 'path_len': numpy.int16, 'cluster': numpy.int16, 'destination_latitude': numpy.float32, 'destination_longitude': numpy.float32, 'travel_time': numpy.int32, } all_fields.update(native_fields) def cut_me_baby(train, cuts, excl={}): dset = {} cuts.sort() cut_id = 0 for i in xrange(data.train_size): if i%10000==0 and i!=0: print >> sys.stderr, 'cut: {:d} done'.format(i) if i in excl: continue time = train['timestamp'][i] latitude = train['latitude'][i] longitude = train['longitude'][i] if len(latitude) == 0: continue end_time = time + 15 * (len(latitude) - 1) while cuts[cut_id] < time: if cut_id >= len(cuts)-1: return dset cut_id += 1 if end_time < cuts[cut_id]: continue else: dset[i] = (cuts[cut_id] - time) / 15 + 1 return dset def make_tvt(test_cuts_name, valid_cuts_name, outpath): trainset = TaxiDataset('train') traindata = trainset.get_data(None, slice(0, trainset.num_examples)) idsort = traindata[trainset.sources.index('timestamp')].argsort() traindata = dict(zip(trainset.sources, (t[idsort] for t in traindata))) print >> sys.stderr, 'test cut begin' test_cuts = importlib.import_module('.%s' % test_cuts_name, 'data.cuts').cuts test = cut_me_baby(traindata, test_cuts) print >> sys.stderr, 'valid cut begin' valid_cuts = importlib.import_module('.%s' % valid_cuts_name, 'data.cuts').cuts valid = cut_me_baby(traindata, valid_cuts, test) test_size = len(test) valid_size = len(valid) train_size = data.train_size - test_size - valid_size print ' set | size | ratio' print ' ----- | ------- | -----' print ' train | {:>7d} | {:>5.3f}'.format(train_size, float(train_size)/data.train_size) print ' valid | {:>7d} | {:>5.3f}'.format(valid_size, float(valid_size)/data.train_size) print ' test | {:>7d} | {:>5.3f}'.format(test_size , float(test_size )/data.train_size) with open(os.path.join(data.path, 'arrival-clusters.pkl'), 'r') as f: clusters = cPickle.load(f) print >> sys.stderr, 'compiling cluster assignment function' latitude = theano.tensor.scalar('latitude') longitude = theano.tensor.scalar('longitude') coords = theano.tensor.stack(latitude, longitude).dimshuffle('x', 0) parent = theano.tensor.argmin(hdist(clusters, coords)) cluster = theano.function([latitude, longitude], parent) train_clients = set() print >> sys.stderr, 'preparing hdf5 data' hdata = {k: numpy.empty(shape=(data.train_size,), dtype=v) for k, v in all_fields.iteritems()} train_i = 0 valid_i = train_size test_i = train_size + valid_size print >> sys.stderr, 'write: begin' for idtraj in xrange(data.train_size): if idtraj%10000==0 and idtraj!=0: print >> sys.stderr, 'write: {:d} done'.format(idtraj) in_test = idtraj in test in_valid = not in_test and idtraj in valid in_train = not in_test and not in_valid if idtraj in test: i = test_i test_i += 1 elif idtraj in valid: i = valid_i valid_i += 1 else: train_clients.add(traindata['origin_call'][idtraj]) i = train_i train_i += 1 trajlen = len(traindata['latitude'][idtraj]) if trajlen == 0: hdata['destination_latitude'][i] = data.train_gps_mean[0] hdata['destination_longitude'][i] = data.train_gps_mean[1] else: hdata['destination_latitude'][i] = traindata['latitude'][idtraj][-1] hdata['destination_longitude'][i] = traindata['longitude'][idtraj][-1] hdata['travel_time'][i] = trajlen for field in native_fields: val = traindata[field][idtraj] if field in ['latitude', 'longitude']: if in_test: val = val[:test[idtraj]] elif in_valid: val = val[:valid[idtraj]] hdata[field][i] = val plen = len(hdata['latitude'][i]) hdata['path_len'][i] = plen hdata['cluster'][i] = -1 if plen==0 else cluster(hdata['latitude'][i][0], hdata['longitude'][i][0]) print >> sys.stderr, 'write: end' print >> sys.stderr, 'removing useless origin_call' for i in xrange(train_size, data.train_size): if hdata['origin_call'][i] not in train_clients: hdata['origin_call'][i] = 0 print >> sys.stderr, 'preparing split array' split_array = numpy.empty(len(all_fields)*3, dtype=numpy.dtype([ ('split', 'a', 64), ('source', 'a', 21), ('start', numpy.int64, 1), ('stop', numpy.int64, 1), ('indices', h5py.special_dtype(ref=h5py.Reference)), ('available', numpy.bool, 1), ('comment', 'a', 1)])) flen = len(all_fields) for i, field in enumerate(all_fields): split_array[i]['split'] = 'train'.encode('utf8') split_array[i+flen]['split'] = 'valid'.encode('utf8') split_array[i+2*flen]['split'] = 'test'.encode('utf8') split_array[i]['start'] = 0 split_array[i]['stop'] = train_size split_array[i+flen]['start'] = train_size split_array[i+flen]['stop'] = train_size + valid_size split_array[i+2*flen]['start'] = train_size + valid_size split_array[i+2*flen]['stop'] = train_size + valid_size + test_size for d in [0, flen, 2*flen]: split_array[i+d]['source'] = field.encode('utf8') split_array[:]['indices'] = None split_array[:]['available'] = True split_array[:]['comment'] = '.'.encode('utf8') print >> sys.stderr, 'writing hdf5 file' file = h5py.File(outpath, 'w') for k in all_fields.keys(): file.create_dataset(k, data=hdata[k], maxshape=(data.train_size,)) file.attrs['split'] = split_array file.flush() file.close() if __name__ == '__main__': if len(sys.argv) < 3 or len(sys.argv) > 4: print >> sys.stderr, 'Usage: %s test_cutfile valid_cutfile [outfile]' % sys.argv[0] sys.exit(1) outpath = os.path.join(data.path, 'tvt.hdf5') if len(sys.argv) < 4 else sys.argv[3] make_tvt(sys.argv[1], sys.argv[2], outpath)