Groovy ActiveMQ 5.8 Embedded Broker


/ Published in: Groovy
Save to your folder(s)

ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
This example overcomes some limitations of the basic ActiveMQ embedded
brokers examples I found online

Some of the challenges were:
# Multiple instances on same machine and be able to use JMX.
# Running on a machine with less than 50G or 100G disk space
caused combinations of ActiveMQ errors or warnings.
# Groovy Grapes/Grab syntax to use that would work on pc and mac.

The broker in this example uses a nonpersistent store and
is multicast discoverable and should allow you to run multiple instances
of it (in separate processes of course) which is the reason for all the
code snips containing random port nums and random thread sleeps
to increase the odds of success of each new embedded broker process
to get a working set of port nums.


Copy this code and paste it in your HTML
  1. #!/usr/bin/env groovy
  2.  
  3. @Grapes([
  4. @Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
  5. ])
  6.  
  7. // file: EmbeddedBroker.groovy
  8.  
  9. /*
  10. ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
  11. This example overcomes some limitations of the basic ActiveMQ embedded
  12. brokers examples I found online
  13.  
  14. Some of the challenges were:
  15. # Multiple instances on same machine and be able to use JMX.
  16. # Running on a machine with less than 50G or 100G disk space
  17. caused combinations of ActiveMQ errors or warnings.
  18. # Groovy Grapes/Grab syntax to use that would work on pc and mac.
  19.  
  20. The broker in this example uses a nonpersistent store and
  21. is multicast discoverable and should allow you to run multiple instances
  22. of it (in separate processes of course) which is the reason for all the
  23. code snips containing random port nums and random thread sleeps
  24. to increase the odds of success of each new embedded broker process
  25. to get a working set of port nums.
  26. */
  27.  
  28. import javax.management.ObjectName;
  29. import org.apache.activemq.broker.BrokerService;
  30. import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
  31.  
  32. import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
  33. import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
  34. import org.apache.activemq.transport.discovery.DiscoveryListener;
  35.  
  36. public final class EmbeddedBroker {
  37.  
  38. static random = new java.util.Random();
  39.  
  40. static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
  41.  
  42. static Integer javaPid = calcPid();
  43. static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
  44.  
  45. static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;
  46.  
  47. private EmbeddedBroker() {} // not used - from Java example
  48.  
  49. static def sockets // to pre-allocate ports for ActiveMQ
  50. static def ports // randomly generated list of free ports
  51. static def calcMqPort = { ports.last() }
  52.  
  53. //
  54. /*
  55. SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^
  56. -Dcom.sun.management.jmxremote.port=51001 ^
  57. -Dcom.sun.management.jmxremote.local.only=false ^
  58. -Dcom.sun.management.jmxremote.authenticate=false ^
  59. -DmyJmxConnectorPort=41001
  60. */
  61.  
  62. def tryCounter = 1000;
  63. sockets = [];
  64.  
  65. def base = 4000; // lowest port num to try
  66. def portRange = 5000;
  67.  
  68. def calcPorts = {
  69. ports = [];
  70. def rnd = { base + random.nextInt(portRange) }
  71. def p = rnd();
  72. ports = (0 ..< 3).collect { ((p + it) as Integer) }
  73. // lets make activemq port same as pid so easy to use jconsole
  74. ports[-1] = javaPid
  75. ports; // return
  76. }
  77.  
  78. calcPorts();
  79. while ( tryCounter-- >= 0 ) {
  80. try {
  81. Thread.sleep random.nextInt( 100 );
  82. // sockets = ports.collect { new Socket(it) }
  83. ports.each { itPort -> sockets << new ServerSocket(itPort) }
  84. assert sockets.size() == ports.size(); // need at least 3
  85. } catch(Exception ex) {
  86. if ( !(ex instanceof java.net.BindException) ) {
  87. System.err.println ex
  88. }
  89. sockets.findAll { it != null }.each { itSocketToClose ->
  90. try { itSocketToClose.close(); } catch(Exception ex2) {}
  91. }
  92. sockets.clear();
  93. calcPorts();
  94. Thread.sleep( random.nextInt( 200 ) + 500 );
  95. }
  96. }
  97. Thread.sleep random.nextInt( 200 );
  98. sockets.each { it.close() }
  99.  
  100. def sm = [:] // for system map props
  101. sm.'com.sun.management.jmxremote.port' = ports[0].toString()
  102. sm.'com.sun.management.jmxremote.local.only' = 'false'
  103. sm.'com.sun.management.jmxremote.authenticate' = 'false'
  104. sm.'myJmxConnectorPort' = ports[1].toString()
  105.  
  106. // ports[0] is for com.sun.management.jmxremote.port
  107. // ports[1] is for broker.getManagementContext().setConnectorPort
  108.  
  109. sm.keySet().each { key -> System.properties[ key ] = sm[key] }
  110.  
  111. BrokerService broker
  112. def brokerCreated = false;
  113.  
  114. tryCounter = 100;
  115. while( (!brokerCreated) && (tryCounter-- >= 0) ) {
  116. try {
  117. broker = createBroker();
  118. brokerCreated = true;
  119.  
  120. // run forever
  121. Object lock = new Object();
  122. synchronized (lock) {
  123. lock.wait();
  124. }
  125.  
  126. break; //
  127. } catch(Exception ex) {
  128. println "### Oops: ${ex}"
  129. }
  130. } // end while
  131. }
  132.  
  133. public static BrokerService createBroker() throws Exception {
  134. def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
  135.  
  136. BrokerService broker = new BrokerService();
  137. broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
  138. broker.setUseShutdownHook(true);
  139.  
  140. // Stop ActiveMQ 5.8 Errors or Warnings when running on machines with
  141. // less than 50G to 100G of diskspace
  142. Long HundredGig = 107374182400L
  143. File fileVisitor = broker.tmpDataDirectory.canonicalFile;
  144. while( !fileVisitor.exists() ) {
  145. fileVisitor = new File(fileVisitor, '..').canonicalFile
  146. }
  147. if ( fileVisitor.usableSpace < HundredGig ) {
  148. broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2;
  149. broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
  150. }
  151. broker.systemUsage.setSendFailIfNoSpace(false);
  152. broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
  153.  
  154. // String theBrokerSuffix = sJavaPid.replace('@','_');
  155. broker.brokerName = 'broker1'
  156.  
  157. broker.setUseJmx(true);
  158.  
  159. // sometimes set in bat/sh starter
  160. Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
  161. broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
  162.  
  163. // !!! for jmx usage
  164. broker.setBrokerObjectName(
  165. BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
  166. )
  167.  
  168. def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
  169. // conn.name += "_port_${javaPid}"
  170. // for discovery
  171. conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
  172.  
  173. broker.start();
  174. }
  175. }

Report this snippet


Comments

RSS Icon Subscribe to comments

You need to login to post a comment.