Escrevi um artigo pro blog "ChurrOps on DevOps" sobre raspresenterpy aṕos uma conversa sobre o assunto no canal do Telegram do grupo "papo de sysadmin".
Eu achava que já tinha escrito sobre isso aqui mesmo, mas fiquei surpreso por não ter encontrado nenhuma referência.
raspresenterpy é um programa de display que alterna links pra ser usado pra mostrar mensagens ou telas de monitoramento como do Jenkins. Fiz até uma apresentação sobre o mesmo numa PyConSE. O texto que escrevi detalha sobre repositório e como usar o mesmo.
Ele encontra-se com suporte ao raspbian Jessie e python-qt4. Testei com python-qt5 e já verifiquei que não funciona. Então deixei no meu backlog pra corrigir e lançar uma versão mais recente.
https://churrops.io/2018/06/08/usando-python-e-raspberrypi-pra-mostrar-varias-telas/
Esses dias estava analisando um código que era parecido com shell script. Sua função era monitorar as interfaces de rede para, no caso de alteração (nova interface surgindo ou desaparecendo), registrar a mudança de objetos no sistema de dados do opensaf (objetos no IMM pra quem conhece).
Mas o código fazia algo parecido com um shell que a todo healthcheck do AMF (framework do opensaf que é muito semelhante ao systemd e roda determinado programa ou script de tempos em tempos) fazia uma busca com o comando "ip link addr list" e comparava com o que estava armazenado no IMM. Algo como:
def healthcheckCallback(self, invocation, compName, healthcheckKey): saAmfResponse(self.check_macs, invocation, eSaAisErrorT.SA_AIS_OK)
def check_macs(self):
macs = []
for line in self.get_link_list().split("\n"):
if not re.search("link/ether", line): continue
# [ 'link/ether', '52:54:00:c3:5d:88', 'brd', 'ff:ff:ff:ff:ff:ff']
mac.append(line.split()[1])
imm_obj = self.get_from_imm()
if imm_obj != macs:
self.update_imm(mac)
def get_link_list(self):
linux_command = "ip link list"
return self.run_shell(linux_command)
Essa é uma forma bastante simplificada pra tentar visualizar como tudo funciona. Eu propositalmente tirei comentários extras e deixei mais limpo apenas para poder comentar aqui.
Como a chamada pra buscar os dados junto ao IMM no OpenSAF tem muitas linhas, eu só deixei um get_from_imm() que retornará um array de mac registrados anteriormente. Se esse valor for diferente do coletado, então é chamado o método update_imm() com os macs que devem estar lá.
Funciona? Sim, funciona. Mas... se não houve nenhuma mudança nas interfaces de rede (como a subida de uma interface de VIP ou mesmo um container em docker), por quê eu preciso rodar o get_link_list()?
Entendeu qual foi meu ponto?
O código em si consiste em rodar o monitoramente separado numa thread. Toda vez que o código detecta uma mudança (na verdade o kernel sinaliza isso), ele altera uma variável que o programa lê durante o healthcheck. Algo como:
def check_macs(self):
if self.network_changed is False: return
Assim bem simples. Teve mudança? network_changed vira um True.
Linux tem mecanismos pra detectar uma mudanças na interfaces de rede. Por quê não usar? E foi o que fiz.
Criei um método chamado monitor_link() que é iniciado junto com programa no método initialize(), que é parte de como o AMF faz as chamadas de callback:
self.thread = threading.Thread(target=self.monitor_link, args=())
self.thread.start()
E como funciona o monitor_link()? Aqui tenho de pedir desculpas antecipadamente que enquanto o código utiliza menos CPU e memória que chamar um shell script, o tamanho e complexidade é bastante grande. No fim troquei 2 linhas de código por umas 35 linhas. Na verdade eu praticamente escrevi o código por trás do "ip link". Mas o resultado ficou independente desse comando e mesmo de utilizar um shell externo pra buscar o resultado.
A primeira coisa é criar um socket do tipo AF_NETLINK. Em seguida fazer um bind() num ID aleatório e monitorar com RTMGRP_LINK.
def monitor_link(self): # Create the netlink socket and bind to RTMGRP_LINK, s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) s.bind((os.getpid(), RTMGRP_LINK))
pra gerar o código aleatório que é um inteiro, usei os.getpid() pra usar o PID do próprio programa.
Em seguida é iniciado um loop com select() em cima do descritor do socket pra leitura. Quando aparecer algum dado, daí sim a informação é lida.
rlist, wlist, xlist = select.select([s.fileno()], [], [], 1)
O que vem a seguir são quebras da sequência de bits até chegar no ponto é possível ver o tipo de mensagem que chegou do select(). Se o tipo de mensagem for NOOP de algo nulo, apenas continue monitorando no select(). Se vier algum ERROR, pare o programa. Se vier mensagem e não for do tipo NEWLINK pra um link novo ou mudança de MAC, também continue aguardando no select().
if msg_type == NLMSG_NOOP: continue elif msg_type == NLMSG_ERROR: break elif msg_type != RTM_NEWLINK: continue
Por fim uma iteração nos dados pra buscar o tipo. Se o dado for do tipo IFLA_IFNAME, que é uma nova interface ou mudança de nome, ou IFLA_ADDRESS, que é MAC e endereço IP, muda a flag de network_changed pra True.
rta_type == IFLA_IFNAME or rta_type == IFLA_ADDRESS:
E é isso. O código completo segue abaixo.
def monitor_link(self): # Create the netlink socket and bind to RTMGRP_LINK, s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) s.bind((os.getpid(), RTMGRP_LINK)) while self.terminating is False: rlist, wlist, xlist = select.select([s.fileno()], [], [], 1)
if self.network_changed is True: continue if self.terminating is True: return
try: data = os.read(rlist[0], 65535) except: continue msg_len, msg_type, flags, seq, pid = struct.unpack("=LHHLL", data[:16]) if msg_type == NLMSG_NOOP: continue elif msg_type == NLMSG_ERROR: break elif msg_type != RTM_NEWLINK: continue data = data[16:] family, _, if_type, index, flags, change = struct.unpack("=BBHiII", data[:16]) remaining = msg_len - 32 data = data[16:] while remaining: rta_len, rta_type = struct.unpack("=HH", data[:4]) if rta_len < 4: break rta_data = data[4:rta_len] increment = (rta_len + 4 - 1) & ~(4 - 1) data = data[increment:] remaining -= increment if rta_type == IFLA_IFNAME or rta_type == IFLA_ADDRESS: self.network_changed = True
Encontrei essa implementação no Stack Overflow buscando informação do código em C. Foi uma grande ajuda e deixou meu programa muito mais coerente com o que eu realmente queria.
Ficou muito maior? Ficou. Mas também ficou muito mais 1337 :)
Mais:
[1] Exemplo de uso de python com AMF no OpenSAF: https://sourceforge.net/p/opensaf/staging/ci/default/tree/python/samples/amf_demo
[2] Projeto OpenSAF: https://sourceforge.net/projects/opensaf/
[3] Implementação original desse código: https://stackoverflow.com/questions/44299342/netlink-interface-listener-in-python
Estive trabalhando nos últimos tempos em publicar as palestras do FISL no Youtube. Por que fazer isso? Bom... primeiramente pra atender um interesse próprio que é assistir os vídeos na minha SmartTV, que é Smart, roda Linux, mas não suporta o formato OGV disponibilizado no site do FISL (software livre é sobre coçar sua coceira, lembram?). Publicando o conteúdo no Youtube continua OGV por trás, mas daí a TV aceita o format HMTL5 pra envio do stream do vídeo.
Mas isso não é tudo. Publicando no Youtube um público que não sabe da existência do FISL vai ter acesso aos vídeos. Tenho notado vários acessos de gente no exterior ao conteúdo em inglês.
E tudo é feito de forma automatizada. Como cada FISL usou uma forma distinta de publicação, tenho usando um mesmo script modificado a cada edição do evento. Eles estão armazenados na minha conta de Github.
https://github.com/helioloureiro/homemadescripts
No momento estão carregados os vídeos da última e penúltima edição do FISL, mas pretendo carregar o máximo possível dos eventos anteriores.
O outro motivo, além da disponibilização dos vídeos em outra plataforma (mesmo não sendo software livre), é ter um acesso permanente aos vídeos. No momento estão hospedados nas máquinas da ASL assim como um dia tivemos nossas listas de mails nos servidores da CIPSGA (Comitê de Incentivo à Produção de Software Gratuito e Alternativo). Não sei se a ASL está no mesmo nível da CIPSGA, que usava máquinas na SERPRO se não estou enganado, e por uma falha de disco perderam todos os dados, mas por precaução melhor fazer a maior quantidade de cópias possíveis dos vídeos.
Essa é a idéia no momento.
Nota: eu vi que alguns vídeos estão sem alguns caractéres. É alguma conversão de utf-8 que deu errado :(
Reusando o código que escrevi pra tirar snapshots durante a PyConSe e publicar automaticamente no Twitter, escrevi um pequeno aplicativo pra raspberypi com Python pra pegar o mesmo tipo de imagem, mas da minha janela, e ir acompanhando a evolução do tempo ao longo do dia e do ano. Essa é a imagem que ilustra o início do post.
Acho que será legal fazer uma animação das imagens mostrando o sol que brilha até quase 11 da noite, o inverno que escurece às 2 da tarde, e a neve chegando. E tudo postando no Twitter.
As ferramentas são as mais simples possível: um raspberrypi conectado com um dongle wifi e uma webcam USB creative (que aliás uso pra participar dos hangouts). E sempre Python pra fazer tudo.
Descobri que o Forecast.IO fornece uma API com JSON pra buscar a previsão do tempo atual e até 10 dias, com permissão de 1000 queries por dia de forma gratuita. Perfeito pro meu pequeno projeto. O mais difícil foi fazer a conversão da temperatura de Farenheit pra Celsius (meus dias de vestibulando já se foram faz muito tempo), mas pedi ajuda à Internet pra isso. Fiz uma pequena função que retorna os dados que quero em forma de um array.
import requests import json import time """ Um monte de código por aqui [...] """" def get_content(): timestamp = time.strftime("Date: %Y-%m-%d %H:%M", time.localtime()) msg = [] msg.append("Stockholm") msg.append(timestamp) url = "https://api.forecast.io/forecast/%s/%s" % (wth_key, wth_loc) req = requests.get(url) jdata = json.loads(req.text) summary = jdata["currently"]["summary"] temp = jdata["currently"]["temperature"] temp = Far2Celsius(temp) msg.append(u"Temperature: %s°C" % temp) msg.append("Summary: %s" %summary) return msg
A primeira coisa que precisei alterar foi a adição de textos à imagem. Tendo a informação vinda do Forecast.IO, eu precisava modificar a imagem pra que ela aparecesse. No início eu usei uma fonte de cor branca, mas logo percebi que preto ficava com um contraste melhor. Mas quando chegar o inverno, época em que os dias são realmente muito curtos por aqui, vou precisar pensar numa forma pra trocar para branco. Mas no momento usei as bibliotecas do PIL que manipulam imagem em Python.
import Image import ImageFont, ImageDraw, ImageOps IMGSIZE = (1280, 720) BLACK = (0, 0, 0) WHITE = (255, 255, 255) """ Um monte de código por aqui [...] """" def WeatherScreenshot(): msg = get_content() if not msg: msg = "Just another shot at %s" % \ time.strftime("%H:%M", time.localtime()) if msg: msg_body = "\n".join(msg[1:]) im = Image.open(filename) # just get truetype fonts on package ttf-mscorefonts-installer try: f_top = ImageFont.truetype(font="Arial", size=60) except TypeError: # older versions hasn't font and require full path arialpath = "/usr/share/fonts/truetype/msttcorefonts/Arial.ttf" f_top = ImageFont.truetype(arialpath, size=60) try: f_body = ImageFont.truetype(font="Arial", size=20) except TypeError: # older versions hasn't font and require full path arialpath = "/usr/share/fonts/truetype/msttcorefonts/Arial.ttf" f_body = ImageFont.truetype(arialpath, size=20) txt = Image.new('L', IMGSIZE) d = ImageDraw.Draw(txt) d.text( (10, 10), msg[0], font=f_top, fill=255) position = 80 for m in msg[1:]: d.text( (10, position), m, font=f_body, fill=255) position += 20 w = txt.rotate(0, expand=1) im.paste(ImageOps.colorize(w, BLACK, BLACK), (0,0), w) im.save(filename)
descobri que a versão de raspbian que estou usando, baseado em Debian Wheezy, tem uma API um pouco diferente e pode precisar que a fonte com o path completo seja passada no argumento.
Outra alteração foi mudar a chamada pra webcam capturar a imagem que era uma função mas modifiquei pra uma thread. Assim o tempo fica consistente. Do contrário ao invés de mostrar 12:00 apareceria algo como 12:03 (o tempo pra adquirir a imagem).
import threading def WeatherScreenshot(): th = threading.Thread(target=GetPhoto) th.start() msg = get_content() th.join()
E já que mencionei a imagem, esse foi o maior problema até agora. Descobri que não existe uma forma muito confiável de inicializar a webcam. Às vezes ela adquiri a imagem de forma bonitinha, às vezes fica super exposta, outras vezes sub.
E não tem nada que dê um feedback sobre a qualidade. Li vários artigos com dicas de uso com pygame, que é a forma que uso, e com opencv também, mas todas com o mesmo princípio. Basicamente fazem um start() no framework da webcam, que inicializa a webcam, adquirem um número de imagens aleatórios (alguns dizem 30) e esperam pelo melhor ao capturar a imagem. Nada que retorne um indicador de qualidade. Nada.
DISCARDFRAMES = 2 * 30 def GetPhoto(): filename = None pygame.init() pygame.camera.init() elif os.path.exists("/dev/video0"): device = "/dev/video0" if not device: print "Not webcam found. Aborting..." sys.exit(1) # you can get your camera resolution by command "uvcdynctrl -f" cam = pygame.camera.Camera(device, IMGSIZE) cam.start() time.sleep(3) counter = 10 while counter: if cam.query_image(): break time.sleep(1) counter -= 1 # idea from https://codeplasma.com/2012/12/03/getting-webcam-images-with-python-and-opencv-2-for-real-this-time/ # get a set of pictures to be discarded and adjust camera for x in xrange(DISCARDFRAMES): while not cam.query_image(): time.sleep(1) image = cam.get_image() image = cam.get_image()
Basicamente um método de tentativa e erro. Por isso que iniciei a chamada à webcam como thread. Como as webcams USB tem CPU própria, não tem - até onde pesquisei - uma API confiável pra verificar se o balanço de branco normalizou antes de capturar a imagem. Só retornam a própria imagem. Tosco.
Então resolvi fazer um outro script como módulo, que basicamente mapeia toda a imagem em seu tamanho e cria um dicionário do tipo "COR: quantas vezes". Descobri que valores RGB (pega o valor de R + G + B, soma e divide por 3 pra ter a média) acima de 235 já indicam super exposição. Não só isso. Como eu conto a quantidade que aquele valor RGB aparece, sempre que um valor sobressai acima de 15% do total, já indica uma imagem ruim. Não é um dos melhores métodos científicos, mas tem funcionando bem (verifiquei nas imagens já adquiridas e salvas). Os tempos de aquisição de imagem mudaram de até 1 minuto pra em torno de 10 minutos. Mas por enquanto com qualidade muito melhor.
import Image def brightness(filename): """ source: http://stackoverflow.com/questions/6442118/python-measuring-pixel-brightness """ img = Image.open(filename) #Convert the image te RGB if it is a .gif for example img = img.convert ('RGB') RANK = {} #coordinates of the pixel X_i,Y_i = 0,0 (X_f, Y_f) = img.size #Get RGB for i in xrange(X_i, X_f): for j in xrange(Y_i, Y_f): #print "i:", i,",j:", j pixelRGB = img.getpixel((i,j)) R,G,B = pixelRGB br = sum([R,G,B])/ 3 ## 0 is dark (black) and 255 is bright (white) if RANK.has_key(br): RANK[br] += 1 else: RANK[br] = 1 color_order = [] pic_size = X_f * Y_f print "Picture size:", pic_size for k in sorted(RANK, key=RANK.get, reverse=True): amount = RANK[k] # if low than 15%, ignore if amount < (.15 * pic_size): continue print k, "=>", RANK[k] color_order.append(k) if color_order: print color_order return -1 return 0
O código todo está disponível no meu github.
https://github.com/helioloureiro/snapshot-twitter
E provavelmente devo lançar um gif animado posteriormente com o decorrer do clima ao longo do ano.
Tenho alguns problemas como concorrência no caso de tentar adquirir uma imagem ao mesmo tempo que a crontab tentar fazer isso (implementei uma API em REST pra isso, mas não é algo pra publicar :). Devo implementar algum tipo de lock usando /tmp, mas algo simples.
E agora no verão, com sol até quase 11 horas da noite, tenho também um pequeno problema de negação de serviço que às vezes acontece.
Ainda não descobri um módulo em Python pra mitigar isso :)
Meu novo queridinho de programação é um raspberrypi. Tenho feito coisas interessantes com ele usando Python. E em breve teremos a PyCon Sweden acontecendo por aqui. Então resolvi criar um robôzinho de twitter pra postar snapshots da apresentação. Pretendo colocar minha webcam externa nele (uso pros hangouts) e deixar ele pegando as imagens da conferência e postando.
No script a mensagem é estática, mas eu pretendo alterar para algo que pegue uma lista com horários, nomes e temas pra deixar tudo junto na postagem. Vai ficar interessante.
O código parcialmente feito, que veio o obamawatch, é esse aqui:
#! /usr/bin/python -u # -*- coding: utf-8 -*- """ Based in: http://stackoverflow.com/questions/15870619/python-webcam-http-streaming-and-image-capture """ SAVEDIR = "/tmp" import pygame import pygame.camera import time import sys import os import twitter import ConfigParser configuration = "/home/helio/.twitterc" def TweetPhoto(): """ """ print "Pygame init" pygame.init() print "Camera init" pygame.camera.init() # you can get your camera resolution by command "uvcdynctrl -f" cam = pygame.camera.Camera("/dev/video1", (1280, 720)) print "Camera start" cam.start() time.sleep(1) print "Getting image" image = cam.get_image() time.sleep(1) print "Camera stop" cam.stop() timestamp = time.strftime("%Y-%m-%d_%H%M%S", time.localtime()) year = time.strftime("%Y", time.localtime()) filename = "%s/%s.jpg" % (SAVEDIR, timestamp) print "Saving file %s" % filename pygame.image.save(image, filename) cfg = ConfigParser.ConfigParser() print "Reading configuration: %s" % configuration if not os.path.exists(configuration): print "Failed to find configuration file %s" % configuration sys.exit(1) cfg.read(configuration) cons_key = cfg.get("TWITTER", "CONS_KEY") cons_sec = cfg.get("TWITTER", "CONS_SEC") acc_key = cfg.get("TWITTER", "ACC_KEY") acc_sec = cfg.get("TWITTER", "ACC_SEC") print "Autenticating in Twitter" # App python-tweeter # https://dev.twitter.com/apps/815176 tw = twitter.Api( consumer_key = cons_key, consumer_secret = cons_sec, access_token_key = acc_key, access_token_secret = acc_sec ) print "Posting..." tw.PostMedia(status = "Testing python twitter and PostMedia() for #pyconse timestamp=%s" % timestamp, media = filename) print "Removing media file %s" % filename os.unlink(filename) if __name__ == '__main__': try: TweetPhoto() except KeyboardInterrupt: sys.exit(0)
Acabei usando /dev/video1 pois estava testando a webcam no laptop, que já tem outra webcam interna e quando rebootei com a câmera externa, acabou jogando a dele pra esse device.
Outra melhoria que implementei foi a de mover os tokens de autenticação pra um arquivo externo e ler via ConfigParser(). Assim fica mais limpo o código e possível de enviar pro github (e sem mandar suas chaves privadas junto :).