文章目录
  1. 1. Overview
  2. 2. 背景设计
  3. 3. 集群搭建
  4. 4. HBase建库
  5. 5. 网页入库
  6. 6. SBT
  7. 7. HBase基本操作
    1. 7.1. build.sbt
    2. 7.2. 主程序
  8. 8. Spark基本操作
    1. 8.1. build.sbt
    2. 8.2. 主程序
    3. 8.3. 那些踩过的坑
      1. 8.3.1. spark用户提交无法读取用户目录
      2. 8.3.2. 缺少org.apache.htrace
      3. 8.3.3. Scala版本和集群不对
  9. 9. 后记
  10. 10. Reference

Overview

话说最近搜索相关工作没啥改进空间,所以正好抽出时间来学习一下hadoop的上层工具,这边用的挺多的,但是之前在某搜索引擎大厂基本都是在用hadoop streaming,对于Hadoop上的上层工具基本上没用过,也不了解,而且2年了hadoop也有了很多发展,和之前有了很多变化,所以磨刀不误砍柴功(强行打酱油),还是需要研究研究的。

背景设计

没有目标的学习都是耍流氓,所以得给自己设计一个虚拟的目标。这里场景假设为将抓取下来的网页存入到HBase中,然后再通过Spark对网页进行处理,解析出有用的字段。所以主要学习目标如下:

  1. 将之前抓的存在文件系统中的网页存入HBase
  2. 对HBase的基本API操作
  3. Spark上处理HBase内的网页

好吧,其实这也是之前挖过的坑Naive-Qie系列,坑还没填呢,就推到重构了,2333333

集群搭建

这部分有空再写一篇总结一下心得吧,直接用的小胖推荐的Cloudera Manager,虽然和Red Hat一样,用着免费服务收费的模式,但是界面还挺不错的,自动模式也省心,虽然踩了不少坑。这类Hadoop全家桶的工具挺多的,这边好像用的是Apache Ambari。哪个更好用就不知道了,不过开源项目,你懂的。

HBase建库

关于HBase的设计,目前还没有仔细了解,就纯粹当成NoSQL去用,不考虑性能。这里RowKey用url的md5,但其实这是不对的,虽然可以把数据散列开,但是最大的问题是没办法利用key是有序的特点,进行范围查询,所以有空会研究一下HBase表设计的最佳实践,再设计一个实用的网页库。包括两个列族,meta用于存储url等信息,data用于存储网页内容。用HBase的shell建表:

1
hbase(main):004:0> create 'pagedb', 'meta', 'data'

网页入库

之前抓了1000多某电商网站的网页,正好全部丢到HBase里去,省得重新抓了,也省略了抓取部分(反正就是个perl脚本)。正好最近刚填完了Golang学习笔记的坑,HBase还有thrift接口,正好Scala才开始复习,就打算用Go了。虽然thrift之前没用过,但是github上有封装好的goh。大致了解了一下,HBase的thrift接口目前有v1和v2两个版本,v2是在v1上重构的,提供了更简洁的接口,然而,goh还是基于v1的,所以反正我懒得研究thrift了,先用v1吧就。废话不多说,先上代码,这里就是把go当成脚本写了,写的丑别怪我,毕竟之前0行Go的工程经验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main
import (
"fmt"
"bufio"
"os"
"strings"
"crypto/md5"
"io"
"io/ioutil"
"github.com/sdming/goh"
"github.com/sdming/goh/Hbase"
)
func main() {
if len(os.Args) != 2 {
fmt.Println("need filename")
return
}
address := "datanode2:9090"
client, err := goh.NewTcpClient(address, goh.TBinaryProtocol, false)
if err != nil {
fmt.Println(err)
return
}
if err = client.Open(); err != nil {
fmt.Println(err)
return
}
defer client.Close()
//读入一个url\t/path/to/html的文件
f, err := os.Open(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
defer f.Close()
table := "pagedb"
reader := bufio.NewReader(f)
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
continue
}
vs := strings.Split(strings.Trim(line, "\n"), "\t")
bv, err := ioutil.ReadFile(vs[1])
if err != nil {
fmt.Println(err)
continue
}
key := fmt.Sprintf("%x", md5.Sum([]byte(vs[0])))
mutations := make([]*Hbase.Mutation, 2)
mutations[0] = goh.NewMutation("meta:url", []byte(vs[0]))
mutations[1] = goh.NewMutation("data:content", bv)
fmt.Println(client.MutateRow(table, []byte(key), mutations, nil))
}
}

Go其实还是非常简单实用的,这部分花的时间最少,因为不涉及什么依赖,目标也简单,Go写起来还比较舒服的,小研究了一下就写出来了。

SBT

yet another build tools…话说JVM圈的构建工具真是太多了,从大学开始接触过的开源构建工具就有ant, maven, gradle, ivy, sbt,就这还不算他们的不同版本。sbt(跟我一起念shi bian tai)是scala主推的构建工具,既然决定要学习一下Scala,那么sbt肯定不能放过,工具还是很简单的,但是感觉主要还是编译用,打包功能还得靠插件,配置上就是scala程序,感觉还是挺蛋疼的,不过也比maven的xml舒服一点。
一个典型的SBT项目组织结构

project目录下的plugins.sbt是配置插件用的,我这里主要是配置一个sbt-assembly插件,虽然并没有用上,不过也写着吧。

1
2
// project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")

HBase基本操作

build.sbt

1
2
3
4
5
6
7
8
9
10
name := "hbase test"
version := "1.0"
scalaVersion := "2.10.4"
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-common" % "2.6.0",
"org.apache.hbase" % "hbase-common" % "1.0.0",
"org.apache.hbase" % "hbase-client" % "1.0.0"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.odinliu.test
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,HBaseAdmin,HTable,Put,Get,Scan,Result,ResultScanner}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.fs.Path
import scala.util.matching.Regex
object TestHbase {
def printRowKey(rs: ResultScanner) {
val result = rs.next()
if (result != null) {
println(Bytes.toString(result.getRow()))
//println(Bytes.toString(result.getValue(Bytes.toBytes("meta"), Bytes.toBytes("title"))))
printRowKey(rs)
}
}
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val connection = ConnectionFactory.createConnection(conf)
val admin = connection.getAdmin()
val listtables = admin.listTables()
listtables.foreach(println)
val table = connection.getTable(TableName.valueOf("pagedb"))
val rs = table.getScanner(new Scan())
printRowKey(rs)
rs.close()
val result = table.get(new Get(Bytes.toBytes("fbc9fdefb5e1391c34abd4da2c88a13f")))
val value = Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content")))
println(value)
table.close()
}
}

其实Scala也是一门挺强大的语言,就是语法糖太多,太甜,但是好处就是写起来比较随意,当成Java来写倒是也不会出什么问题。因为读写HBase是本地的,直接sbt clean run就可以运行了,还是挺方便的。唯一需要注意的是各个依赖版本的问题,最好和CDH的各个版本一致。HBase这里没出啥问题,不过由于HBase新的API和之前不大一样,网上很多范例都没办法用,特别是HBase自己的手册也是用的老版本,真是无语。

Spark基本操作

build.sbt

1
2
3
4
5
6
7
8
9
10
11
12
13
name := "spark with hbase"
version := "1.0"
scalaVersion := "2.10.4"
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
"org.apache.hadoop" % "hadoop-common" % "2.6.0" % "provided",
"org.apache.hbase" % "hbase-common" % "1.0.0" % "provided",
"org.apache.hbase" % "hbase-server" % "1.0.0" % "provided",
"org.apache.htrace" % "htrace-core" % "3.1.0-incubating",
"org.apache.hbase" % "hbase-client" % "1.0.0" % "provided"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

这里这些包都是编译用的,提交到Spark集群上是用CDH的包,所以版本一定要注意,我用的CDH-5.4.8-1.cdh5.4.8.p0.4版本相关版本是这样的,如果不注意版本是要踩坑的。

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.odinliu.test
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,HBaseAdmin,HTable,Put,Get,Scan,Result,ResultScanner}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase.mapred.{TableOutputFormat}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf}
import scala.util.matching.Regex
object SparkHbase {
private val titleRegex = new Regex("""<title>([^<]+)</title>""", "title")
def saveTitle(result: Result) = {
val key = Bytes.toString(result.getRow)
val title = parseTitle(Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content"))))
println("rowkey["+key+"] title["+title+"]")
val p = new Put(Bytes.toBytes(key))
p.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("title"), Bytes.toBytes(title))
(new ImmutableBytesWritable, p)
}
def parseTitle(page: String) = titleRegex.findFirstMatchIn(page).get.group("title")
def main(args: Array[String]) {
val sconf = new SparkConf().setAppName("HBase With Spark")
val sc = new SparkContext(sconf)
val hconf = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.zookeeper.quorum", "datanode1,datanode2,datanode0")
hconf.set(TableInputFormat.INPUT_TABLE, "pagedb")
val jobConf = new JobConf(hconf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "pagedb")
val dbRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val count = dbRDD.count()
println("pagedb count: " + count)
dbRDD.cache()
//处理网页
val save = dbRDD.map {
case (_, result) => saveTitle(result)
}
save.saveAsHadoopDataset(jobConf)
}
}

这里主程序解析网页直接用正则了,因为主要是试验一下在Spark里读写HBase,所以用正则意思意思,正常还是要用xpath。提交任务:

1
[odin@DannyGreen tmp]$ sudo -u spark spark-submit --class "com.odinliu.test.SparkHbase" /tmp/spark-with-hbase_2.10-1.0.jar

那些踩过的坑

spark用户提交无法读取用户目录

这个比较诡异,有的机器有,有的没有,但是在我写码的机器就不行,只能把包拷贝到/tmp/下,并到那个目录下面执行spark-submit。

缺少org.apache.htrace

ERROR TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace

CDH的Spark的classpath中引入的是/opt/cloudera/parcels/CDH/jars/htrace-core-3.0.4.jar,而这个版本的时候,htrace还是org.htrace,而3.1.0的时候已经贡献给Apache了,改叫org.apache.htrace了。虽然Spark里也有3.1.0的jar包,但是并没有引入,我没找到Cloudera Manager在哪里配这个,只能手动修改作业提交机器的/etc/alternatives/spark-conf/classpath.txt,在最后添上/opt/cloudera/parcels/CDH-5.4.8-1.cdh5.4.8.p0.4/jars/htrace-core-3.1.0-incubating.jar即可。虽然说spark-submit可以通过--jars指定提交哪些jar包,但是我试了一下,不能用,而且把3.1.0的jar包assemble到我的任务jar里也不行,就只能用这个土办法了,不好的地方就是重新deploy client configuration后可能需要再修改一下。

Scala版本和集群不对

Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

一开始我用系统的Scala 2.11版本做编译,但是提交到集群上会报这个错,原来是CDH的Scala版本是2.10,所以会出问题,把build.sbt的Scala版本改一下就行了。

后记

总的来说,在Spark上操作HBase还是挺方便的,但是网上很多文章的API版本都比较旧,在新版本集群里程序根本跑不了。而且JVM的classpath简直是蛋疼,各种依赖版本不对,虽然这东西和linux的LD_LIBRARY_PATH是一个东西,但是感觉完全不是那么回事啊,能编译过的,各种运行时异常,都是因为JVM上加载的类和编译的不是一个。另外CDH虽然安装挺方便的,但是很多东西不知道在哪改,但反正“又不是不能用”。

Reference

文章目录
  1. 1. Overview
  2. 2. 背景设计
  3. 3. 集群搭建
  4. 4. HBase建库
  5. 5. 网页入库
  6. 6. SBT
  7. 7. HBase基本操作
    1. 7.1. build.sbt
    2. 7.2. 主程序
  8. 8. Spark基本操作
    1. 8.1. build.sbt
    2. 8.2. 主程序
    3. 8.3. 那些踩过的坑
      1. 8.3.1. spark用户提交无法读取用户目录
      2. 8.3.2. 缺少org.apache.htrace
      3. 8.3.3. Scala版本和集群不对
  9. 9. 后记
  10. 10. Reference