Python中logging模块的用法实例,python根据文件大小打log日志
分类:多线程

python写日志封装类实例,python实例

本文实例呈报了python落成写日志封装类。分享给大家供我们参考。具体如下:

# encoding:utf-8
import sys
import logging
import time
def writeLog(message):
  logger=logging.getLogger()
  filename = time.strftime('%Y-%m-%d',time.localtime(time.time()))
  handler=logging.FileHandler("./log/"+filename+"error")
  logger.addHandler(handler)
  logger.setLevel(logging.NOTSET)
  logger.info(message)
if __name__ == '__main__':
  writeLog("hello")

将这段代码保存为 TLog,调用的时候先import TLog,然后TLog.writelog("jb51.net"卡塔尔国就能够

企望本文所述对大家的Python程序设计具备利于。

本文实例呈报了python实现写日志封装类。分享给大家供大家参谋。具体如下: # encoding:utf-8import sysimpor...

python完毕封装拿到virustotal扫描结果,pythonvirustotal

正文实例陈诉了python完结封装获得virustotal扫描结果的主意。分享给我们供大家参谋。具体方法如下:

import simplejson 
import urllib 
import urllib2 
import os, sys 
import logging 

try: 
  import sqlite3 
except ImportError: 
  sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. "  
           "Please verify your installation. Exiting...n") 
  sys.exit(-1) 

MD5 = "5248f774d2ee0a10936d0b1dc89107f1" 
MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com 


APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的 

class VirusTotalDatabase: 
  """ 
  Database abstraction layer. 
  """ 
  def __init__(self, db_file): 
    log = logging.getLogger("Database.Init") 
    self.__dbfile = db_file 
    self._conn = None 
    self._cursor = None 

    # Check if SQLite database already exists. If it doesn't exist I invoke 
    # the generation procedure. 
    if not os.path.exists(self.__dbfile): 
      if self._generate(): 
        print("Generated database "%s" which didn't"  
             " exist before." % self.__dbfile) 
      else: 
        print("Unable to generate database") 

    # Once the database is generated of it already has been, I can 
    # initialize the connection. 
    try: 
      self._conn = sqlite3.connect(self.__dbfile) 
      self._cursor = self._conn.cursor() 
    except Exception, why: 
      print("Unable to connect to database "%s": %s." 
           % (self.__dbfile, why)) 

    log.debug("Connected to SQLite database "%s"." % self.__dbfile) 

  def _generate(self): 
    """ 
    Creates database structure in a SQLite file. 
    """ 
    if os.path.exists(self.__dbfile): 
      return False 

    db_dir = os.path.dirname(self.__dbfile) 
    if not os.path.exists(db_dir): 
      try: 
        os.makedirs(db_dir) 
      except (IOError, os.error), why: 
        print("Something went wrong while creating database "  
             "directory "%s": %s" % (db_dir, why)) 
        return False 

    conn = sqlite3.connect(self.__dbfile) 
    cursor = conn.cursor() 

    cursor.execute("CREATE TABLE virustotal (n"               
            " id INTEGER PRIMARY KEY,n"             
            " md5 TEXT NOT NULL,n"            
            " Kaspersky TEXT DEFAULT NULL,n"                
            " McAfee TEXT DEFAULT NULL,n"             
            " Symantec TEXT DEFAULT NULL,n"              
            " Norman TEXT DEFAULT NULL,n"              
            " Avast TEXT DEFAULT NULL,n"             
            " NOD32 TEXT DEFAULT NULL,n"          
            " BitDefender TEXT DEFAULT NULL,n"             
            " Microsoft TEXT DEFAULT NULL,n"             
            " Rising TEXT DEFAULT NULL,n"            
            " Panda TEXT DEFAULT NULLn"            
            ");") 
    print "create db:%s sucess" % self.__dbfile 

    return True 

  def _get_task_dict(self, row): 
    try: 
      task = {} 
      task["id"] = row[0] 
      task["md5"] = row[1] 
      task["Kaspersky"] = row[2] 
      task["McAfee"] = row[3] 
      task["Symantec"] = row[4] 
      task["Norman"] = row[5] 
      task["Avast"] = row[6] 
      task["NOD32"] = row[7] 
      task["BitDefender"] = row[8] 
      task["Microsoft"] = row[9] 
      task["Rising"] = row[10] 
      task["Panda"] = row[11] 
      return task 
    except Exception, why: 
      return None 

  def add_sample(self, md5, virus_dict): 
    """ 

    """ 
    task_id = None 

    if not self._cursor: 
      return None 
    if not md5 or md5 == "": 
      return None 

    Kaspersky = virus_dict.get("Kaspersky", None) 
    McAfee = virus_dict.get("McAfee", None) 
    Symantec = virus_dict.get("Symantec", None) 
    Norman = virus_dict.get("Norman", None) 
    Avast = virus_dict.get("Avast", None) 
    NOD32 = virus_dict.get("NOD32", None) 
    BitDefender = virus_dict.get("BitDefender", None) 
    Microsoft = virus_dict.get("Microsoft", None) 
    Rising = virus_dict.get("Rising", None) 
    Panda = virus_dict.get("Panda", None) 

    self._conn.text_factory = str 
    try: 
      self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ?;", 
                 (md5,)) 
      sample_row = self._cursor.fetchone() 
    except sqlite3.OperationalError, why: 
      print "sqlite3 error:%sn" % str(why) 
      return False 

    if sample_row: 
      try: 
        sample_row = sample_row[0] 
        self._cursor.execute("UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?,  
                   NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=?  WHERE id = ?;", 
                   (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft, 
                   Rising, Panda, sample_row)) 
        self._conn.commit() 
        task_id = sample_row 
      except sqlite3.OperationalError, why: 
        print("Unable to update database: %s." % why) 
        return False 
    else: #the sample not in the database 
      try: 
        self._cursor.execute("INSERT INTO virustotal "  
                   "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
                    Microsoft, Rising, Panda) "  
                   "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", 
                   (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
                    Microsoft, Rising, Panda)) 
        self._conn.commit() 
        task_id = self._cursor.lastrowid 
      except sqlite3.OperationalError, why: 
        print "why",str(why) 
        return None 
      print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id)) 
    return task_id 

  def get_sample(self): 
    """ 
    Gets a task from pending queue. 
    """ 
    log = logging.getLogger("Database.GetTask") 

    if not self._cursor: 
      log.error("Unable to acquire cursor.") 
      return None 

    # Select one item from the queue table with higher priority and older 
    # addition date which has not already been processed. 
    try:     
      self._cursor.execute("SELECT * FROM virustotal "  
                 #"WHERE lock = 0 "  
                 #"AND status = 0 "  
                 "ORDER BY id, added_on LIMIT 1;") 
    except sqlite3.OperationalError, why: 
      log.error("Unable to query database: %s." % why) 
      return None 

    sample_row = self._cursor.fetchone() 

    if sample_row: 
      return self._get_task_dict(sample_row) 
    else: 
      return None 

  def search_md5(self, md5): 
    """ 

    """ 
    if not self._cursor: 
      return None 

    if not md5 or len(md5) != 32: 
      return None 

    try: 
      self._cursor.execute("SELECT * FROM virustotal "  
                 "WHERE md5 = ? "  
                 #"AND status = 1 "  
                 "ORDER BY id DESC;", 
                 (md5,)) 
    except sqlite3.OperationalError, why: 
      return None 

    task_dict = {} 
    for row in self._cursor.fetchall(): 
      task_dict = self._get_task_dict(row) 
      #if task_dict: 
        #tasks.append(task_dict) 

    return task_dict 



class VirusTotal: 
  """""" 

  def __init__(self, md5): 
    """Constructor""" 
    self._virus_dict = {} 
    self._md5 = md5 
    self._db_file = r"./db/virustotal.db" 
    self.get_report_dict() 

  def repr(self): 
    return str(self._virus_dict) 

  def submit_md5(self, file_path): 
    import postfile                                      
    #submit the file 
    FILE_NAME = os.path.basename(file_path)  


    host = "www.virustotal.com"                                
    selector = "https://www.virustotal.com/vtapi/v2/file/scan"                 
    fields = [("apikey", APIKEY)] 
    file_to_send = open(file_path, "rb").read()                        
    files = [("file", FILE_NAME, file_to_send)]                        
    json = postfile.post_multipart(host, selector, fields, files)               
    print json 
    pass 

  def get_report_dict(self): 
    result_dict = {} 

    url = "https://www.virustotal.com/vtapi/v2/file/report" 
    parameters = {"resource": self._md5, 
            "apikey": APIKEY} 
    data = urllib.urlencode(parameters) 
    req = urllib2.Request(url, data) 
    response = urllib2.urlopen(req) 
    json = response.read() 

    response_dict = simplejson.loads(json) 
    if response_dict["response_code"]: #has result  
      scans_dict = response_dict.get("scans", {}) 
      for anti_virus_comany, virus_name in scans_dict.iteritems(): 
        if virus_name["detected"]: 
          result_dict.setdefault(anti_virus_comany, virus_name["result"]) 
    return result_dict 

  def write_to_db(self): 
    """""" 
    db = VirusTotalDatabase(self._db_file) 
    virus_dict = self.get_report_dict() 
    db.add_sample(self._md5, virus_dict) 

利用办法如下:

config = {'input':"inputMd5s"} 
fp = open(config['input'], "r") 
content = fp.readlines() 
MD5S = [] 
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)): 
  MD5S.append(md5)   
print "MD5S",MD5S 
fp.close() 


from getVirusTotalInfo import VirusTotal 
#得到扫描结果并写入数库 
for md5 in MD5S: 
  virus_total = VirusTotal(md5) 
  virus_total.write_to_db() 

期望本文所述对大家的Python程序设计有所帮忙。

python遵照文件大小打log日志,

正文实例陈述了python根据文件大小打log日志的措施,分享给大家供大家参照他事他说加以考查。具体方法如下:

import glob 
import logging 
import logging.handlers 
LOG_FILENAME='logging_rotatingfile_example.out' 
# Set up a specific logger with our desired output level 
my_logger = logging.getLogger('MyLogger') 
my_logger.setLevel(logging.DEBUG) 
# Add the log message handler to the logger 
handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, 
            maxBytes=20, 
            backupCount=5, 
           ) 
my_logger.addHandler(handler) 
# Log some messages 
for i in range(20): 
 my_logger.debug('i = %d' % i) 
# See what files are created 
 logfiles = glob.glob('%s*' % LOG_FILENAME) 
 for filename in logfiles: 
  print filename 

该实例可实现循环打日志 ,第二个文本达到maxBytes大小后,就写入第2个文本。

可望本文所述对大家的Python程序设计有着帮助。

Python中logging模块的用法实例,pythonlogging

正文实例叙述了logging模块的用法实例,分享给大家供我们参谋。具体方法如下:

import logging 
import os 

log = logging.getLogger() 
formatter = logging.Formatter('[%(asctime)s] [%(name)s] %(levelname)s: %(message)s') 


stream_handler = logging.StreamHandler() 
file_handler = logging.FileHandler(os.path.join("c:\", "analysis.log")) 


file_handler.setFormatter(formatter) 
stream_handler.setFormatter(formatter) 


log.addHandler(file_handler) 
log.addHandler(stream_handler) 
log.setLevel(logging.DEBUG) 


log.warn("a warning %s " % "c:\") 

程序运行结果如下:

[2014-09-29 10:23:58,905] [root] WARNING: a warning c:

可望本文所述对我们的Python程序设计具备助于。

以下代码怎使用python封装

from ctypes import *
class fft_downloader_params_udp_s(Structure):
_fields_ = [
("serverIp",c_char*IP_BYTES_MAX),
("serverPort",c_char*SERVER_PORT_BYTES_MAX),
("inputFileName",c_char*FNAME_BYTES_MAX),
]
有不知道的,能够参照他事他说加以考察python灰帽子  

在python中模块是个什概念?能够用简易的例证表明?

一个模块正是叁个文书,导入文本就导入了相应的模块,能够使用模块里定义的类,函数,变量等新闻。假若模块很多,可以用包来管理,正是把公文放进叁个文本夹里,再增添二个__init__.py文件。  

怎把python封装成exe

简答:
由此实际测验,PyInstaller很好用。
不可能繁琐的安插,加上三个-F,就足以一贯生成单后生可畏的exe文件。
效用很爽。

py2exe,没用过,可是看看互连网教程,正是须求复杂配置和折磨的。
之所以说PyInstaller比py2exe,好用多了。

详解:
【记录】把Python代码的BlogsToWordPress打包成exe可实行文件

(此处不给贴地址,请自个儿用google搜标题,就可以找到帖子地址卡塔尔国  

本文实例叙述了python实现封装获得virustotal扫描结果的艺术。共享给大家供我们参谋。具体...

本文实例汇报了python依照文件大小打log日志的办法,分享给我们供我们参考。具体方法如下: import glob impo...

python中的logging模块,默许写的日记文件寄放在哪儿?

日志文件对象配置的时候,是要填日志文件的岗位的呦,笔者都是按种类必要放置的。
默认的从未有过研讨过。  

本文实例陈诉了logging模块的用法实例,分享给我们供大家参照他事他说加以考查。具体方法如下: import logging impo...

本文由10bet手机官网发布于多线程,转载请注明出处:Python中logging模块的用法实例,python根据文件大小打log日志

上一篇:Python获取linux主机ip的简单实现方法,python批量生成本地ip地址的方法 下一篇:没有了
猜你喜欢
热门排行
精彩图文