本文由 简悦 SimpRead 转码, 原文地址 happybase.readthedocs.io
本用户指南探讨了 HappyBase API,并应为您提供足够的信息来帮助您入门。请注意,本用户指南旨在介绍 HappyBase,而不是一般的 HBase。读者应该已经对 HBase 及其数据模型有了基本的了解。
虽然用户指南确实涵盖了大多数功能,但它并不是一个完整的参考指南。有关 HappyBase API 的更多信息,请访问 API 文档。
建立连接 ¶
我们将从连接到 HBase 开始。创造新的Connection
实例:
import happybase
connection = happybase.Connection('somehost')
在某些设置中,Connection
类需要有关它将连接到的 HBase 版本以及要使用的 Thrift 传输的一些其他信息。如果您仍在使用 HBase 0.90.x,则需要设置 compat 参数,以确保 HappyBase 使用正确的有线协议。此外,如果您将 HBase 0.94 与非标准 Thrift 传输模式配合使用,请确保提供正确的传输参数。有关这些参数及其支持的值的详细信息,请参阅 Connection
类的 API 文档。
当Connection
创造被创建后,它会自动打开与 HBase Thrift 服务器的套接字连接。可以通过将 autoconnect 参数设置为 False,并使用 Connection.open()
手动打开连接来禁用此行为:
connection = happybase.Connection('somehost', autoconnect=False)
# before first use:
connection.open()
Connection
类提供与 HBase 交互的主入口点。例如,要列出可用的表,请使用 Connection.tables()
:
print(connection.tables())
Connection
类上的大多数其他方法都用于系统管理任务,如创建、删除、启用和禁用表。请参阅Connection
类的 API 文档,其中包含更多信息。本用户指南不涵盖这些任务,因为您很可能已经在使用 HBase shell 执行这些系统管理任务。
注:HappyBase 还具有一个连接池,本指南稍后将对此进行介绍。
使用表格 ¶
Table
类提供了用于检索和操作 HBase 中数据的主 API。在上面的示例中,我们已经使用 Connection.tables()
方法列出了所有可用的表。如果还没有创建任何表,您可以使用 Connection.create_table()
创建一个新的表:
connection.create_table(
'mytable',
{'cf1': dict(max_versions=10),
'cf2': dict(max_versions=1, block_cache_enabled=False),
'cf3': dict(), # use defaults
}
)
注:对于许多 HBase 管理任务,HBase shell 通常是更好的替代方案,因为与 HappyBase 使用的有限的 Thrift API 相比,该 shell 更强大。
下一步是获取要使用的Table
实例。只需调用 Connection.table()
,向其传递表名:
table = connection.table('mytable')
获取Table
实例_不会_导致往返 Thrift 服务器,这意味着应用程序代码可能会在Connection
实例需要新表时要求它提供新Table
,而不会产生负面的性能后果。副作用是没有进行任何检查以确保表存在,因为这将涉及往返。如果稍后在代码中尝试与不存在的表进行交互,则会出现错误。对于本指南,我们假设该表存在。
注:来自 Java HBase API 的 "重"HTable HBase 类位于 Thrift 连接的另一端,该类执行与区域服务器的实际通信。Python 端的
Table
实例和服务器端的 HTable 实例之间没有直接映射。
使用表 "命名空间"¶
如果单个 HBase 实例由多个应用程序共享,则不同应用程序使用的表名可能会发生冲突。这个问题的简单解决方案是将 "命名空间" 前缀添加到特定应用程序 "拥有" 的所有表的名称中,例如,myproject
项目中所有表的名称格式为myproject_XYZ
.
无需在每次将表名传递给 HappyBase 时添加此特定于应用程序的前缀,而是将 table_prefix 参数传递给 Connection
可以解决此问题。HappyBase 会将该前缀(和下划线)附加到该Connection
实例处理的每个表名之前。例如:
connection = happybase.Connection('somehost', table_prefix='myproject')
此时,Connection.tables()
不再包含其他 "命名空间" 中的表。HappyBase 只会返回带有前缀的表,并且在返回结果时也会透明地删除前缀,例如:myproject_
print(connection.tables()) # Table "myproject_XYZ" in HBase will be
# returned as simply "XYZ"
这也适用于采用表名的其他方法,例如 Connection.table()
:
table = connection.table('XYZ') # Operates on myproject_XYZ in HBase
最终结果是,表前缀在代码中仅指定一次,即在对 Connection
构造函数的调用中指定,并且在需要更改时只需要进行一次更改。
检索数据 ¶
HBase 数据模型是一个多维稀疏映射。HBase 中的表包含列系列,其列限定符包含值和时间戳。在大多数 HappyBase API 中,列系列和限定符名称被指定为单个字符串,例如 ,而不是两个单独的参数。虽然列族和限定符是 HBase 数据模型中的不同概念,但在与数据交互时,它们几乎总是一起使用,因此将它们视为单个字符串会使 API 变得更加简单。cf1:col1
检索行 ¶
Table
类提供了各种方法来从 HBase 中的表中检索数据。最基本的是 Table.row()
,它从表中检索单个行,并将其作为字典返回,将列映射到值:
row = table.row(b'row-key')
print(row[b'cf1:col1']) # prints the value of cf1:col1
Table.rows()
方法的工作方式与 Table.row() 类似
,但采用多个行键并将其作为(键、数据)元组返回:
rows = table.rows([b'row-key-1', b'row-key-2'])
for key, data in rows:
print(key, data)
如果您希望 Table.rows()
以字典或有序字典的形式返回结果,则必须自己执行此操作。不过,这很容易,因为返回值可以直接传递给字典构造函数。对于普通字典,顺序丢失:
rows_as_dict = dict(table.rows([b'row-key-1', b'row-key-2']))
... 而对于 a,顺序保持不变:OrderedDict
from collections import OrderedDict
rows_as_ordered_dict = OrderedDict(table.rows([b'row-key-1', b'row-key-2']))
进行更细粒度的选择 ¶
HBase 的数据模型允许更细粒度地选择要检索的数据。如果您事先知道需要哪些列,则可以通过将这些列显式指定给 Table.row() 和 Table.rows()
来提高性能。 columns 参数采用列名的列表(或元组):
row = table.row(b'row-key', columns=[b'cf1:col1', b'cf1:col2'])
print(row[b'cf1:col1'])
print(row[b'cf1:col2'])
列参数中的项也可能只是一个列系列,而不是同时提供列系列和列限定符,这意味着将检索该列系列中的所有列。例如,要获取列系列 cf1 中的所有列和值,请使用:
row = table.row(b'row-key', columns=[b'cf1'])
在 HBase 中,每个单元都有一个附加的时间戳。如果您不想使用存储在 HBase 中的最新版本的数据,则从数据库中检索数据的方法(例如 Table.row() )
都接受时间戳参数,该参数指定结果应限制为具有时间戳的值,直至指定时间戳:
row = table.row(b'row-key', timestamp=123456789)
默认情况下,HappyBase 在其返回的结果中不包含时间戳。在需要访问时间戳的应用程序中,只需将 include_timestamp 参数设置为 。现在,结果中的每个单元格都将作为(值,时间戳)元组返回,而不仅仅是一个值:True
row = table.row(b'row-key', columns=[b'cf1:col1'], include_timestamp=True)
value, timestamp = row[b'cf1:col1']
HBase 支持存储同一单元格的多个版本。这可以为每个列系列进行配置。若要检索给定行的列的所有版本,可以使用 Table.cells()。
此方法返回单元格的有序列表,最新版本排在最前面。versions 参数指定要返回的最大版本数。就像检索行的方法一样,include_timestamp 参数确定结果中是否包含时间戳。例:
values = table.cells(b'row-key', b'cf1:col1', versions=2)
for value in values:
print("Cell data: {}".format(value))
cells = table.cells(b'row-key', b'cf1:col1', versions=3, include_timestamp=True)
for value, timestamp in cells:
print("Cell data at {}: {}".format(timestamp, value))
请注意,结果包含的单元格数可能少于请求的单元格数。单元格可能只是具有较少的版本,或者您请求的版本可能比 HBase 为列系列保留的版本多。
扫描表中的行 ¶
除了检索已知行键的数据外,还可以使用表扫描程序有效地迭代 HBase 中的行,创造 d 使用 Table.scan()
。循环访问表中的所有行的基本扫描程序如下所示:
for key, data in table.scan():
print(key, data)
在实践中,执行上述示例中的完整表扫描的成本高得令人望而却步。可以通过多种方式限制扫描,以进行更具选择性的范围查询。一种方法是指定启动键和 / 或停止键。循环访问从行 aaa 到表末尾的所有行:
for key, data in table.scan(row_start=b'aaa'):
print(key, data)
要循环访问从表开头到行 xyz 的所有行,请使用:
for key, data in table.scan(row_stop=b'xyz'):
print(key, data)
要循环访问 aaa 行(包含)和 xyz(未包含)之间的所有行,请同时提供以下两项:
for key, data in table.scan(row_start=b'aaa', row_stop=b'xyz'):
print(key, data)
另一种方法是使用密钥前缀。例如,要循环访问以 abc 开头的所有行:
for key, data in table.scan(row_prefix=b'abc'):
print(key, data)
上面的扫描程序示例仅使用 row_start 、 row_stop 和 row_prefix 参数按行键限制结果,但扫描程序也可以将结果限制为某些列、列系列和时间戳,就像 Table.row()
和 Table.rows() 一
样。对于高级用户,可以将筛选器字符串作为筛选器参数传递。此外,可选的 limit 参数定义最多检索的数据量,batch_size 参数指定传输的块应有多大。 Table.scan()
API 文档提供了有关支持的扫描程序选项的详细信息。
操作数据 ¶
HBase 没有任何_数据类型_的概念; 所有行键、列名和列值都简单地视为原始字节字符串。
根据设计,HappyBase _不_执行任何自动字符串转换。这意味着在将数据传递到 HappyBase 之前,必须在应用程序中将数据转换为字节字符串,例如通过调用文本字符串(使用 Unicode),或者采用更高级的字符串序列化技术(如 )。有关此内容的更多详细信息,请查看 HBase 建模技术。请注意,HappyBase 使用的底层 Thrift 库将文本字符串自动编码为字节,但强烈建议不要依赖此 "功能",因为返回的数据不会自动解码,从而导致不对称,从而导致混淆行为。在应用程序代码中具有显式编码和解码步骤是正确的方法。s.encode('utf-8')``struct.pack()
在 HBase 中,所有变更要么存储数据,要么标记数据以进行删除; 没有就地更新或删除这样的事情。HappyBase 提供了执行单个插入或删除的方法,以及一个批量 API,用于一次性执行多个变更。
存储数据 ¶
要在表中存储单个数据单元格,我们可以使用 Table.put()
,它采用行键和要存储的数据。数据应该是将列名映射到值的字典:
table.put(b'row-key', {b'cf:col1': b'value1',
b'cf:col2': b'value2'})
如果要显式提供时间戳,请使用时间戳参数:
table.put(b'row-key', {b'cf:col1': b'value1'}, timestamp=123456789)
如果省略,HBase 默认为当前系统时间。
删除数据 ¶
Table.delete()
方法从表中删除数据。要删除完整的行,只需指定行键:
若要删除一个或多个列而不是完整行,还要指定 columns 参数:
table.delete(b'row-key', columns=[b'cf1:col1', b'cf1:col2'])
可选的时间戳参数将删除操作限制为最多达到指定时间戳的数据。
执行批量变更 ¶
Table.put()
和 Table.delete()
方法都会立即向 HBase Thrift 服务器发出命令。这意味着在存储或删除多个值时,使用这些方法不是很有效。聚合一堆命令并一次性将它们发送到服务器要高效得多。这正是 Batch 类
的内容,创造 d 使用 Table.batch()
,do。 Batch
实例具有 put 和 delete 方法,就像 Table
类一样,但更改使用 Batch.send()
在一次往返中发送到服务器:
b = table.batch()
b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
b.delete(b'row-key-4')
b.send()
注:在单个批处理中存储和删除同一行键的数据会导致不可预知的结果,因此不要这样做。
虽然 Batch
实例上的方法类似于 put()
和 delete()
方法,但它们不会为每个变更采用时间戳参数。相反,您可以为完整批处理指定单个时间戳参数:
b = table.batch(timestamp=123456789)
b.put(...)
b.delete(...)
b.send()
Batch
实例可以用作 上下文管理器 ,这与 Python 的 with
语句结合使用是最有用的。上面的示例可以简化为:
with table.batch() as b:
b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
b.delete(b'row-key-4')
如您所见,不再需要调用 Batch.send()
。 当with
代码块终止时,即使块中的某个位置出现错误,也会自动应用批处理,因此它的行为与子句基本相同。但是,某些应用程序需要事务行为,仅当未发生异常时才发送批处理。如果没有上下文管理器,这将如下所示:with``try/finally
b = table.batch()
try:
b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
b.delete(b'row-key-4')
raise ValueError("Something went wrong!")
except ValueError as e:
# error handling goes here; nothing will be sent to HBase
pass
else:
# no exceptions; send data
b.send()
使用块更容易获得相同的行为。 Table.batch()
的事务参数就是您所需要的:with
try:
with table.batch(transaction=True) as b:
b.put(b'row-key-1', {b'cf:col1': b'value1', b'cf:col2': b'value2'})
b.put(b'row-key-2', {b'cf:col2': b'value2', b'cf:col3': b'value3'})
b.put(b'row-key-3', {b'cf:col3': b'value3', b'cf:col4': b'value4'})
b.delete(b'row-key-4')
raise ValueError("Something went wrong!")
except ValueError:
# error handling goes here; nothing is sent to HBase
pass
# when no error occurred, the transaction succeeded
正如您可能已经想象的那样,Batch
会将所有变更保留在内存中,直到通过显式调用 Batch.send()
或块结束时发送批处理。这不适用于需要存储大量数据的应用程序,因为它可能导致批处理太大而无法在一次往返中发送,或者批处理使用太多内存。对于这些情况,可以指定 batch_size 参数。batch_size 充当阈值:Batch
实例在有多个挂起操作时自动发送所有挂起 batch_size 变更。例如,这将导致到服务器的三次往返(两个批次包含 1000 个单元格,一个批次具有剩余的 400 个单元格):with
with table.batch(batch_size=1000) as b:
for i in range(1200):
# this put() will result in two mutations (two cells)
b.put(b'row-%04d' % i, {
b'cf1:col1': b'v1',
b'cf1:col2': b'v2',
})
适当的 batch_size 非常特定于应用程序,因为它取决于数据大小,因此只需进行试验,了解不同的大小如何适用于您的特定用例。
使用原子计数器 ¶
Table.counter_inc()
和 Table.counter_dec()
方法允许原子增量和递减 8 字节宽的值,这些值被 HBase 解释为大端 64 位有符号整数。计数器在首次使用时自动初始化为 0。递增或递减计数器时,将返回修改后的值。例:
print(table.counter_inc(b'row-key', b'cf1:counter')) # prints 1
print(table.counter_inc(b'row-key', b'cf1:counter')) # prints 2
print(table.counter_inc(b'row-key', b'cf1:counter')) # prints 3
print(table.counter_dec(b'row-key', b'cf1:counter')) # prints 2
可选的 value 参数指定通过以下方式递增或递减的量:
print(table.counter_inc(b'row-key', b'cf1:counter', value=3)) # prints 5
虽然计数器通常与上面所示的递增和递减函数一起使用,但 Table.counter_get()
和 Table.counter_set()
方法可用于直接检索或设置计数器值:
print(table.counter_get(b'row-key', b'cf1:counter')) # prints 5
table.counter_set(b'row-key', b'cf1:counter', 12)
使用连接池 ¶
HappyBase 附带一个线程安全的连接池,允许多个线程共享和重用打开的连接。这在多线程服务器应用程序(例如使用 Apache 的 mod_wsgi 提供服务的 Web 应用程序)中最有用。当线程向池请求连接(使用 ConnectionPool.connection() )
时,将授予其租约,在此期间,线程对连接具有独占访问权限。使用连接完成线程后,它将连接返回到池,以便它可用于其他线程。
实例化池 ¶
该池由连接池
类提供。构造函数的 size 参数指定池中的连接数。其他参数将传递给Connection
构造函数:
pool = happybase.ConnectionPool(size=3, host='...', table_prefix='myproject')
实例化后,连接池将立即建立连接,以便立即检测到错误主机名等简单问题。对于剩余的连接,池会执行惰性操作:仅在需要时才会打开新连接。
获取连接 ¶
连接只能使用 Python 的上下文管理器协议获得,即使用语句中的代码块。这可确保连接在使用后实际返回到池。例:with
pool = happybase.ConnectionPool(size=3, host='...')
with pool.connection() as connection:
print(connection.tables())
警告
切勿在块结束后使用实例。即使变量仍在作用域内,连接可能已同时分配给另一个线程。connection``with
应尽快将连接返回到池中,以便其他线程可以使用它们。这意味着块中包含的代码量应保持在绝对最小值。在实践中,应用程序应该只加载块内的数据,并在块外处理数据:with``with``with
with pool.connection() as connection:
table = connection.table('table-name')
row = table.row(b'row-key')
process_data(row)
一个应用程序线程一次只能保存一个连接。当线程保持连接并再次请求连接时(例如,因为被调用的函数也从池中请求连接),将返回它已经持有的相同连接实例,因此这不需要应用程序进行任何协调。这意味着在以下示例中,对池的两个连接请求将返回完全相同的连接:
pool = happybase.ConnectionPool(size=3, host='...')
def do_something_else():
with pool.connection() as connection:
pass # use the connection here
with pool.connection() as connection:
# use the connection here, e.g.
print(connection.tables())
# call another function that uses a connection
do_something_else()
处理断开的连接 ¶
池会尝试检测断开的连接,并在连接返回到池时用新的连接替换这些连接。但是,连接池不会捕获引发的异常,也不会自动重试失败的操作。这意味着应用程序仍必须处理连接错误。
三. 今后的步骤
下一步是亲自尝试一下! API 文档可用作参考。