hadoop流水账之HBase,Spark和在Spark上操作HBase


Overview

话说最近搜索相关工作没啥改进空间,所以正好抽出时间来学习一下hadoop的上层工具,这边用的挺多的,但是之前在某搜索引擎大厂基本都是在用hadoop streaming,对于Hadoop上的上层工具基本上没用过,也不了解,而且2年了hadoop也有了很多发展,和之前有了很多变化,所以磨刀不误砍柴功(强行打酱油),还是需要研究研究的。

背景设计

没有目标的学习都是耍流氓,所以得给自己设计一个虚拟的目标。这里场景假设为将抓取下来的网页存入到HBase中,然后再通过Spark对网页进行处理,解析出有用的字段。所以主要学习目标如下:

  1. 将之前抓的存在文件系统中的网页存入HBase
  2. 对HBase的基本API操作
  3. Spark上处理HBase内的网页

好吧,其实这也是之前挖过的坑Naive-Qie系列,坑还没填呢,就推到重构了,2333333

集群搭建

这部分有空再写一篇总结一下心得吧,直接用的小胖推荐的Cloudera Manager,虽然和Red Hat一样,用着免费服务收费的模式,但是界面还挺不错的,自动模式也省心,虽然踩了不少坑。这类Hadoop全家桶的工具挺多的,这边好像用的是Apache Ambari。哪个更好用就不知道了,不过开源项目,你懂的。

HBase建库

关于HBase的设计,目前还没有仔细了解,就纯粹当成NoSQL去用,不考虑性能。这里RowKey用url的md5,但其实这是不对的,虽然可以把数据散列开,但是最大的问题是没办法利用key是有序的特点,进行范围查询,所以有空会研究一下HBase表设计的最佳实践,再设计一个实用的网页库。包括两个列族,meta用于存储url等信息,data用于存储网页内容。用HBase的shell建表:

hbase(main):004:0> create 'pagedb', 'meta', 'data'

网页入库

之前抓了1000多某电商网站的网页,正好全部丢到HBase里去,省得重新抓了,也省略了抓取部分(反正就是个perl脚本)。正好最近刚填完了Golang学习笔记的坑,HBase还有thrift接口,正好Scala才开始复习,就打算用Go了。虽然thrift之前没用过,但是github上有封装好的goh。大致了解了一下,HBase的thrift接口目前有v1和v2两个版本,v2是在v1上重构的,提供了更简洁的接口,然而,goh还是基于v1的,所以反正我懒得研究thrift了,先用v1吧就。废话不多说,先上代码,这里就是把go当成脚本写了,写的丑别怪我,毕竟之前0行Go的工程经验:

package main
import (
	"fmt"
	"bufio"
	"os"
	"strings"
	"crypto/md5"
	"io"
	"io/ioutil"
	"github.com/sdming/goh"
	"github.com/sdming/goh/Hbase"
)
func main() {
	if len(os.Args) != 2 {
		fmt.Println("need filename")
		return
	}
	address := "datanode2:9090"
	client, err := goh.NewTcpClient(address, goh.TBinaryProtocol, false)
	if err != nil {
		fmt.Println(err)
		return
	}
	if err = client.Open(); err != nil {
		fmt.Println(err)
		return
	}
	defer client.Close()
	//读入一个url\t/path/to/html的文件
	f, err := os.Open(os.Args[1])
	if err != nil {
		fmt.Println(err)
		return
	}
	defer f.Close()
	table := "pagedb"
	reader := bufio.NewReader(f)
	for {
		line, err := reader.ReadString('\n')
		if err == io.EOF {
			break
		}
		if err != nil {
			continue
		}
		vs := strings.Split(strings.Trim(line, "\n"), "\t")
		bv, err := ioutil.ReadFile(vs[1])
		if err != nil {
			fmt.Println(err)
			continue
		}
		key := fmt.Sprintf("%x", md5.Sum([]byte(vs[0])))
		mutations := make([]*Hbase.Mutation, 2)
		mutations[0] = goh.NewMutation("meta:url", []byte(vs[0]))
		mutations[1] = goh.NewMutation("data:content", bv)
		fmt.Println(client.MutateRow(table, []byte(key), mutations, nil))
	}
}

Go其实还是非常简单实用的,这部分花的时间最少,因为不涉及什么依赖,目标也简单,Go写起来还比较舒服的,小研究了一下就写出来了。

SBT

yet another build tools…话说JVM圈的构建工具真是太多了,从大学开始接触过的开源构建工具就有ant, maven, gradle, ivy, sbt,就这还不算他们的不同版本。sbt(跟我一起念shi bian tai)是scala主推的构建工具,既然决定要学习一下Scala,那么sbt肯定不能放过,工具还是很简单的,但是感觉主要还是编译用,打包功能还得靠插件,配置上就是scala程序,感觉还是挺蛋疼的,不过也比maven的xml舒服一点。
一个典型的SBT项目组织结构

project目录下的plugins.sbt是配置插件用的,我这里主要是配置一个sbt-assembly插件,虽然并没有用上,不过也写着吧。

// project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")

HBase基本操作

build.sbt

name := "hbase test"
version := "1.0"
scalaVersion := "2.10.4"
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")
libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-common" % "2.6.0",
  "org.apache.hbase" % "hbase-common" % "1.0.0",
  "org.apache.hbase" % "hbase-client" % "1.0.0"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

主程序

package com.odinliu.test
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,HBaseAdmin,HTable,Put,Get,Scan,Result,ResultScanner}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.fs.Path
import scala.util.matching.Regex
object TestHbase {
  def printRowKey(rs: ResultScanner) {
    val result = rs.next()
    if (result != null) {
      println(Bytes.toString(result.getRow()))
      //println(Bytes.toString(result.getValue(Bytes.toBytes("meta"), Bytes.toBytes("title"))))
      printRowKey(rs)
    }
  }
  def main(args: Array[String]) {
    val conf = HBaseConfiguration.create()
    val connection = ConnectionFactory.createConnection(conf)
    val admin = connection.getAdmin()
    val listtables = admin.listTables()
    listtables.foreach(println)
    val table = connection.getTable(TableName.valueOf("pagedb"))
    val rs = table.getScanner(new Scan())
    printRowKey(rs)
    rs.close()
    val result = table.get(new Get(Bytes.toBytes("fbc9fdefb5e1391c34abd4da2c88a13f")))
    val value = Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content")))
    println(value)
    table.close()
  }
}

其实Scala也是一门挺强大的语言,就是语法糖太多,太甜,但是好处就是写起来比较随意,当成Java来写倒是也不会出什么问题。因为读写HBase是本地的,直接sbt clean run就可以运行了,还是挺方便的。唯一需要注意的是各个依赖版本的问题,最好和CDH的各个版本一致。HBase这里没出啥问题,不过由于HBase新的API和之前不大一样,网上很多范例都没办法用,特别是HBase自己的手册也是用的老版本,真是无语。

Spark基本操作

build.sbt

name := "spark with hbase"
version := "1.0"
scalaVersion := "2.10.4"
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
  "org.apache.hadoop" % "hadoop-common" % "2.6.0" % "provided",
  "org.apache.hbase" % "hbase-common" % "1.0.0" % "provided",
  "org.apache.hbase" % "hbase-server" % "1.0.0" % "provided",
  "org.apache.htrace" % "htrace-core" % "3.1.0-incubating",
  "org.apache.hbase" % "hbase-client" % "1.0.0" % "provided"
)
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

这里这些包都是编译用的,提交到Spark集群上是用CDH的包,所以版本一定要注意,我用的CDH-5.4.8-1.cdh5.4.8.p0.4版本相关版本是这样的,如果不注意版本是要踩坑的。

主程序

package com.odinliu.test
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.{HBaseConfiguration,TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory,HBaseAdmin,HTable,Put,Get,Scan,Result,ResultScanner}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase.mapred.{TableOutputFormat}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf}
import scala.util.matching.Regex
object SparkHbase {
  private val titleRegex = new Regex("""<title>([^<]+)</title>""", "title")
  def saveTitle(result: Result) = {
    val key = Bytes.toString(result.getRow)
    val title = parseTitle(Bytes.toString(result.getValue(Bytes.toBytes("data"), Bytes.toBytes("content"))))
    println("rowkey["+key+"] title["+title+"]")
    val p = new Put(Bytes.toBytes(key))
    p.addColumn(Bytes.toBytes("meta"), Bytes.toBytes("title"), Bytes.toBytes(title))
    (new ImmutableBytesWritable, p)
  }
  def parseTitle(page: String) = titleRegex.findFirstMatchIn(page).get.group("title")
  def main(args: Array[String]) {
    val sconf = new SparkConf().setAppName("HBase With Spark")
    val sc = new SparkContext(sconf)
    val hconf = HBaseConfiguration.create()
    hconf.set("hbase.zookeeper.property.clientPort", "2181")
    hconf.set("hbase.zookeeper.quorum", "datanode1,datanode2,datanode0")
    hconf.set(TableInputFormat.INPUT_TABLE, "pagedb")
    val jobConf = new JobConf(hconf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "pagedb")
    val dbRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])
    val count = dbRDD.count()
    println("pagedb count: " + count)
    dbRDD.cache()
    //处理网页
    val save = dbRDD.map {
      case (_, result) => saveTitle(result)
    }
    save.saveAsHadoopDataset(jobConf)
  }
}

这里主程序解析网页直接用正则了,因为主要是试验一下在Spark里读写HBase,所以用正则意思意思,正常还是要用xpath。提交任务:

[odin@DannyGreen tmp]$ sudo -u spark spark-submit --class "com.odinliu.test.SparkHbase" /tmp/spark-with-hbase_2.10-1.0.jar

那些踩过的坑

spark用户提交无法读取用户目录

这个比较诡异,有的机器有,有的没有,但是在我写码的机器就不行,只能把包拷贝到/tmp/下,并到那个目录下面执行spark-submit。

缺少org.apache.htrace

ERROR TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoClassDefFoundError: org/apache/htrace/Trace
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.Trace

CDH的Spark的classpath中引入的是/opt/cloudera/parcels/CDH/jars/htrace-core-3.0.4.jar,而这个版本的时候,htrace还是org.htrace,而3.1.0的时候已经贡献给Apache了,改叫org.apache.htrace了。虽然Spark里也有3.1.0的jar包,但是并没有引入,我没找到Cloudera Manager在哪里配这个,只能手动修改作业提交机器的/etc/alternatives/spark-conf/classpath.txt,在最后添上/opt/cloudera/parcels/CDH-5.4.8-1.cdh5.4.8.p0.4/jars/htrace-core-3.1.0-incubating.jar即可。虽然说spark-submit可以通过--jars指定提交哪些jar包,但是我试了一下,不能用,而且把3.1.0的jar包assemble到我的任务jar里也不行,就只能用这个土办法了,不好的地方就是重新deploy client configuration后可能需要再修改一下。

Scala版本和集群不对

Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

一开始我用系统的Scala 2.11版本做编译,但是提交到集群上会报这个错,原来是CDH的Scala版本是2.10,所以会出问题,把build.sbt的Scala版本改一下就行了。

后记

总的来说,在Spark上操作HBase还是挺方便的,但是网上很多文章的API版本都比较旧,在新版本集群里程序根本跑不了。而且JVM的classpath简直是蛋疼,各种依赖版本不对,虽然这东西和linux的LD_LIBRARY_PATH是一个东西,但是感觉完全不是那么回事啊,能编译过的,各种运行时异常,都是因为JVM上加载的类和编译的不是一个。另外CDH虽然安装挺方便的,但是很多东西不知道在哪改,但反正“又不是不能用”。

Reference


文章作者: Odin
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Odin !
  目录