def insert_into_queue self table_name key data Вставка данных HBase че

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def insert_into_queue(self, table_name, key, data):
"""
Вставка данных в HBase через очередь HotQueue
:param table_name: string - имя таблицы
:param key: string - ключ строки
:param data: dict - словарь данных
:return: result: boolean - результат вставки
:raise: Exception
"""
if not config.QUEUE_ENABLED:
return False
# Убедимся, что соединение с очередью установлено
self.ensure_queue_connection()
# Подготовим данные ко вставке
task = {
'data': {},
'table': table_name,
'row_key': str(key),
}
for k, v in data.items():
k = 'info:%s' % k
v = "" if v is None else "%s" % v
task["data"][k] = v
try:
# Вставим запись в очередь задач
self.queue.put(task)
# Все хорошо
return True
except Exception, e:
request.logger.warning("[%s] Could not insert task into Queue. "
"Data is %s, error is %s" % (self.__class__.__name__, task, e)
)
return False
def ensure_queue_connection(self):
"""
Убедимся, что существует соединение с очередью
:return: queue: HotQueue - инстанс диспетчера очереди
"""
try:
self.queue
except:
if config.REDIS_UNIX_SOCKET:
self.queue = HotQueue(
config.QUEUE_NAME,
unix_socket_path=config.REDIS_UNIX_SOCKET
)
else:
self.queue = HotQueue(
config.QUEUE_NAME,
host=config.REDIS_HOST,
port=config.REDIS_PORT,
db=0
)
return self.queue