import java.io.IOException;
import java.util.StringTokenizer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;
import java.util.List;
import com.google.common.collect.Lists;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;
import storm.trident.spout.IBatchSpout;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Map;
import java.util.ArrayList;
import backtype.storm.task.TopologyContext;
import java.io.FileInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import storm.trident.operation.BaseFilter;
import java.io.FilenameFilter;
import java.lang.Integer;
import storm.trident.operation.CombinerAggregator;
import redis.clients.jedis.Jedis;
import java.util.Iterator;
import java.util.Set;
//id fake_sid adt aeid msgid regid
public class Storming {
static Jedis jedis = new Jedis("localhost");
public static class Splitter extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
String line = tuple.getString(0);
String[] fields = line.split(";");
//System.out.println(fields[0] + "/" + fields[1] + "/" + fields[2]);
ArrayList arr = new ArrayList();
arr.add(new Values(fields[0]));
arr.add(new Values(fields[1]));
arr.add(new Values(fields[2]));
collector.emit(arr);
}
}
public static class MatchRedis extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
//System.out.println(tuple.getString(0) + "/" + tuple.getString(1) + "/" + tuple.getString(2) + "\n");
//System.out.println(tuple.getValues().get(2).toString());
String msg = tuple.getValues().get(2).toString();
String[] msgarr = msg.substring(1, msg.length() - 1).split(" ");
Boolean flag = true;
for (String word : msgarr){
//System.out.println("in loop:" + word);
if (word.length() > 3){
if (jedis.sismember("filter", word))
flag = false;
}
}
ArrayList arr = new ArrayList();
if (flag) {
collector.emit(new Values("true"));
} else {
collector.emit(new Values("false"));
}
}
}
public static class Printer extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
System.out.println(tuple.toString());
}
}
public static class Batcher implements IBatchSpout{
Fields fields;
File msglist;
BufferedReader reader;
String str;
@Override
public Fields getOutputFields(){
return fields;
}
public Batcher(Fields fields){
this.fields = fields;
}
@Override
public void open(Map conf, TopologyContext context){
try{
msglist = new File("/home/vladimir/trident/msglist.csv");
reader = new BufferedReader(new InputStreamReader(new FileInputStream(msglist)));
}catch (Exception e){
}
}
@Override
public void emitBatch(long batchId, TridentCollector collector){
//for(;;)
// System.out.println("in emitbatch");
try{
str = reader.readLine();
while(str != null){
collector.emit(new Values(str));
//System.out.println(str);
str = reader.readLine();
}
}catch (Exception e){
}
}
@Override
public void close(){}
@Override
public void ack(long batchId){}
@Override
public Map getComponentConfiguration(){
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
topology.newStream("spout",new Batcher(new Fields("line"))).
each(new Fields("line"), new Splitter(), new Fields("innum", "outnum", "text")).
each(new Fields("innum", "outnum", "text"), new MatchRedis(),new Fields("spamflag")).
each(new Fields("innum", "outnum", "text", "spamflag"), new Printer(), new Fields("final"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SpamFilter", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
}
}
}
}