coding utf-8 from __future__ import absolute_import import feedparser

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import feedparser
from itertools import imap
import re
import logging
from source import atomstream
from util.html import convert_entities, strip_tags
RE_LINK = re.compile(r'<a [^>]+>', re.I)
RE_EN = re.compile(r'[a-z]', re.I)
RE_UPPER_EN = re.compile(r'[A-Z]')
def en_ratio(content):
en_count = len(RE_EN.findall(content))
length = len(content)
if length:
return round(en_count / (1.0 * length), 2)
else:
return 0
def check_en_ratio(html, text):
ratio = en_ratio(text)
if ratio < 0.7:
#logging.debug('Low en ratio %s' % ratio)
return False
else:
return True
def find_freq(text):
fmap = {}
total_count = 0
for word in text.split():
total_count += 1
count = fmap.get(word, 0)
count += 1
fmap[word] = count
for x, y in fmap.iteritems():
fmap[x] = round((100 * y) / (1.0 * total_count), 2)
return fmap
def check_freq(html, text):
freq_map = find_freq(text)
spam_words = filter(lambda x: x[1] > 50, freq_map.items())
if spam_words:
#logging.debug('Spam words: %s' % ', '.join('%s=%d%%' % x for x in spam_words.iteritems()))
return False
else:
return True
def upper_ratio(content):
#count = len(filter(lambda x: u'A' <= x <= u'Z', content))
count = len(RE_UPPER_EN.findall(content))
length = len(content)
if not length:
return 0
else:
return round(count / (1.0 * len(content)), 2)
def check_upper_ratio(html, text):
ratio = upper_ratio(text)
if ratio < 0.001:
#logging.debug('Low upper ratio %s' % ratio)
return False
else:
return True
def check_length(html, text):
length = len(text)
if length < 500:
#logging.debug('Low length %s' % length)
return False
else:
return True
def link_count(html):
return len(RE_LINK.findall(html))
def check_link_count(html, text):
count = link_count(html)
if count > 15:
#logging.debug('High number of links %s' % count)
return False
else:
return True
checkers = [check_length, check_link_count, check_upper_ratio, check_en_ratio, check_freq]
def fetch_items(limit=5):
count = 0
bad_count = 0
for feed in imap(feedparser.parse, atomstream.connect()):
if feed.entries:
entry = feed.entries[0]
content = convert_entities(entry.content[0]['value'])
stripped = strip_tags(content)
# If any checker return False than
# we can count the text as the spam
bad = any(not func(content, stripped) for func in checkers)
if bad:
bad_count += 1
else:
logging.debug('[ %s ] %s / %s' % (bad_count, entry.title, entry.link))
bad_count = 0
count += 1
item = {'title': entry.title,
'link': entry.link,
'content': content}
yield item
if count >= limit:
return