-1
我從自定義MIB中創建了我的第一個Python SNMP代理。返回值VarBinds pysnmp
它支持SNMP GET和SET請求,但它返回由我預先確定的值。
如何讓我的函數返回的varbinds成爲用戶通過SNMP SET提供的值?
代碼:
from pysnmp.entity import engine, config
from pysnmp import debug
from pysnmp.entity.rfc3413 import cmdrsp, context, ntforg
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.proto.rfc1902 import OctetString
from pysnmp.smi import builder
import threading
import collections
import time
#can be useful
debug.setLogger(debug.Debug('all'))
MibObject = collections.namedtuple('MibObject', ['mibName',
'objectType', 'valueFunc'])
class Mib(object):
"""Stores the data we want to serve.
"""
def __init__(self):
self._lock = threading.RLock()
self._system_channel = 0
self._system_programmed = 0
def getSystemModel(self):
return "Teste 1 Ok"
def getTransportStream(self):
return "Teste 2 Ok"
def getSystemProgrammedPower(self):
with self._lock:
return self._system_programmed
def setSystemProgrammedPower(self, value):
with self._lock:
self._system_programmed = value
def getSystemChannel(self):
with self._lock:
return self._system_channel
def setSystemChannel(self, value):
with self._lock:
self._system_channel = value
def createVariable(SuperClass, getValue, *args):
"""This is going to create a instance variable that we can export.
getValue is a function to call to retreive the value of the scalar
"""
class Var(SuperClass):
def readGet(self, name, *args):
return name, self.syntax.clone(getValue())
return Var(*args)
class SNMPAgent(object):
"""Implements an Agent that serves the custom MIB and
can send a trap.
"""
def __init__(self, mibObjects):
"""
mibObjects - a list of MibObject tuples that this agent
will serve
"""
#each SNMP-based application has an engine
self._snmpEngine = engine.SnmpEngine()
#open a UDP socket to listen for snmp requests
config.addSocketTransport(
self._snmpEngine,
udp.domainName,
udp.UdpTransport().openServerMode(('127.0.0.1', 161))
)
# SNMPv3/USM setup
config.addV1System(self._snmpEngine, 'test-agent', 'public')
# user: usr-sha-none, auth: SHA, priv NONE
config.addV3User(
self._snmpEngine, 'test-user',
config.usmHMACMD5AuthProtocol, 'authkey1',
config.usmDESPrivProtocol, 'privkey1'
)
# Allow full MIB access for each user at VACM
config.addContext(self._snmpEngine, '')
config.addRwUser(self._snmpEngine, 1, 'test-agent', 'noAuthNoPriv', (1,3,6)) # v1
config.addRwUser(self._snmpEngine, 2, 'test-agent', 'noAuthNoPriv', (1,3,6)) # v2c
config.addRwUser(self._snmpEngine, 3, 'test-user', 'authPriv', (1,3,6)) # v3
#each app has one or more contexts
self._snmpContext = context.SnmpContext(self._snmpEngine)
#the builder is used to load mibs. tell it to look in the
#current directory for our new MIB. We'll also use it to
#export our symbols later
mibBuilder = self._snmpContext.getMibInstrum().getMibBuilder()
mibSources = mibBuilder.getMibSources() + (builder.DirMibSource('.'),)
mibBuilder.setMibSources(*mibSources)
#our variables will subclass this since we only have scalar types
#can't load this type directly, need to import it
MibScalarInstance, = mibBuilder.importSymbols('SNMPv2-SMI',
'MibScalarInstance')
#export our custom mib
for mibObject in mibObjects:
nextVar, = mibBuilder.importSymbols(mibObject.mibName,
mibObject.objectType)
instance = createVariable(MibScalarInstance,
mibObject.valueFunc,
nextVar.name, (0,),
nextVar.syntax)
#need to export as <var name>Instance
instanceDict = {str(nextVar.name)+"Instance":instance}
mibBuilder.exportSymbols(mibObject.mibName,
**instanceDict)
# tell pysnmp to respotd to get, set, getnext, and getbulk
cmdrsp.GetCommandResponder(self._snmpEngine, self._snmpContext)
cmdrsp.NextCommandResponder(self._snmpEngine, self._snmpContext)
cmdrsp.BulkCommandResponder(self._snmpEngine, self._snmpContext)
cmdrsp.SetCommandResponder(self._snmpEngine, self._snmpContext)
def setTrapReceiver(self, host, community):
"""Send traps to the host using community string community
"""
config.addV1System(self._snmpEngine, 'nms-area', community)
config.addVacmUser(self._snmpEngine, 2, 'nms-area', 'noAuthNoPriv',
notifySubTree=(1,3,6,1,4,1))
config.addTargetParams(self._snmpEngine,
'nms-creds', 'nms-area', 'noAuthNoPriv', 1)
config.addTargetAddr(self._snmpEngine, 'my-nms', udp.domainName,
(host, 162), 'nms-creds',
tagList='all-my-managers')
#set last parameter to 'notification' to have it send
#informs rather than unacknowledged traps
config.addNotificationTarget(
self._snmpEngine, 'test-notification', 'my-filter',
'all-my-managers', 'trap')
def sendTrap(self):
print "Sending trap"
ntfOrg = ntforg.NotificationOriginator(self._snmpContext)
errorIndication = ntfOrg.sendNotification(
self._snmpEngine,
'test-notification',
('LINEARISDBLQ-MIB', 'systemCurrentAlarmTrap'),
())
def serve_forever(self):
print "Starting agent"
self._snmpEngine.transportDispatcher.jobStarted(1)
try:
self._snmpEngine.transportDispatcher.runDispatcher()
except:
self._snmpEngine.transportDispatcher.closeDispatcher()
raise
class Worker(threading.Thread):
"""Just to demonstrate updating the MIB
and sending traps
"""
def __init__(self, agent, mib):
threading.Thread.__init__(self)
self._agent = agent
self._mib = mib
self.setDaemon(True)
def run(self):
while True:
time.sleep(3)
self._mib.setSystemChannel(mib.getSystemChannel()+1)
self._agent.sendTrap()
if __name__ == '__main__':
mib = Mib()
objects = [MibObject('LINEARISDBLQ-MIB', 'systemModel', mib.getSystemModel),
MibObject('LINEARISDBLQ-MIB', 'systemChannel', mib.getSystemChannel),
MibObject('LINEARISDBLQ-MIB', 'transportStream', mib.getTransportStream),
MibObject('LINEARISDBLQ-MIB', 'systemProgrammedPower', mib.getSystemProgrammedPower)]
agent = SNMPAgent(objects)
agent.setTrapReceiver('127.0.0.1', 'traps')
Worker(agent, mib).start()
try:
agent.serve_forever()
except KeyboardInterrupt:
print "Shutting down"
要回答這個問題需要經歷很多代碼。如果不在這個問題上花費不成比例的時間,就很難給出正確的答案;大多數人寧願轉向另一個問題。如果將問題歸結爲可能再現問題的最小樣本,那麼您得到的答案的數量,質量和清晰度也會提高。編輯後的問題不必與整個代碼做同樣的事情,只需要重現需要幫助的一個方面。 – Veedrac 2014-10-01 18:14:11