import java io IOException import java util StringTokenizer import bac

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import java.io.IOException;
import java.util.StringTokenizer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;
import java.util.List;
import com.google.common.collect.Lists;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.MemoryMapState;
import storm.trident.tuple.TridentTuple;
import storm.trident.spout.IBatchSpout;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import java.io.FileInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import storm.trident.operation.BaseFilter;
import java.io.FilenameFilter;
import java.lang.Integer;
import storm.trident.operation.CombinerAggregator;
//id fake_sid adt aeid msgid regid
public class Storming {
public static class Splitter extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
}
}
public static class MatchRedis extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
}
}
public static class Printer extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector){
}
}
public static class Batcher implements IBatchSpout{
@Override
public Fields getOutputFields(){
return fields;
}
public Batcher(Fields fields){
this.fields = fields;
}
@Override
public void open(Map conf, TopologyContext context){
}
@Override
public void emitBatch(long batchId, TridentCollector collector){
for(;;)
System.out.println("in emitbatch");
}
@Override
public void close(){}
@Override
public void ack(long batchId){}
@Override
public Map getComponentConfiguration(){
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
topology.newStream("spout",new Batcher(new Fields("line"))).
each(new Fields("line"), new Splitter(), new Fields("innum", "outnum", "text")).
each(new Fields("innum", "outnum", "text"), new MatchRedis(),new Fields("innum", "outnum", "text", "spamflag")).
each(new Fields("innum", "outnum", "text", "spamflag"), new Printer(), new Fields("final"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("SpamFilter", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
}
}
}
}