`

转 spark简单实例

阅读更多

1、准备文件

1
wget http://statweb.stanford.edu/~tibs/ElemStatLearn/datasets/spam.data

 

2、加载文件

1
scala> val inFile = sc.textFile("/home/scipio/spam.data")

 

  输出

1
2
3
14/06/28 12:15:34 INFO MemoryStore: ensureFreeSpace(32880) called with curMem=65736, maxMem=311387750
14/06/28 12:15:34 INFO MemoryStore: Block broadcast_2 stored as values to memory (estimated size 32.1 KB, free 296.9 MB)
inFile: org.apache.spark.rdd.RDD[String] = MappedRDD[7] at textFile at <console>:12

 

3、显示一行

1
scala> inFile.first()

 

  输出

1
2
3
4
5
6
7
8
9
10
14/06/28 12:15:39 INFO FileInputFormat: Total input paths to process : 1
14/06/28 12:15:39 INFO SparkContext: Starting job: first at <console>:15
14/06/28 12:15:39 INFO DAGScheduler: Got job 0 (first at <console>:15) with 1 output partitions (allowLocal=true)
14/06/28 12:15:39 INFO DAGScheduler: Final stage: Stage 0(first at <console>:15)
14/06/28 12:15:39 INFO DAGScheduler: Parents of final stage: List()
14/06/28 12:15:39 INFO DAGScheduler: Missing parents: List()
14/06/28 12:15:39 INFO DAGScheduler: Computing the requested partition locally
14/06/28 12:15:39 INFO HadoopRDD: Input split: file:/home/scipio/spam.data:0+349170
14/06/28 12:15:39 INFO SparkContext: Job finished: first at <console>:15, took 0.532360118 s
res2: String = 0 0.64 0.64 0 0.32 0 0 0 0 0 0 0.64 0 0 0 0.32 0 1.29 1.93 0 0.96 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.778 0 0 3.756 61 278 1

 

 

4、函数运用

 (1)map
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val nums = inFile.map(x=>x.split(' ').map(_.toDouble))
nums: org.apache.spark.rdd.RDD[Array[Double]] = MappedRDD[8] at map at <console>:14
 
scala> nums.first()
14/06/28 12:19:07 INFO SparkContext: Starting job: first at <console>:17
14/06/28 12:19:07 INFO DAGScheduler: Got job 1 (first at <console>:17) with 1 output partitions (allowLocal=true)
14/06/28 12:19:07 INFO DAGScheduler: Final stage: Stage 1(first at <console>:17)
14/06/28 12:19:07 INFO DAGScheduler: Parents of final stage: List()
14/06/28 12:19:07 INFO DAGScheduler: Missing parents: List()
14/06/28 12:19:07 INFO DAGScheduler: Computing the requested partition locally
14/06/28 12:19:07 INFO HadoopRDD: Input split: file:/home/scipio/spam.data:0+349170
14/06/28 12:19:07 INFO SparkContext: Job finished: first at <console>:17, took 0.011412903 s
res3: Array[Double] = Array(0.00.640.640.00.320.00.00.00.00.00.00.640.00.00.00.320.01.291.930.00.960.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.00.7780.00.03.75661.0278.01.0)

 

 (2)collecct
1
2
3
4
5
6
7
8
9
scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:12
 
scala> val mapRdd = rdd.map(2*_)
mapRdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[10] at map at <console>:14
 
scala> mapRdd.collect
14/06/28 12:24:45 INFO SparkContext: Job finished: collect at <console>:17, took 1.789249751 s
res4: Array[Int] = Array(246810)

 

 (3)filter
1
2
3
4
5
6
scala> val filterRdd = sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
filterRdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[13] at filter at <console>:12
 
scala> filterRdd.collect
14/06/28 12:27:45 INFO SparkContext: Job finished: collect at <console>:15, took 0.056086178 s
res5: Array[Int] = Array(6810)

 

 (4)flatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scala> val rdd = sc.textFile("/home/scipio/README.md")
14/06/28 12:31:55 INFO MemoryStore: ensureFreeSpace(32880) called with curMem=98616, maxMem=311387750
14/06/28 12:31:55 INFO MemoryStore: Block broadcast_3 stored as values to memory (estimated size 32.1 KB, free 296.8 MB)
rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[15] at textFile at <console>:12
 
scala> rdd.count
14/06/28 12:32:50 INFO SparkContext: Job finished: count at <console>:15, took 0.341167662 s
res6: Long = 127
 
scala> rdd.cache
res7: rdd.type = MappedRDD[15] at textFile at <console>:12
 
scala> rdd.count
14/06/28 12:33:00 INFO SparkContext: Job finished: count at <console>:15, took 0.32015745 s
res8: Long = 127
 
scala> val wordCount = rdd.flatMap(_.split(' ')).map(x=>(x,1)).reduceByKey(_+_)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[20] at reduceByKey at <console>:14
 
scala> wordCount.collect
 
res9: Array[(String, Int)] = Array((means,1), (under,2), (this,4), (Because,1), (Python,2), (agree,1), (cluster.,1), (its,1), (YARN,,3), (have,2), (pre-built,1), (MRv1,,1), (locally.,1), (locally,2), (changed,1), (several,1), (only,1), (sc.parallelize(1,1), (This,2), (basic,1), (first,1), (requests,1), (documentation,1), (Configuration,1), (MapReduce,2), (without,1), (setting,1), ("yarn-client",1), ([params]`.,1), (any,2), (application,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (version,3), (file,1), (documentation,,1), (test,1), (MASTER,1), (entry,1), (example,3), (are,2), (systems.,1), (params,1), (scala>,1), (<artifactId>hadoop-client</artifactId>,1), (refer,1), (configure,1), (Interactive,2), (artifact,1), (can,7), (file's,1), (build,3), (when,2), (2.0.X,,1), (Apac...
 
scala> wordCount.saveAsTextFile("/home/scipio/wordCountResult.txt")

 

 (5)union
1
2
3
4
5
6
7
8
9
10
11
12
scala> val rdd = sc.parallelize(List(('a',1),('a',2)))
rdd: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:12
 
scala> val rdd2 = sc.parallelize(List(('b',1),('b',2)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:12
 
scala> rdd union rdd2
res3: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[12] at union at <console>:17
 
scala> res3.collect
 
res4: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))

 

 (6) join
1
2
3
4
5
6
7
8
9
10
11
12
scala> val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:12
 
scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:12
 
scala> rdd1 join rdd2
res1: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = FlatMappedValuesRDD[14] at join at <console>:17
 
res1.collect
 
res2: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))

 

 (7)lookup
1
2
3
val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1.lookup('a')
res3: Seq[Int] = WrappedArray(12)

 

 (8)groupByKey
1
2
3
4
5
val wc = sc.textFile("/home/scipio/README.md").flatMap(_.split(' ')).map((_,1)).groupByKey
wc.collect
 
14/06/28 12:56:14 INFO SparkContext: Job finished: collect at <console>:15, took 2.933392093 s
res0: Array[(String, Iterable[Int])] = Array((means,ArrayBuffer(1)), (under,ArrayBuffer(11)), (this,ArrayBuffer(1111)), (Because,ArrayBuffer(1)), (Python,ArrayBuffer(11)), (agree,ArrayBuffer(1)), (cluster.,ArrayBuffer(1)), (its,ArrayBuffer(1)), (YARN,,ArrayBuffer(111)), (have,ArrayBuffer(11)), (pre-built,ArrayBuffer(1)), (MRv1,,ArrayBuffer(1)), (locally.,ArrayBuffer(1)), (locally,ArrayBuffer(11)), (changed,ArrayBuffer(1)), (sc.parallelize(1,ArrayBuffer(1)), (only,ArrayBuffer(1)), (several,ArrayBuffer(1)), (This,ArrayBuffer(11)), (basic,ArrayBuffer(1)), (first,ArrayBuffer(1)), (documentation,ArrayBuffer(1)), (Configuration,ArrayBuffer(1)), (MapReduce,ArrayBuffer(11)), (requests,ArrayBuffer(1)), (without,ArrayBuffer(1)), ("yarn-client",ArrayBuffer(1)), ([params]`.,Ar...

 

 (9)sortByKey
1
2
3
4
val rdd = sc.textFile("/home/scipio/README.md")
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
val wcsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
wcsort.saveAsTextFile("/home/scipio/sort.txt")

 

 升序的话,sortByKey(true)

http://my.oschina.net/scipio/blog/284957#OSC_h5_11

http://bit1129.iteye.com/blog/2171799

http://bit1129.iteye.com/blog/2171811

分享到:
评论

相关推荐

    Spark开发实例(编程实践)

    Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种运行模式,可使用单机模式,也可以使用分布式模式。为简单起见,本节采用单机模式运行 Spark。 无论采用哪种模式,只要...

    hadoop scala spark 例子项目,运行了单机wordcount

    hadoop scala spark 例子项目,运行了单机wordcount

    hadoop+spark机器学习实例

    HadoopSparkExampler,Hadoop+Spark大数据巨量分析演示代码

    spark机器学习简单实例文档

    这是当时自己写的简单的spark机器学习部分方法的基本实现,是文档格式,里面有详细解释和代码

    spark自己编写的例子程序

    非常好用,自己测试过,非常好用,自己测试过,非常好用,自己测试过

    spark-scala-maven实例

    基于spark的scala maven实例项目两个简单的统计实例,适合初学者了解。 /** * 第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMaster来设置程序要链接的Spark集群的...

    spark及stream任务简单实现框架及使用实例

    spark及stream任务实现框架及使用实例,结果存入mysql数据库,包含了一套最简单的实现框架,方便添加各种简单的任务

    Python3:Python+spark编程实战

    本文来自csdn,本文简单介绍了Python+spark的配置运行及实例介绍,希望对您的学习有所启迪。0.1配置可参考:0.2有关spark说明:spark不兼容Python3.6安装注意版本可下载:anaconda4.21.1数据student.txt1.2代码1.3...

    SparkExample:Spark相关的一些例子,代码详细注释并赋予相关代码解析

    基于spark大数据平台写的一些简单例子 关于spark平台的Streaming、sql、sparkMllib、spark写的一些简单的demo程序 这边可以直接将项目下载下来导入到eclipse或则Intellij IDE中,每一个程序都能单独执行,里面给出了...

    spark源码之scala基础语法demo

    1:面向对象,可以定义class,通过new调用实例对象使用。 2:兼容java,在scala中可以直接调用java方法。 2:函数式编程,柯里化函数,匿名函数,高阶函数等。 3:代码行简单。 4:支持并发控制,Actor Model机制 5...

    hadoop+spark分布式集群搭建及spark程序示例.doc

    hadoop+spark分布式集群搭建及spark程序示例,例子程序为用二项逻辑斯蒂回归进行二分类分析和一个简单的求平均的程序,两种不同的运行方式

    大数据学习笔记

    17.2 简单消费者实例 63 17.3 消费者群例子 65 第18章 KAFKA与SPARK集成 67 18.1 Kafka与spark集成 67 18.2 SparkConf API 67 18.3 StreamingContext API 67 18.4 KafkaUtils API 67 18.5 建立脚本 69 18.6 编译/...

    Spark javaweb框架

    Spark是一个可像Node.js的Express那样快速Web框架,它虽然和大数据处理框架Spark同名,但是它是基于Java的,受Ruby的Sinatra框架鼓舞,用于Java的...本例是可运行至tomcat等应用服务器上的简单例子,纯java开发,为源码

    spark rdd转dataframe 写入mysql的实例讲解

    spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称...

    Spark自定义累加器的使用实例详解

    下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和。 val sparkConf = new SparkConf().setAppName(Test).setMaster(local[2]) val sc = new SparkContext...

    SparkOnHBase:SparkOnHBase

    这是一个用于Spark的HBase的简单可重用库 功能性 当前功能支持以下功能 批量放置 bulkDelete bulkIncrement bulkGet bulkCheckAndPut bulkCheckAndDelete foreachPartition(带有HConnection) mapPartition...

    Python的Spark:Python的Spark基础(使用PySpark),代码示例

    用Python火花Apache Spark 是技术领域中最热门的新趋势之一。 它是实现大数据与机器学习结合的成果的最大潜力框架。 它运行速度快(由于在内存中进行操作,因此比传统的快100倍,提供健壮...那样简单。我们大多数基于P

    spark-jetty-server:Apache Spark 的食谱和示例

    Spark 码头服务器这是一个简单的示例,演示如何在以纱线客户端模式运行的 Jetty Web 服务器中嵌入SparkContext 。 这被证明是非常重要的,因为了解 Spark 类路径是如何构建的对于完成这项工作是非常必要的。 到目前...

    spark_codebase:带有基线单元测试的 Spark 核心、流、sql、mllib 示例和应用程序的集合

    Spark 流用例一个简单的火花流用例,用于执行 Apache 日志分析,它可以从 Kafka 和 Kinesis 读取数据,执行一些分析并将结果保存到 cassandra。 测试 Spark 核心、流媒体和 SQL API 的 ScalaTest 规范特征 用于测试...

Global site tag (gtag.js) - Google Analytics