Threading UA

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!usr/bin/env python
#-*-encoding:UTF-8-*-
#==================<Імпортування необхідних модулів>====================
import urllib2
#Модуль для роботи з протоколом HTTP, високорівневий
import urllib
#Модуль для роботи з протоколом HTTP, більш низькорівневий ніж urllib2,
#фактично з нього необхідна одна функція - urllib.urlquote
import Queue
#Модуль, який представляє собою "Pool", фактично це список, в якому на
#потрібних місцях вставлені замки таким чином, щоб до нього одночасно
#міг звертатися лише один потік
import threading
#Модуль для работи з потоками, з нього знадобляться лише
#threading.active_count, threading.Thread, threading.Thread.start,
#threading.Rlock
import re
#Модуль для роботи з регулярними виразами, його використання виходить
#за межі статті
import time
#Модуль для роботи з часом, з нього потрібна лише функція sleep
queue = Queue.Queue()
#Обов`язкове присваювання, потрібно робити саме так (тобто імпортувати
#класс Queue з модуля Queue й ініціювати його)
#==================</Імпортування необхідних модулів>===================
#==============================<Налаштування>===========================
PROXY = "10.10.31.103:3128"
#Під час написання статті сиджу за проксі-сервером тому в статті торкаюся
#і цього питання, цим рядком обьявляєтся глобальна змінна PROXY, у якій
#знаходиться адреса проксі-сервера. Для роботи безпосередньо необхідно
#вказати значення None
HEADERS = {"User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1",
"Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1",
"Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7",
"Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1",
"Accept-Encoding" : "identity, *;q=0",
"Connection" : "Keep-Alive"}
#Для того, щоб отримати сторінку www.google.com НЕОБХІДНО використовувати
#заголовки браузеру, вони представлені вище в асоціативному масиві HEADERS,
#відповідають реальним заголовкам браузеру Opera з маленькою модифікацією, ці
#заголовки означають що клієнт не може приймати zlib compressed data, тобто
#стислі дані - не хотів я морочитися ще і з разархивіроанієм сторінок, тим
#більш що не всі сайти їх стискують...
THREADS_COUNT = 10
#В принципі це все налаштування додатка, це-кількість потоків
DEEP = 30
#Це - значення, яке відповідає за глибину сторінок пошуку, які потрібно
#переглядати, фактично ж визначає собою кількість посилань, які будуть
#зібрані зпарсером.
ENCODING = "UTF-8"
#Кодування ваших файлів (необхідно для переводу даних з вашого текстового
#документу з запросами до гуглу у юнікод)
#==============================</Налаштування>==========================
LOCK = threading.RLock() #Ось тут те вперше і торкаємося модуля threading
#створюється обьект LOCK, який є классом threading.RLock з
#модулю threading, це -простий замок, який забороняє виконання
#декількома потоками ділянки коди який йде після виклику його методу
#acquire() Основною відмінністю threading.RLock від threading.Lock
#(теж класс з модуля threading) є те, що кожен потік може звертатися до
#об`єкту threading.RLock необмежену кількість разів, обьект
#threading.Lock може викликатися кожним потоком лише один раз.
#///////////////////////////////////////////////////////////////////////
def worker():
#Обьявленіє функції worker, вхідних аргументів немає
global queue
#Тут і надалі я буду обьявляти змінні з глобального простору
#імен в локальному для кращої читабельності коди, хоча в написанні
#софта таке робити дуже не рекомендую (!)
while True:
#Запуск нескінченного циклу, в якому відбуватиметься робота
try:
#Обробка помилок, блок try/except, коли обробиться помилка
#Queue.Empty це означає, що список завдань порожній, і потік повинен
#завершити свою роботу
target_link = queue.get_nowait()
#Ця строчка втілює собою здобуття завдання потоком з
#списку завдань queue
except Queue.Empty, error:
#Сам перехват помилки
return
#Завершення роботи функції
parsed_data = get_and_parse_page(target_link)
#Пізніше буде реалізована функція, яка отримуватиме сторінку
#і діставати з неї необхідні значення
if parsed_data != "ERROR":
#Перевірка на те, чи була отримана сторінка
write_to_file(parsed_data)
#Також буде реалізована функція для запису зібраних даних у файл
else:
queue.put(target_link)
#Якщо сторінка не була отримана, то додаємо її назад в queue
#///////////////////////////////////////////////////////////////////////
def write_to_file(parsed_data):
#Оголошення функції write_to_file, аргумент – масив даних для запису
global LOCK
global ENCODING
LOCK.acquire()
#"Накидання замку", наступна далі ділянка коду може виконуватися
#лише одним потоком в один і той же момент часу
with open("parsed_data.txt", "a") as out:
#Використовується with statement, відкривається файл parsed_data.txt з
#правами "a", що означає дозапис в кінець файлу, і присвоюється
#хендлеру на файл ім'я out (я так звик)
for site in parsed_data:
#Прохід циклом по всіх елементах parsed data, ім'я активного в
#даний момент елементу буде site
link, title = site[0], site[1]
#Привласнення змінним link і title значень з кортежу site
title = title.replace(u"<em>", "").replace(u"</em>", "").replace(u"<b>", "").replace(u"</b>", "")
#.replace -це заміна HTML-тегів, які проскакують в title
#і зовсім не потрібні
out.write(u"{link}|{title}\n".format(link=link, title=title).encode(ENCODING))
#Проходить сам запис у файл, використовується оператор
#форматування рядків .format, на відміну від % він підтримує
#іменовані аргументи, чим я і не забув скористатися
#таким чином у файл пишеться рядок вигляду: посилання на сайт |
#title сторінки\n -символ перенесення рядка(все це переводиться
#з юникода у cp1251)
LOCK.release()
# "Відмикання" замку, інакше жоден з наступних
#потоків не зможе працювати з цією ділянкою коду. По-хорошому, тут теж потрібно
#зробити обробку помилок, але це учбовий приклад, та і помилка там може
#виникнути (після добавки замку в цю ділянку коди) лише якщо в час
#роботи додатка виставити атрибут “лише читання” для даного користувача
#відносно файлу parsed_data.txt
#///////////////////////////////////////////////////////////////////////
def get_and_parse_page(target_link):
#Визначення функції, аргумент – посилання на сторінку
global PROXY
#Вказує на те, що в даній функції використовується змінна PROXY
#з глобального простору імен
global HEADERS
#Те ж саме і для змінної Headers
if PROXY is not None:
#Якщо значення PROXY не дорівнює None
proxy_handler = urllib2.ProxyHandler( { "http": "http://"+PROXY+"/" } )
#Створюється Проксі-хендлер з вказаним проксі
opener = urllib2.build_opener(proxy_handler)
#Далі створюється opener c створеним раніше Прокси-хендлером
urllib2.install_opener(opener)
#І нарешті він встановлюється, тепер немає необхідності в
#шаманствах, всі запити в яких використовуватиметься urllib2
#(в межах цієї функції прямуватимуть через вказаний раніше
#PROXY)
page_request = urllib2.Request(url=target_link, headers=HEADERS)
#Створюється обьект Request, який втілює собою Request
#instance, фактично це GET запит до сервера з вказаними
#параметрами, мені ж необхідно використовувати заголовки...
try:
#Обробка всіх можливих помилок, що виникають під час здобуття
#сторінки, це недобре, але краще ніж повна відсутність обробки
page = urllib2.urlopen(url=page_request).read().decode("UTF-8", "replace")
#Змінній page привласнюємо прочитане значення сторінки запиту
#переведене в unicode з кодування UTF-8 (кодування
#використовувана на www.google.com) (у Python 2.6 unicode -це
#окремий тип даних(!))
except Exception ,error:
#Саме перехоплення помилки і збереження її значення в змінну error
print str(error)
#Виведення помилки в консоль, перед тим перевівши її в строку
#(просто на всякий випадок)
return "ERROR"
#Повернення з функції в тому випадку, якщо під час роботи виникла помилка
harvested_data = re.findall(r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''', page)
#Збір із сторінки пошуку посислань і title знайдених сторінок
#Очищення даних від результатів пошуку по картинкам, блогам і т.д. від гугла
for data in harvested_data:
#Для кожного елементу массиву harvested_data присвоюємо ім`я data
if data[0].startswith("/"):
#якщо нульовий елемент масиву data (посилання) починається з символу /
harvested_data.remove(data)
#Видаляємо його з массиву harvested_data
if ".google.com" in data[0]:
#якщо нульовий елемент масиву data (посилання) має в собі .google.com
harvested_data.remove(data)
#Також видаляємо його
return harvested_data
#Повертаємо зібрані значення з функції
#///////////////////////////////////////////////////////////////////////
def main():
#Оголошення функції, вхідних аргментов немає
print "STARTED"
#Вивід в консоль про початок процесу
global THREADS_COUNT
global DEEP
global ENCODING
#Оголошення про те що ці змінні використовуватимуться
#з глобального пространства імен
with open("requests.txt") as requests:
#Відкриваємо файл requests у якому знаходяться запити до пошукової системи
for request in requests:
#На даному файлхендлері доступний ітератор, тому можна
#пройтися по файлу циклом, без завантаження файл в оперативку, але це
#теж не важливо, я все одно його туди завантажу:)
request = request.translate(None, "\r\n").decode(ENCODING, "replace")
#Очищення запиту від символів кінця рядка а також переклад
#у юникод (з заміною конфліктних символів)
empty_link = "http://www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
#Це порожня адреса сторінки пошуку, відформатована
for i in xrange(0, DEEP, 10):
#Прохід ітератором по діапазону чисел від 0 до DEEP,
#який є максимальною глибиною пошуку з
#кроком в 10, тобто отримуємо з цього діапазону лише
#числа десятків, тобто 10, 20, 30 (як йде пошук в гугла)
queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))
#Додавання в чергу кожного посилання, що згенеровано
#і переведення його у кодування utf-8(для гуглу)
for _ in xrange(THREADS_COUNT):
#Прохід циклом по діапазону чисел кількості потоків
thread_ = threading.Thread(target=worker)
#Створюється потік target-ім'я функції, яка являє собою
#ділянку коду, виконувану багатопоточно
thread_.start()
#Викликається метод start(), таким чином потік запускається
while threading.active_count() >1:
#До тих пір, поки кількість активних потоків більша 1 (значить
#запущені потоки продовжують роботу)
time.sleep(1)
#Основний потік засинає на 1 секунду
print "FINISHED"
#Вивід в консоль про завершення роботи додатка
#///////////////////////////////////////////////////////////////////////
if __name__ == "__main__":
main()