ReducerClass4 - old

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package org.dit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
public class ReducerClass4 extends Reducer<Text, ColumnsNew, WritableComparable, HCatRecord>{
@Override
protected void reduce(Text key, Iterable<ColumnsNew> value,
Context context)
throws IOException, InterruptedException {
LinkedList<ColumnsNew> globalTbl1 = new LinkedList<ColumnsNew>();
LinkedList<ColumnsNew> globalTbl2 = new LinkedList<ColumnsNew>();
Boolean funFlag1 = false;
Boolean funFlag2 = false;
for (ColumnsNew line: value) {
if (line.getTbl1().get()) {
funFlag1 = true;
globalTbl1.add(line);
} else {
funFlag2 = true;
globalTbl2.add(line);
}
}
String viuserid = key.toString();
List columns = new ArrayList(4);
columns.add(new HCatFieldSchema("viuserid", HCatFieldSchema.Type.STRING, ""));
columns.add(new HCatFieldSchema("syncs", HCatFieldSchema.Type.ARRAY, ""));
columns.add(new HCatFieldSchema("user_agent", HCatFieldSchema.Type.STRING, ""));
columns.add(new HCatFieldSchema("hostname", HCatFieldSchema.Type.STRING, ""));
HCatSchema schema = new HCatSchema(columns);
HCatRecord record = new DefaultHCatRecord(4);
if (funFlag1 && funFlag2) {
for (ColumnsNew col1: globalTbl1) {
List<String> syncs = new ArrayList<String>();
syncs.add(col1.getSyncs().getFirst().toString());
syncs.add(col1.getSyncs().getSecond().toString());
for (ColumnsNew col2: globalTbl2) {
String user_agent = col2.getUser_agent().toString();
String hostname = col2.getHostname().toString();
record.setString("viuserid", schema, viuserid);
record.setList("syncs", schema, syncs);
record.setString("user_agent", schema, user_agent);
record.setString("hostname", schema, hostname);
context.write(null, record);
}
}
} else if (funFlag1 && !funFlag2) {
for (ColumnsNew col1: globalTbl1) {
List<String> syncs = new ArrayList<String>();
syncs.add(col1.getSyncs().getFirst().toString());
syncs.add(col1.getSyncs().getSecond().toString());
record.setString("viuserid", schema, viuserid);
record.setList("syncs", schema, syncs);
record.setString("user_agent", schema, null);
record.setString("hostname", schema, null);
context.write(null, record);
}
} else {
for (ColumnsNew col2: globalTbl2) {
String user_agent = col2.getUser_agent().toString();
String hostname = col2.getHostname().toString();
record.setString("viuserid", schema, viuserid);
record.setList("syncs", schema, null);
record.setString("user_agent", schema, user_agent);
record.setString("hostname", schema, hostname);
context.write(null, record);
}
}
}
}