How to use SBT with Eclipse scala IDE

Scala is most used language in big data programming.Programmer from java background has used an eclipse IDE for programming. so eclipse also have there scala-ide (http://scala-ide.org/). As eclipse supports more of the maven as a build engine in-order to use SBT here are some tricks.

Those who are not aware about sbt (Simple build tool). SBT is a default build engine tool for scala.

i will show  you  an example of Apache spark. I will try to import a code in eclipse  using sbteclipse plugin.

Lets get started, how to use SBT with Eclipse scala IDE

I have created a shell script which will create default project structure of SCALA language, it will also create a build.sbt file which is require for SBT build engine

#!/bin/sh
mkdir -p src/{main,test}/{java,resources,scala}
mkdir lib project target

# create an initial build.sbt file
echo 'name := "SubscriptionExpiry"
version := "1.0"
scalaVersion := "2.11.8"' > build.sbt

you can also run a shell script on windows machine using CYGWIN (https://www.cygwin.com/)

below script will create you a project structure.

now installing a SBT Eclipse plugin in sbt (https://github.com/typesafehub/sbteclipse).

Go to C:\Users\[username]\.sbt\0.13\plugins\

create a plugins.sbt file. open it with notepad, go to link (https://github.com/typesafehub/sbteclipse),
copy line

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.0.1")
paste it to the plugins.sbt. save it.

open your command prompt
now go to project folder

cd c:\Users\[username]\workspace\[projectname]
sbt reload

note that your internet should be on this time, it will download a plugin.

now open plugins.sbt and add your require dependencies

name := "SubscriptionExpiry"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-RC1"

open the command prompt


cd c:\Users\[username]\workspace\[projectname]
sbt eclipse

it will download all your dependencies.

for open you scala-ide (Eclipse) and import a project…

that it’s you are done with it

Apache Spark : ERROR : socket.gaierror: [Errno -2] Name or service not known

Getting Error “socket.gaierror: [Errno -2] Name or service not known”  while executing “./bin/pyspark”


[root@ip-10-0-0-28 spark]# ./bin/pyspark
Python 2.6.6 (r266:84292, Aug 18 2016, 15:13:37)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
/root/spark/spark/python/pyspark/sql/context.py:487: DeprecationWarning: HiveContext is deprecated in Spark 2.0.0. Please use SparkSession.builder.enableHiveSupport().getOrCreate() instead.
DeprecationWarning)
16/12/06 13:58:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/root/spark/spark/python/pyspark/shell.py", line 43, in
spark = SparkSession.builder\
File "/root/spark/spark/python/pyspark/sql/session.py", line 169, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/root/spark/spark/python/pyspark/context.py", line 294, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/root/spark/spark/python/pyspark/context.py", line 115, in __init__
conf, jsc, profiler_cls)
File "/root/spark/spark/python/pyspark/context.py", line 174, in _do_init
self._accumulatorServer = accumulators._start_update_server()
File "/root/spark/spark/python/pyspark/accumulators.py", line 259, in _start_update_server
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
File "/usr/lib64/python2.6/SocketServer.py", line 412, in __init__
self.server_bind()
File "/usr/lib64/python2.6/SocketServer.py", line 423, in server_bind
self.socket.bind(self.server_address)
File "", line 1, in bind
socket.gaierror: [Errno -2] Name or service not known

Then,

Just go to “/etc/hosts” of the server and add line
127.0.0.1 localhost

From the logs it looks like pyspark is unable to understand host localhost.Please check your /etc/hosts file , if localhost is not available , add an entry it should resolve this issue.

e.g:

[Ip] [Hostname] localhost

In case you are not able to change host entry of the server edit /python/pyspark/accumulators.py line number 269 as below

server = AccumulatorServer((“[server host name from hosts file]”, 0), _UpdateRequestHandler)

Apache Spark Components

spark
Spark Core
Spark Core contains the basic functionality of Spark, including components for task scheduling, memory management, fault recovery, interacting with storage systems,
and more. Spark Core is also home to the API that defines resilient distributed datasets (RDDs), which are Spark’s main programming abstraction. RDDs represent a
collection of items distributed across many compute nodes that can be manipulated in parallel. Spark Core provides many APIs for building and manipulating these
collections.

Spark SQL
Spark SQL is Spark’s package for working with structured data. It allows querying data via SQL as well as the Apache Hive variant of SQL—called the Hive Query Language
(HQL)—and it supports many sources of data, including Hive tables, Parquet, and JSON. Beyond providing a SQL interface to Spark, Spark SQL allows developers
to intermix SQL queries with the programmatic data manipulations supported by RDDs in Python, Java, and Scala, all within a single application, thus combining SQL
with complex analytics. This tight integration with the rich computing environment provided by Spark makes Spark SQL unlike any other open source data warehouse
tool. Spark SQL was added to Spark in version 1.0.Shark was an older SQL-on-Spark project out of the University of California, Berkeley,that modified Apache Hive to run on Spark. It has now been replaced by Spark SQL to provide better integration with the Spark engine and language APIs.

Spark Streaming
Spark Streaming is a Spark component that enables processing of live streams of data.Examples of data streams include logfiles generated by production web servers, or
queues of messages containing status updates posted by users of a web service. Spark Streaming provides an API for manipulating data streams that closely matches the
Spark Core’s RDD API, making it easy for programmers to learn the project and move between applications that manipulate data stored in memory, on disk, or arriving
in real time. Underneath its API, Spark Streaming was designed to provide the same degree of fault tolerance, throughput, and scalability as Spark Core.

MLlib
Spark comes with a library containing common machine learning (ML) functionality,called MLlib. MLlib provides multiple types of machine learning algorithms, including
classification, regression, clustering, and collaborative filtering, as well as supporting functionality such as model evaluation and data import. It also provides
some lower-level ML primitives, including a generic gradient descent optimization algorithm. All of these methods are designed to scale out across a cluster.

GraphX
GraphX is a library for manipulating graphs (e.g., a social network’s friend graph) and performing graph-parallel computations. Like Spark Streaming and Spark SQL,
GraphX extends the Spark RDD API, allowing us to create a directed graph with arbitrary properties attached to each vertex and edge. GraphX also provides various operators
for manipulating graphs (e.g., subgraph and mapVertices) and a library of common graph algorithms (e.g., PageRank and triangle counting).

Cluster Managers
Under the hood, Spark is designed to efficiently scale up from one to many thousands of compute nodes. To achieve this while maximizing flexibility, Spark can run over a
variety of cluster managers, including Hadoop YARN, Apache Mesos, and a simple cluster manager included in Spark itself called the Standalone Scheduler. If you are
just installing Spark on an empty set of machines, the Standalone Scheduler provides an easy way to get started; if you already have a Hadoop YARN or Mesos cluster,
however, Spark’s support for these cluster managers allows your applications to also run on them.

Maven : Copy Dependencies in Target Folder

How do I get my project’s runtime dependencies copied into the target/lib folder?

Copy below XML code into pom.xml

<project>
  ...
  <profiles>
    <profile>
      <id>qa</id>
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
              <execution>
                <phase>install</phase>
                <goals>
                  <goal>copy-dependencies</goal>
                </goals>
                <configuration>
                  <outputDirectory>${project.build.directory}/lib</outputDirectory>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>
</project>

Run mvn install command on shell
maven

Now Run copy denpendencies command
mvn install dependency:copy-dependencies

copydependencies

That’s it you re done!!

YACassandraPDO : How to fetch UUID column from cassandra table

It is observer that when we fetch the UUID field using YACassandraPDO driver in PHP it fetches the garbage. for example

select dateof(uuidfield) as theTimeStamp from table;

output will be :

array(1) {
[0]=>
array(2) {
["theTimeStamp"]=>
string(8) "�;e$��"
[0]=>
string(8) "�;e$��"
}
}

Expected:

array(1) {
[0]=>
array(2) {
["theTimeStamp"]=>
string(8) "2012-12-04 10:00:00+0100"
[0]=>
string(8) "2012-12-04 10:00:00+0100"
}
}

For timestamp, pdo returning the hexodecimal string. You can use below function to convert it back to date string.


function getDateStringFromHex($str) {
$date = unpack('H*', $str);
$time = hexdec($date[1]) / 1000;
$dateStr = date('Y-m-d H:i:s', $time);
return $dateStr;
}

Definition Storm Data Model and Topology

The Storm data model

The basic unit of data that can be processed by a Storm application is called a tuple.Each tuple consists of a predefined list of fields. The value of each field can be a byte,char, integer, long, float, double, Boolean, or byte array. Storm also provides an API to define your own data types, which can be serialized as fields in a tuple.

A tuple is dynamically typed, that is, you just need to define the names of the fields in a tuple and not their data type. The choice of dynamic typing helps to simplify the API and makes it easy to use. Also, since a processing unit in Storm can process multiple types of tuples, it’s not practical to declare field types.

Each of the fields in a tuple can be accessed by its name getValueByField(String) or its positional index getValue(int) in the tuple. Tuples also provide convenient methods such as getIntegerByField(String) that save you from typecasting the objects. For example, if you have a Fraction(numerator, denominator) tuple,representing fractional numbers, then you can get the value of the numerator by
either using getIntegerByField(“numerator”) or getInteger(0).

Definition of a Storm topology
In Storm terminology, a topology is an abstraction that defines the graph of the computation. You create a Storm topology and deploy it on a Storm cluster to process the data. A topology can be represented by a direct acyclic graph, where each node does some kind of processing and forwards it to the next node(s) in the flow.

The following is a sample Storm topology:

2

The following are the components of a Storm topology:

Stream: The key abstraction in Storm is that of a stream. A stream is an unbounded sequence of tuples that can be processed in parallel by Storm.Each stream can be processed by a single or multiple types of bolts. Each stream in a Storm application is given an ID and the bolts can produce and consume tuples from these streams on the basis of their ID. Each stream also has an associated schema for the tuples that will flow through it.

Spout: A spout is the source of tuples in a Storm topology. It is responsible for reading or listening to data from an external source, for example,by reading from a logfile or listening for new messages in a queue and publishing them—emitting, in Storm terminology—into streams. A spout can emit multiple streams, each of different schemas. For example, it can read 10-field records from a logfile and emit them as different streams of 7-tuples and 4-tuples each. The backtype.storm.spout.ISpout interface is the interface used to define spouts. If you are writing your topology in Java, then you should use backtype.storm.topology.IRichSpout as it declares methods to use the TopologyBuilder API. Whenever a spout emits a tuple, Storm tracks all the tuples generated while processing this tuple, and when the execution of all the tuples in the graph of this source tuple is complete, it will send back an acknowledgement to the spout. This tracking happens only if a message ID was provided while emitting the tuple. If null was used as message ID, this tracking will not happen.

A tuple-processing timeout can also be defined for a topology, and if a tuple is not processed within the specified timeout, a fail message will be sent back to the spout. Again, this will happen only if you define a message ID. A small performance gain can be extracted out of Storm at the risk of some data loss by disabling the message acknowledgements, which can be done by skipping the message ID while emitting tuples.

The important methods of spout are:

nextTuple(): This method is called by Storm to get the next tuple from the input source. Inside this method, you will have the logic of reading data from the external sources and emitting them to an instance of backtype.storm.spout.ISpoutOutputCollector.The schema for streams can be declared by using the declareStream method of backtype.storm.topology.OutputFieldsDeclarer. If a spout wants to emit data to more than one stream, it can declare multiple streams using the declareStream method and specify a stream ID while emitting the tuple. If there are no more tuples to emit at the moment, this method would not be blocked. Also, if this method does not emit a tuple, then Storm will wait for 1 millisecond before calling it again. This waiting time can be configured using the topology.sleep.spout.wait.strategy.time.ms setting.

ack(Object msgId): This method is invoked by Storm when the tuple with the given message ID is completely processed by the topology. At this point, the user should mark the message as processed and do the required cleaning up such as removing the message from the message queue so that it does not get processed again.

fail(Object msgId): This method is invoked by Storm when it identifies that the tuple with the given message ID has not been processed successfully or has timed out of the configured interval. In such scenarios, the user should do the required processing so that the messages can be emitted again by the nextTuple method.A common way to do this is to put the message back in the incoming message queue.

open(): This method is called only once—when the spout is initialized.If it is required to connect to an external source for the input data,define the logic to connect to the external source in the open method, and then keep fetching the data from this external source in the nextTuple method to emit it further.Another point to note while writing your spout is that none of the methods should be blocking, as Storm calls all the methods in the same thread. Every spout has an internal buffer to keep track of the status of the tuples emitted so far. The spout will keep the tuples in this buffer until they are either acknowledged or failed, calling the ack or fail method respectively. Storm will call the nextTuple method only when this buffer is not full.

Bolt: A bolt is the processing powerhouse of a Storm topology and is responsible for transforming a stream. Ideally, each bolt in the topology should be doing a simple transformation of the tuples, and many such bolts can coordinate with each other to exhibit a complex transformation.

The backtype.storm.task.IBolt interface is preferably used to define bolts, and if a topology is written in Java, you should use the backtype.storm.topology.IRichBolt interface. A bolt can subscribe to multiple streams of other components—either spouts or other bolts—in the topology and similarly can emit output to multiple streams. Output streams can be declared using the declareStream method of backtype.storm.topology.OutputFieldsDeclarer.

The important methods of a bolt are:

execute(Tuple input): This method is executed for each tuple that comes through the subscribed input streams. In this method,you can do whatever processing is required for the tuple and then produce the output either in the form of emitting more tuples to the declared output streams or other things such as persisting the results in a database.

You are not required to process the tuple as soon as this method is called, and the tuples can be held until required. For example, while joining two streams, when a tuple arrives, you can hold it until its counterpart also comes, and then you can emit the joined tuple.The metadata associated with the tuple can be retrieved by the various methods defined in the Tuple interface. If a message ID is associated with a tuple, the execute method must publish an ack or fail event using OutputCollector for the bolt or else Storm will not know whether the tuple was processed successfully or not. The backtype.storm.topology.IBasicBolt interface is a convenient interface that sends an acknowledgement automatically after the completion of the execute method. In the case that a fail event is to be sent, this method should throw backtype.storm.topology.FailedException.

prepare(Map stormConf, TopologyContext context,OutputCollector collector): A bolt can be executed by multiple workers in a Storm topology. The instance of a bolt is created on the client machine and then serialized and submitted to Nimbus. When Nimbus creates the worker instances for the topology, it sends this serialized bolt to the workers. The work will desterilize the bolt and call the prepare method. In this method, you should make sure the bolt is properly configured to execute tuples now. Any state that you want to maintain can be stored as instance variables for the bolt that can be serialized/deserialized later.

Storm Components

A Storm cluster follows a master-slave model where the master and slave processes are coordinated through ZooKeeper. The following are the components of a Storm cluster.

Nimbus
The Nimbus node is the master in a Storm cluster. It is responsible for distributing the application code across various worker nodes, assigning tasks to different machines, monitoring tasks for any failures, and restarting them as and when required.Nimbus is stateless and stores all of its data in ZooKeeper. There is a single Nimbus node in a Storm cluster. It is designed to be fail-fast, so when Nimbus dies, it can be restarted without having any effects on the already running tasks on the worker nodes. This is unlike Hadoop, where if the JobTracker dies, all the running jobs are left in an inconsistent state and need to be executed again.

Supervisor nodes

Supervisor nodes are the worker nodes in a Storm cluster. Each supervisor node runs a supervisor daemon that is responsible for creating, starting, and stopping worker processes to execute the tasks assigned to that node. Like Nimbus, a supervisor daemon is also fail-fast and stores all of its state in ZooKeeper so that it can be restarted
without any state loss. A single supervisor daemon normally handles multiple worker processes running on that machine.

The ZooKeeper cluster
In any distributed application, various processes need to coordinate with each other and share some configuration information. ZooKeeper is an application that provides all these services in a reliable manner. Being a distributed application, Storm also uses a ZooKeeper cluster to coordinate various processes. All of the states associated with
the cluster and the various tasks submitted to the Storm are stored in ZooKeeper. Nimbus and supervisor nodes do not communicate directly with each other but through ZooKeeper. As all data is stored in ZooKeeper, both Nimbus and the supervisor daemons can be killed abruptly without adversely affecting the cluster.

1

 

 

Apache Storm Introduction

Apache Storm has emerged as the platform of choice for the industry leaders to develop such distributed, real-time, data processing platforms.

Stream processing: Storm is used to process a stream of data and update a variety of databases in real time. This processing occurs in real time and the processing speed needs to match the input data speed.

Continuous computation: Storm can do continuous computation on data streams and stream the results into clients in real time. This might require processing each message as it comes or creating small batches over a little time. An example of continuous computation is streaming trending topics on Twitter into browsers.

Distributed RPC: Storm can parallelize an intense query so that you can compute it in real time.

Real-time analytics: Storm can analyze and respond to data that comes from different data sources as they happen in real time.

Features of Storm

Fast: Storm has been reported to process up to 1 million tuples per second per node.

Horizontally scalable: Being fast is a necessary feature to build a high volume/velocity data processing platform, but a single-node will have an upper limit on the number of events that it can process per second. A node represents a single machine in your setup that execute Storm applications. Storm, being a distributed platform, allows you to add more nodes to your Storm cluster and increase the processing capacity of your application. Also, it is linearly scalable, which means that you can double the processing capacity by doubling the nodes.

Fault tolerant: Units of work are executed by worker processes in a Storm cluster. When a worker dies, Storm will restart that worker, and if the node on which the worker is running dies, Storm will restart that worker on some other node in the cluster.

Guaranteed data processing: Storm provides strong guarantees that each message passed on to it to process will be processed at least once. In the event of failures, Storm will replay the lost tuples. Also, it can be configured so that each message will be processed only once.

Easy to operate: Storm is simple to deploy and manage. Once the cluster is deployed, it requires little maintenance.

Programming language agnostic: Even though the Storm platform runs on Java Virtual Machine, the applications that run over it can be written in any programming language that can read and write to standard input and output streams.