HBase-0.94.4+bulkload 海量数据载库神器

06 Jun 2013

bulkload 是以 Mapreduce 的方式将制定格式文件装载到 hbase 中,采用线程池,对于海量数据装载入 hbase 非常有用

bulkload 可以导入已存在的 hbase 表中(更新数据),若存在和已有表中相同 rowkey 的记录,会更新表中此 rowkey 记录的数据

应用环境

将 Mysql 中一个5000万条数据的 tableA 插入到一个已存在的 hbase tableB 中,两个表在逻辑结构上一致,具体字段有出入

平台环境

  • Hadoop-1.0.4
  • HBase-0.94.4

Step1 配置

因为 bulkload 是运行 hadoop 程序,不会自动查找 hbase 的 conf 路径,因此找不到 hbase 的环境变量。需要做如下配置:

hadoop-env.sh

配置 $HADOOP_HOME/conf/hadoop-env.sh 文件,修改 HADOOP_CLASSPATH:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/conf:$HBASE_HOME/hbase-0.94.4.jar:$HBASE_HOME/hbase-0.94.4-tests.jar:$HBASE_HOME/lib/guava-11.0.2.jar:$HBASE_HOME/lib/zookeeper-3.4.5.jar

core-site.xml

配置 $HADOOP_HOME/conf/core-site.xml,将 zookeeper 的配置写入 core-site.xml 中,内容与 $HADOOP_HOME/conf/hbase-site.xml 中保持一致:

<property>
    <name>hbase.zookeeper.quorum</name>
    <!--value值按照$HADOOP_HOME/conf/hbase-site.xml中配置的写 -->
    <value>hadoop-namenode,hadoop-datanode1,hadoop-datanode2</value>
</property>

重启 hadoop 和 hbase

Step2 准备文件

文件格式是每条记录一行,默认以 \t 制表符隔开,例如生成文件 ~/user 如下:

100000001	张三	18
100000002	李四	22
100000003	王五	24

Step3 上传文件

将文件 user 上传至 HDFS 上,目录为 /bulkload

$ hadoop fs -put ~/user /bulkload

Step4 将文件包装成 HFile

$ hadoop jar $HBASE_HOME/hbase-0.94.4.jar  importtsv \
> -Dimporttsv.separator=";" \  #指定输入文件的分隔符为;,分隔符只能为单字节
> -Dimporttsv.bulk.output=/output \   # 输出hfile到/output,/output必须不存在
> -Dimporttsv.columns=HBASE_ROW_KEY,cf:USER_NAME,cf:AGE \ #源文件的第一列为rowkey,第二列为cf:USER_NAMEM,第三列为cf:AGE
> user /bulkload   #导入hbase的user表中,输入文件存放在/bulkload

下面可以分为两个步骤,也可以一次完成

Style1 两步走

按照上面指定 Dimporttsv.bulk.output=/output,此时只是将 HFile 输出到了 HDFS 上的 /output 下。还需要将 HFile 转移到对应的 region 中,这一步只是 mv,所以相当快

# 将/output下的HFile刷入user表中,若user表不存在则会被新建
$hadoop jar $HBASE_HOME/hbase-0.94.4.jar completebulkload /output user

Style2 一步走

可以在上面的命令中不指定 Dimporttsv.bulk.output=/output,则会一步将数据导入到 user 表中

Problem1

若你指定的分隔符 Dimporttsv.separator 为多字节,则会报错,默认分隔符是 \t:

java.lang.IllegalArgumentException: TsvParser only supports single-byte separators
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
        at org.apache.hadoop.hbase.mapreduce.ImportTsv$TsvParser.<init>(ImportTsv.java:88)
        at org.apache.hadoop.hbase.mapreduce.ImportTsv$TsvImporter.setup(ImportTsv.java:218)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)

Problem2

如果你像我一样使用的是 hbase-0.94.* 版本,在 Style1 中执行 completebulkload 命令时会报错,这是 hbase-0.94.* 的一个 bug:

java.util.concurrent.ExecutionException: java.lang.IllegalStateException: The value of the hbase.metrics.showTableName conf option has not been specified in SchemaMetrics
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252)
        at java.util.concurrent.FutureTask.get(FutureTask.java:111)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplitPhase(LoadIncrementalHFiles.java:333)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:232)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:699)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:704)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
        at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.hbase.mapreduce.Driver.main(Driver.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

解决方案1: 采用 Style2,这样会跳过这个 bug

解决方案2: 更新 hbase

$ cd $HBASE_HOME/
# 下载 HBASE-4802.patch
$ wget https://issues.apache.org/jira/secure/attachment/12503953/HBASE-4802.patch
$ patch -p0 < HBASE-4802.patch
# 用maven重新进行编译
$ mvn package -Dmaven.test.skip.exec=true

然后再执行 completebulkload 命令通过^^

>>利用BulkLoad向Hbase插入数据过程详解
>>hbase bulk-load
>>bulk-load装载hdfs数据到hbase小结
>>使用HBASE的BULK LOAD


Hadoop-1.0.4+Eclipse+Windows 远程连接

27 May 2013

Hadoop 部署在 Linux 下,在 Windows 下用 Eclipse 远程连接 Hadoop 进行开发

平台环境

目录结构

  • eclipse目录 F:\eclipse
  • eclipse工程目录 F:\workspace
  • hadoop安装目录 F:\workspace\hadoop-1.0.4

Step1 配置 Map/Reduce Locations

  1. 设置 Host 为要连接服务器的 IP 地址
  2. 设置 Advanced parameters
    • 修改 hadoop.tmp.dir 为本地 tmp 目录,"/" 为 hadoop 安装目录的盘符(比如 /hadooptmp-1.0.4 即 F:\hadooptemp-1.0.4)
    • 修改 dfs.replication 为连接服务器中 hdfs-site.xml 配置的值

切换到 Map/Reduce 视图,查看是否已成功连接上服务器^^

当运行示例 WordCount 时会遇到两个问题

Problem1

org.apache.hadoop.security.AccessControlException: Permission denied: user=Chxt, access=WRITE, inode="hadoop":hadoop:supergroup:rwxr-xr-x

因为 Eclipse 使用 hadoop 插件提交作业时,会默认以 DrWho 身份去将作业写入 hdfs 文件系统中,对应 Windows 下即登录用户(此处我的登录用户为 Chxt), 根据 rwxr-xr-x 知 DrWho 用户对 hadoop 目录并没有写入权限,所以导致异常的发生。

解决方案1: 到服务器上修改 hadoop 的配置文件 conf/hdfs-core.xml,设置 dfs.permissions 为 false

<property>
    <name>dfs.permissions</name>
    <value>false</value>
    <description>
      If "true", enable permission checking in HDFS.
      If "false", permission checking is turned off,
      but all other behavior is unchanged.
      Switching from one parameter value to the other does not change the mode,
      owner or group of files or directories.
    </description>
</property>

解决方案2: 修改 hadoop 目录的权限,执行 hadoop fs -chmod 777 /user/hadoop

重启 Hadoop 服务^^

Problem2

13/05/27 17:02:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/05/27 17:02:30 ERROR security.UserGroupInformation: PriviledgedActionException as:Chxt cause:java.io.IOException: Failed to set permissions of path: \home\hadoop\hadooptmp-1.0.4\mapred\staging\Chxt-4140978\.staging to 0700
Exception in thread "main" java.io.IOException: Failed to set permissions of path: \home\hadoop\hadooptmp-1.0.4\mapred\staging\Chxt-4140978\.staging to 0700

这个是 Windows 下文件权限问题,在 Linux 下可以正常运行,不存在这样的问题。

解决方案: 注释掉 hadoop-1.0.4/src/core/org/apache/hadoop/fs/FileUtil.java 里面 checkReturnValue 函数的内容(有些粗暴,在 Window 下可以不用检查)。重新编译 Hadoop 源码,重命名 hadoop-core-1.0.4-SNAPSHOT.jar 替换掉 hadoop-1.0.4 根目录下的 hadoop-core-1.0.4.jar 即可。

至此可以正常远程连接 hadoop 服务器并运行 Map/Reduce 程序了^^

>>Hadoop学习笔记(九):如何在windows上使用eclipse远程连接hadoop进行程序开发


Hadoop-1.0.4+Windows 源码编译

26 May 2013

编译 Hadoop 源代码是每个接触 Hadoop 人的心愿吧

平台环境

目录结构

  • eclipse目录 F:\eclipse
  • eclipse工程目录 F:\workspace
  • hadoop安装目录 F:\workspace\hadoop-1.0.4

Step1 导入工程

经验证,直接导入 hadoop-1.0.4.tar.gz 工程会有很多不必要的麻烦,推荐从 SVN 上导入

资源库 URL: http://svn.apache.org/repos/asf/hadoop/common/tags,选择 release-1.0.4 检出为项目 release-1.0.4

Step2 设置 eclipse 中 hadoop 安装目录

Windows—>Preferences—>Hadoop Map/Reduce

  • Hadoop installation directory: F:\workspace\hadoop-1.0.4

Step3 build.xml

因为 Windows 无法在 cmd 中运行 shell 脚本(安装 cygwin 也没用),所以需要做以下注释

<!— 1.注释掉下面,否则报错误 "package-info.java:5: 未结束的字符串字面值" —>
<exec executable="sh">
  <arg line="src/saveVersion.sh ${version} ${build.dir}"/>
</exec>

<!— 2.注释掉所有带 "autoreconf" 的 <exec> 标签,因为 Windows 下无法执行 autoreconf 命令(在 cygwin 中安装了 autoconf, automake, libtool也没用)"—>

Step4 Gridmix.java

修改 release-1.0.4/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java:

将两处 Enum<? extends T> 改为 Enum<?>

Step5 Ant 编译

  1. 右键 MapReduceTools 工程,Properties—>Builders—>New—>Ant Builder
  2. New_Builder—>Edit:
    • Name (见名知意): hadoop-1.0.4_Builder
    • Main—>Buildfile (build.xml的位置): F:\workspace\release-1.0.4\build.xml
    • Targets—>Manual Build: jar
  3. Project—>Build Project

编译成功会输出 BUILD SUCCESSFUL ^^

在 F:\workspace\release-1.0.4\build 文件夹下会生成三个开发 jar 包:

  • hadoop-client-1.0.4-SNAPSHOT.jar
  • hadoop-core-1.0.4-SNAPSHOT.jar
  • hadoop-minicluster-1.0.4-SNAPSHOT.jar

去掉"-SNAPSHOT"即可替换 F:\workspace\hadoop-1.0.4 下的同名 jar 包


Hadoop-eclipse-plgin 自己编译这事儿不难

23 May 2013

源代码中自带的 Hadoop-eclipse 插件无法正常使用,需要自己进行编译。网上的教程让我有太多的眼泪,还是自己写一个吧,哈里路亚

平台环境

  • Win7
  • Eclipse Helios(include Ant-1.7)
  • Hadoop-1.0.4

目录结构

  • eclipse目录 F:\eclipse
  • eclipse工程目录 F:\workspace

Step1 导入 Hadoop-eclipse 插件工程

获取 hadoop-1.0.4.tar.gz

解压缩到 Eclipse workspace 目录下

在 Eclipse 中选择 File—>Import—>Existing Projects into Workspace 导入已有项目,路径为: F:\workspace\hadoop-1.0.4\src\contrib\eclipse-plugin,其默认的项目名称是 MapReduceTools

Step2 导入所需 jar 包

在项目 MapReduceTools 中新建 lib 目录,将 hadoop-1.0.4 下的 hadoop-core-1.0.4.jar(重命名为 hadoop.core.jar),及其 lib 目录下的 commons-cli-1.2.jar、commons-lang-2.4.jar、commons-configuration-1.6.jar、jackson-mapper-asl-1.8.8.jar、jackson-core-asl-1.8.8.jar、commons-httpclient-3.0.1.jar 拷贝到该目录。

Step3 build-contrib.xml

将 F:\workspace\hadoop-1.0.4\src\contrib\build-contrib.xml(与 eclipse-plugin 包在同一级目录)拷贝到 MapReduceTools 项目目录中。

修改 build-contrib.xml,注意路径用 "/"

<!— 修改 hadoop 存放目录—>
<property name="hadoop.root" location="F:/workspace/hadoop-1.0.4"/>
<!— 添加 eclipse 存放目录—>
<property name="eclipse.home" location="F:/eclipse" />
<!— 添加 hadoop 版本—>
<property name="version" value="1.0.4"/>

Step4 build.xml

<!— 1.修改 build-contrib.xml 的位置,去掉"../"—>
<import file="build-contrib.xml"/>

<!— 2.添加如下,否则提示"软件包 org.apache.hadoop.fs 不存在"—>
<path id="hadoop-jars">
  <fileset dir="${hadoop.root}/">
    <include name="hadoop-*.jar"/>
  </fileset>
</path>

<path id="classpath">
  <pathelement location="${build.classes}"/>
  <pathelement location="${hadoop.root}/build/classes"/>
  <path refid="eclipse-sdk-jars"/>
  <path refid="hadoop-jars"/>
</path>

<!— 3.修改并添加 jar 包拷贝—>
<target name="jar" depends="compile" unless="skip.contrib">
  <mkdir dir="${build.dir}/lib"/>
  <copy file="${hadoop.root}/hadoop-core-${version}.jar" tofile="${build.dir}/lib/hadoop-core.jar" verbose=""true"/>
  <copy file="${hadoop.root}/lib/commons-cli-1.2.jar" todir="${build.dir}/lib" verbose="true"/>
  <copy file="${hadoop.root}/lib/commons-lang-2.4.jar" todir="${build.dir}/lib" verbose="true"/>
  <copy file="${hadoop.root}/lib/commons-configuration-1.6.jar" todir="${build.dir}/lib" verbose="true"/>
  <copy file="${hadoop.root}/lib/jackson-mapper-asl-1.8.8.jar" todir="${build.dir}/lib" verbose="true"/>
  <copy file="${hadoop.root}/lib/jackson-core-asl-1.8.8.jar" todir="${build.dir}/lib" verbose="true"/>
  <copy file="${hadoop.root}/lib/commons-httpclient-3.0.1.jar" todir="${build.dir}/lib" verbose="true"/>
  <jar jarfile="${build.dir}/hadoop-${name}-${version}.jar" manifest="${root}/META-INF/MANIFEST.MF">
    <fileset dir="${build.dir}" includes="classes/ lib/"/>
    <fileset dir="${root}" includes="resources/ plugin.xml"/>
  </jar>
</target>

Step5 MANIFEST.MF

修改 META-INF/MANIFEST.MF

Bundle-ClassPath: classes/,
  lib/hadoop-core.jar ,
  lib/commons-cli-1.2.jar,
  lib/commons-httpclient-3.0.1.jar,
  lib/jackson-core-asl-1.8.8.jar,
  lib/jackson-mapper-asl-1.8.8.jar,
  lib/commons-configuration-1.6.jar,
  lib/commons-lang-2.4.jar

Step6 Ant 编译

  1. 右键 MapReduceTools 工程,Properties—>Builders—>New—>Ant Builder
  2. New_Builder—>Edit:
    • Name (见名知意): plugin_Builder
    • Buildfile (build.xml的位置): F:\workspace\hadoop-1.0.4\src\contrib\eclipse-plugin\build.xml
  3. Project—>Build Project

编译成功会输出 BUILD SUCCESSFUL ^^

Step7 安装插件

编译结果在 F:\workspace\hadoop-1.0.4\build\contrib\eclipse-plugin 目录下

将 hadoop-eclipse-plugin-1.0.4.jar 放入 F:\eclipse\plugins 目录下,重启 eclipse

Step7 配置插件

  1. 打开 map/Reduce Locations
  2. 右键 New Hadoop Location…,输入 Location Name(见名知意),修改 Map/Reduce Master 和 DFS Master,User name 设置为你启动 hadoop 的名称。
  3. 设置 Advanced parameters
    • 修改 hadoop.tmp.dir 为你 hadoop 集群中设置的目录
    • 修改 dfs.replication 为你 hdfs-site.xml 文件中配置的值
  4. 重启 eclipse

HBase 性能调优-程序设计与开发

16 May 2013

Part1 表的设计

p1-1 Pre-Creating Regions

默认情况下,在创建 HBase 表的时候会自动创建一个 region 分区,当导入数据的时候,所有的 HBase 客户端都向这一个 region 写数据,直到这个 region 足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的 regions,这样当数据写入 HBase 时,会按照 region 分区情况,在集群内做数据的负载均衡。

有关预分区,详情参见 Table Creation: Pre-Creating Regions,下面是一个例子:

public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits) throws IOException {
    try {
        admin.createTable(table, splits);
        return true;
    } catch (TableExistsException e) {
        logger.info("table " + table.getNameAsString() + " already exists");
        // the table already exists...
        return false;  
    }
}

public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
    byte[][] splits = new byte[numRegions-1][];
    BigInteger lowestKey = new BigInteger(startKey, 16);
    BigInteger highestKey = new BigInteger(endKey, 16);
    BigInteger range = highestKey.subtract(lowestKey);
    BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
    lowestKey = lowestKey.add(regionIncrement);
    for(int i=0; i < numRegions-1;i++) {
        BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
        byte[] b = String.format("%016x", key).getBytes();
        splits[i] = b;
    }
    return splits;
}

p1-2 Row Key

HBase 中 row key 用来检索表中的记录,支持以下三种方式:

  • 通过单个 row key 访问:即按照某个 row key 键值进行 get 操作;
  • 通过 row key 的 range 进行 scan:即通过设置 startRowKey 和 endRowKey,在这个范围内进行扫描;
  • 全表扫描:即直接扫描整张表中所有行记录。
    在 HBase 中,row key 可以是任意字符串,最大长度 64KB,实际应用中一般为 10~100bytes,存为 byte[] 字节数组, 一般设计成定长的

row key 是按照 字典序 存储,因此设计 row key 时,要充分利用这个排序特点,将经常一起读取的数据存储到一块,将最近可能会被访问的数据放在一块。

举个例子:如果最近写入 HBase 表中的数据是最可能被访问的,可以考虑将时间戳作为 row key 的一部分,由于是字典序排序,所以可以使用 Long.MAX_VALUE – timestamp 作为 row key,这样能保证新写入的数据在读取时可以被快速命中。

p1-3 Column Family

不要在一张表里定义太多的 column family 。目前 Hbase 并不能很好的处理超过2~3个 column family 的表。因为某个 column family 在 flush 的时候,它邻近的 column family 也会因关联效应被触发 flush,最终导致系统产生更多的 I/O。感兴趣的同学可以对自己的 HBase 集群进行实际测试,从得到的测试结果数据验证一下。

p1-4 In Memory

创建表的时候,可以通过 HColumnDescriptor.setInMemory(true) 将表放到 RegionServer 的缓存中,保证在读取的时候被 cache 命中。

p1-5 Max Version

创建表的时候,可以通过 HColumnDescriptor.setMaxVersions(int maxVersions) 设置表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。

p1-6 Time To Live

创建表的时候,可以通过 HColumnDescriptor.setTimeToLive(int timeToLive) 设置表中数据的存储生命期,过期数据将自动被删除,例如如果只需要存储最近两天的数据,那么可以设置 setTimeToLive(2 * 24 * 60 * 60)。

p1-7 Compact & Split

在 HBase 中,数据在更新时首先写入 WAL日志(HLog)和内存(MemStore)中,MemStore 中的数据是排序的,当 MemStore 累计到一定阈值时,就会创建一个新的 MemStore,并且将老的 MemStore 添加到 flush 队列,由单独的线程 flush 到磁盘上,成为一个 StoreFile。于此同时, 系统会在 zookeeper 中记录一个 redo point,表示这个时刻之前的变更已经持久化了(minor compact)。

StoreFile 是只读的,一旦创建后就不可以再修改。因此 Hbase 的更新其实是不断追加的操作。当一个 Store 中的 StoreFile 达到一定的阈值后,就会进行一次合并(major compact),将对同一个 key 的修改合并到一起,形成一个大的 StoreFile,当 StoreFile 的大小达到一定阈值后,又会对 StoreFile 进行分割(split),等分为两个 StoreFile。

由于对表的更新是不断追加的,处理读请求时,需要访问 Store 中全部的 StoreFile 和 MemStore,将它们按照 row key 进行合并,由于 StoreFile 和 MemStore 都是经过排序的,并且 StoreFile 带有内存中索引,通常合并过程还是比较快的。

实际应用中,可以考虑必要时手动进行 major compact,将同一个 row key 的修改进行合并形成一个大的 StoreFile。同时,可以将 StoreFile 设置大些,减少 split 的发生。

Part2 写表操作

p2-1 多 HTable 并发写

创建多个 HTable 客户端用于写操作,提高写数据的吞吐量,一个例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = "user_log";
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    wTableLog[i] = new HTable(conf, table_log_name);
    wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
    wTableLog[i].setAutoFlush(false);
}

p2-2 HTable 参数设置

p2-2-1 Auto Flush

通过调用 HTable.setAutoFlush(false) 方法可以将 HTable 写客户端的自动 flush 关闭,这样可以批量写入数据到 HBase,而不是有一条 put 就执行一次更新,只有当 put 填满客户端写缓存时,才实际向 HBase 服务端发起写请求。默认情况下 auto flush 是开启的。

p2-2-2 Write Buffer

通过调用 HTable.setWriteBufferSize(writeBufferSize) 方法可以设置 HTable 客户端的写 buffer 大小,如果新设置的 buffer 小于当前写 buffer 中的数据时,buffer 将会被 flush 到服务端。其中,writeBufferSize 的单位是 byte 字节数,可以根据实际写入数据量的多少来设置该值。

p2-2-3 WAL Flag

在 HBase 中,客户端向集群中的 RegionServer 提交数据时(Put/Delete操作),首先会先写 WAL(Write Ahead Log)日志(即 HLog,一个 RegionServer 上的所有 Region 共享一个 HLog),只有当 WAL 日志写成功后,再接着写 MemStore,然后客户端被通知提交数据成功;如果写 WAL 日志失败,客户端则被通知提交失败。这样做的好处是可以做到 RegionServer 宕机后的数据恢复。

因此,对于相对不太重要的数据,可以在 Put/Delete 操作时,通过调用 Put.setWriteToWAL(false) 或 Delete.setWriteToWAL(false) 函数,放弃写 WAL 日志,从而提高数据写入的性能。

谨慎选择关闭 WAL 日志,因为这样的话,一旦 RegionServer 宕机,Put/Delete 的数据将会无法根据 WAL 日志进行恢复。

p2-3 批量写

通过调用 HTable.put(Put) 方法可以将一个指定的 row key 记录写入 HBase,同样 HBase 提供了另一个方法:通过调用 HTable.put(List) 方法可以将指定的 row key 列表,批量写入多行记录,这样做的好处是批量执行,只需要一次网络 I/O 开销,这对于对数据实时性要求高,网络传输 RTT 高的情景下可能带来明显的性能提升。

p2-4 多线程并发写

在客户端开启多个 HTable 写线程,每个写线程负责一个 HTable 对象的 flush 操作,这样结合定时 flush 和写 buffer(writeBufferSize),可以既保证在数据量小的时候,数据可以在较短时间内被 flush(如1秒内),同时又保证在数据量大的时候,写 buffer 一满就及时进行 flush。下面给个具体的例子:

for (int i = 0; i < threadN; i++) {
    Thread th = new Thread() {
        public void run() {
            while (true) {
                try {
                    sleep(1000); //1 second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (wTableLog[i]) {
                    try {
                        wTableLog[i].flushCommits();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };
    th.setDaemon(true);
    th.start();
}

Part3 读表操作

p3-1 多 HTable 并发读

创建多个 HTable 客户端用于读操作,提高读数据的吞吐量,一个例子:

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = user_log;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    rTableLog[i] = new HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

p3-2 HTable 参数设置

p3-2-1 Scanner Caching

hbase.client.scanner.caching 配置项可以设置 HBase scanner 一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少 scan 过程中 next() 的时间开销,代价是 scanner 需要通过客户端的内存来维持这些被 cache 的行记录。有三个地方可以进行配置(优先级递增):

  1. 在 HBase 的 conf 配置文件中进行配置;
  2. 通过调用 HTable.setScannerCaching(int scannerCaching) 进行配置;
  3. 通过调用 Scan.setCaching(int caching) 进行配置。

p3-2-2 Scan Attribute Selection

scan 时指定需要的 Column Family,可以减少网络传输数据量,否则默认 scan 操作会返回整行所有 Column Family 的数据。

p3-2-3 Close ResultScanner

通过 scan 取完数据后,记得要关闭 ResultScanner,否则 RegionServer 可能会出现问题(对应的 Server 资源无法释放)。

p3-3 批量读

通过调用 HTable.get(Get) 方法可以根据一个指定的 row key 获取一行记录,同样 HBase 提供了另一个方法:通过调用 HTable.get(List) 方法可以根据一个指定的 row key 列表,批量获取多行记录,这样做的好处是批量执行,只需要一次网络 I/O 开销,这对于对数据实时性要求高而且网络传输 RTT 高的情景下可能带来明显的性能提升。

p3-4 多线程并发读

在客户端开启多个 HTable 读线程,每个读线程负责通过 HTable 对象进行 get 操作。下面是一个多线程并发读取 HBase,获取店铺一天内各分钟 PV 值的例子:

public class DataReaderServer {
    //获取店铺一天内各分钟PV值的入口函数
    public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp) {
        long min = startStamp;
        int count = (int)((endStamp - startStamp) / (60*1000));
        List<String> lst = new ArrayList<String>();
        for (int i = 0; i <= count; i++) {
            min = startStamp + i * 60 * 1000;
            lst.add(uid + "_" + min);
        }
        return parallelBatchMinutePV(lst);
    }
      
    //多线程并发查询,获取分钟PV值
    private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys) {
        ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
        int parallel = 3;
        List<List<String>> lstBatchKeys  = null;
        if (lstKeys.size() < parallel) {
            lstBatchKeys  = new ArrayList<List<String>>(1);
            lstBatchKeys.add(lstKeys);
        } else {
            lstBatchKeys  = new ArrayList<List<String>>(parallel);
            for(int i = 0; i < parallel; i++){
                List<String> lst = new ArrayList<String>();
                lstBatchKeys.add(lst);
            }

            for(int i = 0 ; i < lstKeys.size(); i++) {
                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
            }
        }
        
        List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
        
        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
        builder.setNameFormat("ParallelBatchQuery");
        ThreadFactory factory = builder.build();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
        
        for(List<String> keys : lstBatchKeys) {
            Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
            FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
            futures.add(future);
        }
        executor.shutdown();
        
        // Wait for all the tasks to finish
        try {
            boolean stillRunning = !executor.awaitTermination(5000000, TimeUnit.MILLISECONDS);
            if (stillRunning) {
                try {
                    executor.shutdownNow();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            try {
                Thread.currentThread().interrupt();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
        
        // Look for any exception
        for (Future f : futures) {
            try {
                if(f.get() != null) {
                    hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
                }
            } catch (InterruptedException e) {
                try {
                    Thread.currentThread().interrupt();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }    
        return hashRet;
    }
    
    //一个线程批量查询,获取分钟PV值
    protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys) {
        ConcurrentHashMap<String, String> hashRet = null;
        List<Get> lstGet = new ArrayList<Get>();
        String[] splitValue = null;
        for (String s : lstKeys) {
            splitValue = s.split("_");
            long uid = Long.parseLong(splitValue[0]);
            long min = Long.parseLong(splitValue[1]);
            byte[] key = new byte[16];
            Bytes.putLong(key, 0, uid);
            Bytes.putLong(key, 8, min);
            Get g = new Get(key);
            g.addFamily(fp);
            lstGet.add(g);
        }
        Result[] res = null;
        try {
            res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
        } catch (IOException e1) {
            logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
        }

        if (res != null && res.length > 0) {
            hashRet = new ConcurrentHashMap<String, String>(res.length);
            for (Result re : res) {
                if (re != null && !re.isEmpty()) {
                    try {
                        byte[] key = re.getRow();
                        byte[] value = re.getValue(fp, cp);
                        if (key != null && value != null) {
                            hashRet.put(String.valueOf(Bytes.toLong(key,
                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes
                                    .toLong(value)));
                        }
                    } catch (Exception e2) {
                        logger.error(e2.getStackTrace());
                    }
                }
            }
        }
        return hashRet;
    }
}

//调用接口类,实现 Callable 接口
class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>> {
     private List<String> keys;

     public BatchMinutePVCallable(List<String> lstKeys) {
         this.keys = lstKeys;
     }

     public ConcurrentHashMap<String, String> call() throws Exception {
         return DataReadServer.getBatchMinutePV(keys);
     }
}

p3-5 缓存查询结果

对于频繁查询 HBase 的应用场景,可以考虑在应用程序中做缓存,当有新的查询请求时,首先在缓存中查找,如果存在则直接返回,不再查询 HBase;否则对 HBase 发起读请求查询,然后在应用程序中将查询结果缓存起来。至于缓存的替换策略,可以考虑 LRU 等常用的策略。

p3-6 Blockcache

HBase 上 Regionserver 的内存分为两个部分,一部分作为 Memstore,主要用来写;另外一部分作为 BlockCache,主要用于读。

  • 写请求会先写入 Memstore,Regionserver 会给每个 region 提供一个 Memstore,当 Memstore 满64MB以后,会启动 flush 刷新到磁盘。当 Memstore 的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动 flush 进程,从最大的 Memstore 开始 flush 直到低于限制。
  • 读请求先到 Memstore 中查数据,查不到就到 BlockCache 中查,再查不到就会到磁盘上读,并把读的结果放入 BlockCache。由于 BlockCache 采用的是 LRU 策略,因此 BlockCache 达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

一个 Regionserver 上有一个 BlockCache 和 N 个 Memstore,它们的大小之和不能大于等于 heapsize * 0.8,否则 HBase 不能启动。默认 BlockCache 为 0.2,而 Memstore 为 0.4。对于注重读响应时间的系统,可以将 BlockCache 设大些,比如设置 BlockCache=0.4,Memstore=0.39,以加大缓存的命中率。

有关 BlockCache 机制,请参考这里: HBase 的 Block cacheHBase 的 blockcache 机制hbase 中的缓存的计算与使用

Part4 数据计算

p4-1 服务端计算

Coprocessor 运行于 HBase RegionServer 服务端,各个 Regions 保持对与其相关的 coprocessor 实现类的引用,coprocessor 类可以通过 RegionServer 上 classpath 中的本地 jar 或 HDFS 的 classloader 进行加载。

目前,已提供有几种 coprocessor:

  • Coprocessor: 提供对于 region 管理的钩子,例如 region 的 open/close/split/flush/compact 等;
  • RegionObserver: 提供用于从客户端监控表相关操作的钩子,例如表的 get/put/scan/delete 等;
  • Endpoint: 提供可以在 region 上执行任意函数的命令触发器。一个使用例子是 RegionServer 端的列聚合,这里有代码示例

p4-2 写端计算

p4-2-1 计数

HBase 本身可以看作是一个可以水平扩展的 Key-Value 存储系统,但是其本身的计算能力有限(Coprocessor 可以提供一定的服务端计算),因此使用 HBase 时,往往需要从写端或者读端进行计算,然后将最终的计算结果返回给调用者。举两个简单的例子:

  • PV 计算:通过在 HBase 写端内存中,累加计数,维护 PV 值的更新,同时为了做到持久化,定期(如1秒)将 PV 计算结果同步到 HBase 中,这样查询端最多会有1秒钟的延迟,能看到秒级延迟的 PV 结果。
  • 分钟 PV 计算:与上面提到的 PV 计算方法相结合,每分钟将当前的累计 PV 值,按照 rowkey + minute 作为新的 rowkey 写入 HBase 中,然后在查询端通过 scan 得到当天各个分钟以前的累计 PV 值,然后顺次将前后两分钟的累计 PV 值相减,就得到了当前一分钟内的 PV 值,从而最终也就得到当天各个分钟内的 PV 值。

p4-2-2 去重

对于 UV 的计算,就是个去重计算的例子。分两种情况:

  • 如果内存可以容纳,那么可以在 Hash 表中维护所有已经存在的 UV 标识,每当新来一个标识时,通过快速查找 Hash 确定是否是一个新的 UV,若是则 UV 值加1,否则 UV 值不变。另外,为了做到持久化或提供给查询接口使用,可以定期(如1秒)将 UV 计算结果同步到 HBase 中。
  • 如果内存不能容纳,可以考虑采用 Bloom Filter 来实现,从而尽可能的减少内存的占用情况。除了 UV 的计算外,判断 URL 是否存在也是个典型的应用场景。

p4-3 读端计算

如果对于响应时间要求比较苛刻的情况(如单次 http 请求要在毫秒级时间内返回),个人觉得读端不宜做过多复杂的计算逻辑,尽量做到读端功能单一化: 即从 HBase RegionServer 读到数据(scan 或 get 方式)后,按照数据格式进行简单的拼接,直接返回给前端使用。当然,如果对于响应时间要求一般,或者业务特点需要,也可以在读端进行一些计算逻辑。

Part5 总结

作为一个 Key-Value 存储系统,HBase 并不是万能的,它有自己独特的地方。因此,基于它来做应用时,我们往往需要从多方面进行优化改进(表设计、读表操作、写表操作、数据计算等),有时甚至还需要从系统级对 HBase 进行配置调优,更甚至可以对 HBase 本身进行优化。这属于不同的层次范畴。

总之,概括来讲,对系统进行优化时,首先定位到影响你的程序运行性能的瓶颈之处,然后有的放矢进行针对行的优化。如果优化后满足你的期望,那么就可以停止优化;否则继续寻找新的瓶颈之处,开始新的优化,直到满足性能要求。

>>HBase性能优化方法总结(一):表的设计
>>HBase性能优化方法总结(二):写表操作
>>HBase性能优化方法总结(三):读表操作
>>HBase性能优化方法总结(四):数据计算


Next Page >>
Fork me on GitHub