aboutsummaryrefslogblamecommitdiff
path: root/data.py
blob: f1236a565b1c1c750af96740544298d22ba1f253 (plain) (tree)
1
2
3
4
5
6
7
8
9
               
             
           
            



                                      
             
 


                                         
                                                 
 
                                                                                   
 

                                                                                           
 














































                                  

                                 


                                               
                                                          

                                        
                            

                                                                   

                                  


                                        






                                           

                           
                          

                           





                                           
                                           




                                            

                                    

                                           


                                             
                                   



                                                     
                                       
 















                                                                                 
 





                                     

                                                                            
 
                                              




                                                      




                                               
                                               
import ast, csv
import socket
import fuel
import numpy
from enum import Enum
from fuel.datasets import Dataset
from fuel.streams import DataStream
from fuel.iterator import DataIterator
import theano

if socket.gethostname() == "adeb.laptop":
    DATA_PATH = "/Users/adeb/data/taxi"
else:
    DATA_PATH="/data/lisatmp3/auvolat/taxikaggle"

client_ids = {int(x): y+1 for y, x in enumerate(open(DATA_PATH+"/client_ids.txt"))}

porto_center = numpy.array([[ -8.61612, 41.1573]], dtype=theano.config.floatX)
data_std = numpy.sqrt(numpy.array([[ 0.00333233, 0.00549598]], dtype=theano.config.floatX))

class CallType(Enum):
    CENTRAL = 0
    STAND = 1
    STREET = 2

    @classmethod
    def from_data(cls, val):
        if val=='A':
            return cls.CENTRAL
        elif val=='B':
            return cls.STAND
        elif val=='C':
            return cls.STREET

    @classmethod
    def to_data(cls, val):
        if val==cls.CENTRAL:
            return 'A'
        elif val==cls.STAND:
            return 'B'
        elif val==cls.STREET:
            return 'C'

class DayType(Enum):
    NORMAL = 0
    HOLIDAY = 1
    HOLIDAY_EVE = 2

    @classmethod
    def from_data(cls, val):
        if val=='A':
            return cls.NORMAL
        elif val=='B':
            return cls.HOLIDAY
        elif val=='C':
            return cls.HOLIDAY_EVE

    @classmethod
    def to_data(cls, val):
        if val==cls.NORMAL:
            return 'A'
        elif val==cls.HOLIDAY:
            return 'B'
        elif val==cls.HOLIDAY_EVE:
            return 'C'

class TaxiData(Dataset):
    example_iteration_scheme=None

    class State:
        __slots__ = ('file', 'index', 'reader')

    def __init__(self, pathes, columns, has_header=False):
        if not isinstance(pathes, list):
            pathes=[pathes]
        assert len(pathes)>0
        self.columns=columns
        self.provides_sources = tuple(map(lambda x: x[0], columns))
        self.pathes=pathes
        self.has_header=has_header
        super(TaxiData, self).__init__()

    def open(self):
        state=self.State()
        state.file=open(self.pathes[0])
        state.index=0
        state.reader=csv.reader(state.file)
        if self.has_header:
            state.reader.next()
        return state

    def close(self, state):
        state.file.close()

    def reset(self, state):
        if state.index==0:
            state.file.seek(0)
        else:
            state.index=0
            state.file.close()
            state.file=open(self.pathes[0])
        state.reader=csv.reader(state.file)
        return state

    def get_data(self, state, request=None):
        if request is not None:
            raise ValueError
        try:
            line=state.reader.next()
        except (ValueError, StopIteration):
            # print state.index
            state.file.close()
            state.index+=1
            if state.index>=len(self.pathes):
                raise StopIteration
            state.file=open(self.pathes[state.index])
            state.reader=csv.reader(state.file)
            if self.has_header:
                state.reader.next()
            return self.get_data(state)

        values = []
        for idx, (_, constructor) in enumerate(self.columns):
            values.append(constructor(line[idx]))
        return tuple(values)

taxi_columns = [
    ("trip_id", lambda x: x),
    ("call_type", CallType.from_data),
    ("origin_call", lambda x: 0 if x == '' or x == 'NA' else client_ids[int(x)]),
    ("origin_stand", lambda x: 0 if x == '' or x == 'NA' else int(x)),
    ("taxi_id", int),
    ("timestamp", int),
    ("day_type", DayType.from_data),
    ("missing_data", lambda x: x[0] == 'T'),
    ("polyline", lambda x: map(tuple, ast.literal_eval(x))),
]

taxi_columns_valid = taxi_columns + [
    ("destination_x", float),
    ("destination_y", float),
    ("time", int),
]

train_files=["%s/split/train-%02d.csv" % (DATA_PATH, i) for i in range(100)]
valid_files=["%s/split/valid.csv" % (DATA_PATH,)]

train_data=TaxiData(train_files, taxi_columns)

valid_data = TaxiData(valid_files, taxi_columns_valid)

# for the moment - will be changed later
test_data = valid_data

def train_it():
    return DataIterator(DataStream(train_data))

def test_it():
    return DataIterator(DataStream(valid_data))