import sys import random import re import operator import os import js

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import sys
import random
import re
import operator
import os
import json
if len(sys.argv) > 1:
DEBUG = sys.argv[1]
else:
DEBUG = 0
class SpamFilter:
""" A naive Bayesian"""
def __init__(self, options=None):
self.ham = {}
self.spam = {}
self.common = {}
self.svm_features = []
self.email_data = []
# Options for naive Bayesian
self.cutoff = 0.75
self.count_threshold = 20
self.size_threshold = 6
self.unique_threshold = 3
self.prob_threshold = 0.1
self.lower = False
self.prob_spam = 0.45
self.num_tokens = 15
# Auto-learning options
self.auto_learn = True
self.taoplus = 0.98
self.taominus = 0.05
# Stats
self.exemplar_spam = 0
self.exemplar_ham = 0
self.false_positives = 0
self.true_positives = 0
self.false_negatives = 0
self.true_negatives = 0
def _get_tokens(self, content):
""" Treat dollar signs, apostrophes and dashes as part of words;
everything else as a token separator """
tokens = re.findall(r'[\w$-\']+', content)
tokens = [t.lower() for t in tokens]
for t in tokens:
if (len(set(t)) < self.unique_threshold or len(t) < self.size_threshold):
del t
return tokens
def _parse_email(self, table, content):
""" Parse email, adding tokens to appropriate dictionary """
tokens = self._get_tokens(content)
for t in tokens:
if t not in table:
table[t] = 0
table[t] += 1
if t not in self.common:
self.common[t] = 0
self.common[t] += 1
def train_data(self, training_percent=33.0):
N = len(self.email_data)
training_indices = set(random.sample(list(range(N)), int(N*training_percent/100.0)))
self.training = []
self.development = []
for i in range(N):
if i in training_indices:
self.training.append(self.email_data[i])
else:
self.development.append(self.email_data[i])
for email in self.training:
if email['isSpam']:
# Add words to spam dictionary
self._parse_email(self.spam, email['message'])
else:
# Add words to ham dictionary
self._parse_email(self.ham, email['message'])
def _classify_bayes_test(self, content):
"""
When auto-learning enabled, re-train when identifying exemplar spam (> taoplus)
or ham emails (< taominus)
"""
if self.auto_learn:
return self._classify_bayes(content, auto_learn=True)
def _debug_content(self, content, pspam, plist):
if DEBUG:
print("%f: \n%s" % (pspam, "\n".join(map(str,plist[:self.num_tokens]))))
print("Test: P(SPAM|email) = %f" % pspam)
ans = input("View email (y/[N])? ")
if ans in ('y','Y'):
print(content)
input('ok?')
def _classify_bayes(self, content, auto_learn=False):
"""
Calculates probability email is spam based on top N words of content:
P(S|w1,w2,..wN) = P(w1|S)*P(w2|S)*...P(wN|S) / \
[ P(w1|S)*P(w2|S)*...P(wN|S) + (1-P(w1|S)*(1-P(w2|S)*...(1-P(wN|S) ]
"""
tokens = self._get_tokens(content)
plist = []
for t in set(tokens):
pword = self.calculate_probability_spam(t)
if pword:
plist.append(pword)
# Sort by most significant words (likely spam or ham)
plist.sort(key=lambda p: abs(0.5-p[1]), reverse=True)
numerator = 1.0
denominator_left = 1.0
denominator_right = 1.0
for i in range(min(self.num_tokens, len(plist))):
numerator *= plist[i][1]
denominator_left *= plist[i][1]
denominator_right *= 1.0 - plist[i][1]
pspam = numerator / (denominator_left + denominator_right)
if auto_learn:
# Auto-learn ('iterative') if very likely spam or very likely ham
if pspam > self.taoplus:
# Add words to spam dictionary
self._parse_email(self.spam, content)
self.exemplar_spam += 1
self._debug_content(content, pspam, plist)
elif pspam < self.taominus:
# Add words to ham dictionary
self._parse_email(self.ham, content)
self.exemplar_ham += 1
self._debug_content(content, pspam, plist)
return 1 if pspam > self.cutoff else 0
def calculate_probability_spam(self, token):
"""
Calculate Bayesian probability of spam given token, P(S|w), meeting minimum criteria
P(S|w) = P(w|S)*P(S) / [ P(w|S)*P(S) + P(w|H)*P(H) ]
"""
if len(token) >= self.size_threshold and len(set(token)) > self.unique_threshold:
b = self.spam[token] if token in self.spam else 1
g = self.ham[token] if token in self.ham else 1
if b+g > self.count_threshold:
numerator = b*1.0/len(self.spam) * self.prob_spam
denominator = b*1.0/len(self.spam) * self.prob_spam + \
g*1.0/len(self.ham) * (1. - self.prob_spam)
pspam_word = numerator / denominator
if abs(0.5 - pspam_word) > self.prob_threshold:
return (token, pspam_word, b+g)
return None
def print_strong_words(self):
""" Prints the most significant words, indicating spam or not spam """
strong_words = []
for token, value in list(self.common.items()):
probability_spam = self.calculate_probability_spam(token)
if probability_spam:
strong_words.append(probability_spam)
strong_words.sort(key=operator.itemgetter(1))
print("\nTop words most likely to be spam:")
for s in strong_words[:-10:-1]:
print("%s = %f, %d occurrences" % (s[0], s[1], s[2]))
print("\nTop words least likely to be spam:")
for s in strong_words[:10]:
print("%s = %f, %d occurences" % (s[0], s[1], s[2]))
def print_stats(self):
self.num_spam = [i['isSpam'] for i in self.email_data].count(1)
print("Read %d emails," % len(self.email_data), end=' ')
print("{:.2%} spam ".format(self.num_spam * 1.0 / len(self.email_data)))
self.print_strong_words()
print("\nVariables:")
print("tao- = %f" % self.taominus)
print("tao+ = %f" % self.taoplus)
print("Prob(Spam) = %f" % self.prob_spam)
print("Spam cutoff = %f" % self.cutoff)
print("Count minimum = %d" % self.count_threshold)
print("\n########################################################")
print("Naive Bayes:")
print("%d unique tokens" % len(self.common))
# print("Training error: ", self._error(self.training, self._classify_bayes))
# print("Development error: ", self._error(self.development, \
# self._classify_bayes_test))
print("Auto-learning on %d spam and %d ham" % (self.exemplar_spam, self.exemplar_ham))
summary = self.false_negatives + self.true_negatives
if summary != 0:
print("False positives = %.2f%%" % (self.false_positives * 1.0 / summary))
print("False negatives = %.2f%%" % (self.false_negatives * 1.00 / summary))
self.false_positives = self.true_positives = self.false_negatives = self.true_negatives = 0
def read_train(self, train_dir):
for f in os.listdir(train_dir):
with open(os.path.join(train_dir, f), 'r') as fo:
mail = json.load(fo)
# добавляем данные из файла в общий массив писем
self.email_data.append(mail)
def main():
sf = SpamFilter()
# читаем папку с json файлами
sf.read_train('jsondata')
sf.train_data(training_percent=50)
sf.print_stats()
if __name__=="__main__":
main()
sys.exit()