Essa é uma apresentação que fiz na iMasters em abril último, no evento 7Masters sobre python.
Não é um curso de python, nem nada próximo disso, mas apenas uma visão de que telecom é na verdade um grande TI, com aplicações que todos nós já conhecemos bem.
Era uma apresentação que deveria levar 7 minutos. Gastei 20.
UPDATE: 2021-12-23 troquei o arquivo embedded de flash pra um gif animado uma vez que nenhum browser sério ainda suporta swf.
Não é de hoje que me deparo com esses erros de unicode em python.
Em algumas funções que uso, principalmente as que lêem de timeline de redes sociais, tenho problemas com caracteres. Dessa vez eu estava fazendo um programa que lê a timeline da rede social da empresa, que é baseada num produto lixo da Microsoft, o ShamePoint, digo, SharePoint, que publica a API de mensagens no formato RSS. A função é pra gerar um gráfico de mensagens a cada 5 minutos para verificar o andamento da rede social (que passou do momento de hype). Então como o ShamePoint é limitado em suas funções, resolvi fazer um "tracking" de posts através de uma função com um hash MD5 da mensagem postada.
A idéia não é muito complexa, mas eis que fazendo o hash eu achei um:
helio@shibboleet:pynet$ ./mynet_tag_posts.py Getting posts Reading output and rss Traceback (most recent call last): File "./mynet_tag_posts.py", line 106, inmain() File "./mynet_tag_posts.py", line 103, in main RSS = RSS_parser(XML) File "./mynet_tag_posts.py", line 60, in RSS_parser key = md5sum(rss.summary_detail.value) File "./mynet_tag_posts.py", line 19, in md5sum m.update(msg) UnicodeEncodeError: 'ascii' codec can't encode character u'\xc5' in position 96: ordinal not in range(128)
E a função de hash MD5, a md5sum(), é super simples:
def md5sum(msg): m = hashlib.md5() m.update(msg) return m.hexdigest()
Só que mensagens com caracteres não ASCII, como em português ou suéco, simplesmente matam o processamento do MD5.
Em outros scripts eu contornei isso fazendo uma análise por caractere e substituindo todo aquele que tivesse valor maior que 128 por seu semelhante não acentuado na tabela ASCII. Trabalho tosco, mas funcional.
Dessa vez resolvi procurar como corrigir isso. E pra sempre. Procurei na documentação do python sobre tratamento de unicode.
E não é que achei a solução do jeito mais simples e pythonista possível? Basta forçar um atributo de formato de caractere no texto, como texto.encode(formato). No caso, mudei a função para essa abaixo, usando formato de caracteres "utf-8":
def md5sum(msg): m = hashlib.md5() msg = msg.encode('utf-8') m.update(msg) return m.hexdigest()
E o problema foi resolvido. Sem mais chororo de formatos iso-8859-1 e utf-8.
Ultimamente, monitorando meu encurtador eri.cx, notei que o mesmo estava recebendo muitos links sem nenhum clique. Olhando mais de perto pude ver que os mesmos eram relacionados à SPAM/vírus. Alguns robôs de Internet, ou mesmo máquinas zumbis, que adicionam esses links suspeitos.
Um dos agressores foi claramente o IP 213.5.66.88. Resolvi o problema adicionando-o em duas regras de firewall bem simples:
iptables -A INPUT -p tcp --dport 80 -s 213.5.66.88 -j LOG
iptables -A INPUT -p tcp --dport 80 -s 213.5.66.88 -j REJECT
Criei uma tabela chamada quarantine e migrei as entradas do banco para lá, manualmente. E também notifiquei os administradores do site sobre o ocorrido, enviando mail para abuse@site e security@site.
Achei que a solução era perfeita e que tinha resolvido meus problemas. Essa semana dei uma olhada novamente na base do encurtador. De 900 links inseridos, uns 350 eram desse tipo (malditos SPAMMERS!).
Chorei minhas mágoas no Twitter, bem no estilo "reclamar no Twitter muito", e recebi algumas sugestões para evitar esses ataques. Mesmo meu guru mental para idéias técno-copiadas, Eduardo Maçan, que administra o encurtador miud.in -- do qual chupinhei várias coisas, sugeriu uma verificação do encurtador via web através do refereer antes de encurtar qualquer link.
Pensei em fazer isso, mas tornaria impossível notificar os administradores dos sites afetados que suas máquinas estão comprometidas, e fora o problema de performance que poderia ocorrer ainda mais que o hosting que contratei tem somente 64 MB de RAM e CPU compartilhada num ambiente Xen. Então resolvi mudar a abordagem e simplesmente deixar as máquinas zumbis adicionarem as URLs "do mal". Mas uma vez por dia eu rodo um programa em Python que faz a busca de padrões desses vírus/scammers, move pra base do quarentine (assim ninguém clicará nos mesmos) mantendo as mesmas entradas, e comunica ao mails abuse@domínio e security@domínio sobre o ataque.
Então segue o script que fiz para esse fim.
#! /usr/bin/python
# -*- coding: utf-8 -*-
# $Id:$
# COPYRIGHT
#
# Copyright (c) 2011 by Helio A. L. Loureiro
#
# Permission to use, copy, modify, distribute, and sell this software
# for any purpose is hereby granted without fee, provided that the above
# copyright notice appear in all copies and that both that copyright
# notice and this permission notice appear in supporting documentation.
# No representations are made about the suitability of this software for
# any purpose. It is provided "as is" without express or implied
# warranty.
### main code
from os import popen
from re import sub, search
import smtplib
from time import ctime,strftime
mysqldb = "eri.cx"
mysqluser = "eri.cx"
mysqlpasswd = "xc.ire"
log_dir = "/var/log"
# Enviando mensagens
def Log(msg):
log_file = log_dir + "/report-sql-spam-" + \
strftime("%Y-%m") + ".log"
msg = msg.rstrip("\n")
msg = "[ " + ctime() + " ] " + msg + "\n"
log = open(log_file, "a")
log.write(msg)
log.close()
# Formatando o texto do mysql
def GenSQLCommand(sql):
cmd = "mysql -u %s -p\'%s\' %s << EOF" % (mysqluser, mysqlpasswd, mysqldb)
cmd = cmd + "\n" + sql + "\n" + "EOF\n"
return cmd
def RetrieveMySQL():
sql = """SELECT
yourls_url.keyword,
yourls_url.url,
yourls_url.title,
yourls_url.timestamp,
yourls_url.ip,
yourls_url.clicks
FROM yourls_url WHERE
yourls_url.title like '%dingo.ucsf.edu%' OR
yourls_url.title like '%cssa.grad.wustl.edu%' OR
yourls_url.title like '%wiki.usfca.edu%' OR
yourls_url.title like '%pantherfile.uwm.edu%' OR
yourls_url.title like '%csci.marietta.edu%' OR
yourls_url.title like '%webfiles.uci.edu%' OR
yourls_url.title like '%Ultram %' OR
yourls_url.title like '%Cialis%' OR
yourls_url.title like '%Tramadol%' OR
yourls_url.title like '%xanax%' OR
yourls_url.title like '%viagra%' ORDER BY yourls_url.keyword;
"""
cmd = GenSQLCommand(sql)
sql_result = popen(cmd).readlines()
sql_result = sql_result[1:]
return sql_result
def InsertQuarantine(db_lines):
sql = """INSERT INTO yourls_quarantine
(keyword, url, title,timestamp,ip,clicks) VALUES """
for e in db_lines:
sql = sql + "\n( "
e = e.rstrip("\n")
for d in e.split("\t"):
sql = sql + "\'" + d + "\', "
sql = sql[:-2] + " ),"
sql = sql[:-1]
cmd = GenSQLCommand(sql)
popen(cmd).read()
def DeleteEntries(keywords):
sql = """DELETE FROM yourls_url USING yourls_url WHERE """
sql = sql + "keyword = \"%s\"" % keywords[0]
for k in range(1, len(keywords)):
sql = sql + "OR keyword = \"%s\"" % keywords[k]
cmd = GenSQLCommand(sql)
popen(cmd).read()
def GetDomain(url):
url = sub("^http://", "", url)
url = sub("^https://", "", url)
url = sub("/.*", "", url)
return url
def SendMail(rcpts, msg):
fromaddr = "helio" + "@" + "domain.com"
mail = smtplib.SMTP('localhost')
mail.sendmail(fromaddr, rcpts, msg)
mail.quit()
def Main():
spam_list = RetrieveMySQL()
ATTACKED = {}
KEYWORDS = []
for spam in spam_list:
Log(sub("\t", ",", spam))
keyword, url, title, timestamp, ip, clicks = spam.split("\t")
domain = GetDomain(url)
KEYWORDS.append(keyword)
try:
ATTACKED[domain].append(spam)
except KeyError:
ATTACKED[domain] = [spam]
for poor_soul in ATTACKED:
rcpts = ["abuse@" + poor_soul,
"root@" + poor_soul,
"security@" + poor_soul]
msg = """From: Helio Loureiro <""" + "helio" + "@" + "domain.com" + """>
To: abuse@%s, root@%s, security@%s
Subject: Service abused from your domain
Hi,
I'm the sysadmin from domain eri.cx, a little poor shortener.
Recently I noticed some attacks (SPAM url shorteneds) coming from
your site.
Here follows the logs:
#timestamp|ip|title|url
""" % (poor_soul, poor_soul, poor_soul)
for spam in ATTACKED[poor_soul]:
keyword, url, title, timestamp, ip, clicks = spam.split("\t")
msg = msg + "%s|%s|%s|%s\n" % (timestamp, ip, title, url)
#print poor_soul, "=>", ATTACKED[poor_soul]
msg = msg + """
It seems your server was compromised. So please take actions accordingly.
This is an automatic mail report. My apologizes if it sent unwanted
information.
Best Regards,
Helio Loureiro
http://eri.cx
"""
#print msg
SendMail(rcpts, msg)
InsertQuarantine(spam_list)
DeleteEntries(KEYWORDS)
if __name__ == '__main__':
Log("== Starting... == ")
Main()
Log("## Finishing... ## ")
De tanto @eduardomacan e @ale_borba reclamarem dos meus posts repetitivos no twitter, aqui segue então mais uma dica com Python pra dar um refresco na repetitividade.
Tenho algumas máquina velhas na empresa, umas Sun Ultra 10, que uso pra compilar e testar código. Uma com Solaris 10 e outra com o 9 (ambas fazendo jus ao nome Slowlaris). Elas tinham IP fixo e ficavam num ambiente de laboratório, mas recentemente resolveram tentar *cobrar* o espaço físico e a energia elétrica, além da "administração do sistema". Resolvi então pegar ambas e as colocar próximas de mim, no ambiente de escritório.
O maior problema disso é que o ambiente disponibiliza IP somente por DHCP. Então como resolver a questão de saber qual o IP da máquina para fazer conexão via SSH, uma vez que o mesmo pode mudar a cada novo boot (e realmente muda por aqui)?
Como sempre, Python veio para salvar o dia. Uso o programa abaixo para enviar por mail qual o IP do servidor. É feito para Solaris, por isso algumas definições são específicas, como o uso da interface "hme0" e a localização do comando "ifconfig". Mas tudo pode ser adaptado facilmente para outra plataforma.
#! /usr/bin/python
import smtplib
from os import popen
from string import split
from re import search, sub
from socket import gethostname
from time import ctime
interface = "hme0"
ifconfig = "/usr/sbin/ifconfig"
mailserver = "mail.internal.domain.com"
def SendMail(rcpts, ip):
fromaddr = "helio@domain.com"
subject = "== Location update: %s (%s) == " % (ip, gethostname())
msg = "From: %s\nTo: " % fromaddr
for n in rcpts:
msg = msg + n + ","
msg = msg[:-1] + "\nSubject: %s" % subject
msg = msg + """
Location update from host %s: %s
%s
-- Powered by Python --
""" % (gethostname(), ip, ctime())
mail = smtplib.SMTP(mailserver)
#mail.set_debuglevel(1)
print "Sending:", msg
mail.sendmail(fromaddr, rcpts, msg)
mail.quit()
def GetIP():
for line in popen(ifconfig + " " + interface).readlines():
#print line
if search("inet", line):
line = line[:-1]
line = sub(".*inet ", "", line)
line = sub(" .*", "", line)
return line
def Main():
ip = GetIP()
rcpts = [ "helio@domain.com",
"billgates@msn.com",
"stallman@gnu.org"
]
SendMail(rcpts, ip)
if __name__ == '__main__':
Main()
Em seguida apenas adicionei o mesmo dentro do "/etc/init.d" e fiz o link em "/etc/rc3.d".
Após as partes 1 e 2 sobre como escrever uma aplicação com python-twitter, onde foi visto como instalar o pacote python-twitter ou via fonte ou via pacote Debian, e como registrar sua aplicação no Twitter, finalmente chegamos ao final: escrevendo a aplicação em python.
O objetivo de toda essa série de descrição de como utilizar uma aplicação de twitter-python era mostrar como fiz um script para automatizar o #FF, enviado às sexta-feiras.
Inicialmente escrevi o seguinte script que fazia exatamente o #FF para todos os meus seguidores:
#! /usr/bin/python
import twitter
# App python-tweeter
# https://dev.twitter.com/apps/815176
api = twitter.Api(
consumer_key = 'xxxxxxx',
consumer_secret = 'yyyyyyyy',
access_token_key = 'zzzzzzz',
access_token_secret = 'zzzzzzzz'
)
FOLLOWERS = []
users = api.GetFollowers()
for u in users:
sc_name = str(u.screen_name)
FOLLOWERS.append(['@' + sc_name])
SIZE = 140
MSG = "#FF"
i = 1
api.PostUpdate("Automated python-twitter-#FF mode=on")
for uid in FOLLOWERS:
print i, uid
if ( len(MSG + " " + uid) > SIZE):
print MSG
api.PostUpdate(MSG)
sleep(60)
MSG = "#FF " + uid
else:
MSG += " " + uid
i += 1
print MSG
api.PostUpdate(MSG)
Esse pequeno script mostrou uma limitação do uso do python-twitter 0.8.1: a lista de usuários era buscada somente com os último 100 seguidores. O demais não retornavam. Eu corrigi esse comportamento e lançei o "0.8.2", mas na verdade ainda não foi aprovado.
Basicamente o módulo twitter, ou classe, permite buscar todos os seguidores com o método GetFollowers(). Cada seguidor retornado volta como uma matriz (array) de seguidores, com seus parâmetros como atributo do objeto retornado. Então como exemplo um dos meus seguidores:
>>> print users[0]
{"description": "Expert in software development, photographer, open networker,
writer, handyman, old DJ+radiohost, idealist, creative, easily inspired,
very independent, spunky",
"favourites_count": 44,
"followers_count": 27800,
"friends_count": 30580,
"geo_enabled": true,
"id": 5377742,
"location": "Copenhagen, Denmark",
"name": "J\u00f8rgen Larsen",
"profile_background_color": "000000",
"profile_background_tile": true,
"profile_image_url":
"http://a0.twimg.com/profile_images/728474665/J_rgen_Larsen_org_nobackground_small_128x128_normal.png",
"profile_link_color": "000000",
"profile_sidebar_fill_color":
"http://a1.twimg.com/profile_background_images/78764489/background3.jpg",
"profile_text_color": "333333",
"protected": false,
"screen_name": "porcupinex",
"status": {"created_at": "Fri Apr 01 06:36:13 +0000 2011",
"favorited": false,
"id": 53707220474544128,
"source": "HootSuite",
"text": "RT @renejsum: RT @tinybuddha \"We must accept finite disappointment, but never lose infinite hope.\"
~Martin Luther King Jr. (Til @jesperdahl)",
"truncated": false},
"statuses_count": 1500,
"time_zone": "Copenhagen",
"url": "http://on.fb.me/porcupinex",
"utc_offset": 3600}
chamando cada user[] como follower (for follower in user:), é possível verificar seus atributos usando algo como follower.url, follow.screen_name e assim por diante. Em geral o que se vê no twitter é no formato @helioloureiro, que é uma referência ao atributo screen_name, esse sem o "@" originalmente.
Para publicar algo usando python-twitter, basta utilizar o método PostUpdate()
api.PostUpdate("Testando via twitter...")
Tudo isso já estaria de bom tamanho se não fossem duas coisas: o @ale_borba reclamou do flood de #FFs e realmente eu não queria mandar #FF para todo mundo, mas somente para quem realmente participa no Twitter. Como eu tenho a política de seguir de volta quem me segue, tem muitos "robôs" que eu gostaria de evitar. Então resolvi criar um grupo "amigos" e buscar somente os seguidores desse grupo:
Acabei escrevendo a busca de seguidores por grupo sem utlizar a API do python-twitter, com urllib2 e json:
lstname = 'Amigos'
AMIGOS = []
NOMES = {}
FOLLOWERS = {}
parameters = {}
parameters['cursor'] = -1
ic = 0
while (parameters['cursor'] != 0):
url = '%s/%s/%s/members.json' % (api.base_url, user,lstname)
json = api._FetchUrl(url, parameters=parameters)
data = simplejson.loads(json)
for u in data['users']:
AMIGOS.append(u)
parameters['cursor'] = int(data['next_cursor_str'])
if (ic == 10): break
ic += 1
Para evitar ainda um flood de mensagens (ou trollada), inclui ainda um randrange() pra criar uma aleatoriedade nos #FFs. Comecei com 50%, mas aumentei pra 33% (1/3). Também adicionei um belo sleep() para evitar o envio massivo de #FF, tornando um pouco mais suave as indicações.
O código final que utilizo atualmente é esse:
#! /usr/bin/python
import twitter
from time import sleep
from sys import exit
import simplejson
from random import randrange
# App python-tweeter
# https://dev.twitter.com/apps/815176
api = twitter.Api(
consumer_key = 'xxxxxx',
consumer_secret = 'yyyyyy',
access_token_key = 'zzzzzzz',
access_token_secret = 'kkkkkkkk'
)
user = 'helioloureiro'
lstname = 'Amigos'
AMIGOS = []
NOMES = {}
FOLLOWERS = {}
parameters = {}
parameters['cursor'] = -1
ic = 0
while (parameters['cursor'] != 0):
url = '%s/%s/%s/members.json' % (api.base_url, user,lstname)
json = api._FetchUrl(url, parameters=parameters)
data = simplejson.loads(json)
for u in data['users']:
AMIGOS.append(u)
parameters['cursor'] = int(data['next_cursor_str'])
if (ic == 10): break
ic += 1
for a in AMIGOS:
NOMES[a['screen_name']] = 1
users = api.GetFollowers()
for u in users:
status = ""
try:
if (NOMES[u.screen_name] == 1):
FOLLOWERS['@' + u.screen_name] = 1
status = "OK"
except:
status = "NOT"
print u.screen_name, status
SIZE = 140
MSG = "#FF"
i = 1
api.PostUpdates("Automated python-twitter-#FF mode=on")
for uid in FOLLOWERS.keys():
if not (randrange(0,2)):
print "Nao foi:", uid
continue
print i, uid
if ( len(MSG + " " + uid) > SIZE):
print MSG
api.PostUpdate(MSG)
sleep(60)
MSG = "#FF " + uid
else:
MSG += " " + uid
i += 1
sleep(60)
print MSG
api.PostUpdate(MSG)
api.PostUpdates("Python Twitter #rockz!!!")
api.PostUpdates("Automated python-twitter-#FF mode=off")
Com esse princípio de uso do python-twitter já criei várias aplicações, inclusive para re-enviar minhas publicações daqui e ainda utilizando o eri.cx para encurtar os link.