利用微信公众账号的开发者模式实现了利用微信控制yeelight灯。
本地跑这个python脚本就行,就是一个简单的SSDP协议。
公众账号接受on或者off的指令,写到一个api,json的文件。
···
-- coding: utf-8 --
import socket
import struct
import select
import sys
import time
import json
import urllib2
SSDP_ADDR=“239.255.255.250”
SSDP_PORT=1982
MS=‘M-SEARCH * HTTP/1.1\r\nHOST: %s:%d\r\nMAN: “ssdp:discover”\r\nST: ssdp:wifi_bulb\r\n\r\n’ % (SSDP_ADDR, SSDP_PORT)
JSON_ADDRESS=“http://www.example.com/api.json”
class SSDPClient():
def init(self):
try:
self.__s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.__s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
local_ip=‘192.168.10.110’
self.__s.bind((local_ip,50012))
except socket.error,msg:
print 'Failed to create socket. Error code: ’ + str(msg[0]) + ’ , Error message : ’ + msg[1]
sys.exit();
def start(self):
self.__send_search()
while True:
#rlist,wlist,xlist
reads, _, _ = select.select([self.__s], [], [], 5)
if reads:
data, addr = self.__s.recvfrom(2048)
list_data=data.split('\r\n')
IP=""
PORT=""
ID=""
for item in list_data:
temp=item.split(':')
if temp[0]=="Location":
print "IP:"+temp[2].replace("//","")
print "PORT:"+temp[3]
IP=temp[2].replace("//","")
PORT=temp[3]
if temp[0]=="id":
print "ID"+temp[1]
ID=temp[1]
self.operate(IP,PORT,ID)
break
else: # timeout
self.__send_search()
self.__s.close()
def __send_search(self):
print "Sending M-SEARCH..."
self.__s.sendto(MS, (SSDP_ADDR, SSDP_PORT))
def operate(self,ip,port,id):
print '='*30
port=int(port)
# print type(port)
try:
self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error,msg:
print 'Failed to create socket. Error code: ' + str(msg[0]) + ' , Error message : ' + msg[1]
sys.exit();
print 'Socket Created'
self.__s.connect((ip,port))
print "connected"
#msg='{"id": 233, "method": "set_power", "params":["on", "smooth", 1500]}\r\n'
xx=0
while xx < 1000:
#获得远程接口状态
try:
url_data=urllib2.urlopen(JSON_ADDRESS).readline()
except Exception,e:
print e
# print url_data
status=json.loads(url_data)['status']
print status
# exit()
msg='{"id": '+str(time.time())+', "method": "set_power", "params":["'+status+'", "smooth", 100]}\r\n'
print msg
try:
self.__s.sendall(msg)
reply = self.__s.recv(4096)
print reply
except socket.error:
print 'Send failed'
sys.exit()
time.sleep(1)
xx=xx+1
print "finished"
self.__s.close()
if name==“main”:
port=SSDPClient()
port.start()
···