[snee] r1366 committed - added changes to the channel model which now makes select * queries wo...

0 views
Skip to first unread message

sn...@googlecode.com

unread,
Mar 4, 2013, 12:34:39 PM3/4/13
to snee-...@googlegroups.com
Revision: 1366
Author: alan.bar...@gmail.com
Date: Mon Mar 4 09:34:19 2013
Log: added changes to the channel model which now makes select *
queries work correctly. what has not been tested is if these changes break
aggregation queries or join queries.
so version is not stable
http://code.google.com/p/snee/source/detail?r=1366

Modified:

/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java

/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CardinalityEstimatedCostModel.java

/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CollectionOfPackets.java

/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/iot/IOT.java

/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModel.java

/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java

=======================================
---
/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java
Sun Mar 3 14:33:24 2013
+++
/branches/alan_2012_07_19_unreliableChannels/clients/reliable-channel-client/src/main/java/uk/ac/manchester/snee/client/RelibaleChannelClient.java
Mon Mar 4 09:34:19 2013
@@ -40,7 +40,7 @@
{

private static String sep = System.getProperty("file.separator");
- private static int queryid =30;
+ private static int queryid =1;
protected static int testNo = 1;
protected static int kResil = 2;
private static int max = 90;
@@ -216,8 +216,8 @@
// {
//get query & schemas
// String currentQuery = queryIterator.next();
- String currentQuery = "SELECT RSTREAM AVG(anow.x) as qx FROM A[NOW]
anow ;";
- //String currentQuery = "SELECT RSTREAM anow.x as qx FROM A[NOW]
anow ;";
+ // String currentQuery = "SELECT RSTREAM AVG(anow.x) as qx FROM A[NOW]
anow ;";
+ String currentQuery = "SELECT RSTREAM anow.x as qx FROM A[NOW] anow ;";
// String currentQuery = "SELECT RSTREAM anow.x as qx FROM A[NOW]
anow,(SELECT bnow.x as sq1x FROM B[NOW] bnow, C[NOW] cnow WHERE
bnow.x=cnow.x) sq1 WHERE anow.x=sq1.sq1x;";
// String currentQuery = "SELECT RSTREAM sq1.sq1x as qx FROM (SELECT
anow.x as sq1x FROM A[NOW] anow, B[NOW] bnow WHERE anow.x=bnow.x) sq1,
(SELECT cnow.x as sq2x FROM C[NOW] cnow, D[NOW] dnow WHERE cnow.x=dnow.x)
sq2 WHERE sq1.sq1x=sq2.sq2x;";

=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CardinalityEstimatedCostModel.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CardinalityEstimatedCostModel.java
Mon Mar 4 09:34:19 2013
@@ -326,7 +326,8 @@
return new CardinalityDataStructureChannel(new ArrayList<Window>());
String extent = operator.getExtent();
ArrayList<Window> outputWindows = new ArrayList<Window>();
- Iterator<Window> inputWindows
=inputs.getWindowsOfExtent(extent).iterator();
+
+ Iterator<Window> inputWindows =inputs.getWindowsOfExtent(operator,
extent).iterator();
while(inputWindows.hasNext())
{
Window input = inputWindows.next();
@@ -344,7 +345,7 @@
if(operator.isNodeDead())
return new CardinalityDataStructureChannel(new ArrayList<Window>());
String extent = operator.getExtent();
- return new
CardinalityDataStructureChannel(inputs.getWindowsOfExtent(extent));
+ return new
CardinalityDataStructureChannel(inputs.getWindowsOfExtent(operator,
extent));
}


@@ -371,7 +372,7 @@
{
//tuples are parittioned
CardinalityDataStructureChannel output =
- new CardinalityDataStructureChannel(
inputs.getWindowsOfExtent(extent));
+ new CardinalityDataStructureChannel(
inputs.getWindowsOfExtent(operator, extent));
return output;
}
else
@@ -388,7 +389,7 @@
if(operator.isNodeDead())
return new CardinalityDataStructureChannel(new ArrayList<Window>());
String extent =
operator.getSensornetOperator().getAttributes().get(1).toString();
- ArrayList<Window> inputWindows = inputs.getWindowsOfExtent(extent);
+ ArrayList<Window> inputWindows = inputs.getWindowsOfExtent(operator,
extent);
ArrayList<Window> outputWindows = new ArrayList<Window>();

for(int index = 0; index < inputWindows.size(); index++)
@@ -408,7 +409,7 @@
if(operator.isNodeDead())
return new CardinalityDataStructureChannel(new ArrayList<Window>());
String extent = operator.getExtent();
- return new
CardinalityDataStructureChannel(inputs.getWindowsOfExtent(extent));
+ return new
CardinalityDataStructureChannel(inputs.getWindowsOfExtent(operator,
extent));
}


@@ -421,7 +422,7 @@
return new CardinalityDataStructureChannel(new ArrayList<Window>());

String extent = operator.getExtent();
- ArrayList<Window> extentWindows = inputs.getWindowsOfExtent(extent);
+ ArrayList<Window> extentWindows = inputs.getWindowsOfExtent(operator,
extent);
ArrayList<Window> outputWindows = new ArrayList<Window>();

for(int index =1; index <= beta; index++ )
@@ -481,8 +482,8 @@
String extent1 = doneExtents.get(0);
String extent2 = doneExtents.get(1);

- ArrayList<Window> windowsOfExtent1 =
inputs.getWindowsOfExtent(extent1);
- ArrayList<Window> windowsOfExtent2 =
inputs.getWindowsOfExtent(extent2);
+ ArrayList<Window> windowsOfExtent1 =
inputs.getWindowsOfExtent(operator, extent1);
+ ArrayList<Window> windowsOfExtent2 =
inputs.getWindowsOfExtent(operator, extent2);

if(windowsOfExtent1 == null || windowsOfExtent2 == null)
return new CardinalityDataStructureChannel(new ArrayList<Window>());
=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CollectionOfPackets.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/costmodels/cardinalitymodel/CollectionOfPackets.java
Mon Mar 4 09:34:19 2013
@@ -3,15 +3,18 @@
import java.util.ArrayList;
import java.util.Iterator;

+import uk.ac.manchester.cs.snee.common.graph.Node;
import uk.ac.manchester.cs.snee.compiler.costmodels.HashMapList;
+import uk.ac.manchester.cs.snee.compiler.iot.InstanceExchangePart;
+import uk.ac.manchester.cs.snee.compiler.iot.InstanceOperator;


public class CollectionOfPackets
{
- private HashMapList<String, Window> windows = new HashMapList<String,
Window>();
+ private HashMapList<InstanceOperator, Window> windows = new
HashMapList<InstanceOperator, Window>();


- public CollectionOfPackets(HashMapList<String, Window> packets)
+ public CollectionOfPackets(HashMapList<InstanceOperator, Window> packets)
{
this.windows = packets;
}
@@ -20,7 +23,7 @@
{
}

- public void updateCollection(String extent, final ArrayList<Window>
cWindows)
+ public void updateCollection(InstanceOperator extent, final
ArrayList<Window> cWindows)
{
ArrayList<Window> extentWindows = this.windows.get(extent);
Iterator<Window> newWindows = cWindows.iterator();
@@ -35,9 +38,43 @@
}
}

- public ArrayList<Window> getWindowsOfExtent(String extent)
+ public void updateCollection(InstanceOperator extent, final
ArrayList<Window> cWindows,
+ HashMapList<InstanceOperator, Window>
windows)
{
- return this.windows.get(extent);
+ ArrayList<Window> extentWindows = windows.get(extent);
+ Iterator<Window> newWindows = cWindows.iterator();
+ while(newWindows.hasNext())
+ {
+ Window newWindow = newWindows.next();
+ Window oldWindow = getWindow(newWindow.getWindowID(), extentWindows);
+ Window addedWindow = new Window(oldWindow.getTuples() +
newWindow.getTuples(), newWindow.getWindowID());
+ if(windows.keySet().contains(extent))
+ windows.remove(extent, oldWindow);
+ windows.add(extent, addedWindow);
+ }
+ }
+
+ public ArrayList<Window> getWindowsOfExtent(InstanceOperator operator,
String extent)
+ {
+ HashMapList<InstanceOperator, Window> allInputWindows = new
HashMapList<InstanceOperator, Window>();
+ Iterator<Node> inputIterator = operator.getInputsList().iterator();
+ while(inputIterator.hasNext())
+ {
+ InstanceOperator op = (InstanceOperator) inputIterator.next();
+ String opExtent = "";
+ if(op instanceof InstanceExchangePart)
+ {
+ InstanceExchangePart exOp = (InstanceExchangePart) op;
+ opExtent = exOp.getExtent();
+ }
+ else
+ {
+ opExtent = op.getExtent();
+ }
+ if(extent.equals(opExtent))
+ this.updateCollection(operator,this.windows.get(op),
allInputWindows);
+ }
+ return allInputWindows.get(operator);
}

public ArrayList<Window> createAcquirePacket(Long beta)
@@ -62,7 +99,7 @@
return new Window(0,index);
}

- public void removeExtent(String extentName)
+ public void removeExtent(InstanceOperator extentName)
{
this.windows.remove(extentName);
}
@@ -79,7 +116,7 @@
return tuples;
}

- public int determineNoTuplesFromWindows(String extent)
+ public int determineNoTuplesFromWindows(InstanceOperator extent)
{
Iterator<Window> windowIterator =this.windows.get(extent).iterator();
int tuples =0;
@@ -93,12 +130,12 @@

public void clear()
{
- this.windows = new HashMapList<String, Window>();
+ this.windows = new HashMapList<InstanceOperator, Window>();
}

public ArrayList<Window> returnWindowsForTuples(int startCountFrom,
int numberOftuples,
- String extent)
+ InstanceOperator extent)
{
int counter =0;
int tuplesCounted = 0;
@@ -141,6 +178,11 @@
}
return doneWindows;
}
+
+ public ArrayList<Window> getWindowsOfExtent(InstanceOperator op)
+ {
+ return this.windows.get(op);
+ }



=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/iot/IOT.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-core/src/main/java/uk/ac/manchester/cs/snee/compiler/iot/IOT.java
Mon Mar 4 09:34:19 2013
@@ -18,6 +18,7 @@
import uk.ac.manchester.cs.snee.compiler.OptimizationException;
import uk.ac.manchester.cs.snee.compiler.costmodels.HashMapList;
import uk.ac.manchester.cs.snee.compiler.queryplan.DAF;
+import uk.ac.manchester.cs.snee.compiler.queryplan.ExchangePartType;
import uk.ac.manchester.cs.snee.compiler.queryplan.PAF;
import uk.ac.manchester.cs.snee.compiler.queryplan.RT;
import uk.ac.manchester.cs.snee.compiler.queryplan.SNEEAlgebraicForm;
@@ -25,6 +26,7 @@
import uk.ac.manchester.cs.snee.metadata.schema.SchemaMetadataException;
import uk.ac.manchester.cs.snee.metadata.source.sensornet.Site;
import
uk.ac.manchester.cs.snee.operators.sensornet.SensornetAcquireOperator;
+import
uk.ac.manchester.cs.snee.operators.sensornet.SensornetDeliverOperator;
import
uk.ac.manchester.cs.snee.operators.sensornet.SensornetExchangeOperator;
import uk.ac.manchester.cs.snee.operators.sensornet.SensornetOperator;
import uk.ac.manchester.cs.snee.operators.sensornet.SensornetOperatorImpl;
@@ -1041,9 +1043,29 @@
{
if(this.getOpInstances(site, TraversalOrder.PRE_ORDER, true).size() ==
0)
return this.getOpInstances(site, TraversalOrder.PRE_ORDER, true);
- InstanceOperator root = this.getOpInstances(site,
TraversalOrder.PRE_ORDER, true).get(0);
+ ArrayList<InstanceOperator> siteOperatorList
=this.getOpInstances(site, TraversalOrder.PRE_ORDER, true);
+ Iterator<InstanceOperator> operatorIterator =
siteOperatorList.iterator();
final ArrayList<InstanceOperator> operatorList = new
ArrayList<InstanceOperator>();
- this.doTransvesalIteratorSpeical(root, site, operatorList);
+ while(operatorIterator.hasNext())
+ {
+ boolean rootOp = false;
+ InstanceOperator op = operatorIterator.next();
+ if(op instanceof InstanceExchangePart)
+ {
+ InstanceExchangePart exOp = (InstanceExchangePart) op;
+ if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&
+ !exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
+ rootOp = true;
+ if(exOp.getComponentType().equals(ExchangePartType.RELAY))
+ rootOp = true;
+ }
+ if(op.getSensornetOperator() instanceof SensornetDeliverOperator)
+ rootOp = true;
+ if(rootOp)
+ {
+ this.doTransvesalIteratorSpeical(op, site, operatorList);
+ }
+ }
return operatorList;
}

@@ -1052,7 +1074,6 @@
ArrayList<InstanceOperator>
operatorList)
{
ArrayList<InstanceOperator> nonInportantOperators = new
ArrayList<InstanceOperator>();
- ArrayList<InstanceOperator> inportantOperators = new
ArrayList<InstanceOperator>();
String currentSiteID = instanceOperator.getSite().getID();
String lookingSiteID = site.getID();
if (currentSiteID.equals(lookingSiteID))
=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModel.java
Fri Mar 1 10:09:26 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModel.java
Mon Mar 4 09:34:19 2013
@@ -676,7 +676,7 @@
rootOp = opToCheck;
}
ChannelModelSite site =
channelModel.get(Integer.parseInt(rootSite.getID()));
- return
CollectionOfPackets.determineNoTuplesFromWindows(site.getTransmitableWindows().getWindowsOfExtent(rootOp.getExtent()));
+ return
CollectionOfPackets.determineNoTuplesFromWindows(site.getTransmitableWindows().getWindowsOfExtent(rootOp));
}

public void clearModel()
=======================================
---
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java
Sun Mar 3 14:33:24 2013
+++
/branches/alan_2012_07_19_unreliableChannels/snee-wsn-manager/src/main/java/uk/ac/manchester/cs/snee/manager/planner/costbenifitmodel/model/channel/ChannelModelSite.java
Mon Mar 4 09:34:19 2013
@@ -125,8 +125,6 @@
{
ArrayList<Boolean> packets = arrivedPackets.get(source);
arrivedPackets.remove(source);
- if(packetID == 0 || packetID > packets.size() )
- System.out.println();
packets.set(packetID -1, true);
arrivedPackets.put(source, packets);
if(!needToListenTo.contains(source))
@@ -158,8 +156,6 @@
{
String EquivNode = equivNodesIterator.next();
ArrayList<Boolean> equivPackets = arrivedPackets.get(EquivNode);
- if(equivPackets == null)
- System.out.println();
if(equivPackets.get(counter))
found = true;
}
@@ -275,8 +271,6 @@
public void recivedSiblingPacket(String string, int packetID)
{
ArrayList<Boolean> packetsFromSibling = arrivedPackets.get(string);
- if(packetID == 0)
- System.out.println();
packetsFromSibling.set(packetID -1, true);
arrivedPackets.remove(string);
arrivedPackets.put(string, packetsFromSibling);
@@ -379,20 +373,22 @@
rPacketCount = 0;
}
boolean isleaf =
exOp.getSourceFrag().containsOperatorType(SensornetAcquireOperator.class);
- int outputPackets = this.tupleToPacketConversion(tuples,
previousOp, exOp);
+ int outputPackets = this.tupleToPacketConversion(tuples,
previousOp, exOp, cPacketCount);
addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);

currentPacketCount.remove(exOp.getSite().getID());

currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
exOp.setExtent(preivousOutputExtent);
exOp.setTupleValueForExtent(previousOpOutput);
-
this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
+ this.transmitableWindows.updateCollection(exOp,
tuples.getWindowsOfExtent(exOp, exOp.getExtent()));

}
if(exOp.getComponentType().equals(ExchangePartType.PRODUCER) &&

exOp.getNext().getSite().getID().equals(exOp.getSite().getID()))
{

exOp.setExtent(exOp.getSourceFrag().getRootOperator().getExtent());
+ ArrayList<Window> preOpWindows =
tuples.getWindowsOfExtent(previousOp);
+ tuples.updateCollection(exOp, preOpWindows);
}
/*if the exchange is a relay, then the packets have been
recieved by this site already,
and need to be assessed*/
@@ -419,8 +415,8 @@
ArrayList<Window> windows =

packetToTupleConversion(transmittablePacketsCount, exOp, exOp.getPrevious(),
currentUsedRecievedPacketCount);
- tuples.updateCollection(exOp.getExtent(), windows);
- int outputPackets = this.tupleToPacketConversion(tuples,
preExOp, exOp);
+ tuples.updateCollection(preExOp, windows);
+ int outputPackets = this.tupleToPacketConversion(tuples,
preExOp, exOp, cPacketCount);
addToPacketCounts(outputPackets, cPacketCount, isleaf,
packetIds);
currentPacketCount.remove(exOp.getSite().getID());

currentPacketCount.put(this.overlayNetwork.getClusterHeadFor(exOp.getSite().getID()),
cPacketCount + outputPackets);
@@ -429,8 +425,9 @@

currentUsedRecievedPacketCount.put(this.overlayNetwork.getClusterHeadFor(preExOp.getSite().getID()),
rPacketCount + outputPackets);
int noOfTuples =
CollectionOfPackets.determineNoTuplesFromWindows(outputWindows);
exOp.setTupleValueForExtent(noOfTuples);
- this.tupleToPacketConversion(tuples, preExOp, exOp);
-
this.transmitableWindows.updateCollection(preivousOutputExtent,
tuples.getWindowsOfExtent(preivousOutputExtent));
+ this.tupleToPacketConversion(tuples, preExOp, exOp,
cPacketCount);
+ this.transmitableWindows.removeExtent(exOp);
+ this.transmitableWindows.updateCollection(exOp,
tuples.getWindowsOfExtent(exOp, exOp.getExtent()));

}
if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
@@ -454,8 +451,8 @@
ArrayList<Window> windows =
packetToTupleConversion(transmittablePacketsCount, exOp,
exOp.getPrevious(),
currentUsedRecievedPacketCount);
- tuples.updateCollection(exOp.getExtent(), windows);
- this.tupleToPacketConversion(tuples, exOp.getPrevious(), exOp);
+ tuples.updateCollection(exOp, windows);
+ this.tupleToPacketConversion(tuples, exOp.getPrevious(), exOp,
cPacketCount);

currentUsedRecievedPacketCount.remove(preExOp.getSite().getID());
currentUsedRecievedPacketCount.put(
this.overlayNetwork.getClusterHeadFor(
@@ -464,8 +461,10 @@
if(exOp.getComponentType().equals(ExchangePartType.CONSUMER) &&
previousOp != null )
{
+ ArrayList<Window> preOpWindows =
tuples.getWindowsOfExtent(previousOp);
previousOp = null;
exOp.setExtent(exOp.getPrevious().getExtent());
+ tuples.updateCollection(exOp, preOpWindows);
}
}
else // is some sort of in-network query operator
@@ -477,7 +476,7 @@
preivousOutputExtent = attributes.get(1).toString();
previousOp = op;
op.setExtent(preivousOutputExtent);
- tuples.updateCollection(preivousOutputExtent,
tuples.createAcquirePacket(beta));
+ tuples.updateCollection(op, tuples.createAcquirePacket(beta));
}
// if some operator then place though cardinality model
if(!(op.getSensornetOperator() instanceof
SensornetAcquireOperator) &&
@@ -499,9 +498,9 @@
extent = preOp.getExtent();
}
CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
- tuples.removeExtent(preivousOutputExtent);
- tuples.updateCollection(extent, outputs.getWindows());
- this.tupleToPacketConversion(tuples, preOp, op);
+ tuples.removeExtent(op);
+ tuples.updateCollection(op, outputs.getWindows());
+ this.tupleToPacketConversion(tuples, preOp, op, 0);
}
//if delivery operator calculate packets transmitted for system
to determine tuples.
if(op.getSensornetOperator() instanceof SensornetDeliverOperator)
@@ -518,12 +517,12 @@
}
CardinalityDataStructureChannel outputs = cardModel.model(op,
tuples, beta);
ArrayList<Window> outputWindows = outputs.getWindows();
- tuples.removeExtent(op.getExtent());
- tuples.updateCollection(op.getExtent(), outputWindows);
- previousOpOutput = this.tupleToPacketConversion(tuples, op,
op);
+ tuples.removeExtent(op);
+ tuples.updateCollection(op, outputWindows);
+ previousOpOutput = this.tupleToPacketConversion(tuples, op,
op, 0);
for(int index = 0; index < previousOpOutput; index++)
packetIds.add(index);
- this.transmitableWindows.updateCollection(op.getExtent(),
tuples.getWindowsOfExtent(op.getExtent()));
+ this.transmitableWindows.updateCollection(op,
tuples.getWindowsOfExtent(op, op.getExtent()));
}
}
}
@@ -561,7 +560,7 @@
if(!doneExtents.contains(currentExtent))
{
doneExtents.add(currentExtent);
- tuples.removeExtent(currentExtent);
+ tuples.removeExtent(cOp);
}
}
return doneExtents;
@@ -837,35 +836,33 @@
* converts a number of tuples into packets
* @param tuples
* @param op
+ * @param cPacketCount
* @param exOp2
* @return
* @throws SchemaMetadataException
* @throws TypeMappingException
*/
private int tupleToPacketConversion(CollectionOfPackets tuples,
- InstanceOperator op,
InstanceOperator mainOp)
+ InstanceOperator op,
InstanceOperator mainOp, Integer cPacketCount)
throws SchemaMetadataException, TypeMappingException
{
int tupleSize = 0;
- String extent = null;
if(ChannelModelSite.reliableChannelQEP)
{
if(op instanceof InstanceExchangePart)
{
InstanceExchangePart exOp = (InstanceExchangePart) op;
tupleSize =
exOp.getSourceFrag().getRootOperator().getSensornetOperator().getPhysicalTupleSize();
- extent = exOp.getExtent();
}
else
{
tupleSize = op.getSensornetOperator().getPhysicalTupleSize();
- extent = op.getExtent();
}
int maxMessagePayloadSize = costs.getMaxMessagePayloadSize();
int payloadOverhead = costs.getPayloadOverhead();
int numTuplesPerMessage = (int) Math.floor(maxMessagePayloadSize -
payloadOverhead) / (tupleSize);
- int totalTuples = tuples.determineNoTuplesFromWindows(extent);
- int pacekts = totalTuples / numTuplesPerMessage;
+ int totalTuples = tuples.determineNoTuplesFromWindows(op);
+ int pacekts = (totalTuples / numTuplesPerMessage);

if(totalTuples % numTuplesPerMessage == 0)
{
@@ -885,12 +882,10 @@
{
InstanceExchangePart exOp = (InstanceExchangePart) op;
tupleSize =
exOp.getSourceFrag().getRootOperator().getSensornetOperator().getPhysicalTupleSize();
- extent = exOp.getExtent();
}
else
{
tupleSize = op.getSensornetOperator().getPhysicalTupleSize();
- extent = op.getExtent();
}
int maxMessagePayloadSize = costs.getMaxMessagePayloadSize();
int payloadOverhead = 0;
@@ -899,7 +894,7 @@
else
payloadOverhead = costs.getPayloadOverhead();
int numTuplesPerMessage = (int) Math.floor(maxMessagePayloadSize -
payloadOverhead) / (tupleSize);
- int totalTuples = tuples.determineNoTuplesFromWindows(extent);
+ int totalTuples = tuples.determineNoTuplesFromWindows(op);
Double frac = new Double(totalTuples) / new
Double(numTuplesPerMessage);
Double packetsD = Math.ceil(frac);
int pacekts = packetsD.intValue();
@@ -1000,7 +995,7 @@
CollectionOfPackets completeRecievedWindows = new
CollectionOfPackets();
if(noPackets == 0)
{
- return completeRecievedWindows.getWindowsOfExtent(op.getExtent());
+ return completeRecievedWindows.getWindowsOfExtent(op,
op.getExtent());
}
else
{
@@ -1040,12 +1035,12 @@
if(receivedPacketBoolIterator.next())
{
Iterator<Window> receivedWindows =
- inputWindows.returnWindowsForTuples(tupleCount,
numTuplesPerMessage,op.getExtent()).iterator();
+ inputWindows.returnWindowsForTuples(tupleCount,
numTuplesPerMessage,op).iterator();
while(receivedWindows.hasNext())
{
ArrayList<Window> update = new ArrayList<Window>();
update.add(receivedWindows.next());
- completeRecievedWindows.updateCollection(op.getExtent(),
update);
+ completeRecievedWindows.updateCollection(op, update);
}
}
else
@@ -1054,7 +1049,7 @@
}
countedPacket++;
}
- return completeRecievedWindows.getWindowsOfExtent(op.getExtent());
+ return completeRecievedWindows.getWindowsOfExtent(op,
op.getExtent());
}
else
{
@@ -1092,7 +1087,10 @@
usedUpPackets = usedPackets.get(preOp.getSite().getID());
int removeCoutner = 0;
while(removeCoutner < usedUpPackets)
+ {
receivedPacketBoolIterator.next();
+ removeCoutner++;
+ }
int countedPacket = 0;
int tupleCount = 0;
while(receivedPacketBoolIterator.hasNext() && countedPacket <
noPackets)
@@ -1100,18 +1098,18 @@
if(receivedPacketBoolIterator.next())
{
Iterator<Window> receivedWindows =
- inputWindows.returnWindowsForTuples(tupleCount,
numTuplesPerMessage, extent).iterator();
+ inputWindows.returnWindowsForTuples(tupleCount,
numTuplesPerMessage, preOp).iterator();
while(receivedWindows.hasNext())
{
ArrayList<Window> update = new ArrayList<Window>();
update.add(receivedWindows.next());
- completeRecievedWindows.updateCollection(extent, update);
+ completeRecievedWindows.updateCollection(op, update);
}
}
tupleCount += numTuplesPerMessage;
countedPacket++;
}
- return completeRecievedWindows.getWindowsOfExtent(extent);
+ return completeRecievedWindows.getWindowsOfExtent(op);
}
}
}
@@ -1294,8 +1292,6 @@
ArrayList<Integer> packetIDs = new ArrayList<Integer>();
String key = this.overlayNetwork.getClusterHeadFor(siteID);
ArrayList<Boolean> packets = arrivedPackets.get(key);
- if(packets == null)
- System.out.println();
Iterator<Boolean> packetIterator = packets.iterator();
int counter = 0;
while(packetIterator.hasNext())
Reply all
Reply to author
Forward
0 new messages