Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,19 +671,24 @@ public void parseMainConfig(final Element e, final Configuration config) throws
}
}

// Ensure AMQP federation configuration isn't picked up here by core federation
// configuration parsing, any AMQP federation configuration would generate empty
// Core federation configurations that would not create active federations.
NodeList federations = e.getElementsByTagName("federations");

for (int i = 0; i < federations.getLength(); i++) {
Element fedNode = (Element) federations.item(i);
parseFederationsConfiguration(fedNode, config);
}
Element federationElement = (Element) federations.item(i);

NodeList fedNodes = e.getElementsByTagName("federation");
// Handle attributes at the federations level and then add each nested federation.
parseFederationsConfiguration(federationElement, config);

for (int i = 0; i < fedNodes.getLength(); i++) {
Element fedNode = (Element) fedNodes.item(i);
for (int j = 0; j < federationElement.getChildNodes().getLength(); ++j) {
Node node = federationElement.getChildNodes().item(j);

parseFederationConfiguration(fedNode, config);
if (node.getNodeName().equalsIgnoreCase("federation")) {
parseFederationConfiguration((Element) node, config);
}
}
}

NodeList gaNodes = e.getElementsByTagName("grouping-handler");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -501,6 +503,196 @@ public void testDefaultBridgeProducerWindowSize() throws Exception {
assertEquals(ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), bconfig.getProducerWindowSize());
}

@Test
public void testCoreFederationIgnoresAMQPFederationConfigurations() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String middlePart = """
<broker-connections>
<amqp-connection uri="tcp://test1:111" name="test1" retry-interval="333" reconnect-attempts="33" user="testuser" password="testpassword">
<federation>
<local-queue-policy name="federation5" priority-adjustment="1" include-federated="false">
<include address-match="test" queue-match="tracking" />
</local-queue-policy>
</federation>
</amqp-connection>
<amqp-connection uri="tcp://federation" name="federation6" auto-start="false">
<federation>
<local-queue-policy name="lqp1" priority-adjustment="1" include-federated="false">
<include address-match="test" queue-match="tracking" />
<property key="amqpCredits" value="1"/>
<transformer-class-name>class-another</transformer-class-name>
</local-queue-policy>
<remote-queue-policy name="rqp1" priority-adjustment="-1" include-federated="true">
<include address-match="#" queue-match="tracking" />
<property key="amqpCredits" value="2"/>
<property key="amqpLowCredits" value="1"/>
</remote-queue-policy>
<local-address-policy name="lap1" auto-delete="false" auto-delete-delay="1" auto-delete-message-count="12" max-hops="2" enable-divert-bindings="true">
<include address-match="orders" />
<exclude address-match="all.#" />
<transformer-class-name>class-name</transformer-class-name>
</local-address-policy>
<local-queue-policy name="lqp2">
<include address-match="address" queue-match="theQueue" />
<transformer-class-name>class-another</transformer-class-name>
</local-queue-policy>
<remote-address-policy name="rap1" auto-delete="true" auto-delete-delay="2" auto-delete-message-count="42" max-hops="1" enable-divert-bindings="false">
<include address-match="support" />
<property key="amqpCredits" value="2"/>
<property key="amqpLowCredits" value="1"/>
<transformer>
<class-name>something</class-name>
<property key="key1" value="value1"/>
<property key="key2" value="value2"/>
</transformer>
</remote-address-policy>
<property key="amqpCredits" value="7"/>
<property key="amqpLowCredits" value="1"/>
</federation>
</amqp-connection>
</broker-connections>
<federations>
<federation name="federation1">
<upstream name="eu-west-1" user="westuser" password="32a10275cf4ab4e9">
<static-connectors>
<connector-ref>connector1</connector-ref>
</static-connectors>
<policy ref="policySetA"/>
</upstream>
<upstream name="eu-east-1" user="eastuser" password="32a10275cf4ab4e9">
<ha>true</ha>
<discovery-group-ref discovery-group-name="dg1"/>
<policy ref="policySetA"/>
</upstream>
<policy-set name="policySetA">
<policy ref="address-federation" />
<policy ref="queue-federation" />
</policy-set>
<queue-policy name="queue-federation" >
<exclude queue-match="the_queue" address-match="#" />
</queue-policy>
<address-policy name="address-federation" >
<include address-match="the_address" />
</address-policy>
</federation>
<federation name="federation2" user="globaluser" password="32a10275cf4ab4e9">
<upstream name="usa-west-1">
<static-connectors>
<connector-ref>connector1</connector-ref>
</static-connectors>
<policy ref="address-federation-usa"/>
</upstream>
<upstream name="usa-east-1" >
<ha>true</ha>
<discovery-group-ref discovery-group-name="dg1"/>
<policy ref="queue-federation-usa"/>
</upstream>
<queue-policy name="queue-federation-usa" >
<exclude queue-match="the_queue" address-match="#" />
</queue-policy>
<address-policy name="address-federation-usa" >
<include address-match="the_address" />
</address-policy>
</federation>
<federation name="federation3" user="globaluser" password="32a10275cf4ab4e9">
<upstream name="asia-1">
<static-connectors>
<connector-ref>connector1</connector-ref>
</static-connectors>
<policy ref="queue-federation-asia"/>
<policy ref="address-federation-asia"/>
</upstream>
<upstream name="asia-2" >
<ha>true</ha>
<discovery-group-ref discovery-group-name="dg1"/>
<policy ref="queue-federation-asia"/>
<policy ref="address-federation-asia"/>
</upstream>
<queue-policy name="queue-federation-asia" transformer-ref="federation-transformer-3" >
<exclude queue-match="the_queue" address-match="#" />
</queue-policy>
<address-policy name="address-federation-asia" transformer-ref="federation-transformer-3" >
<include address-match="the_address" />
</address-policy>
<transformer name="federation-transformer-3">
<class-name>org.foo.FederationTransformer3</class-name>
<property key="federationTransformerKey1" value="federationTransformerValue1"/>
<property key="federationTransformerKey2" value="federationTransformerValue2"/>
</transformer>
</federation>
<federation name="federation4" user="globaluser" password="32a10275cf4ab4e9">
<upstream name="asia-3">
<static-connectors>
<connector-ref>connector1</connector-ref>
</static-connectors>
<policy ref="queue-federation-asia"/>
<policy ref="address-federation-asia"/>
</upstream>
<downstream name="asia-4" >
<ha>true</ha>
<discovery-group-ref discovery-group-name="dg1"/>
<policy ref="queue-federation-asia"/>
<policy ref="address-federation-asia"/>
<upstream-connector-ref>connector1</upstream-connector-ref>
</downstream>
<queue-policy name="queue-federation-asia2" transformer-ref="federation-transformer-4" >
<exclude queue-match="the_queue" address-match="#" />
</queue-policy>
<address-policy name="address-federation-asia2" transformer-ref="federation-transformer-4" >
<include address-match="the_address" />
</address-policy>
<transformer name="federation-transformer-4">
<class-name>org.foo.FederationTransformer4</class-name>
<property key="federationTransformerKey1" value="federationTransformerValue1"/>
<property key="federationTransformerKey2" value="federationTransformerValue2"/>
</transformer>
</federation>
</federations>""";

String configStr = FIRST_PART + middlePart + LAST_PART;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));

Configuration config = parser.parseMainConfig(input);

assertEquals(4, config.getFederationConfigurations().size());

final Map<String, FederationConfiguration> federations =
config.getFederationConfigurations().stream()
.collect(Collectors.toMap(c -> c.getName(), Function.identity()));

assertTrue(federations.containsKey("federation1"));

final FederationConfiguration configuration1 = federations.get("federation1");

assertEquals(2, configuration1.getUpstreamConfigurations().size());
assertEquals(0, configuration1.getDownstreamConfigurations().size());
assertEquals(0, configuration1.getTransformerConfigurations().size());

assertTrue(federations.containsKey("federation2"));

final FederationConfiguration configuration2 = federations.get("federation2");

assertEquals(2, configuration2.getUpstreamConfigurations().size());
assertEquals(0, configuration2.getDownstreamConfigurations().size());
assertEquals(0, configuration2.getTransformerConfigurations().size());

assertTrue(federations.containsKey("federation3"));

final FederationConfiguration configuration3 = federations.get("federation3");

assertEquals(2, configuration3.getUpstreamConfigurations().size());
assertEquals(0, configuration3.getDownstreamConfigurations().size());
assertEquals(1, configuration3.getTransformerConfigurations().size());

assertTrue(federations.containsKey("federation4"));

final FederationConfiguration configuration4 = federations.get("federation4");

assertEquals(1, configuration4.getUpstreamConfigurations().size());
assertEquals(1, configuration4.getDownstreamConfigurations().size());
assertEquals(1, configuration4.getTransformerConfigurations().size());
}

@Test
public void testParsingOverflowPageSize() throws Exception {
testParsingOverFlow("""
Expand Down
Loading