久久精品精选,精品九九视频,www久久只有这里有精品,亚洲熟女乱色综合一区
    分享

    OPCUA-Python

     車厘子V 2019-03-13

    pip install opcua

    Server

    from threading import Thread

    import copy

    import logging

    from datetime import datetime

    import time

    from math import sin

    import sys

    sys.path.insert(0, "..")

    try:

        from IPython import embed

    except ImportError:

        import code

        def embed():

            myvars = globals()

            myvars.update(locals())

            shell = code.InteractiveConsole(myvars)

            shell.interact()

    from opcua import ua, uamethod, Server

    class SubHandler(object):

        """

        Subscription Handler. To receive events from server for a subscription

        """

        def datachange_notification(self, node, val, data):

            print("Python: New data change event", node, val)

        def event_notification(self, event):

            print("Python: New event", event)

    # method to be exposed through server

    def func(parent, variant):

        ret = False

        if variant.Value % 2 == 0:

            ret = True

        return [ua.Variant(ret, ua.VariantType.Boolean)]

    # method to be exposed through server

    # uses a decorator to automatically convert to and from variants

    @uamethod

    def multiply(parent, x, y):

        print("multiply method call with parameters: ", x, y)

        return x * y

    class VarUpdater(Thread):

        def __init__(self, var):

            Thread.__init__(self)

            self._stopev = False

            self.var = var

        def stop(self):

            self._stopev = True

        def run(self):

            while not self._stopev:

                v = sin(time.time() / 10)

                self.var.set_value(v)

                time.sleep(0.1)

    if __name__ == "__main__":

        # optional: setup logging

        logging.basicConfig(level=logging.WARN)

        #logger = logging.getLogger("opcua.address_space")

        # logger.setLevel(logging.DEBUG)

        #logger = logging.getLogger("opcua.internal_server")

        # logger.setLevel(logging.DEBUG)

        #logger = logging.getLogger("opcua.binary_server_asyncio")

        # logger.setLevel(logging.DEBUG)

        #logger = logging.getLogger("opcua.uaprocessor")

        # logger.setLevel(logging.DEBUG)

        # now setup our server

        server = Server()

        #server.disable_clock()

        #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")

        server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")

        server.set_server_name("FreeOpcUa Example Server")

        # set all possible endpoint policies for clients to connect through

        server.set_security_policy([

            ua.SecurityPolicyType.NoSecurity,

            # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,

            # ua.SecurityPolicyType.Basic256Sha256_Sign

        ])

        # setup our own namespace

        uri = "http://examples.freeopcua."

        idx = server.register_namespace(uri)

        # create a new node type we can instantiate in our address space

        dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")

        dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)

        dev.add_property(0, "device_id", "0340").set_modelling_rule(True)

        ctrl = dev.add_object(0, "controller")

        ctrl.set_modelling_rule(True)

        ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)

        # populating our address space

        # First a folder to organise our nodes

        myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")

        # instanciate one instance of our device

        mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)

        mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable 

        # create directly some objects and variables

        myobj = server.nodes.objects.add_object(idx, "MyObject")

        myvar = myobj.add_variable(idx, "MyVariable", 6.7)

        mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)

        myvar.set_writable()    # Set MyVariable to be writable by clients

        mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")

        mystringvar.set_writable()    # Set MyVariable to be writable by clients

        mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())

        mydtvar.set_writable()    # Set MyVariable to be writable by clients

        myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])

        myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))

        myprop = myobj.add_property(idx, "myproperty", "I am a property")

        mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])

        multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])

        # import some nodes from xml

        # server.import_xml("custom_nodes.xml")

        # creating a default event object

        # The event object automatically will have members for all events properties

        # you probably want to create a custom event type, see other examples

        myevgen = server.get_event_generator()

        myevgen.event.Severity = 300

        # starting!

        server.start()

        print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())

        vup = VarUpdater(mysin)  # just  a stupide class update a variable

        vup.start()

        try:

            # enable following if you want to subscribe to nodes on server side

            #handler = SubHandler()

            #sub = server.create_subscription(500, handler)

            #handle = sub.subscribe_data_change(myvar)

            # trigger event, all subscribed clients wil receive it

            var = myarrayvar.get_value()  # return a ref to value in db server side! not a copy!

            var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated

            var.append(9.3)

            myarrayvar.set_value(var)

            mydevice_var.set_value("Running")

            myevgen.trigger(message="This is BaseEvent")

            server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using set_value

            embed()

        finally:

            vup.stop()

            server.stop()


    這個服務程序演示了opcua服務端,幾乎所有的功能,其中Event部分沒有不斷發送,所以僅供參考。

    下圖展示了server端的對象結構


    客戶端

    from IPython import embed

    from opcua import Client

    class SubHandler(object):

        def event_notification(self, event):

            print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)

    def main_c():

        url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"

        c = Client(url)

        try:

            c.connect()

            root = c.get_root_node()

            embed()

        except Exception as e:

            print("Client Exception:", e)

        finally:

            c.disconnect()

    if __name__ == "__main__":

        main_c()


    客戶端遍歷流程:

    獲取root下的Objects節點的所有子節點,一般類型都為Object,取一代表為A

    獲取A的NodeClass(一般為Object)和TypeDefinition,其中NodeId需要與路徑["0:Types","0:ObjectTypes","0:BaseObjectType"]下的類型對照(自定義類型也在內)。

    繼續以A為根,遍歷子節點,如果時Object繼續2,如果是Variable,NodeId對照["0:Types","0:VariableTypes","0:BaseVariableType"],可以判斷是變量(variable,63)還是屬性(property,65)。

    對于Method使用a_root.call_method(a, arg1)調用。對于Variable使用get_value/get_data_value()獲取存儲的值。如果是Object繼續2.

    對于Variable使用access_level獲取是否有寫權限,如果有set_value可以設置值。

    一個比較完美的遍歷客戶端:

    from opcua import Client, ua

    def brower_child(root):

        """

        遞歸調用遍歷,格式化不好做,有深度問題

        """

        name = root.get_node_class().name

        # print(name)

        if name == "Object":

            brower_obj(root)

            for c in root.get_children():

                print("  ", end='')

                brower_child(c)

        elif name == 'Variable':

            brower_var(root)

        else:

            brower_method(root)

    class CurState():

        def __init__(self, parent=None, p=None, d=0):

            self.parent = parent  # unused

            self.p = p

            self.d = d

    def brower_child2(root, max_d=-1, ignore=[]):

        """

        棧+循環遍歷,非常好用

        """

        stack = [CurState(None, root, 0)]

        while len(stack):

            cur = stack.pop()

            name = cur.p.get_node_class().name

            print(''.join(['  ' for i in range(cur.d)]), end="")

            if cur.p.get_browse_name().Name in ignore:

                continue

            if name == "Object":

                brower_obj(cur.p)

                if max_d > 0 and cur.d >= max_d:

                    continue

                for c in cur.p.get_children():

                    stack.append(CurState(cur.p, c, cur.d+1))

            elif name == 'Variable':

                brower_var(cur.p)

            else:

                brower_method(cur.p)

    def brower_obj(v):

        # print(v.get_browse_name())

        rw = 'R '

        bname = v.get_browse_name()

        print("*%2d:%-30s (%-2s, %-23s)" %

              (bname.NamespaceIndex, bname.Name, rw, "Object"))

    def brower_var(v):

        # print(v.get_browse_name())

        rw = 'R '

        if ua.AccessLevel.CurrentWrite in v.get_access_level():

            rw = "RW"

        bname = v.get_browse_name()

        tv = v.get_data_value().Value

        v_show = tv.Value

        if len(str(v_show)) > 1024:

            v_show = str(v_show[:56]) + "..."

        print("-%2d:%-30s (%-2s, %-23s) =>" %

              (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)

    def brower_method(v):

        # print(v.get_description())

        rw = 'C '

        bname = v.get_browse_name()

        # args = []

        # for a in v.get_properties():

        #     dt = a.get_data_type().NodeIdType.name

        #     args.append(dt)

        print("@%2d:%-30s (%-2s, %-23s)" %

              (bname.NamespaceIndex, bname.Name, rw, "Method"))

    def main_c():

        url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"

        c = Client(url)

        try:

            c.connect()

            root = c.get_root_node()

            print("\r\nBrower:")

            brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])

        except Exception as e:

            print("Client Exception:", e)

        finally:

            c.disconnect()

    if __name__ == "__main__":

        main_c()

      本站是提供個人知識管理的網絡存儲空間,所有內容均由用戶發布,不代表本站觀點。請注意甄別內容中的聯系方式、誘導購買等信息,謹防詐騙。如發現有害或侵權內容,請點擊一鍵舉報。
      轉藏 分享 獻花(0

      0條評論

      發表

      請遵守用戶 評論公約

      類似文章 更多

      主站蜘蛛池模板: 亚洲欧美偷拍另类A∨| 草裙社区精品视频播放| 免费现黄频在线观看国产| 精品卡一卡二卡乱码高清| 亚洲人成无码WWW久久久 | 亚洲精品无码久久毛片| 日韩乱码人妻无码中文字幕视频| 日韩V欧美V中文在线 | 亚洲欧美中文字幕5发布| 国产高清一区二区不卡| 国产成人综合欧美精品久久| 欧美人与禽2o2o性论交| 西西人体44WWW高清大胆| 中文字幕人妻无码一夲道| 国产午夜成人无码免费看| 巨爆乳中文字幕爆乳区| 日韩免费无码一区二区三区| 婷婷久久香蕉五月综合加勒比 | 97视频精品全国免费观看| 秋霞鲁丝片成人无码| 公喝错春药让我高潮| 狠狠色丁香婷婷综合尤物| 99国精品午夜福利视频不卡99| 国产精品国产三级国产AV主播 | 国产波霸爆乳一区二区| 免费现黄频在线观看国产| 国产精品久久无码不卡黑寡妇| 成在人线AV无码免观看| 国产精品免费视频不卡| 高清国产MV视频在线观看| 国产成人8X人网站视频| 亚洲中文字幕无码中字| 精品人人妻人人澡人人爽人人| 五月天天天综合精品无码| 中文字幕制服国产精品| 全国最大的成人网站| 成人无码午夜在线观看| 翘臀少妇被扒开屁股日出水爆乳 | 亚洲人成网站18禁止无码| 成人久久免费网站| 亚洲AV无码成H人动漫无遮挡|