用户指南 — HappyBase 1.2.0 文档


本文由 简悦 SimpRead 转码, 原文地址 happybase.readthedocs.io

HappyBase

本用户指南探讨了 HappyBase API,并应为您提供足够的信息来帮助您入门。请注意,本用户指南旨在介绍 HappyBase,而不是一般的 HBase。读者应该已经对 HBase 及其数据模型有了基本的了解。

虽然用户指南确实涵盖了大多数功能,但它并不是一个完整的参考指南。有关 HappyBase API 的更多信息,请访问 API 文档

建立连接

我们将从连接到 HBase 开始。创造新的Connection实例:

import happybase

connection = happybase.Connection('somehost')

在某些设置中,Connection类需要有关它将连接到的 HBase 版本以及要使用的 Thrift 传输的一些其他信息。如果您仍在使用 HBase 0.90.x,则需要设置 compat 参数,以确保 HappyBase 使用正确的有线协议。此外,如果您将 HBase 0.94 与非标准 Thrift 传输模式配合使用,请确保提供正确的传输参数。有关这些参数及其支持的值的详细信息,请参阅 Connection类的 API 文档。

Connection创造被创建后,它会自动打开与 HBase Thrift 服务器的套接字连接。可以通过将 autoconnect 参数设置为 False,并使用 Connection.open()手动打开连接来禁用此行为:

connection = happybase.Connection('somehost', autoconnect=False)

# before first use:
connection.open()

Connection类提供与 HBase 交互的主入口点。例如,要列出可用的表,请使用 Connection.tables()

print(connection.tables())

Connection类上的大多数其他方法都用于系统管理任务,如创建、删除、启用和禁用表。请参阅Connection类的 API 文档,其中包含更多信息。本用户指南不涵盖这些任务,因为您很可能已经在使用 HBase shell 执行这些系统管理任务。

注:HappyBase 还具有一个连接池,本指南稍后将对此进行介绍。

使用表格

Table类提供了用于检索和操作 HBase 中数据的主 API。在上面的示例中,我们已经使用 Connection.tables()方法列出了所有可用的表。如果还没有创建任何表,您可以使用 Connection.create_table()创建一个新的表:

connection.create_table(
    'mytable',
    {'cf1': dict(max_versions=10),
     'cf2': dict(max_versions=1, block_cache_enabled=False),
     'cf3': dict(),  # use defaults
    }
)

注:对于许多 HBase 管理任务,HBase shell 通常是更好的替代方案,因为与 HappyBase 使用的有限的 Thrift API 相比,该 shell 更强大。

下一步是获取要使用的Table实例。只需调用 Connection.table(),向其传递表名:

table = connection.table('mytable')

获取Table实例_不会_导致往返 Thrift 服务器,这意味着应用程序代码可能会在Connection实例需要新表时要求它提供新Table,而不会产生负面的性能后果。副作用是没有进行任何检查以确保表存在,因为这将涉及往返。如果稍后在代码中尝试与不存在的表进行交互,则会出现错误。对于本指南,我们假设该表存在。

注:来自 Java HBase API 的 "重"HTable HBase 类位于 Thrift 连接的另一端,该类执行与区域服务器的实际通信。Python 端的Table实例和服务器端的 HTable 实例之间没有直接映射。

使用表 "命名空间"

如果单个 HBase 实例由多个应用程序共享,则不同应用程序使用的表名可能会发生冲突。这个问题的简单解决方案是将 "命名空间" 前缀添加到特定应用程序 "拥有" 的所有表的名称中,例如,myproject项目中所有表的名称格式为myproject_XYZ.

无需在每次将表名传递给 HappyBase 时添加此特定于应用程序的前缀,而是将 table_prefix 参数传递给 Connection可以解决此问题。HappyBase 会将该前缀(和下划线)附加到该Connection实例处理的每个表名之前。例如:

connection = happybase.Connection('somehost', table_prefix='myproject')

此时,Connection.tables()不再包含其他 "命名空间" 中的表。HappyBase 只会返回带有前缀的表,并且在返回结果时也会透明地删除前缀,例如:myproject_

print(connection.tables())  # Table "myproject_XYZ" in HBase will be
                            # returned as simply "XYZ"

这也适用于采用表名的其他方法,例如 Connection.table()

table = connection.table('XYZ')  # Operates on myproject_XYZ in HBase

最终结果是,表前缀在代码中仅指定一次,即在对 Connection构造函数的调用中指定,并且在需要更改时只需要进行一次更改。

检索数据

HBase 数据模型是一个多维稀疏映射。HBase 中的表包含列系列,其列限定符包含值和时间戳。在大多数 HappyBase API 中,列系列和限定符名称被指定为单个字符串,例如 ,而不是两个单独的参数。虽然列族和限定符是 HBase 数据模型中的不同概念,但在与数据交互时,它们几乎总是一起使用,因此将它们视为单个字符串会使 API 变得更加简单。cf1:col1

检索行

Table类提供了各种方法来从 HBase 中的表中检索数据。最基本的是 Table.row(),它从表中检索单个行,并将其作为字典返回,将列映射到值:

row = table.row(b'row-key')
print(row[b'cf1:col1'])   # prints the value of cf1:col1

Table.rows()方法的工作方式与 Table.row() 类似,但采用多个行键并将其作为(键、数据)元组返回:

rows = table.rows([b'row-key-1', b'row-key-2'])
for key, data in rows:
    print(key, data)

如果您希望 Table.rows()以字典或有序字典的形式返回结果,则必须自己执行此操作。不过,这很容易,因为返回值可以直接传递给字典构造函数。对于普通字典,顺序丢失:

rows_as_dict = dict(table.rows([b'row-key-1', b'row-key-2']))

... 而对于 a,顺序保持不变:OrderedDict

from collections import OrderedDict
rows_as_ordered_dict = OrderedDict(table.rows([b'row-key-1', b'row-key-2']))

进行更细粒度的选择

HBase 的数据模型允许更细粒度地选择要检索的数据。如果您事先知道需要哪些列,则可以通过将这些列显式指定给 Table.row() 和 Table.rows()来提高性能。 columns 参数采用列名的列表(或元组):

row = table.row(b'row-key', columns=[b'cf1:col1', b'cf1:col2'])
print(row[b'cf1:col1'])
print(row[b'cf1:col2'])

列参数中的项也可能只是一个列系列,而不是同时提供列系列和列限定符,这意味着将检索该列系列中的所有列。例如,要获取列系列 cf1 中的所有列和值,请使用:

row = table.row(b'row-key', columns=[b'cf1'])

在 HBase 中,每个单元都有一个附加的时间戳。如果您不想使用存储在 HBase 中的最新版本的数据,则从数据库中检索数据的方法(例如 Table.row() )都接受时间戳参数,该参数指定结果应限制为具有时间戳的值,直至指定时间戳:

row = table.row(b'row-key', timestamp=123456789)

默认情况下,HappyBase 在其返回的结果中不包含时间戳。在需要访问时间戳的应用程序中,只需将 include_timestamp 参数设置为 。现在,结果中的每个单元格都将作为(值,时间戳)元组返回,而不仅仅是一个值:True

row = table.row(b'row-key', columns=[b'cf1:col1'], include_timestamp=True)
value, timestamp = row[b'cf1:col1']

HBase 支持存储同一单元格的多个版本。这可以为每个列系列进行配置。若要检索给定行的列的所有版本,可以使用 Table.cells()。 此方法返回单元格的有序列表,最新版本排在最前面。versions 参数指定要返回的最大版本数。就像检索行的方法一样,include_timestamp 参数确定结果中是否包含时间戳。例:

values = table.cells(b'row-key', b'cf1:col1', versions=2)
for value in values:
    print("Cell data: {}".format(value))

cells = table.cells(b'row-key', b'cf1:col1', versions=3, include_timestamp=True)
for value, timestamp in cells:
    print("Cell data at {}: {}".format(timestamp, value))

请注意,结果包含的单元格数可能少于请求的单元格数。单元格可能只是具有较少的版本,或者您请求的版本可能比 HBase 为列系列保留的版本多。

扫描表中的行

除了检索已知行键的数据外,还可以使用表扫描程序有效地迭代 HBase 中的行,创造 d 使用 Table.scan()。循环访问表中的所有行的基本扫描程序如下所示:

for key, data in table.scan():
    print(key, data)

在实践中,执行上述示例中的完整表扫描的成本高得令人望而却步。可以通过多种方式限制扫描,以进行更具选择性的范围查询。一种方法是指定启动键和 / 或停止键。循环访问从行 aaa 到表末尾的所有行:

for key, data in table.scan(row_start=b'aaa'):
    print(key, data)

要循环访问从表开头到行 xyz 的所有行,请使用:

for key, data in table.scan(row_stop=b'xyz'):
    print(key, data)

要循环访问 aaa 行(包含)和 xyz(未包含)之间的所有行,请同时提供以下两项:

for key, data in table.scan(row_start=b'aaa', row_stop=b'xyz'):
    print(key, data)

另一种方法是使用密钥前缀。例如,要循环访问以 abc 开头的所有行:

for key, data in table.scan(row_prefix=b'abc'):
    print(key, data)

上面的扫描程序示例仅使用 row_start 、 row_stop 和 row_prefix 参数按行键限制结果,但扫描程序也可以将结果限制为某些列、列系列和时间戳,就像 Table.row()Table.rows() 一样。对于高级用户,可以将筛选器字符串作为筛选器参数传递。此外,可选的 limit 参数定义最多检索的数据量,batch_size 参数指定传输的块应有多大。 Table.scan() API 文档提供了有关支持的扫描程序选项的详细信息。

操作数据

HBase 没有任何_数据类型_的概念; 所有行键、列名和列值都简单地视为原始字节字符串。

根据设计,HappyBase _不_执行任何自动字符串转换。这意味着在将数据传递到 HappyBase 之前,必须在应用程序中将数据转换为字节字符串,例如通过调用文本字符串(使用 Unicode),或者采用更高级的字符串序列化技术(如 )。有关此内容的更多详细信息,请查看 HBase 建模技术。请注意,HappyBase 使用的底层 Thrift 库将文本字符串自动编码为字节,但强烈建议不要依赖此 "功能",因为返回的数据不会自动解码,从而导致不对称,从而导致混淆行为。在应用程序代码中具有显式编码和解码步骤是正确的方法。s.encode('utf-8')``struct.pack()

在 HBase 中,所有变更要么存储数据,要么标记数据以进行删除; 没有就地更新或删除这样的事情。HappyBase 提供了执行单个插入或删除的方法,以及一个批量 API,用于一次性执行多个变更。

存储数据

要在表中存储单个数据单元格,我们可以使用 Table.put(),它采用行键和要存储的数据。数据应该是将列名映射到值的字典:

table.put(b'row-key', {b'cf:col1': b'value1',
                       b'cf:col2': b'value2'})

如果要显式提供时间戳,请使用时间戳参数:

table.put(b'row-key', {b'cf:col1': b'value1'}, timestamp=123456789)

如果省略,HBase 默认为当前系统时间。

删除数据

Table.delete()方法从表中删除数据。要删除完整的行,只需指定行键:

若要删除一个或多个列而不是完整行,还要指定 columns 参数:

table.delete(b'row-key', columns=[b'cf1:col1', b'cf1:col2'])

可选的时间戳参数将删除操作限制为最多达到指定时间戳的数据。

执行批量变更

Table.put()Table.delete()方法都会立即向 HBase Thrift 服务器发出命令。这意味着在存储或删除多个值时,使用这些方法不是很有效。聚合一堆命令并一次性将它们发送到服务器要高效得多。这正是 Batch 类的内容,创造 d 使用 Table.batch(),do。 Batch实例具有 put 和 delete 方法,就像 Table类一样,但更改使用 Batch.send()在一次往返中发送到服务器:

b = table.batch()
b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
b.delete(b'row-key-4')
b.send()

注:在单个批处理中存储和删除同一行键的数据会导致不可预知的结果,因此不要这样做。

虽然 Batch实例上的方法类似于 put()delete()方法,但它们不会为每个变更采用时间戳参数。相反,您可以为完整批处理指定单个时间戳参数:

b = table.batch(timestamp=123456789)
b.put(...)
b.delete(...)
b.send()

Batch 实例可以用作 上下文管理器 ,这与 Python 的 with 语句结合使用是最有用的。上面的示例可以简化为:

with table.batch() as b:
    b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
    b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
    b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
    b.delete(b'row-key-4')

如您所见,不再需要调用 Batch.send()。 当with代码块终止时,即使块中的某个位置出现错误,也会自动应用批处理,因此它的行为与子句基本相同。但是,某些应用程序需要事务行为,仅当未发生异常时才发送批处理。如果没有上下文管理器,这将如下所示:with``try/finally

b = table.batch()
try:
    b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
    b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
    b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
    b.delete(b'row-key-4')
    raise ValueError("Something went wrong!")
except ValueError as e:
    # error handling goes here; nothing will be sent to HBase
    pass
else:
    # no exceptions; send data
    b.send()

使用块更容易获得相同的行为。 Table.batch()的事务参数就是您所需要的:with

try:
    with table.batch(transaction=True) as b:
        b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
        b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
        b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
        b.delete(b'row-key-4')
        raise ValueError("Something went wrong!")
except ValueError:
    # error handling goes here; nothing is sent to HBase
    pass

# when no error occurred, the transaction succeeded

正如您可能已经想象的那样,Batch会将所有变更保留在内存中,直到通过显式调用 Batch.send()或块结束时发送批处理。这不适用于需要存储大量数据的应用程序,因为它可能导致批处理太大而无法在一次往返中发送,或者批处理使用太多内存。对于这些情况,可以指定 batch_size 参数。batch_size 充当阈值:Batch实例在有多个挂起操作时自动发送所有挂起 batch_size 变更。例如,这将导致到服务器的三次往返(两个批次包含 1000 个单元格,一个批次具有剩余的 400 个单元格):with

with table.batch(batch_size=1000) as b:
    for i in range(1200):
        # this put() will result in two mutations (two cells)
        b.put(b'row-%04d' % i, {
            b'cf1:col1': b'v1',
            b'cf1:col2': b'v2',
        })

适当的 batch_size 非常特定于应用程序,因为它取决于数据大小,因此只需进行试验,了解不同的大小如何适用于您的特定用例。

使用原子计数器

Table.counter_inc()Table.counter_dec()方法允许原子增量和递减 8 字节宽的值,这些值被 HBase 解释为大端 64 位有符号整数。计数器在首次使用时自动初始化为 0。递增或递减计数器时,将返回修改后的值。例:

print(table.counter_inc(b'row-key', b'cf1:counter'))  # prints 1
print(table.counter_inc(b'row-key', b'cf1:counter'))  # prints 2
print(table.counter_inc(b'row-key', b'cf1:counter'))  # prints 3

print(table.counter_dec(b'row-key', b'cf1:counter'))  # prints 2

可选的 value 参数指定通过以下方式递增或递减的量:

print(table.counter_inc(b'row-key', b'cf1:counter', value=3))  # prints 5

虽然计数器通常与上面所示的递增和递减函数一起使用,但 Table.counter_get()Table.counter_set()方法可用于直接检索或设置计数器值:

print(table.counter_get(b'row-key', b'cf1:counter'))  # prints 5

table.counter_set(b'row-key', b'cf1:counter', 12)

使用连接池

HappyBase 附带一个线程安全的连接池,允许多个线程共享和重用打开的连接。这在多线程服务器应用程序(例如使用 Apache 的 mod_wsgi 提供服务的 Web 应用程序)中最有用。当线程向池请求连接(使用 ConnectionPool.connection() )时,将授予其租约,在此期间,线程对连接具有独占访问权限。使用连接完成线程后,它将连接返回到池,以便它可用于其他线程。

实例化池

该池由连接池类提供。构造函数的 size 参数指定池中的连接数。其他参数将传递给Connection构造函数:

pool = happybase.ConnectionPool(size=3, host='...', table_prefix='myproject')

实例化后,连接池将立即建立连接,以便立即检测到错误主机名等简单问题。对于剩余的连接,池会执行惰性操作:仅在需要时才会打开新连接。

获取连接

连接只能使用 Python 的上下文管理器协议获得,即使用语句中的代码块。这可确保连接在使用后实际返回到池。例:with

pool = happybase.ConnectionPool(size=3, host='...')

with pool.connection() as connection:
    print(connection.tables())

警告

切勿在块结束后使用实例。即使变量仍在作用域内,连接可能已同时分配给另一个线程。connection``with

应尽快将连接返回到池中,以便其他线程可以使用它们。这意味着块中包含的代码量应保持在绝对最小值。在实践中,应用程序应该只加载块内的数据,并在块外处理数据:with``with``with

with pool.connection() as connection:
    table = connection.table('table-name')
    row = table.row(b'row-key')

process_data(row)

一个应用程序线程一次只能保存一个连接。当线程保持连接并再次请求连接时(例如,因为被调用的函数也从池中请求连接),将返回它已经持有的相同连接实例,因此这不需要应用程序进行任何协调。这意味着在以下示例中,对池的两个连接请求将返回完全相同的连接:

pool = happybase.ConnectionPool(size=3, host='...')

def do_something_else():
    with pool.connection() as connection:
        pass  # use the connection here

with pool.connection() as connection:
    # use the connection here, e.g.
    print(connection.tables())

    # call another function that uses a connection
    do_something_else()

处理断开的连接

池会尝试检测断开的连接,并在连接返回到池时用新的连接替换这些连接。但是,连接池不会捕获引发的异常,也不会自动重试失败的操作。这意味着应用程序仍必须处理连接错误。

三. 今后的步骤

下一步是亲自尝试一下! API 文档可用作参考。


幻翼 2022年1月7日 10:52 收藏文档