Reusando o código que escrevi pra tirar snapshots durante a PyConSe e publicar automaticamente no Twitter, escrevi um pequeno aplicativo pra raspberypi com Python pra pegar o mesmo tipo de imagem, mas da minha janela, e ir acompanhando a evolução do tempo ao longo do dia e do ano. Essa é a imagem que ilustra o início do post.
Acho que será legal fazer uma animação das imagens mostrando o sol que brilha até quase 11 da noite, o inverno que escurece às 2 da tarde, e a neve chegando. E tudo postando no Twitter.
As ferramentas são as mais simples possível: um raspberrypi conectado com um dongle wifi e uma webcam USB creative (que aliás uso pra participar dos hangouts). E sempre Python pra fazer tudo.
Descobri que o Forecast.IO fornece uma API com JSON pra buscar a previsão do tempo atual e até 10 dias, com permissão de 1000 queries por dia de forma gratuita. Perfeito pro meu pequeno projeto. O mais difícil foi fazer a conversão da temperatura de Farenheit pra Celsius (meus dias de vestibulando já se foram faz muito tempo), mas pedi ajuda à Internet pra isso. Fiz uma pequena função que retorna os dados que quero em forma de um array.
import requests import json import time """ Um monte de código por aqui [...] """" def get_content(): timestamp = time.strftime("Date: %Y-%m-%d %H:%M", time.localtime()) msg = [] msg.append("Stockholm") msg.append(timestamp) url = "https://api.forecast.io/forecast/%s/%s" % (wth_key, wth_loc) req = requests.get(url) jdata = json.loads(req.text) summary = jdata["currently"]["summary"] temp = jdata["currently"]["temperature"] temp = Far2Celsius(temp) msg.append(u"Temperature: %s°C" % temp) msg.append("Summary: %s" %summary) return msg
A primeira coisa que precisei alterar foi a adição de textos à imagem. Tendo a informação vinda do Forecast.IO, eu precisava modificar a imagem pra que ela aparecesse. No início eu usei uma fonte de cor branca, mas logo percebi que preto ficava com um contraste melhor. Mas quando chegar o inverno, época em que os dias são realmente muito curtos por aqui, vou precisar pensar numa forma pra trocar para branco. Mas no momento usei as bibliotecas do PIL que manipulam imagem em Python.
import Image import ImageFont, ImageDraw, ImageOps IMGSIZE = (1280, 720) BLACK = (0, 0, 0) WHITE = (255, 255, 255) """ Um monte de código por aqui [...] """" def WeatherScreenshot(): msg = get_content() if not msg: msg = "Just another shot at %s" % \ time.strftime("%H:%M", time.localtime()) if msg: msg_body = "\n".join(msg[1:]) im = Image.open(filename) # just get truetype fonts on package ttf-mscorefonts-installer try: f_top = ImageFont.truetype(font="Arial", size=60) except TypeError: # older versions hasn't font and require full path arialpath = "/usr/share/fonts/truetype/msttcorefonts/Arial.ttf" f_top = ImageFont.truetype(arialpath, size=60) try: f_body = ImageFont.truetype(font="Arial", size=20) except TypeError: # older versions hasn't font and require full path arialpath = "/usr/share/fonts/truetype/msttcorefonts/Arial.ttf" f_body = ImageFont.truetype(arialpath, size=20) txt = Image.new('L', IMGSIZE) d = ImageDraw.Draw(txt) d.text( (10, 10), msg[0], font=f_top, fill=255) position = 80 for m in msg[1:]: d.text( (10, position), m, font=f_body, fill=255) position += 20 w = txt.rotate(0, expand=1) im.paste(ImageOps.colorize(w, BLACK, BLACK), (0,0), w) im.save(filename)
descobri que a versão de raspbian que estou usando, baseado em Debian Wheezy, tem uma API um pouco diferente e pode precisar que a fonte com o path completo seja passada no argumento.
Outra alteração foi mudar a chamada pra webcam capturar a imagem que era uma função mas modifiquei pra uma thread. Assim o tempo fica consistente. Do contrário ao invés de mostrar 12:00 apareceria algo como 12:03 (o tempo pra adquirir a imagem).
import threading def WeatherScreenshot(): th = threading.Thread(target=GetPhoto) th.start() msg = get_content() th.join()
E já que mencionei a imagem, esse foi o maior problema até agora. Descobri que não existe uma forma muito confiável de inicializar a webcam. Às vezes ela adquiri a imagem de forma bonitinha, às vezes fica super exposta, outras vezes sub.
E não tem nada que dê um feedback sobre a qualidade. Li vários artigos com dicas de uso com pygame, que é a forma que uso, e com opencv também, mas todas com o mesmo princípio. Basicamente fazem um start() no framework da webcam, que inicializa a webcam, adquirem um número de imagens aleatórios (alguns dizem 30) e esperam pelo melhor ao capturar a imagem. Nada que retorne um indicador de qualidade. Nada.
DISCARDFRAMES = 2 * 30 def GetPhoto(): filename = None pygame.init() pygame.camera.init() elif os.path.exists("/dev/video0"): device = "/dev/video0" if not device: print "Not webcam found. Aborting..." sys.exit(1) # you can get your camera resolution by command "uvcdynctrl -f" cam = pygame.camera.Camera(device, IMGSIZE) cam.start() time.sleep(3) counter = 10 while counter: if cam.query_image(): break time.sleep(1) counter -= 1 # idea from https://codeplasma.com/2012/12/03/getting-webcam-images-with-python-and-opencv-2-for-real-this-time/ # get a set of pictures to be discarded and adjust camera for x in xrange(DISCARDFRAMES): while not cam.query_image(): time.sleep(1) image = cam.get_image() image = cam.get_image()
Basicamente um método de tentativa e erro. Por isso que iniciei a chamada à webcam como thread. Como as webcams USB tem CPU própria, não tem - até onde pesquisei - uma API confiável pra verificar se o balanço de branco normalizou antes de capturar a imagem. Só retornam a própria imagem. Tosco.
Então resolvi fazer um outro script como módulo, que basicamente mapeia toda a imagem em seu tamanho e cria um dicionário do tipo "COR: quantas vezes". Descobri que valores RGB (pega o valor de R + G + B, soma e divide por 3 pra ter a média) acima de 235 já indicam super exposição. Não só isso. Como eu conto a quantidade que aquele valor RGB aparece, sempre que um valor sobressai acima de 15% do total, já indica uma imagem ruim. Não é um dos melhores métodos científicos, mas tem funcionando bem (verifiquei nas imagens já adquiridas e salvas). Os tempos de aquisição de imagem mudaram de até 1 minuto pra em torno de 10 minutos. Mas por enquanto com qualidade muito melhor.
import Image def brightness(filename): """ source: http://stackoverflow.com/questions/6442118/python-measuring-pixel-brightness """ img = Image.open(filename) #Convert the image te RGB if it is a .gif for example img = img.convert ('RGB') RANK = {} #coordinates of the pixel X_i,Y_i = 0,0 (X_f, Y_f) = img.size #Get RGB for i in xrange(X_i, X_f): for j in xrange(Y_i, Y_f): #print "i:", i,",j:", j pixelRGB = img.getpixel((i,j)) R,G,B = pixelRGB br = sum([R,G,B])/ 3 ## 0 is dark (black) and 255 is bright (white) if RANK.has_key(br): RANK[br] += 1 else: RANK[br] = 1 color_order = [] pic_size = X_f * Y_f print "Picture size:", pic_size for k in sorted(RANK, key=RANK.get, reverse=True): amount = RANK[k] # if low than 15%, ignore if amount < (.15 * pic_size): continue print k, "=>", RANK[k] color_order.append(k) if color_order: print color_order return -1 return 0
O código todo está disponível no meu github.
https://github.com/helioloureiro/snapshot-twitter
E provavelmente devo lançar um gif animado posteriormente com o decorrer do clima ao longo do ano.
Tenho alguns problemas como concorrência no caso de tentar adquirir uma imagem ao mesmo tempo que a crontab tentar fazer isso (implementei uma API em REST pra isso, mas não é algo pra publicar :). Devo implementar algum tipo de lock usando /tmp, mas algo simples.
E agora no verão, com sol até quase 11 horas da noite, tenho também um pequeno problema de negação de serviço que às vezes acontece.
Ainda não descobri um módulo em Python pra mitigar isso :)
Meu novo queridinho de programação é um raspberrypi. Tenho feito coisas interessantes com ele usando Python. E em breve teremos a PyCon Sweden acontecendo por aqui. Então resolvi criar um robôzinho de twitter pra postar snapshots da apresentação. Pretendo colocar minha webcam externa nele (uso pros hangouts) e deixar ele pegando as imagens da conferência e postando.
No script a mensagem é estática, mas eu pretendo alterar para algo que pegue uma lista com horários, nomes e temas pra deixar tudo junto na postagem. Vai ficar interessante.
O código parcialmente feito, que veio o obamawatch, é esse aqui:
#! /usr/bin/python -u # -*- coding: utf-8 -*- """ Based in: http://stackoverflow.com/questions/15870619/python-webcam-http-streaming-and-image-capture """ SAVEDIR = "/tmp" import pygame import pygame.camera import time import sys import os import twitter import ConfigParser configuration = "/home/helio/.twitterc" def TweetPhoto(): """ """ print "Pygame init" pygame.init() print "Camera init" pygame.camera.init() # you can get your camera resolution by command "uvcdynctrl -f" cam = pygame.camera.Camera("/dev/video1", (1280, 720)) print "Camera start" cam.start() time.sleep(1) print "Getting image" image = cam.get_image() time.sleep(1) print "Camera stop" cam.stop() timestamp = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) year = time.strftime("%Y", time.localtime()) filename = "%s/%s.jpg" % (SAVEDIR, timestamp) print "Saving file %s" % filename pygame.image.save(image, filename) cfg = ConfigParser.ConfigParser() print "Reading configuration: %s" % configuration if not os.path.exists(configuration): print "Failed to find configuration file %s" % configuration sys.exit(1) cfg.read(configuration) cons_key = cfg.get("TWITTER", "CONS_KEY") cons_sec = cfg.get("TWITTER", "CONS_SEC") acc_key = cfg.get("TWITTER", "ACC_KEY") acc_sec = cfg.get("TWITTER", "ACC_SEC") print "Autenticating in Twitter" # App python-tweeter # https://dev.twitter.com/apps/815176 tw = twitter.Api( consumer_key = cons_key, consumer_secret = cons_sec, access_token_key = acc_key, access_token_secret = acc_sec ) print "Posting..." tw.PostMedia(status = "Testing python twitter and PostMedia() for #pyconse timestamp=%s" % timestamp, media = filename) print "Removing media file %s" % filename os.unlink(filename) if __name__ == '__main__': try: TweetPhoto() except KeyboardInterrupt: sys.exit(0)
Acabei usando /dev/video1 pois estava testando a webcam no laptop, que já tem outra webcam interna e quando rebootei com a câmera externa, acabou jogando a dele pra esse device.
Outra melhoria que implementei foi a de mover os tokens de autenticação pra um arquivo externo e ler via ConfigParser(). Assim fica mais limpo o código e possível de enviar pro github (e sem mandar suas chaves privadas junto :).
Vou descrever aqui mais uma dica de uso que um processo ou ferramenta. Como faz vários anos que programo em python, em certo ponto achei razoável adicionar uma variável e parâmetro pra debug. Então todos meu programas em python em geral tem uma estrutura mais ou menos assim:
#! /usr/bin/python
def debug(msg):
print "DEBUG: %s" % msg
class MinhaClasse:
código
código
código
if __name__ == '__main__':
o = MinhaClasse()
o.main()
Então o que faço em geral é ter uma função debug(), mesmo que use classe e orientação à objetos, pra facilitar a chamada. Porque eu uso a função? Se eu usar como método dentro da classe, tem de chamar toda vez como self.debug(). Como não vejo muita vantagem nisso, prefiro definir sempre como função no topo do código.
Mas esse é um exemplo pra mostrar o princípio. O que uso é um pouco mais elaborado que isso. Vamos melhorar esse código pra entender melhor criando alguns métodos como __init__() e main().
#! /usr/bin/python
import getopt
DEBUG = False
def debug(msg):
if DEBUG:
print "DEBUG: %s" % msg
class MinhaClasse:
def __init__(self):
debug("Construtor da classe")
def fazalgo(self):
debug("Fazendo algo")
def main(self):
debug("Chamando main")
self.fazalgo()
if __name__ == '__main__':
try: opts, args = getopt.getopt(sys.argv[1:], "d") for opt, arg in opts: if opt == "-d": DEBUG = True debug("DEBUG ENABLED") except getopt.GetoptError: pass
if os.environ.has_key("DEBUG"): DEBUG = True
o = MinhaClasse()
o.main()
Primeiramente eu adicione algum tipo de verificação de opção. Pode ser com getopt como argparse. Como é uma opção simples, pra verificar o parâmetro "-d", de debug ativo, usei getopt. Em seguida usei uma variável global DEBUG, que fica como padrão em False, ou seja, desligado.
Quando faço a chamada na parte de baixo, onde __name__ é '__main__', verifico a opção via flag ou via variável de shell. Isso quer dizer que se eu usar o script de 2 formas, terei debug ativado:
> ./meuscript.py -d
ou
> env DEBUG=1 ./meuscript.py
a segunda forma ajuda no caso de ter um sistema mais complexo e um shell script chamar seu programa. Daí se vários scripts verificarem as variáveis de shell pra buscar por DEBUG (ou $DEBUG), fica fácil ativar/desativar.
E assim até hoje eu debugo meus programas. Claro que fiz alguns aperfeiçoamentos como uma função que imprime o nome do método que está rodando.
def __funcname__(depth=0): """ To print function names like __func__ in C. """ return "__function__: " + sys._getframe(depth + 1).f_code.co_name + "()"
Assim, dentro de um método, posso usar debug da seguinte forma:
class MinhaClasse:
def __init__(self):
debug(__funcname__)
E isso ajuda ao imprimir o nome da função corrente, da mesma forma que se usa a macro__FUNCTION__ em C. Essa dica eu achei recentement no StackOverflow:
http://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function-without-using-traceback
E por último, e acabei com o tempo refinando meu debug(). Ao invés de de somente aceitar string, eu fiz um seletor de tipo pra imprimir qualquer variável, inclusive dicionários no formato json pra ficar mais fácil ler.
#! /usr/bin/python import json import getopt DEBUG = False def debug(msg): """ Debug helper """ if DEBUG: if type(msg) == type("abc"): # it is ok None elif type(msg) == type({}): msg = "%s" % json.dumps(msg, indent=4) elif type(msg) == type([]): msg = "[ %s ]" % ", ".join(msg) msg = "DEBUG(%s): %s" % (__file__, msg) print msg syslog.syslog(syslog.LOG_DEBUG, msg) def __funcname__(depth=0): """ To print function names like __func__ in C. """ return "__function__: " + sys._getframe(depth + 1).f_code.co_name + "()" class MinhaClasse: def __init__(self): debug(__funcname__) debug("Construtor da classe") self.nome = "Helio" self.sobrenome = "Loureiro" debug("Nome: %s" % self.nome) debug("Sobrenome: %s" % self.sobrenome) self.dados = {} def fazalgo(self): debug(__funcname__) debug("Fazendo algo") for k, v in self.dados.items(): debug("%s => %s" % (k, v) ) def main(self): debug(__funcname__) self.dados = { "Nome" : self.nome, "Sobrenome" : self.sobrenome } debug(self.dados) self.fazalgo() if __name__ == '__main__': try: opts, args = getopt.getopt(sys.argv[1:], "d") for opt, arg in opts: if opt == "-d": DEBUG = True debug("DEBUG ENABLED") except getopt.GetoptError: pass if os.environ.has_key("DEBUG"): DEBUG = True o = MinhaClasse() o.main()
Como escrevi anteriormente, não é um padrão fazer isso. Existem módulos que ajudam a debuggar de forma até mais profunda. Eu gosto de escrever minhas mensagens de debug pra filtrar melhor as mensagens e poder ver o que realmente importa. Então fica aqui a dica.
No nosso ambiente de desenvolvimento utilizamos Jenkins pra Continuous Integration, ou seja, a cada "commit" de código, compilar e testar a versão de uma forma automatizada. Cada um pode conectar no servidor Jenkins e visualizar o status do build, mas deixamos um display aberto para que todos possam ver como as coisas andam (e não deixar um release quebrado parado pra sempre).
Nada melhor que usar um raspberrypi pra essa atividade, já que é um dispositivo pequeno e com baixo consumo de energia. Aliás é o treco pendurando pra baixo da TV. O único incoveniente é que pra mostrar mais de um servidor Jenkins, as opções atuais são pra utilizar algum código javascript que faz com que a tela troque de URL.
Essas soluções funcionam muito bem em PCs x86_32 e x86_64, mas em um raspberrypi... as limitações de CPU são grandes. Como os browsers que suportam isso são chrome/chromium e firefox, o efeito indesejado é essa "lentidão" em renderizar a página, como mostra a imagem.
Outro problema é que seu um dos servidores Jenkins estiver fora do ar, esses javascripts "quebram", não fazendo a transição pra url seguinte.
Pra completar o problema, decidimos mudar de chromium pra epyphany, pois o mesmo usa muito menos memória que o chromium, e menos CPU. Mas nem tudo é perfeito: o epyphany não suporta script pra trocar entre várias URLs como o chrome.
Durante essas trocas browsers e experimentos, troquei umas mensagens com o Gustavo Noronha, vulgo Kov, que é um dos mantenedores do epyphany, perguntando sobre as possibilidades do mesmo. Ele disse que epyphany não suportava esse tipo de plugin, mas que eu poderia escrever minha própria aplicação em webkit. Então...
Demorou. Acho que trocamos essas mensagens, via twitter, lá pro meio de outubro. Já é quase Natal. Mas finalmente escrevi alguma coisa.
Comecei com um pequeno script baseado num código em C++ com Qt:
#include #include int main(int argc, char **argv){ QApplication app(argc, argv); QWebView wv; QWebPage *page = wv.page(); QWebSettings *settings = page->settings(); settings->setAttribute(QWebSettings::JavascriptEnabled, true); settings->setAttribute(QWebSettings::PluginsEnabled, true); wv.load(QUrl("http://www.youtube.com")); wv.show(); return app.exec();
Acabei trocando o script simples pra uma classe em python, e usando threads. O que era simples ficou um pouco mais... vamos chamar de "refinado". Mas está funcionando. E com menos memória e CPU, que era o objetivo inicial.
Quem quiser dar uma brincada ou mesmo usar, o código está disponível no GitHub:
Como sempre acontece, foram publicados os vídeos do FISL15. Eu, como bom nerd, baixei todos eles usando wget.
wget -nH -np -r --mirror http://hemingway.softwarelivre.org/fisl15/high/
Eu comecei baixando sem a opção "--mirror", mas como são 18 GB de vídeos, não consegui terminar no mesmo dia. E pra não sobrescrever, acabei usando esse parâmetro pra baixar somente os vídeos faltantes ou que estavam pela metade.
No fim acabei com um diretório com vídeos como esses:
sala40t-high-201405071002.ogv
sala40t-high-201405071059.ogv
sala40t-high-201405071200.ogv
sala40t-high-201405071309.ogv
sala40t-high-201405071400.ogv
sala40t-high-201405071505.ogv
sala40t-high-201405071559.ogv
sala40t-high-201405071704.ogv
sala40t-high-201405081002.ogv
sala40t-high-201405081059.ogv
sala40t-high-201405081201.ogv
sala40t-high-201405081302.ogv
E agora? Quem é quem? Olhar um por um na grade palestras do FISL15?
Então novamente usei python pra salvar o dia. É um código bem simples que faz análise do HTML das grades, por dia, e cria um link do arquivo em outro diretório, TODOS, com formato "Título - Autor.ogv". Pra facilitar.
#! /usr/bin/python
# -*- coding: utf-8 -*-
"""
System to get titles and authors from FISL15 presentations,
and match them to video files, already downloaded.
To download all videos (18 GB):
wget -nH -np -r --mirror http://hemingway.softwarelivre.org/fisl15/high/
Presentations grid:
http://papers.softwarelivre.org/papers_ng/public/new_grid?day=9
LICENSE:
"THE BEER-WARE LICENSE" (Revision 42):
Helio Loureiro wrote this file. As long as you retain this notice you
can do whatever you want with this stuff. If we meet some day, and you think
this stuff is worth it, you can buy me a beer in return.
Helio Loureiro"
This email address is being protected from spambots. You need JavaScript enabled to view it.
"""
from BeautifulSoup import BeautifulSoup
import urllib2
import re
import os
import sys
URL="http://papers.softwarelivre.org/papers_ng/public/new_grid?day="
DAYS = [7, 8, 9, 10]
# My directory to find videos. Probably you need to fix it.
TARGETDIR = "%s/Videos/FISL15" % os.environ.get('HOME')
if not os.path.exists(TARGETDIR + "/TODOS"):
os.mkdir(TARGETDIR + "/TODOS")
for day in DAYS:
page = urllib2.urlopen("%s%d" %(URL, day))
soup = BeautifulSoup(page.read())
for html in soup.findAll('div', "slot-list"):
for d in html.findAll('div'):
a = d.find('div', "author")
t = d.find('div', "title")
l = d.find('a')
# if empty info, move on
if not a or not t or not l:
continue
# wordlist clean up
author = re.sub("\n", "", a.string)
author = re.sub(" ","", author)
title = re.sub("\n", "", t.string)
title = re.sub(" ", "", title)
title = re.sub("/", "", title) #avoiding directory issues
link = l.get('href')
# since wget kept the directory structure, it is easy
dirvideo = re.sub("http://hemingway.softwarelivre.org", TARGETDIR, link)
# is the video over there?
status = False
if os.path.exists(dirvideo):
status = True
# False here could trigger urllib2 to download video
print author, ",",
print title, ",",
print link, ",",
print status
if status:
videoname = "%s/TODOS/%s - %s.ogv" % (TARGETDIR, title, author)
if os.path.exists(videoname):
continue
try:
os.link(dirvideo, videoname)
except:
# Added this because titles w/ strings "/" where causing issues,
# so I had to check. GNU/Linux was the problem (Stallman's fault).
print "Failed to link %s to %s" % (dirvideo, videoname)
sys.exit(1)
print "Created: %s" % videoname
Em algum momento eu devo publicar o mesmo no GitHub. Sob a BWL.
Publiquei o código aqui: https://github.com/helioloureiro/FISL15_video_downloader-
Tá! Eu sei que tem um tracinho a mais ali no final do link, mas eu fiz errado e agora fica assim mesmo.