Revision: 1364
Author:
alan.bar...@gmail.com
Date: Fri Mar 1 10:49:08 2013
Log: quick fix for channel
http://code.google.com/p/snee/source/detail?r=1364
Modified:
/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java
=======================================
---
/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java
Fri Mar 1 10:49:08 2013
@@ -279,7 +279,7 @@
SNEEProperties.setSetting(SNEEPropertyNames.RUN_AVRORA_SIMULATOR, "FALSE");
SNEEProperties.setSetting(SNEEPropertyNames.WSN_MANAGER_INITILISE_FRAMEWORKS, "FALSE");
- control.addQuery(_query, _queryParams, new Long(0), new Double(0.0));
+ control.addQuery(_query, _queryParams, new Long(0), new Double(1.0));
getController().close();
if (logger.isDebugEnabled())
logger.debug("RETURN");
=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java
Fri Mar 1 10:49:08 2013
@@ -68,6 +68,7 @@
private ChannelModel model = null;
private ArrayList<Integer> packetIds = new ArrayList<Integer>();
private boolean usePacketIDs = false;
+ private boolean ranAlready = false;
/**
* constructor for a channel model site
@@ -124,7 +125,7 @@
{
ArrayList<Boolean> packets = arrivedPackets.get(source);
arrivedPackets.remove(source);
- if(packetID == 0)
+ if(packetID == 0 || packetID > packets.size() )
System.out.println();
packets.set(packetID -1, true);
arrivedPackets.put(source, packets);
@@ -343,183 +344,187 @@
public ArrayList<Integer> transmittablePackets(IOT IOT)
throws OptimizationException, SchemaMetadataException,
TypeMappingException
{
- Site site = IOT.getSiteFromID(siteID);
- ArrayList<InstanceOperator> operators =
IOT.getOpInstancesInSpecialOrder(site);
- Iterator<InstanceOperator> operatorIterator = operators.iterator();
- HashMap<String, Integer> currentPacketCount = new HashMap<String,
Integer>();
- int previousOpOutput = 0;
- HashMap<String, Integer> currentUsedRecievedPacketCount = new
HashMap<String, Integer>();
- String preivousOutputExtent = null;
- InstanceOperator previousOp = null;
- CollectionOfPackets tuples = new CollectionOfPackets();
- while(operatorIterator.hasNext())
+ if(!ranAlready)
{
- InstanceOperator op = operatorIterator.next();
- if(op instanceof InstanceExchangePart)
+ ranAlready = true;
+ Site site = IOT.getSiteFromID(siteID);
+ ArrayList<InstanceOperator> operators =
IOT.getOpInstancesInSpecialOrder(site);
+ Iterator<InstanceOperator> operatorIterator = operators.iterator();
+ HashMap<String, Integer> currentPacketCount = new HashMap<String,
Integer>();
+ int previousOpOutput = 0;
+ HashMap<String, Integer> currentUsedRecievedPacketCount = new
HashMap<String, Integer>();
+ String preivousOutputExtent = null;
+ InstanceOperator previousOp = null;
+ CollectionOfPackets tuples = new CollectionOfPackets();
+ while(operatorIterator.hasNext())
{
- /*if a producer exchange operator convert tuples to packets and
- * determine how many have been lost.
- */
- InstanceExchangePart exOp = (InstanceExchangePart) op;
- if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&
- !exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
+ InstanceOperator op = operatorIterator.next();
+ if(op instanceof InstanceExchangePart)
{
- Integer cPacketCount =
currentPacketCount.get(exOp.getSite().getID());
- if(cPacketCount == null)
+ /*if a producer exchange operator convert tuples to packets and
+ * determine how many have been lost.
+ */
+ InstanceExchangePart exOp = (InstanceExchangePart) op;
+ if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&
+ !exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
{
- cPacketCount = 0;
+ Integer cPacketCount =
currentPacketCount.get(exOp.getSite().getID());
+ if(cPacketCount == null)
+ {
+ cPacketCount = 0;
+ }
+ Integer rPacketCount =
currentUsedRecievedPacketCount.get(exOp.getSite().getID());
+ if(rPacketCount == null)
+ {
+ rPacketCount = 0;
+ }
+ boolean isleaf =
exOp.getSourceFrag().containsOperatorType(SensornetAcquireOperator.class);
+ int outputPackets = this.tupleToPacketConversion(tuples,
previousOp, exOp);
+ addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);
+
+ currentPacketCount.remove(exOp.getSite().getID());
+
currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
+ exOp.setExtent(preivousOutputExtent);
+ exOp.setTupleValueForExtent(previousOpOutput);
+
this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
+
}
- Integer rPacketCount =
currentUsedRecievedPacketCount.get(exOp.getSite().getID());
- if(rPacketCount == null)
+ if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&
+
exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
{
- rPacketCount = 0;
+
exOp.setExtent(exOp.getSourceFrag().getRootOperator().getExtent());
}
- boolean isleaf =
exOp.getSourceFrag().containsOperatorType(SensornetAcquireOperator.class);
- int outputPackets = this.tupleToPacketConversion(tuples,
previousOp, exOp);
- addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);
-
- currentPacketCount.remove(exOp.getSite().getID());
-
currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
- exOp.setExtent(preivousOutputExtent);
- exOp.setTupleValueForExtent(previousOpOutput);
- this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
-
- }
- if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&
- exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
- {
-
exOp.setExtent(exOp.getSourceFrag().getRootOperator().getExtent());
- }
- /*if the exchange is a relay, then the packets have been recieved
by this site already,
- and need to be assessed*/
- if(exOp.getComponentType().equals(ExchangePartType.RELAY))
- {
- InstanceExchangePart preExOp = exOp.getPrevious();
- int maxTransmittablePacketCount =
exOp.getmaxPackets(IOT.getDAF(), beta, costs, !reliableChannelQEP);
- exOp.setExtent(preExOp.getExtent());
- //got max packets
- Integer rPacketCount =
currentUsedRecievedPacketCount.get(preExOp.getSite().getID());
- if(rPacketCount == null)
+ /*if the exchange is a relay, then the packets have been
recieved by this site already,
+ and need to be assessed*/
+ if(exOp.getComponentType().equals(ExchangePartType.RELAY))
{
- rPacketCount = 0;
+ InstanceExchangePart preExOp = exOp.getPrevious();
+ int maxTransmittablePacketCount =
exOp.getmaxPackets(IOT.getDAF(), beta, costs, !reliableChannelQEP);
+ exOp.setExtent(preExOp.getExtent());
+ //got max packets
+ Integer rPacketCount =
currentUsedRecievedPacketCount.get(preExOp.getSite().getID());
+ if(rPacketCount == null)
+ {
+ rPacketCount = 0;
+ }
+ Integer cPacketCount =
currentPacketCount.get(exOp.getSite().getID());
+ if(cPacketCount == null)
+ cPacketCount = 0;
+ boolean isleaf =
exOp.getSourceFrag().containsOperatorType(SensornetAcquireOperator.class);
+ int transmittablePacketsCount = recievedPacketCount(IOT,
rPacketCount,
+
maxTransmittablePacketCount,
+
exOp.getPrevious().getSite().getID());
+ if(exOp.getExtent() == null)
+ exOp.setExtent(exOp.getPrevious().getExtent());
+ ArrayList<Window> windows =
+
packetToTupleConversion(transmittablePacketsCount, exOp, exOp.getPrevious(),
+ currentUsedRecievedPacketCount);
+ tuples.updateCollection(exOp.getExtent(), windows);
+ int outputPackets = this.tupleToPacketConversion(tuples,
preExOp, exOp);
+ addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);
+ currentPacketCount.remove(exOp.getSite().getID());
+
currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
+ ArrayList<Window> outputWindows =
packetToTupleConversion(outputPackets, exOp, preExOp,
currentUsedRecievedPacketCount);
+
currentUsedRecievedPacketCount.remove(preExOp.getSite().getID());
+
currentUsedRecievedPacketCount.put(this.overlayNetwork.getClusterHeadFor(preExOp.getSite().getID()),
rPacketCount + outputPackets);
+ int noOfTuples =
CollectionOfPackets.determineNoTuplesFromWindows(outputWindows);
+ exOp.setTupleValueForExtent(noOfTuples);
+ this.tupleToPacketConversion(tuples, preExOp, exOp);
+
this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
+
}
- Integer cPacketCount =
currentPacketCount.get(exOp.getSite().getID());
- if(cPacketCount == null)
- cPacketCount = 0;
- boolean isleaf =
exOp.getSourceFrag().containsOperatorType(SensornetAcquireOperator.class);
- int transmittablePacketsCount = recievedPacketCount(IOT,
rPacketCount,
-
maxTransmittablePacketCount,
-
exOp.getPrevious().getSite().getID());
- if(exOp.getExtent() == null)
- exOp.setExtent(exOp.getPrevious().getExtent());
- ArrayList<Window> windows =
-
packetToTupleConversion(transmittablePacketsCount, exOp, exOp.getPrevious(),
- currentUsedRecievedPacketCount);
- tuples.updateCollection(exOp.getExtent(), windows);
- int outputPackets = this.tupleToPacketConversion(tuples,
preExOp, exOp);
- addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);
- currentPacketCount.remove(exOp.getSite().getID());
-
currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
- ArrayList<Window> outputWindows =
packetToTupleConversion(outputPackets, exOp, preExOp,
currentUsedRecievedPacketCount);
- currentUsedRecievedPacketCount.remove(preExOp.getSite().getID());
-
currentUsedRecievedPacketCount.put(this.overlayNetwork.getClusterHeadFor(preExOp.getSite().getID()),
rPacketCount + outputPackets);
- int noOfTuples =
CollectionOfPackets.determineNoTuplesFromWindows(outputWindows);
- exOp.setTupleValueForExtent(noOfTuples);
- this.tupleToPacketConversion(tuples, preExOp, exOp);
- this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
-
- }
- if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
- previousOp == null )
- {
- int maxTransmittablePacketCount =
exOp.getmaxPackets(IOT.getDAF(), beta, costs, !reliableChannelQEP);
- Integer cPacketCount =
currentPacketCount.get(exOp.getPrevious().getSite().getID());
- if(cPacketCount == null)
- cPacketCount = 0;
- InstanceExchangePart preExOp =exOp.getPrevious();
- Integer rPacketCount =
currentUsedRecievedPacketCount.get(preExOp.getSite().getID());
- if(rPacketCount == null)
+ if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
+ previousOp == null )
{
- rPacketCount = 0;
+ int maxTransmittablePacketCount =
exOp.getmaxPackets(IOT.getDAF(), beta, costs, !reliableChannelQEP);
+ Integer cPacketCount =
currentPacketCount.get(exOp.getPrevious().getSite().getID());
+ if(cPacketCount == null)
+ cPacketCount = 0;
+ InstanceExchangePart preExOp =exOp.getPrevious();
+ Integer rPacketCount =
currentUsedRecievedPacketCount.get(preExOp.getSite().getID());
+ if(rPacketCount == null)
+ {
+ rPacketCount = 0;
+ }
+ int transmittablePacketsCount = recievedPacketCount(IOT,
rPacketCount,
+
maxTransmittablePacketCount,
+
exOp.getPrevious().getSite().getID());
+ if(exOp.getExtent() == null)
+ exOp.setExtent(exOp.getPrevious().getExtent());
+ ArrayList<Window> windows =
+ packetToTupleConversion(transmittablePacketsCount, exOp,
exOp.getPrevious(),
+ currentUsedRecievedPacketCount);
+ tuples.updateCollection(exOp.getExtent(), windows);
+ this.tupleToPacketConversion(tuples, exOp.getPrevious(), exOp);
+
currentUsedRecievedPacketCount.remove(preExOp.getSite().getID());
+ currentUsedRecievedPacketCount.put(
+ this.overlayNetwork.getClusterHeadFor(
+ preExOp.getSite().getID()), rPacketCount +
maxTransmittablePacketCount);
}
- int transmittablePacketsCount = recievedPacketCount(IOT,
rPacketCount,
-
maxTransmittablePacketCount,
-
exOp.getPrevious().getSite().getID());
- if(exOp.getExtent() == null)
+ if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
+ previousOp != null )
+ {
+ previousOp = null;
exOp.setExtent(exOp.getPrevious().getExtent());
- ArrayList<Window> windows =
- packetToTupleConversion(transmittablePacketsCount, exOp,
exOp.getPrevious(),
- currentUsedRecievedPacketCount);
- tuples.updateCollection(exOp.getExtent(), windows);
- this.tupleToPacketConversion(tuples, exOp.getPrevious(), exOp);
- currentUsedRecievedPacketCount.remove(preExOp.getSite().getID());
- currentUsedRecievedPacketCount.put(
- this.overlayNetwork.getClusterHeadFor(
- preExOp.getSite().getID()), rPacketCount +
maxTransmittablePacketCount);
+ }
}
- if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
- previousOp != null )
- {
- previousOp = null;
- exOp.setExtent(exOp.getPrevious().getExtent());
- }
- }
- else // is some sort of in-network query operator
- {
- //if acquire, then need to determine tuples
- if(op.getSensornetOperator() instanceof SensornetAcquireOperator)
+ else // is some sort of in-network query operator
{
- List<Attribute> attributes =
op.getSensornetOperator().getLogicalOperator().getAttributes();
- preivousOutputExtent = attributes.get(1).toString();
- previousOp = op;
- op.setExtent(preivousOutputExtent);
- tuples.updateCollection(preivousOutputExtent,
tuples.createAcquirePacket(beta));
- }
- // if some operator then place though cardinality model
- if(!(op.getSensornetOperator() instanceof
SensornetAcquireOperator) &&
- !(op.getSensornetOperator() instanceof
SensornetDeliverOperator))
- {
- previousOp = op;
- InstanceOperator preOp = (InstanceOperator) op.getInput(0);
- preivousOutputExtent = recordExtent(preOp);
- String extent = "";
- if(op.getSensornetOperator() instanceof
SensornetNestedLoopJoinOperator)
+ //if acquire, then need to determine tuples
+ if(op.getSensornetOperator() instanceof SensornetAcquireOperator)
{
- ArrayList<String> doneExtents = locateInputExtents(op, tuples);
- extent = calculateExtent(doneExtents);
- op.setExtent(extent);
+ List<Attribute> attributes =
op.getSensornetOperator().getLogicalOperator().getAttributes();
+ preivousOutputExtent = attributes.get(1).toString();
+ previousOp = op;
+ op.setExtent(preivousOutputExtent);
+ tuples.updateCollection(preivousOutputExtent,
tuples.createAcquirePacket(beta));
}
- else
+ // if some operator then place though cardinality model
+ if(!(op.getSensornetOperator() instanceof
SensornetAcquireOperator) &&
+ !(op.getSensornetOperator() instanceof
SensornetDeliverOperator))
{
- op.setExtent(preOp.getExtent());
- extent = preOp.getExtent();
+ previousOp = op;
+ InstanceOperator preOp = (InstanceOperator) op.getInput(0);
+ preivousOutputExtent = recordExtent(preOp);
+ String extent = "";
+ if(op.getSensornetOperator() instanceof
SensornetNestedLoopJoinOperator)
+ {
+ ArrayList<String> doneExtents = locateInputExtents(op,
tuples);
+ extent = calculateExtent(doneExtents);
+ op.setExtent(extent);
+ }
+ else
+ {
+ op.setExtent(preOp.getExtent());
+ extent = preOp.getExtent();
+ }
+ CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
+ tuples.removeExtent(preivousOutputExtent);
+ tuples.updateCollection(extent, outputs.getWindows());
+ this.tupleToPacketConversion(tuples, preOp, op);
}
- CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
- tuples.removeExtent(preivousOutputExtent);
- tuples.updateCollection(extent, outputs.getWindows());
- this.tupleToPacketConversion(tuples, preOp, op);
- }
- //if delivery operator calculate packets transmitted for system to
determine tuples.
- if(op.getSensornetOperator() instanceof SensornetDeliverOperator)
- {
- InstanceOperator preOp = op.getInstanceInput(0);
- if(preOp instanceof InstanceExchangePart)
+ //if delivery operator calculate packets transmitted for system
to determine tuples.
+ if(op.getSensornetOperator() instanceof SensornetDeliverOperator)
{
- InstanceExchangePart exPreOp = (InstanceExchangePart) preOp;
- op.setExtent(exPreOp.getExtent());
+ InstanceOperator preOp = op.getInstanceInput(0);
+ if(preOp instanceof InstanceExchangePart)
+ {
+ InstanceExchangePart exPreOp = (InstanceExchangePart) preOp;
+ op.setExtent(exPreOp.getExtent());
+ }
+ else
+ {
+ op.setExtent(preOp.getExtent());
+ }
+ CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
+ ArrayList<Window> outputWindows = outputs.getWindows();
+ tuples.removeExtent(op.getExtent());
+ tuples.updateCollection(op.getExtent(), outputWindows);
+ previousOpOutput = this.tupleToPacketConversion(tuples, op,
op);
+ for(int index = 0; index < previousOpOutput; index++)
+ packetIds.add(index);
+ this.transmitableWindows.updateCollection(op.getExtent(),
tuples.getWindowsOfExtent(op.getExtent()));
}
- else
- {
- op.setExtent(preOp.getExtent());
- }
- CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
- ArrayList<Window> outputWindows = outputs.getWindows();
- tuples.removeExtent(op.getExtent());
- tuples.updateCollection(op.getExtent(), outputWindows);
- previousOpOutput = this.tupleToPacketConversion(tuples, op, op);
- for(int index = 0; index < previousOpOutput; index++)
- packetIds.add(index);
- this.transmitableWindows.updateCollection(op.getExtent(),
tuples.getWindowsOfExtent(op.getExtent()));
}
}
}
@@ -1267,6 +1272,7 @@
setupExpectedPackets();
this.noiseValuesExped = new HashMapList<String, NoiseDataStore>();
this.transmitableWindows.clear();
+ this.ranAlready = false;
}
/**