public class RowKeyRenameImporter extends TableMapper ImmutableBytesWr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class RowKeyRenameImporter extends TableMapper<ImmutableBytesWritable, Mutation> {
private static final Log LOG = LogFactory.getLog(RowKeyRenameImporter.class);
public final static String WAL_DURABILITY = "import.wal.durability";
public final static String ROWKEY_RENAME_IMPL = "row.key.rename";
private List<UUID> clusterIds;
private Durability durability;
private RowKeyRename rowkeyRenameImpl;
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
*/
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
try {
writeResult(row, value, context);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void writeResult(ImmutableBytesWritable key, Result result, Context context)
throws IOException, InterruptedException {
Put put = null;
if (LOG.isTraceEnabled()) {
LOG.trace("Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
}
processKV(key, result, context, put);
}
protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put)
throws IOException, InterruptedException {
LOG.info("Renaming the row " + key.toString());
ImmutableBytesWritable renameRowKey = rowkeyRenameImpl.rowKeyRename(key);
for (Cell kv : result.rawCells()) {
if (put == null) {
put = new Put(renameRowKey.get());
}
Cell renamedKV = convertKv(kv, renameRowKey);
addPutToKv(put, renamedKV);
if (put != null) {
if (durability != null) {
put.setDurability(durability);
}
put.setClusterIds(clusterIds);
context.write(key, put);
}
}
}
// helper: create a new KeyValue based on renaming of row Key
private static Cell convertKv(Cell kv, ImmutableBytesWritable renameRowKey) {
byte[] newCfName = CellUtil.cloneFamily(kv);
kv = new KeyValue(renameRowKey.get(), // row buffer
renameRowKey.getOffset(), // row offset
renameRowKey.getLength(), // row length
newCfName, // CF buffer
0, // CF offset
kv.getFamilyLength(), // CF length
kv.getQualifierArray(), // qualifier buffer
kv.getQualifierOffset(), // qualifier offset
kv.getQualifierLength(), // qualifier length
kv.getTimestamp(), // timestamp
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
kv.getValueArray(), // value buffer
kv.getValueOffset(), // value offset
kv.getValueLength()); // value length
return kv;
}
protected void addPutToKv(Put put, Cell kv) throws IOException {
put.add(kv);
}