Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler 25 from cproton import pn_reactor_collector, pn_collector_release57 64 6730 self.connection = connection 31 self.link = link 32 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 33 msg="Opening link %s" % link.name) 34 self._checkClosed()3537 try: 38 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 39 timeout=timeout, 40 msg="Opening link %s" % self.link.name) 41 except Timeout as e: pass 42 self._checkClosed()4345 if self.link.state & Endpoint.REMOTE_CLOSED: 46 self.link.close() 47 if not self.connection.closing: 48 raise LinkDetached(self.link)4951 self.link.close() 52 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 53 msg="Closing link %s" % self.link.name)54 55 # Access to other link attributes.8970 super(BlockingSender, self).__init__(connection, sender) 71 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 72 #this may be followed by a detach, which may contain an error condition, so wait a little... 73 self._waitForClose() 74 #...but close ourselves if peer does not 75 self.link.close() 76 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7779 delivery = self.link.send(msg) 80 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 81 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 82 delivery.settle() 83 bad = error_states 84 if bad is None: 85 bad = [Delivery.REJECTED, Delivery.RELEASED] 86 if delivery.remote_state in bad: 87 raise SendException(delivery.remote_state) 88 return delivery12692 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 93 self.connection = connection 94 self.incoming = collections.deque([]) 95 self.unsettled = collections.deque([])9698 self.incoming.append((event.message, event.delivery)) 99 self.connection.container.yield_() # Wake up the wait() loop to handle the message.100102 if event.link.state & Endpoint.LOCAL_ACTIVE: 103 event.link.close() 104 if not self.connection.closing: 105 raise LinkDetached(event.link)106 110 111 @property 114116 message, delivery = self.incoming.popleft() 117 if not delivery.settled: 118 self.unsettled.append(delivery) 119 return message120173130 super(BlockingReceiver, self).__init__(connection, receiver) 131 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 132 #this may be followed by a detach, which may contain an error condition, so wait a little... 133 self._waitForClose() 134 #...but close ourselves if peer does not 135 self.link.close() 136 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 137 if credit: receiver.flow(credit) 138 self.fetcher = fetcher 139 self.container = connection.container140142 self.fetcher = None 143 # The next line causes a core dump if the Proton-C reactor finalizes 144 # first. The self.container reference prevents out of order reactor 145 # finalization. It may not be set if exception in BlockingLink.__init__ 146 if hasattr(self, "container"): 147 self.link.handler = None # implicit call to reactor148150 if not self.fetcher: 151 raise Exception("Can't call receive on this receiver as a handler was provided") 152 if not self.link.credit: 153 self.link.flow(1) 154 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 155 return self.fetcher.pop()156 159 162 168189177 self.link = link 178 if link.is_sender: 179 txt = "sender %s to %s closed" % (link.name, link.target.address) 180 else: 181 txt = "receiver %s from %s closed" % (link.name, link.source.address) 182 if link.remote_condition: 183 txt += " due to: %s" % link.remote_condition 184 self.condition = link.remote_condition.name 185 else: 186 txt += " by peer" 187 self.condition = None 188 super(LinkDetached, self).__init__(txt)202193 self.connection = connection 194 txt = "Connection %s closed" % connection.hostname 195 if connection.remote_condition: 196 txt += " due to: %s" % connection.remote_condition 197 self.condition = connection.remote_condition.name 198 else: 199 txt += " by peer" 200 self.condition = None 201 super(ConnectionClosed, self).__init__(txt)205 """ 206 A synchronous style connection wrapper. 207 208 This object's implementation uses OS resources. To ensure they 209 are released when the object is no longer in use, make sure that 210 object operations are enclosed in a try block and that close() is 211 always executed on exit. 212 """322213 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):214 self.disconnected = False 215 self.timeout = timeout or 60 216 self.container = container or Container() 217 self.container.timeout = self.timeout 218 self.container.start() 219 self.url = Url(url).defaults() 220 self.conn = None 221 self.closing = False 222 failed = True 223 try: 224 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) 225 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 226 msg="Opening connection") 227 failed = False 228 finally: 229 if failed and self.conn: 230 self.close()231233 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))234235 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):236 prefetch = credit 237 if handler: 238 fetcher = None 239 if prefetch is None: 240 prefetch = 1 241 else: 242 fetcher = Fetcher(self, credit) 243 return BlockingReceiver( 244 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)245247 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 248 if self.closing: 249 return 250 self.closing = True 251 self.container.errors = [] 252 try: 253 if self.conn: 254 self.conn.close() 255 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 256 msg="Closing connection") 257 finally: 258 self.conn.free() 259 # Nothing left to block on. Allow reactor to clean up. 260 self.run() 261 self.conn = None 262 self.container.global_handler = None # break circular ref: container to cadapter.on_error 263 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 264 self.container = None265 268270 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 271 while self.container.process(): pass 272 self.container.stop() 273 self.container.process()274276 """Call process until condition() is true""" 277 if timeout is False: 278 timeout = self.timeout 279 if timeout is None: 280 while not condition() and not self.disconnected: 281 self.container.process() 282 else: 283 container_timeout = self.container.timeout 284 self.container.timeout = timeout 285 try: 286 deadline = time.time() + timeout 287 while not condition() and not self.disconnected: 288 self.container.process() 289 if deadline < time.time(): 290 txt = "Connection %s timed out" % self.url 291 if msg: txt += ": " + msg 292 raise Timeout(txt) 293 finally: 294 self.container.timeout = container_timeout 295 if self.disconnected or self._is_closed(): 296 self.container.stop() 297 self.conn.handler = None # break cyclical reference 298 if self.disconnected and not self._is_closed(): 299 raise ConnectionException( 300 "Connection %s disconnected: %s" % (self.url, self.disconnected))301303 if event.link.state & Endpoint.LOCAL_ACTIVE: 304 event.link.close() 305 if not self.closing: 306 raise LinkDetached(event.link)307309 if event.connection.state & Endpoint.LOCAL_ACTIVE: 310 event.connection.close() 311 if not self.closing: 312 raise ConnectionClosed(event.connection)313315 self.on_transport_closed(event)316318 self.on_transport_closed(event)319336325 """Thread-safe atomic counter. Start at start, increment by step.""" 326 self.count, self.step = start, step 327 self.lock = threading.Lock()328330 """Get the next value""" 331 self.lock.acquire() 332 self.count += self.step; 333 result = self.count 334 self.lock.release() 335 return result338 """ 339 Implementation of the synchronous request-response (aka RPC) pattern. 340 @ivar address: Address for all requests, may be None. 341 @ivar connection: Connection for requests and responses. 342 """ 343 344 correlation_id = AtomicCount() 345384 385 @property347 """ 348 Send requests and receive responses. A single instance can send many requests 349 to the same or different addresses. 350 351 @param connection: A L{BlockingConnection} 352 @param address: Address for all requests. 353 If not specified, each request must have the address property set. 354 Successive messages may have different addresses. 355 """ 356 super(SyncRequestResponse, self).__init__() 357 self.connection = connection 358 self.address = address 359 self.sender = self.connection.create_sender(self.address) 360 # dynamic=true generates a unique address dynamically for this receiver. 361 # credit=1 because we want to receive 1 response message initially. 362 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 363 self.response = None364366 """ 367 Send a request message, wait for and return the response message. 368 369 @param request: A L{proton.Message}. If L{self.address} is not set the 370 L{self.address} must be set and will be used. 371 """ 372 if not self.address and not request.address: 373 raise ValueError("Request message has no address: %s" % request) 374 request.reply_to = self.reply_to 375 request.correlation_id = correlation_id = str(self.correlation_id.next()) 376 self.sender.send(request) 377 def wakeup(): 378 return self.response and (self.response.correlation_id == correlation_id)379 self.connection.wait(wakeup, msg="Waiting for response") 380 response = self.response 381 self.response = None # Ready for next response. 382 self.receiver.flow(1) # Set up credit for the next response. 383 return response387 """Return the dynamic address of our receiver.""" 388 return self.receiver.remote_source.address389391 """Called when we receive a message for our receiver.""" 392 self.response = event.message 393 self.connection.container.yield_() # Wake up the wait() loop to handle the message.394
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Jun 5 10:49:20 2018 | http://epydoc.sourceforge.net |