import sys
import random
import re
import operator
import os
import json
if len(sys.argv) > 1:
DEBUG = sys.argv[1]
else:
DEBUG = 0
class SpamFilter:
""" A naive Bayesian"""
def __init__(self, options=None):
self.ham = {}
self.spam = {}
self.common = {}
self.svm_features = []
self.email_data = []
# Options for naive Bayesian
self.cutoff = 0.75
self.count_threshold = 20
self.size_threshold = 6
self.unique_threshold = 3
self.prob_threshold = 0.1
self.lower = False
self.prob_spam = 0.45
self.num_tokens = 15
# Auto-learning options
self.auto_learn = True
self.taoplus = 0.98
self.taominus = 0.05
# Stats
self.exemplar_spam = 0
self.exemplar_ham = 0
self.false_positives = 0
self.true_positives = 0
self.false_negatives = 0
self.true_negatives = 0
def _get_tokens(self, content):
""" Treat dollar signs, apostrophes and dashes as part of words;
everything else as a token separator """
tokens = re.findall(r'[\w$-\']+', content)
tokens = [t.lower() for t in tokens]
for t in tokens:
if (len(set(t)) < self.unique_threshold or len(t) < self.size_threshold):
del t
return tokens
def _parse_email(self, table, content):
""" Parse email, adding tokens to appropriate dictionary """
tokens = self._get_tokens(content)
for t in tokens:
if t not in table:
table[t] = 0
table[t] += 1
if t not in self.common:
self.common[t] = 0
self.common[t] += 1
def train_data(self, training_percent=33.0):
N = len(self.email_data)
training_indices = set(random.sample(list(range(N)), int(N*training_percent/100.0)))
self.training = []
self.development = []
for i in range(N):
if i in training_indices:
self.training.append(self.email_data[i])
else:
self.development.append(self.email_data[i])
for email in self.training:
if email['isSpam']:
# Add words to spam dictionary
self._parse_email(self.spam, email['message'])
else:
# Add words to ham dictionary
self._parse_email(self.ham, email['message'])
def _classify_bayes_test(self, content):
"""
When auto-learning enabled, re-train when identifying exemplar spam (> taoplus)
or ham emails (< taominus)
"""
if self.auto_learn:
return self._classify_bayes(content, auto_learn=True)
def _debug_content(self, content, pspam, plist):
if DEBUG:
print("%f: \n%s" % (pspam, "\n".join(map(str,plist[:self.num_tokens]))))
print("Test: P(SPAM|email) = %f" % pspam)
ans = input("View email (y/[N])? ")
if ans in ('y','Y'):
print(content)
input('ok?')
def _classify_bayes(self, content, auto_learn=False):
"""
Calculates probability email is spam based on top N words of content:
P(S|w1,w2,..wN) = P(w1|S)*P(w2|S)*...P(wN|S) / \
[ P(w1|S)*P(w2|S)*...P(wN|S) + (1-P(w1|S)*(1-P(w2|S)*...(1-P(wN|S) ]
"""
tokens = self._get_tokens(content)
plist = []
for t in set(tokens):
pword = self.calculate_probability_spam(t)
if pword:
plist.append(pword)
# Sort by most significant words (likely spam or ham)
plist.sort(key=lambda p: abs(0.5-p[1]), reverse=True)
numerator = 1.0
denominator_left = 1.0
denominator_right = 1.0
for i in range(min(self.num_tokens, len(plist))):
numerator *= plist[i][1]
denominator_left *= plist[i][1]
denominator_right *= 1.0 - plist[i][1]
pspam = numerator / (denominator_left + denominator_right)
if auto_learn:
# Auto-learn ('iterative') if very likely spam or very likely ham
if pspam > self.taoplus:
# Add words to spam dictionary
self._parse_email(self.spam, content)
self.exemplar_spam += 1
self._debug_content(content, pspam, plist)
elif pspam < self.taominus:
# Add words to ham dictionary
self._parse_email(self.ham, content)
self.exemplar_ham += 1
self._debug_content(content, pspam, plist)
return 1 if pspam > self.cutoff else 0
def calculate_probability_spam(self, token):
"""
Calculate Bayesian probability of spam given token, P(S|w), meeting minimum criteria
P(S|w) = P(w|S)*P(S) / [ P(w|S)*P(S) + P(w|H)*P(H) ]
"""
if len(token) >= self.size_threshold and len(set(token)) > self.unique_threshold:
b = self.spam[token] if token in self.spam else 1
g = self.ham[token] if token in self.ham else 1
if b+g > self.count_threshold:
numerator = b*1.0/len(self.spam) * self.prob_spam
denominator = b*1.0/len(self.spam) * self.prob_spam + \
g*1.0/len(self.ham) * (1. - self.prob_spam)
pspam_word = numerator / denominator
if abs(0.5 - pspam_word) > self.prob_threshold:
return (token, pspam_word, b+g)
return None
def print_strong_words(self):
""" Prints the most significant words, indicating spam or not spam """
strong_words = []
for token, value in list(self.common.items()):
probability_spam = self.calculate_probability_spam(token)
if probability_spam:
strong_words.append(probability_spam)
strong_words.sort(key=operator.itemgetter(1))
print("\nTop words most likely to be spam:")
for s in strong_words[:-10:-1]:
print("%s = %f, %d occurrences" % (s[0], s[1], s[2]))
print("\nTop words least likely to be spam:")
for s in strong_words[:10]:
print("%s = %f, %d occurences" % (s[0], s[1], s[2]))
def print_stats(self):
self.num_spam = [i['isSpam'] for i in self.email_data].count(1)
print("Read %d emails," % len(self.email_data), end=' ')
print("{:.2%} spam ".format(self.num_spam * 1.0 / len(self.email_data)))
self.print_strong_words()
print("\nVariables:")
print("tao- = %f" % self.taominus)
print("tao+ = %f" % self.taoplus)
print("Prob(Spam) = %f" % self.prob_spam)
print("Spam cutoff = %f" % self.cutoff)
print("Count minimum = %d" % self.count_threshold)
print("\n########################################################")
print("Naive Bayes:")
print("%d unique tokens" % len(self.common))
# print("Training error: ", self._error(self.training, self._classify_bayes))
# print("Development error: ", self._error(self.development, \
# self._classify_bayes_test))
print("Auto-learning on %d spam and %d ham" % (self.exemplar_spam, self.exemplar_ham))
summary = self.false_negatives + self.true_negatives
if summary != 0:
print("False positives = %.2f%%" % (self.false_positives * 1.0 / summary))
print("False negatives = %.2f%%" % (self.false_negatives * 1.00 / summary))
self.false_positives = self.true_positives = self.false_negatives = self.true_negatives = 0
def read_train(self, train_dir):
for f in os.listdir(train_dir):
with open(os.path.join(train_dir, f), 'r') as fo:
mail = json.load(fo)
# добавляем данные из файла в общий массив писем
self.email_data.append(mail)
def main():
sf = SpamFilter()
# читаем папку с json файлами
sf.read_train('jsondata')
sf.train_data(training_percent=50)
sf.print_stats()
if __name__=="__main__":
main()
sys.exit()