Big Data is a Technology which is spread in the tech world like a wildfire. To understand the concept of big data and let's build a simple ELT application to bring youtube data live data analysis with youtube data API.
Project
Goal:
We will make a small project which
will extract data from youtube with youtube data API and find out most popular
and trending videos in the world.
Apache Spark:
Spark is a very popular big data analysis tool among the
big data technology era. Apache spark is the best substitute of Hadoop, as Hadoop
work on MapReduce data processing algorithms and take too much time for
computation while Spark works RDD (resilient distributed datasets) and compute
your data 1000x time faster than Hadoop MapReduce.
Youtube
date API:
Youtube Data API is provided by Google, for
developers to easily develop an application which can use features of youtube.
To use youtube data API you would need to create a developer key.
Step 1: Go to the link below and create a project, and create an API key
https://console.developers.google.com/projectselector/apis/credentials?supportedpurview=project
By going to credentials>create
credential> API key
And API key will be generated.
Note: this API key will give you limited access to youtube data with day limit.
Step 2: Goto dashboard>Enable API and services
And choose youtube data API 3 and enable it by accepting term and conditions.
Now you will be able to access youtube data
API to request and extract data from youtube.
Preconfigured tools required:
- Apache
spark on local machine: to make spark streaming analysis
- Jupyter
notebook: python coding environment for pyspark
- Hive:
SQL database for spark SQL and loading data in database.
- Mysql:
metastore for Hive.
- Apache
hadoop hdfs: file store for our data.
- Kafka:
handling topics for spark streaming.
- Zookeeper:
managing kafka topics.
To access google all API with python, we gonna need google-api-python-client. So to install this package in your anaconda execute below command.
pip install --upgrade
google-api-python-client # to install google client
pip install kafka # to install
kafka python module
Project Schematics:
- Get videos details data from youtube
with python and filter it.
- Send JSON data to spark by using kafka
module of python.
- Receiving JSON data from Zookeeper By using pyspark
streaming.
- Create a pyspark streaming dataframe of
the JSON data.
- Make analysis on it to find the result.
- Store the final output in the hive table using
So, let’s begin...!!!
YouTube data Extracting:
Here we are going to make a
python code for requesting for data from youtube with python googleapiclient
module and filtering received data and then send back to kafka broker port
9092.
we need to import required modules in our python code.
from pandas import
DataFrame as df
from
googleapiclient.discovery import build
from
googleapiclient.errors import HttpError
from kafka import
SimpleProducer, KafkaClient
Now define the required parameters for the YouTube Data API object.
DEVELOPER_KEY =
'developer key’
YOUTUBE_API_SERVICE_NAME
= 'youtube'
YOUTUBE_API_VERSION = 'v3'
Create
a class object for google client to access youtube.
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,developerKey=DEVELOPER_KEY)
Now we have built our object so we can start pulling requests.
Youtube
API has many methods for extract data like search, channel, videos.
1.youtube.search()
will return a JSON of search result will give videos, channel details
according to input parameters or keyword, you searched for, like q=’python’
2. youtube.videos() will return a JSON of search result
will give videos details according to input parameters or keyword you searched
for.
3. youtube.channel() will return a JSON of search result
will give channel details according to input parameters or keyword you searched
for.
We
are using youtube.videos() to extract mostPopular videos in different charts.
search_response =
youtube.videos().list(part='id,snippet,statistics',chart='mostPopular',
regionCode= country, videoCategoryId= categarycode
,maxResults=50).execute()
This the search result will give you a JSON, so we need to filter and convert that in a
proper format.
stream_videos
= []
for i in search_response['items']:
temp_res = dict(v_id = i['id'],
v_title = i['snippet']['title'], ch_id= i['snippet']['channelId'], ch_title=
i['snippet']['channelTitle'], country= country, categary_name = categary,
publised_date = i['snippet']['publishedAt'])
temp_res.update(i['statistics'])
stream_videos.append(temp_res)
print(stream_video)
The output will be like below.
{'v_id':
'orkPrGSAETs', 'v_title': 'The Vision of Bharat | Mahesh Babu | Siva Koratala |
DVV Entertainment | Bharat Ane Nenu Teaser', 'ch_id':
'UCumU_6FNxfHXTmeeFCYz6Yw', 'ch_title': 'DVV Entertainments', 'country': 'IN',
'categary_name': 'Film & Animation', 'publised_date':
'2018-03-06T12:32:50.000Z', 'viewCount': '12981881', 'likeCount': '487734',
'dislikeCount': '32135', 'favoriteCount': '0', 'commentCount': '28179'},
{'v_id': 'ZG1Su0QwPYs', 'v_title': 'Rangamma Mangamma Lyrical Video Song ||
Rangasthalam Songs || Ram Charan, Samantha, Devi Sri Prasad', 'ch_id':
'UCnJjcn5FrgrOEp5_N45ZLEQ', 'ch_title': 'T-Series Telugu', 'country': 'IN',
'categary_name': 'Film & Animation', 'publised_date':
'2018-03-08T12:30:07.000Z', 'viewCount': '9537656', 'likeCount': '173534',
'dislikeCount': '8979', 'favoriteCount': '0', 'commentCount': '12980'},
{'v_id': 'ivmmk3Ud_Xg', 'v_title': 'Family Time With Kapil Sharma Coming Soon
On Sony Television', 'ch_id': 'UCpEhnqL0y41EpW2TvWAHD7Q', 'ch_title': 'SET
India', 'country': 'IN', 'categary_name': 'Film & Animation',
'publised_date': '2018-03-10T14:42:51.000Z', 'viewCount': '5080376', 'likeCount':
'98383', 'dislikeCount': '5040', 'favoriteCount': '0', 'commentCount': '6180'}
Now
we want to send this data to spark streaming for analysis to we have to use
kafka python module parameter now.
topic = 'youtube' #
kafka topic which will be read by spark
kafak= KafkaClient('localhost:9092') #kafka broker post
from where zookeeper will read
producer= SimpleProducer(kafak)
Function which will send this JSON data to spark streaming
def kafkaSend(data):
try:
#print(data)
producer.send_messages('youtube' ,
data.encode('utf-8'))
return True
except BaseException as e:
print("Error on_data: %s"
% str(e))
return True
Spark streaming:
spark streaming is a submodule of spark in which streaming data get analyzed and the result is achieved a time of the interval which is called the batch interval.
Spark
streaming is used in live streaming data for analysis like youtube recommendation a system, where youtube is listening to your current activities and on the bases
of that, it’s recommending new videos to you, which you may like.
This is a simple example of streaming data analysis example, there are so many uses of streaming data analysis.
Import
required in our code:
from
__future__ import print_function
from
pyspark.sql import SparkSession
from
pyspark import SparkContext,SparkConf
from
pyspark.streaming import StreamingContext
from
pyspark.streaming.kafka import KafkaUtils
Parameter
defining for pyspark:
here
we will e define zookeeper broker port and topic which spark will listen to.
zkQuorum =
"localhost:2181"
topic = "youtube"
Creating entry points for spark,
hive and streaming.
sc = SparkContext("local[2]",
appName="youtube")
spark =
SparkSession.builder.enableHiveSupport().getOrCreate()
ssc = StreamingContext(sc,3)
Defining
a function which will analyze and store Dstream’s RDD to Hive table.
def
function_to_split_rows(records):
if records.count() !=0:
spark_dataframe
= spark.read.json(records)
spark_dataframe.show(2)
spark_dataframe.write.insertInto('default.youtubemostPopular',
overwrite=False)
print("its done")
else:
print("Empty RDD")
Here data is being stored in youtubemostPopular table in hive database.