Introduction :Taxi business is one of the largest in New York City (Farber, 2008). The way the yellow taxi works is that the drivers should be working in shift to limit the number of taxicab that is running in the city. The reason they work in shift is because they want to limit the number of taxicabs that run at a given time in the road . The other reason is everyone in the taxicab business has to get equal number of customers and the profit that everyone makes is almost equal . Hence this approach has got a lot of appreciation from the cab drivers and has been followed. There are two types of taxi cabs that run in New York City. One is New York City Yellow Taxi cab that we are analyzing in this project. The other type is New York City Green Taxi cab which is also popular but not as popular as yellow taxi cab. That is the reason we analyze only New York City yellow taxi cab in this analysis. The main objective of this project is to find the time when the cab is occupied and when it is not. It can be found by using the passenger destination location. For example, if a passenger gets down at Bronx, NY, the time taken for the next passenger is 5 min and if the passenger gets down in Long Island, NY, the time taken for next passenger is 40 min, the result will be that Bronx is better place to get next customer quickly.
Data :The data is available from January 2013. The size of the data is 2.5GB before unzipping and uncompressing. Hence setting up Spark for 2.5GB of compressed data is not a reasonable task. The data is in Comma Separated Format (CSV). The variables include latitude, longitude, temporal data such as date and time. The data after unzipping and uncompressing is around 20GB. So for analysis purpose, we take just the data for January 2013 data. The reason being that the data is huge and analyzing just one-month data looks reasonable. Therefore the code runs faster and we can see how the model is built quicker and make modifications and check the process again. In other words, for checking the code and de bugging and finding how good is the model, we are choosing a month data. The data seems to be sufficient to start the analysis.
METHOD:We set up all the tools and libraries needed to complete the analysis. The process needed to set the environment is connecting to Amazon Web Services EMR, Uploading Data In The S3 Bucket, installing spark and running spark and Importing The Data From S3 Bucket. Each of these steps are clearly explained on how to set up in mac system.
Connecting to Amazon Web Services EMR:The Steps for connecting to Amazon Web Services EMR involves three steps. The first step is to create an EMR instance. The second one is to create a create a key pair. The third step is to connect to the local system using terminal. We shall go in detail for each step. To create the key value pair, go to key pair options in the home page. Click on new key pair. Give a new name to it. Download the key pair and know the location where it is saved. To create a EMR instance, login into your Amazon Web Services console site. Select EMR option from the Analytics section. Then click on create cluster. The quick option windows opens up. Under Software Configuration, choose Spark: Spark 2.0.1 on Hadoop 2.7.3 YARN with Ganglia 3.7.2 and Zeppelin 0.6.2. Under Hardware Configuration section, choose the number of clusters to be 4. Under Security and Access window, select the key pair you created and click next. Go to advanced settings and check for the master node and slave nodes. To connect to the local server, open the terminal. Give the key pair access power by typing the following command CHMOD 600 /keyname.pem. Then connect to the EMR by clicking the ssh – I key name.pem Hadoop@ec2 – 35 – 160 – 198 – 2.us – west – 2. compute amazonaws. com. It will ask a prompt message whether to continue, enter yes. The terminal will run successful and EC2 image will be displayed along with a successful message as shown in Figure 1.
Uploading Data to the S3 Bucket:To access the huge data that we will use, we upload it to a S3 Bucket. S3 is available from Storage and Content Delivery section. S3 is nothing but a storage place for data that we will use for our analysis. The steps to upload the data into S3 are very simple. In our case, as the data is huge we subset our data. The Data uploaded is for one month. The original data had data for 2 years but to test the code against a smaller chunk of data, uploaded January 2013 data in the S3 bucket. The S3 Bucket after uploading is shown in Figure 2.
Running Spark :After the data is successfully uploaded, we run spark in our local system. The spark is run on the terminal. So, open the terminal and run the command spark-shell. The command output is shown in Figure 3.
Running Spark from Databricks:
The reason for running the Databricks is because the output is more clear and easy to understand. The same command works in the terminal as well. So open the Databricks site and open a notebook selecting the Scala language if you are using Databricks. If you are using terminal, then you don’t need to do anything as running spark in terminal will enable you to run Scala code. If you are running directly with Databricks, you do not need to set up the Spark in the terminal as Databricks runs in the web and has its own Spark set up with the notebook.
Importing the Data from a S3 Bucket: Choosing the Libraries: Reading the Temporal Data: Reading the Geospatial Data: Using the GeoJSON package: Handling noise in the data: Location Analysis: Building sessions: Analysis: RESULTS
Importing the Data from a S3 Bucket:To connect with the S3 Bucket from Databricks, it needs a command. The tricky part is to know the difference between S3, S3n and S3a. The difference is illustrated in the table in Figure 4. Figure 4. Difference between S3, S3n and S3a The command to connect the DataBricks with S3 Bucket is as follows val data = sc.textFile("s3n://nyctaxidata123/nyc.pem")
Choosing the Libraries:The coding is based on few existing libraries. The library Kyro is used to serialization so there is a serializable interface to work on. Java libraries with Scala Wrappers are the best solution to the above problem. There is date and time data , so as a beginner one would choose Java temporal class but the disadvantage is they use space for small operations. So using NScalaTime and JodaTime is better as they have advantage over the Java class.
Reading the Temporal Data:Temporal Data is a data consisting of time frame details such as Date, Time, Year, Quarter, Decade. The time data is always important component of analysis because if they are present, the scope of analysis increases to different dimension. The data is read using NScalaTime and JodaTime libraries. The import and manipulation of data is illustrated in Figure 5. Figure 5. Workflow for importing the Temporal Data Figure 6. Code used to import the temporal data Figure 7. Code used to format the temporal data
Reading the Geospatial Data:There are two types of geospatial data, one is vector and the other is raster. The data provided by the nyctaxi is a vector with GeoJSON format. The obstacle here is to make sure that the coordinates are present in the New York Borough. There is no existing library that can solve the above problem, so the existing Esri Geometry API is edited to the convince of the problem. The Esri Geometry API has GeometryEngine that is used to find relationship between coordinates. Figure 8. Code used to load the geospatial data
Handling noise in the data:The size of the data is so large that it is impossible to find invalid values in them. So we create a exception in the code to jump to next value if there is a invalid entry. It can be done by creating a function called safeParse() and apply these conditions. The other option is to use try catch to filter invalid records. This method is feasible only when there is small fraction of the data is invalid. The filtering and mapping is done together to get rid of invalid records with the help of collect statement. The collect statement takes partial function as a argument. Most of the invalid records in the data is missing values, so it is easy to identify and get rid of it. For temporal data, the time when a passenger takes the ride will always be earlier to the time when he/she reached the destination. If the condition is not true, then the record is invalid. These kind of problems may occur because of the data quality distortion when it is imported into the workspace. To overcome the problem, we define a function called hours(). The function is calculated by the difference between (destination time - pick off time). Hours() = Time when reached destination – Time when the journey started The negative values are removed because it doesn’t make any sense as the start time will never be greater than end time.
Location Analysis:The data is concerned about five boroughs, so the data which has a destination outside these boroughs will be invalid data as the taxi won’t be used for distance greater than two boroughs. The objective can be achieved my converting the pickup latitude, longitude and drop off latitude, longitude and convert it into borough. Then a conditional statement will parse the records that are in those five boroughs. Filtering of the invalid records are done and the final data is good for analysis.
Building sessions:The session in spark is built by using groupBy statement. The data is grouped based on the driver id or driver name. The method works better when there is small amount of records in each entity. The reason why it works for small data is because the records should be in memory for computation. So the alternative approach is secondary sort on a composite key of identifier and time stamp. This approach accepts RDD key value pair that we want to operate, input of value and extraction of secondary key to do sorting, optional splitting function that takes input as sorted value and split same key to multiple groups ie. Records from the same driver, number of partitions in RDD. The secondary key mentioned in the explanation is nothing but the start time of the trip. The innocent approach to make sessions in Spark is to play out a groupBy on the identifier we need to make sessions for and afterward sort the occasions post-rearrange by a timestamp identifier. On the off chance that we just have a little number of occasions for every element, this approach will work sensibly well. Figure 11. Workflow showing the analysis of the NYC taxi data Since this approach requires every one of the occasions for any specific substance to be in memory in the meantime, it will not scale as the quantity of occasions for every substance gets bigger and bigger. We require a method for building sessions that does not require the majority of the occasions for a specific element to be held in memory at the same time for sorting. In MapReduce, we can construct sessions by playing out an optional sort, where we make a composite key made up of an identifier and a timestamp esteem, sort the greater part of the records on the composite key, and after that utilization a custom practitioner and gathering function to guarantee that the greater part of the records for a similar identifier show up in a similar yield parcel. Luckily, Spark can likewise bolster this same auxiliary sort design by making utilization of its repartition And Sort Within Partitions change. In the repo, we've given a usage of a group By Key And Sort Values arrangement that does precisely this. Since the workings of this usefulness are generally orthogonal to the ideas this section is covering, we are overlooking the violent subtle elements here.
Analysis:Deciding the threshold for the wait time of the taxi driver based on the location or borough is a bigger challenge because fixing the threshold is important for deriving insights. So splitting the data with different threshold and doing the analysis is always a better option. Running the analysis with threshold as 4 hours and again running the same analysis with the threshold, as 3 hours and comparing would give a clear picture. Creating a pipeline for this operation is an expensive work, so we use session data containing entity and each entity as driver records for all operations. So, we store the session data into Hadoop Distributed File System (HDFS) so that it can be called easily when it is needed. To see the time taken for the driver to find next passenger, we create a function called boroughDuration method. The complete workflow of the analysis is shown in Figure 11. boroughDuration = drop off time of Nth trip – pick up time of the N+1 th trip We find the difference between drop off time and next pick up time. We aggregate the duration of wait time of each drivers based on the location or borough so the output will be average wait time and the location or borough. We have to make sure that the boroughDuration is not negative because the negative values suggest that it is not taking the previous trip drop off time and the pick up time of the new trip. So, after applying the filter the output looks good to analyze. The error is being studied with the help of Spark’s StatCounter. But there was no pattern found that would help explain the negative values. The final output is sorted so that the order specifies the lesser number of wait time. Figure 12. Code used to produce the output
RESULTSThe above code helps you find the duration taken by the taxi driver to get the next customer. The sorted output is shown in the table in Figure 13. Figure 13. Final output in the terminal window The output gives a lot of information about the wait time. The first element is the name of the borough the result is being displayed. The second element is the number of records that were analyzed for this borough. The third element is the mean. that is the mean time it takes to get the next customer. The fourth element is the standard deviation. The standard deviation is nothing but the amount of deviance that can be expected from the analyzed mean. The fifth element is the maximum wait time. The maximum wait time is the value for the highest wait time taken by a taxi cab driver to get a next customer for that borough. The sixth element is the minimum wait time. The minimum wait time is the value for the lowest wait time taken by a taxi cab driver to get a next customer for that borough. So, Manhattan has the lowest wait time with an average of 10 min. Staten Island has a worst wait time of 45 min. The results were used to fine drivers who rejected the passengers who wanted to travel to these high wait time borough or locations based on this insight. With this information there are many things that can be made sense out of in the taxi business.