trident

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import java.io.IOException;
import java.util.StringTokenizer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;
import java.util.List;
import com.google.common.collect.Lists;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;
import storm.trident.spout.IBatchSpout;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Map;
import java.util.ArrayList;
import backtype.storm.task.TopologyContext;
import java.io.FileInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import storm.trident.operation.BaseFilter;
import java.io.FilenameFilter;
import java.lang.Integer;
import storm.trident.operation.CombinerAggregator;
import redis.clients.jedis.Jedis;
import java.util.Iterator;
import java.util.Set;
//id fake_sid adt aeid msgid regid
public class Storming {
static Jedis jedis = new Jedis("localhost");
public static class Splitter extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
String line = tuple.getString(0);
String[] fields = line.split(";");
//System.out.println(fields[0] + "/" + fields[1] + "/" + fields[2]);
ArrayList arr = new ArrayList();
arr.add(new Values(fields[0]));
arr.add(new Values(fields[1]));
arr.add(new Values(fields[2]));
collector.emit(arr);
}
}
public static class MatchRedis extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
//System.out.println(tuple.getString(0) + "/" + tuple.getString(1) + "/" + tuple.getString(2) + "\n");
//System.out.println(tuple.getValues().get(2).toString());
String msg = tuple.getValues().get(2).toString();
String[] msgarr = msg.substring(1, msg.length() - 1).split(" ");
Boolean flag = true;
for (String word : msgarr){
//System.out.println("in loop:" + word);
if (word.length() > 3){
if (jedis.sismember("filter", word))
flag = false;
}
}
ArrayList arr = new ArrayList();
if (flag) {
collector.emit(new Values("true"));
} else {
collector.emit(new Values("false"));
}
}
}
public static class Printer extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
System.out.println(tuple.toString());
}
}
public static class Batcher implements IBatchSpout{
Fields fields;
File msglist;
BufferedReader reader;
String str;
@Override
public Fields getOutputFields(){
return fields;
}
public Batcher(Fields fields){
this.fields = fields;
}
@Override
public void open(Map conf, TopologyContext context){
try{
msglist = new File("/home/vladimir/trident/msglist.csv");
reader = new BufferedReader(new InputStreamReader(new FileInputStream(msglist)));
}catch (Exception e){
}
}
@Override
public void emitBatch(long batchId, TridentCollector collector){
//for(;;)
// System.out.println("in emitbatch");
try{
str = reader.readLine();
while(str != null){
collector.emit(new Values(str));
//System.out.println(str);
str = reader.readLine();
}
}catch (Exception e){
}
}
@Override
public void close(){}
@Override
public void ack(long batchId){}
@Override
public Map getComponentConfiguration(){
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
topology.newStream("spout",new Batcher(new Fields("line"))).
each(new Fields("line"), new Splitter(), new Fields("innum", "outnum", "text")).
each(new Fields("innum", "outnum", "text"), new MatchRedis(),new Fields("spamflag")).
each(new Fields("innum", "outnum", "text", "spamflag"), new Printer(), new Fields("final"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SpamFilter", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
}
}
}
}