Go to

Saturday, October 17, 2020

Youtube Data Analysis with Apache Spark

Big Data is a Technology which is spread in the tech world like a wildfire. To understand the concept of big data and let's build a simple ELT application to bring youtube data live data analysis with youtube data API.


Project Goal: 
We will make a small project which will extract data from youtube with youtube data API and find out most popular and trending videos in the world.


Apache Spark: 
Spark is a very popular big data analysis tool among the big data technology era. Apache spark is the best substitute of Hadoop, as Hadoop work on MapReduce data processing algorithms and take too much time for computation while Spark works RDD (resilient distributed datasets) and compute your data 1000x time faster than Hadoop MapReduce.


Youtube date API: 
Youtube Data API is provided by Google, for developers to easily develop an application which can use features of youtube.


To use youtube data API you would need to create a developer key.

Step 1: Go to the link below and create a project, and create an API key

https://console.developers.google.com/projectselector/apis/credentials?supportedpurview=project

By going to credentials>create credential> API key

And API key will be generated.

Note: this API key will give you limited access to youtube data with day limit.

Step 2: Goto dashboard>Enable API and services

And choose youtube data API 3 and enable it by accepting term and conditions.

Now you will be able to access youtube data API to request and extract data from youtube.

 

Preconfigured tools required:

  1. Apache spark on local machine: to make spark streaming analysis
  2. Jupyter notebook: python coding environment for pyspark
  3. Hive: SQL database for spark SQL and loading data in database.
  4. Mysql: metastore for Hive.
  5. Apache hadoop hdfs: file store for our data.
  6. Kafka: handling topics for spark streaming.
  7. Zookeeper: managing kafka topics.

To access google all API with python, we gonna need google-api-python-client. So to install this package in your anaconda execute below command.

            pip install --upgrade google-api-python-client # to install google client

            pip install kafka # to install kafka python module


Project Schematics:

  1. Get videos details data from youtube with python and filter it.
  2. Send JSON data to spark by using kafka module of python.
  3. Receiving JSON data  from Zookeeper By using pyspark streaming.
  4. Create a pyspark streaming dataframe of the JSON data.
  5. Make analysis on it to find the result.
  6. Store the final output in the hive table using

So, let’s begin...!!!

YouTube data Extracting: 

Here we are going to make a python code for requesting for data from youtube with python googleapiclient module and filtering received data and then send back to kafka broker port 9092.

we need to import required modules in our python code.

from pandas import DataFrame as df

from googleapiclient.discovery import build

from googleapiclient.errors import HttpError

from kafka import SimpleProducer, KafkaClient

 

Now define the required parameters for the YouTube Data API object.

DEVELOPER_KEY = 'developer key’

YOUTUBE_API_SERVICE_NAME = 'youtube'

YOUTUBE_API_VERSION = 'v3'

Create a class object for google client to access youtube.

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,developerKey=DEVELOPER_KEY)

Now we have built our object so we can start pulling requests.

Youtube API has many methods for extract data like search, channel, videos.

 

1.youtube.search()  will return a JSON of search result will give videos, channel details according to input parameters or keyword, you searched for, like q=’python’

 

2. youtube.videos() will return a JSON of search result will give videos details according to input parameters or keyword you searched for.

 

3. youtube.channel() will return a JSON of search result will give channel details according to input parameters or keyword you searched for.

 

We are using youtube.videos() to extract mostPopular videos in different charts.

 

search_response = youtube.videos().list(part='id,snippet,statistics',chart='mostPopular',

                                           regionCode= country, videoCategoryId= categarycode ,maxResults=50).execute()

 

This the search result will give you a JSON, so we need to filter and convert that in a proper format.

 

stream_videos = []

    

        for i in search_response['items']:

           temp_res = dict(v_id = i['id'], v_title = i['snippet']['title'], ch_id= i['snippet']['channelId'], ch_title= i['snippet']['channelTitle'], country= country, categary_name = categary, publised_date = i['snippet']['publishedAt'])                    

           temp_res.update(i['statistics'])

           stream_videos.append(temp_res)

     

print(stream_video)

 

The output will be like below.

 

{'v_id': 'orkPrGSAETs', 'v_title': 'The Vision of Bharat | Mahesh Babu | Siva Koratala | DVV Entertainment | Bharat Ane Nenu Teaser', 'ch_id': 'UCumU_6FNxfHXTmeeFCYz6Yw', 'ch_title': 'DVV Entertainments', 'country': 'IN', 'categary_name': 'Film & Animation', 'publised_date': '2018-03-06T12:32:50.000Z', 'viewCount': '12981881', 'likeCount': '487734', 'dislikeCount': '32135', 'favoriteCount': '0', 'commentCount': '28179'}, {'v_id': 'ZG1Su0QwPYs', 'v_title': 'Rangamma Mangamma Lyrical Video Song || Rangasthalam Songs || Ram Charan, Samantha, Devi Sri Prasad', 'ch_id': 'UCnJjcn5FrgrOEp5_N45ZLEQ', 'ch_title': 'T-Series Telugu', 'country': 'IN', 'categary_name': 'Film & Animation', 'publised_date': '2018-03-08T12:30:07.000Z', 'viewCount': '9537656', 'likeCount': '173534', 'dislikeCount': '8979', 'favoriteCount': '0', 'commentCount': '12980'}, {'v_id': 'ivmmk3Ud_Xg', 'v_title': 'Family Time With Kapil Sharma Coming Soon On Sony Television', 'ch_id': 'UCpEhnqL0y41EpW2TvWAHD7Q', 'ch_title': 'SET India', 'country': 'IN', 'categary_name': 'Film & Animation', 'publised_date': '2018-03-10T14:42:51.000Z', 'viewCount': '5080376', 'likeCount': '98383', 'dislikeCount': '5040', 'favoriteCount': '0', 'commentCount': '6180'}


Now we want to send this data to spark streaming for analysis to we have to use kafka python module parameter now.

 

topic = 'youtube'   # kafka topic which will be read by spark

kafak= KafkaClient('localhost:9092') #kafka broker post from where zookeeper will read

producer= SimpleProducer(kafak)

 

Function which will send this JSON data to spark streaming


def kafkaSend(data):

   try:

           #print(data)

           producer.send_messages('youtube' , data.encode('utf-8'))

           return True

   except BaseException as e:

           print("Error on_data: %s" % str(e))

   return True

 

Spark streaming: 

spark streaming is a submodule of spark in which streaming data get analyzed and the result is achieved a time of the interval which is called the batch interval.

Spark streaming is used in live streaming data for analysis like youtube recommendation a system, where youtube is listening to your current activities and on the bases of that, it’s recommending new videos to you, which you may like.


This is a simple example of streaming data analysis example, there are so many uses of streaming data analysis.

Import required in our code:

           from __future__ import print_function

from pyspark.sql import SparkSession

from pyspark import SparkContext,SparkConf

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

 

Parameter defining for pyspark:

here we will e define zookeeper broker port and topic which spark will listen to.

             zkQuorum = "localhost:2181"

              topic = "youtube"

 

 Creating entry points for spark, hive and streaming.

 

              sc = SparkContext("local[2]", appName="youtube")

   spark = SparkSession.builder.enableHiveSupport().getOrCreate()

              ssc = StreamingContext(sc,3)

 

Defining a function which will analyze and store Dstream’s RDD to Hive table.

 

            def function_to_split_rows(records):

                         if records.count() !=0:

                                     spark_dataframe = spark.read.json(records)

                                     spark_dataframe.show(2)

                                      spark_dataframe.write.insertInto('default.youtubemostPopular', overwrite=False)

                                      print("its done")

                         else:

                                      print("Empty  RDD")

 

Here data is being stored in youtubemostPopular table in hive database.

 

No comments:

Post a Comment

Power BI Report and Dataset Performance Optimization

  Power BI Report and Dataset Performance Optimization     For any organization developing Power BI reports, there is a strong desire to des...