diff --git a/Dream2/.classpath b/Dream2/.classpath
index 03be8b6..7eb4300 100644
--- a/Dream2/.classpath
+++ b/Dream2/.classpath
@@ -14,6 +14,7 @@
+
diff --git a/Dream2/dream.properties b/Dream2/dream.properties
index 1625048..365f84f 100755
--- a/Dream2/dream.properties
+++ b/Dream2/dream.properties
@@ -1,4 +1,5 @@
serverAddr localhost
serverPort 9000
hostName local
-consistencyType single_glitch_free
\ No newline at end of file
+consistencyType single_glitch_free
+enableEvaluation true
\ No newline at end of file
diff --git a/Dream2/eval/ec2_scripts/distribute.sh b/Dream2/eval/ec2_scripts/distribute.sh
new file mode 100644
index 0000000..e09e750
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/distribute.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+source nodes.sh
+
+for i in "${nodes[@]}"
+do
+ echo 'Sending files to '$i
+ scp -r $1 $i:
+done
diff --git a/Dream2/eval/ec2_scripts/gather.sh b/Dream2/eval/ec2_scripts/gather.sh
new file mode 100644
index 0000000..96a6449
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/gather.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+source nodes.sh
+
+for i in "${nodes[@]}"
+do
+ echo 'Gathering files from '$i
+ scp $i:*.txt ./results/
+done
diff --git a/Dream2/eval/ec2_scripts/killall.sh b/Dream2/eval/ec2_scripts/killall.sh
new file mode 100644
index 0000000..bb41a2f
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/killall.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+source nodes.sh
+
+for i in "${nodes[@]}"
+do
+ echo 'Killing java processes in '$i
+ ssh $i killall -9 java
+done
diff --git a/Dream2/eval/ec2_scripts/nodes.sh b/Dream2/eval/ec2_scripts/nodes.sh
new file mode 100644
index 0000000..8c76607
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/nodes.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+broker1=ip-172-31-43-217.eu-central-1.compute.internal
+broker2=ip-172-31-47-82.eu-central-1.compute.internal
+broker3=ip-172-31-41-23.eu-central-1.compute.internal
+broker4=ip-172-31-32-245.eu-central-1.compute.internal
+broker5=ip-172-31-34-178.eu-central-1.compute.internal
+broker6=ip-172-31-39-44.eu-central-1.compute.internal
+broker7=ip-172-31-44-11.eu-central-1.compute.internal
+broker8=ip-172-31-47-133.eu-central-1.compute.internal
+broker9=ip-172-31-44-19.eu-central-1.compute.internal
+broker10=ip-172-31-42-224.eu-central-1.compute.internal
+
+lock_manager=ip-172-31-39-95.eu-central-1.compute.internal
+
+client=ip-172-31-43-235.eu-central-1.compute.internal
+
+nodes=($broker1 $broker2 $broker3 $broker4 $broker5 $broker6 $broker7 $broker8 $broker9 $broker10 $lock_manager $client)
diff --git a/Dream2/eval/ec2_scripts/start_local.sh b/Dream2/eval/ec2_scripts/start_local.sh
new file mode 100644
index 0000000..9286310
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/start_local.sh
@@ -0,0 +1,13 @@
+java -cp DREAM.jar dream.eval.StartLockManager lock-manager 9999 &
+sleep 1
+java -cp DREAM.jar dream.eval.StartServer server1 9001 &
+sleep 1
+java -cp DREAM.jar dream.eval.StartServer server2 9002 reds-tcp:localhost:9001 &
+sleep 1
+java -cp DREAM.jar dream.eval.EvalVarClient reds-tcp:localhost:9001 reds-tcp:localhost:9999 client1 v1 10000 500 &
+sleep 1
+java -cp DREAM.jar dream.eval.EvalVarClient reds-tcp:localhost:9002 reds-tcp:localhost:9999 client2 v2 10000 500 &
+sleep 1
+java -cp DREAM.jar dream.eval.EvalSignalClient reds-tcp:localhost:9001 reds-tcp:localhost:9999 client3 v3 v1@client1:v2@client2 &
+sleep 1
+java -cp DREAM.jar dream.eval.EvalSignalClient reds-tcp:localhost:9002 reds-tcp:localhost:9999 client4 v4 v3@client3 &
diff --git a/Dream2/eval/ec2_scripts/start_remote.sh b/Dream2/eval/ec2_scripts/start_remote.sh
new file mode 100644
index 0000000..8471757
--- /dev/null
+++ b/Dream2/eval/ec2_scripts/start_remote.sh
@@ -0,0 +1,156 @@
+#!/bin/bash
+
+source nodes.sh
+
+#j=java
+j=./jdk1.8.0_144/jre/bin/java
+
+reds_lock_manager='reds-tcp:'$lock_manager':9999'
+reds_broker1='reds-tcp:'$broker1':9000'
+reds_broker2='reds-tcp:'$broker2':9000'
+reds_broker3='reds-tcp:'$broker3':9000'
+reds_broker4='reds-tcp:'$broker4':9000'
+reds_broker5='reds-tcp:'$broker5':9000'
+reds_broker6='reds-tcp:'$broker6':9000'
+reds_broker7='reds-tcp:'$broker7':9000'
+reds_broker8='reds-tcp:'$broker8':9000'
+reds_broker9='reds-tcp:'$broker9':9000'
+reds_broker10='reds-tcp:'$broker10':9000'
+
+ssh $lock_manager $j -cp DREAM.jar dream.eval.StartLockManager lock-manager 9999 &
+sleep 1
+
+ssh $broker1 $j -cp DREAM.jar dream.eval.StartServer broker1 9000 &
+sleep 1
+ssh $broker2 $j -cp DREAM.jar dream.eval.StartServer broker2 9000 $reds_broker1 &
+sleep 1
+ssh $broker3 $j -cp DREAM.jar dream.eval.StartServer broker3 9000 $reds_broker1 &
+sleep 1
+ssh $broker4 $j -cp DREAM.jar dream.eval.StartServer broker4 9000 $reds_broker1 &
+sleep 1
+ssh $broker5 $j -cp DREAM.jar dream.eval.StartServer broker5 9000 $reds_broker1 &
+sleep 1
+ssh $broker6 $j -cp DREAM.jar dream.eval.StartServer broker6 9000 $reds_broker1 &
+sleep 1
+ssh $broker7 $j -cp DREAM.jar dream.eval.StartServer broker7 9000 $reds_broker3 &
+sleep 1
+ssh $broker8 $j -cp DREAM.jar dream.eval.StartServer broker8 9000 $reds_broker3 &
+sleep 1
+ssh $broker9 $j -cp DREAM.jar dream.eval.StartServer broker9 9000 $reds_broker2 &
+sleep 1
+ssh $broker10 $j -cp DREAM.jar dream.eval.StartServer broker10 9000 $reds_broker2 &
+sleep 1
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker4 $reds_lock_manager v1 v1 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker5 $reds_lock_manager v2 v2 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker6 $reds_lock_manager v3 v3 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v4 v4 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v5 v5 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker4 $reds_lock_manager v6 v6 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker5 $reds_lock_manager v7 v7 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker6 $reds_lock_manager v8 v8 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v9 v9 80000 2000 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalVarClient $reds_broker7 $reds_lock_manager v10 v10 80000 2000 &
+sleep 1
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager a11 a11 v1@v1:v6@v6 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager a12 a12 v1@v1 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager a21 a21 a11@a11:a12@a12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager a22 a22 a12@a12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager a31 a31 a21@a21:a22@a22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager a32 a32 a22@a22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager a41 a41 a31@a31:a32@a32 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager a42 a42 a32@a32 &
+sleep 1
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager b11 b11 v2@v2:v7@v7 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager b12 b12 v2@v2 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b21 b21 a12@a12:b11@b11:b12@b12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b22 b22 b12@b12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager b31 b31 b21@b21:b22@b22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager b32 b32 b22@b22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager b41 b41 b31@b31:b32@b32 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager b42 b42 b32@b32 &
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker5 $reds_lock_manager c11 c11 v3@v3:v8@v8 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager c12 c12 v3@v3 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker7 $reds_lock_manager c21 c21 b12@b12:c11@c11:c12@c12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager c22 c22 c12@c12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker4 $reds_lock_manager c31 c31 c21@c21:c22@c22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager c32 c32 c22@c22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager c41 c41 c31@c31:c32@c32 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager c42 c42 c32@c32 &
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d11 d11 v4@v4:v9@v9 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d12 d12 v4@v4 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d21 d21 c12@c12:d11@d11:d12@d12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d22 d22 d12@d12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager d31 d31 d21@d21:d22@d22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager d32 d32 d22@d22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager d41 d41 d31@d31:d32@d32 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager d42 d42 d32@d32 &
+
+###
+
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e11 e11 v5@v5:v10@v10 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e12 e12 v5@v5 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e21 e21 d12@d12:e11@e11:e12@e12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e22 e22 e12@e12 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker10 $reds_lock_manager e31 e31 e21@e21:e22@e22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker6 $reds_lock_manager e32 e32 e22@e22 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker8 $reds_lock_manager e41 e41 e31@e31:e32@e32 &
+sleep 1
+ssh $client $j -cp DREAM.jar dream.eval.EvalSignalClient $reds_broker9 $reds_lock_manager e42 e42 e32@e32 &
diff --git a/Dream2/eval/process_scripts/process_results.py b/Dream2/eval/process_scripts/process_results.py
new file mode 100644
index 0000000..61457b6
--- /dev/null
+++ b/Dream2/eval/process_scripts/process_results.py
@@ -0,0 +1,98 @@
+from math import sqrt
+import os
+import scipy.stats
+
+results_dir = "./results/"
+running_time = 300 * 2
+
+def extractList(predicate):
+ global results_dir
+ lst = []
+ for file in filter(predicate, os.listdir(results_dir)):
+ if file.endswith(".txt"):
+ f = open(results_dir + "/" + file, "r")
+ lines = f.readlines()
+ for i in range(0, len(lines)):
+ lst.append(float(lines[i]))
+ f.close()
+
+ return lst
+
+
+def sumTraffic(predicate, column):
+ global results_dir
+ trafficMap = {}
+ for file in filter(predicate, os.listdir(results_dir)):
+ if file.endswith(".txt"):
+ f = open(results_dir + "/" + file, "r")
+ lines = f.readlines()
+ for i in range(0, len(lines)):
+ line = lines[i]
+ pkt = line.split()[0]
+ val = float(line.split()[column])
+ if pkt in trafficMap:
+ val = val + trafficMap[pkt]
+ trafficMap[pkt] = val
+ f.close()
+
+ return trafficMap
+
+
+def sumTrafficPkts(predicate):
+ return sumTraffic(predicate, 1)
+
+
+def sumTrafficKBS(predicate):
+ return sumTraffic(predicate, 2)
+
+
+def confidenceDelay(lst):
+ sum = 0.0
+ sumSquare = 0.0
+ count = 0
+
+ for val in lst:
+ sum = sum + val
+ sumSquare = sumSquare+(val**2)
+ count = count+1
+
+ mean = sum/count
+ stdDev = sqrt(abs(sumSquare/count-(mean**2))*count/(count-1))
+ confidence = (-scipy.stats.t.ppf(0.05,count-1).sum()*stdDev)/sqrt(count)
+
+ return confidence
+
+
+def printDelay(protocol):
+ print(protocol)
+ for level in range(1,5):
+ lst = extractList(lambda x: x[1] == str(level) and "delay" in x and protocol in x)
+ avg = sum(lst)/len(lst)
+ confidence = confidenceDelay(lst)
+ print(str(level) + " " + str(avg) + " " + str(confidence))
+ lst = extractList(lambda x: "delay" in x and protocol in x)
+ avg = sum(lst)/len(lst)
+ confidence = confidenceDelay(lst)
+ print("Total " + str(avg) + " " + str(confidence))
+
+
+def printTraffic(protocol):
+ global running_time
+ print(protocol)
+ pktsMap = sumTrafficPkts(lambda x: "traffic" in x and protocol in x)
+ kbsMap = sumTrafficKBS(lambda x: "traffic" in x and protocol in x)
+ for (k, v) in pktsMap.items():
+ print(k + "\t" + str(v) + " pkts\t" + str(kbsMap[k]/(running_time*1000)) + " KB/s")
+
+
+print("*** Delay ***")
+printDelay("Causal")
+printDelay("Single_source_glitch_free")
+printDelay("Complete_glitch_free")
+printDelay("Atomic")
+
+print("*** Traffic ***")
+printTraffic("Causal")
+printTraffic("Single_source_glitch_free")
+printTraffic("Complete_glitch_free")
+printTraffic("Atomic")
diff --git a/Dream2/eval/results/results.tgz b/Dream2/eval/results/results.tgz
new file mode 100644
index 0000000..3333228
Binary files /dev/null and b/Dream2/eval/results/results.tgz differ
diff --git a/Dream2/eval/results/results_summary.txt b/Dream2/eval/results/results_summary.txt
new file mode 100644
index 0000000..27fbdaa
--- /dev/null
+++ b/Dream2/eval/results/results_summary.txt
@@ -0,0 +1,48 @@
+*** Delay ***
+Causal
+1 13.6975947064 0.345167393789
+2 36.2754037053 0.4300273317
+3 57.9256247251 0.497485644096
+4 89.0463249322 0.477204840466
+Total 56.7127674806 0.349006684908
+Single_source_glitch_free
+1 10.7324043987 0.109840628571
+2 45.1498443837 0.290770992685
+3 65.2101957994 0.444436697569
+4 89.8191998031 0.473445153253
+Total 55.0853281736 0.365671139301
+Complete_glitch_free
+1 27.249077988 0.765836304213
+2 56.6824640183 0.544100893748
+3 76.188045226 0.655165600347
+4 92.2011379937 0.767327098272
+Total 65.0915597017 0.430133655038
+Atomic
+1 40.6880773415 0.683120096158
+2 71.9482150787 0.547210917915
+3 86.8404346433 0.589093498436
+4 102.820055687 0.61606537902
+Total 77.5325273668 0.390049951696
+*** Traffic ***
+Causal
+__DREAM_PUBLICATION_PACKET_SUBJECT 123300.0 pkts 113.0 KB/s
+__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s
+__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s
+Single_source_glitch_free
+__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 93.475 KB/s
+__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s
+__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s
+Complete_glitch_free
+__DREAM_LOCK_REQUEST_PACKET_SUBJECT 3000.0 pkts 3.49 KB/s
+__DREAM_LOCK_GRANT_PACKET_SUBJECT 3000.0 pkts 0.845 KB/s
+__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 103.35 KB/s
+__DREAM_LOCK_RELEASE_PACKET_SUBJECT 15600.0 pkts 4.446 KB/s
+__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s
+__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s
+Atomic
+__DREAM_LOCK_REQUEST_PACKET_SUBJECT 8365.0 pkts 9.21751666667 KB/s
+__DREAM_LOCK_GRANT_PACKET_SUBJECT 8365.0 pkts 2.35614166667 KB/s
+__DREAM_PUBLICATION_PACKET_SUBJECT 102000.0 pkts 97.44 KB/s
+__DREAM_LOCK_RELEASE_PACKET_SUBJECT 11064.0 pkts 3.15324 KB/s
+__DREAM_ADVERTISEMENT_PACKET_SUBJECT 2950.0 pkts 3.60608 KB/s
+__DREAM_SUBSCRIPTION_PACKET_SUBJECT 290.0 pkts 0.27338 KB/s
\ No newline at end of file
diff --git a/Dream2/eval/topology/topology.graffle b/Dream2/eval/topology/topology.graffle
new file mode 100644
index 0000000..1c4d727
Binary files /dev/null and b/Dream2/eval/topology/topology.graffle differ
diff --git a/Dream2/eval/topology/topology.pdf b/Dream2/eval/topology/topology.pdf
new file mode 100644
index 0000000..0ca475e
Binary files /dev/null and b/Dream2/eval/topology/topology.pdf differ
diff --git a/Dream2/src/eval/java/dream/eval/EvalSignalClient.java b/Dream2/src/eval/java/dream/eval/EvalSignalClient.java
new file mode 100755
index 0000000..3f6ffc2
--- /dev/null
+++ b/Dream2/src/eval/java/dream/eval/EvalSignalClient.java
@@ -0,0 +1,104 @@
+package dream.eval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+import dream.client.DreamClient;
+import dream.client.LockToken;
+import dream.client.RemoteVar;
+import dream.client.Signal;
+import dream.client.UpdateProducer;
+import dream.common.ConsistencyType;
+import dream.common.Consts;
+
+public class EvalSignalClient {
+
+ public static void main(String args[]) {
+ if (args.length < 5) {
+ System.out.println(
+ "Usage: EvalSignalClient [:]+");
+ System.exit(0);
+ }
+
+ final String serverAddr = args[0];
+ final String lockMgrAddr = args[1];
+ final String hostName = args[2];
+ final String signalName = args[3];
+ final String deps = args[4];
+
+ Consts.serverAddr = serverAddr;
+ Consts.lockManagerAddr = lockMgrAddr;
+
+ Consts.setHostName(hostName);
+
+ final DreamClient client = DreamClient.instance;
+ client.connect();
+
+ // TODO ...
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+
+ StringTokenizer tokenizer = new StringTokenizer(deps, ":");
+ final List relevantRemoteVars = new ArrayList<>();
+
+ while (tokenizer.hasMoreTokens()) {
+ relevantRemoteVars.add(tokenizer.nextToken());
+ }
+
+ // Wait until all remote vars have been notified
+ while (!client.listVariables().containsAll(relevantRemoteVars)) {
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Create RemoteVar objects
+ final List> remoteVars = new ArrayList<>();
+ for (String remoteVar : relevantRemoteVars) {
+ remoteVars.add(new RemoteVar(remoteVar.split("@")[1], remoteVar.split("@")[0]));
+ }
+
+ @SuppressWarnings("unchecked")
+ final Signal signal = new Signal(signalName, () -> {
+ int result = 0;
+ for (UpdateProducer> remoteVar : remoteVars) {
+ result += ((RemoteVar) remoteVar).get();
+ }
+ return result;
+ }, 1, remoteVars);
+
+ final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
+ signal.change().addHandler((oldVal, val) -> logger.fine("Signal: " + val));
+
+ if (Consts.consistencyType == ConsistencyType.ATOMIC) {
+ final Set readLock = new HashSet<>();
+ readLock.add(signalName + "@" + hostName);
+ new Thread(() -> {
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LockToken token = client.readLock(readLock);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ client.unlock(token);
+ }
+ }).start();
+ }
+
+ }
+}
diff --git a/Dream2/src/eval/java/dream/eval/EvalVarClient.java b/Dream2/src/eval/java/dream/eval/EvalVarClient.java
new file mode 100755
index 0000000..ac1e05f
--- /dev/null
+++ b/Dream2/src/eval/java/dream/eval/EvalVarClient.java
@@ -0,0 +1,64 @@
+package dream.eval;
+
+import java.util.Random;
+import java.util.logging.Logger;
+
+import dream.client.DreamClient;
+import dream.client.Var;
+import dream.common.Consts;
+
+public class EvalVarClient {
+ private static final int numChanges = 300;
+
+ public static void main(String args[]) {
+ if (args.length < 6) {
+ System.out.println(
+ "Usage: EvalVarClient ");
+ System.exit(1);
+ }
+
+ final String serverAddr = args[0];
+ final String lockMgrAddr = args[1];
+ final String hostName = args[2];
+ final String varName = args[3];
+ final int startTime = Integer.parseInt(args[4]);
+ final int sleepTime = Integer.parseInt(args[5]);
+
+ Consts.serverAddr = serverAddr;
+ Consts.lockManagerAddr = lockMgrAddr;
+
+ Consts.setHostName(hostName);
+
+ final DreamClient client = DreamClient.instance;
+ client.connect();
+
+ // TODO ...
+ try {
+ Thread.sleep(80000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+
+ final Var remoteInt = new Var(varName, 1);
+ final Random random = new Random();
+
+ try {
+ Thread.sleep(startTime);
+ } catch (final InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ for (int i = 0; i < numChanges; ++i) {
+ remoteInt.set(random.nextInt(1000));
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
+ logger.info(hostName + " finished sending updates");
+ }
+
+}
diff --git a/Dream2/src/eval/java/dream/eval/StartLockManager.java b/Dream2/src/eval/java/dream/eval/StartLockManager.java
new file mode 100644
index 0000000..9f6c1a2
--- /dev/null
+++ b/Dream2/src/eval/java/dream/eval/StartLockManager.java
@@ -0,0 +1,69 @@
+package dream.eval;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import dream.common.Consts;
+import dream.common.packets.discovery.LockManagerHelloPacket;
+import dream.common.packets.locking.LockReleasePacket;
+import dream.common.packets.locking.LockRequestPacket;
+import dream.locking.LockManagerForwarder;
+import polimi.reds.NodeDescriptor;
+import polimi.reds.broker.overlay.GenericOverlay;
+import polimi.reds.broker.overlay.NeighborhoodChangeListener;
+import polimi.reds.broker.overlay.NotRunningException;
+import polimi.reds.broker.overlay.Overlay;
+import polimi.reds.broker.overlay.SimpleTopologyManager;
+import polimi.reds.broker.overlay.TCPTransport;
+import polimi.reds.broker.overlay.TopologyManager;
+import polimi.reds.broker.overlay.Transport;
+import polimi.reds.broker.routing.GenericRouter;
+
+public class StartLockManager implements NeighborhoodChangeListener {
+ private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
+ private final Overlay overlay;
+
+ public static void main(String args[]) {
+ if (args.length < 2) {
+ System.out.println("Usage: StartLockManager ");
+ System.exit(0);
+ }
+ Consts.setHostName(args[0]);
+ Consts.lockManagerPort = Integer.parseInt(args[1]);
+ new StartLockManager();
+ }
+
+ private StartLockManager() {
+ logger.info("Starting lock manager");
+ final Transport tr = new TCPTransport(Consts.lockManagerPort);
+ final TopologyManager tm = new SimpleTopologyManager();
+ overlay = new GenericOverlay(tm, tr);
+ final GenericRouter router = new GenericRouter(overlay);
+ final LockManagerForwarder forwarder = new LockManagerForwarder();
+ router.setPacketForwarder(LockRequestPacket.subject, forwarder);
+ router.setPacketForwarder(LockReleasePacket.subject, forwarder);
+ overlay.addNeighborhoodChangeListener(this);
+ overlay.start();
+ logger.info("Lock manager started");
+ }
+
+ @Override
+ public void notifyNeighborAdded(NodeDescriptor sender) {
+ try {
+ overlay.send(LockManagerHelloPacket.subject, new LockManagerHelloPacket(), sender);
+ } catch (IOException | NotRunningException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void notifyNeighborDead(NodeDescriptor sender) {
+ // Nothing to do
+ }
+
+ @Override
+ public void notifyNeighborRemoved(NodeDescriptor sender) {
+ // Nothing to do
+ }
+
+}
diff --git a/Dream2/src/eval/java/dream/eval/StartServer.java b/Dream2/src/eval/java/dream/eval/StartServer.java
new file mode 100755
index 0000000..39ce953
--- /dev/null
+++ b/Dream2/src/eval/java/dream/eval/StartServer.java
@@ -0,0 +1,84 @@
+package dream.eval;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+import dream.common.Consts;
+import dream.common.packets.AdvertisementPacket;
+import dream.common.packets.EventPacket;
+import dream.common.packets.SubscriptionPacket;
+import dream.common.packets.discovery.ServerHelloPacket;
+import dream.server.ServerEventForwarder;
+import polimi.reds.NodeDescriptor;
+import polimi.reds.broker.overlay.GenericOverlay;
+import polimi.reds.broker.overlay.NeighborhoodChangeListener;
+import polimi.reds.broker.overlay.NotRunningException;
+import polimi.reds.broker.overlay.Overlay;
+import polimi.reds.broker.overlay.SimpleTopologyManager;
+import polimi.reds.broker.overlay.TCPTransport;
+import polimi.reds.broker.overlay.TopologyManager;
+import polimi.reds.broker.overlay.Transport;
+import polimi.reds.broker.routing.GenericRouter;
+
+public class StartServer implements NeighborhoodChangeListener {
+ private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
+ private Overlay overlay = null;
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.out.println("Usage: StartServer [reds-tcp:address:port]*");
+ System.exit(0);
+ }
+ Consts.setHostName(args[0]);
+ int port = Integer.parseInt(args[1]);
+ List neighbors = Arrays.asList(args);
+ new StartServer(port, neighbors.subList(2, neighbors.size()));
+ }
+
+ private StartServer(final int port, final List neighbors) {
+ logger.info("Starting server");
+ final Transport tr = new TCPTransport(port);
+ final TopologyManager tm = new SimpleTopologyManager();
+ overlay = new GenericOverlay(tm, tr);
+ final GenericRouter router = new GenericRouter(overlay);
+ final ServerEventForwarder forwarder = new ServerEventForwarder(overlay);
+ overlay.addNeighborhoodChangeListener(forwarder);
+ router.setPacketForwarder(EventPacket.subject, forwarder);
+ router.setPacketForwarder(SubscriptionPacket.subject, forwarder);
+ router.setPacketForwarder(AdvertisementPacket.subject, forwarder);
+ overlay.addNeighborhoodChangeListener(this);
+ overlay.start();
+ logger.info("Server started");
+ neighbors.forEach(n -> {
+ try {
+ overlay.addNeighbor(n);
+ } catch (ConnectException | MalformedURLException | NotRunningException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ @Override
+ public void notifyNeighborAdded(NodeDescriptor sender) {
+ try {
+ overlay.send(ServerHelloPacket.subject, new ServerHelloPacket(), sender);
+ } catch (IOException | NotRunningException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void notifyNeighborDead(NodeDescriptor sender) {
+ // Nothing to do
+ }
+
+ @Override
+ public void notifyNeighborRemoved(NodeDescriptor sender) {
+ // Nothing to do
+ }
+
+}
diff --git a/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java b/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java
new file mode 100644
index 0000000..45f4dbb
--- /dev/null
+++ b/Dream2/src/eval/java/dream/eval/utils/EvalUtils.java
@@ -0,0 +1,58 @@
+package dream.eval.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import dream.common.Consts;
+
+public class EvalUtils {
+
+ public static final void updateTraffic(Serializable pkt, String subject, final Map trafficPkts,
+ final Map trafficBytes) {
+ long currentPkts = trafficPkts.containsKey(subject) ? trafficPkts.get(subject) : 0;
+ trafficPkts.put(subject, currentPkts + 1);
+ long currentBytes = trafficBytes.containsKey(subject) ? trafficBytes.get(subject) : 0;
+ trafficBytes.put(subject, currentBytes + sizeof(pkt));
+ }
+
+ private static final int sizeof(Serializable pkt) {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ try {
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteOutputStream);
+ objectOutputStream.writeObject(pkt);
+ objectOutputStream.flush();
+ objectOutputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return byteOutputStream.toByteArray().length;
+ }
+
+ public static final void saveTrafficToFile(final Map trafficPkts,
+ final Map trafficBytes) {
+ try {
+ FileWriter writer = new FileWriter(Consts.trafficMeasurementFile, false);
+ for (String subject : trafficPkts.keySet()) {
+ writer.write(subject + "\t" + trafficPkts.get(subject) + "\t" + trafficBytes.get(subject) + "\n");
+ }
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void saveDelayToFile(int count, double sum) {
+ try {
+ FileWriter writer = new FileWriter(Consts.delayMeasurementFile, true);
+ writer.write(sum / count + "\n");
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java b/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java
index e490ae4..1c89ec3 100644
--- a/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java
+++ b/Dream2/src/examples/java/dream/examples/biginer/ConsumerApp.java
@@ -11,7 +11,7 @@ public class ConsumerApp {
public ConsumerApp() throws Exception {
// App will be running on host different from the producer
- Consts.hostName = "Host2";
+ Consts.setHostName("Host2");
// Register a Subscription
RemoteVar rv = new RemoteVar("Host1", "exVar");
@@ -20,7 +20,7 @@ public ConsumerApp() throws Exception {
// trigger appropriate action
Signal s = new Signal("s", () -> {
return rv.get() + "ABC";
- } , rv);
+ }, "", rv);
// Register a handler which will be executed upon receiving the signal
s.change().addHandler((oldVal, val) -> System.out.println("Signal1: " + val));
diff --git a/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java b/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java
index 16f743d..079868b 100644
--- a/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java
+++ b/Dream2/src/examples/java/dream/examples/biginer/ProducerApp.java
@@ -10,7 +10,7 @@ public class ProducerApp {
public ProducerApp() throws Exception {
// Mention the host(node) which is producing the value
- Consts.hostName = "Host1";
+ Consts.setHostName("Host1");
// myVar is created and registered as exVar for remote consumption.
// exVar is initialized to AAA
diff --git a/Dream2/src/examples/java/dream/examples/chat/core/Chat.java b/Dream2/src/examples/java/dream/examples/chat/core/Chat.java
index c80227e..20468bf 100644
--- a/Dream2/src/examples/java/dream/examples/chat/core/Chat.java
+++ b/Dream2/src/examples/java/dream/examples/chat/core/Chat.java
@@ -43,7 +43,7 @@ public Chat(String username, int window_x, int window_y) {
return new ArrayList();
else
return registeredClients.get();
- }, registeredClients);
+ }, new ArrayList(), registeredClients);
onlineList.change().addHandler((o, n) -> {
if (n.contains("toServerVar@" + username) && gui == null)
setup();
@@ -95,7 +95,7 @@ private void setup() {
return remote.get();
else
return "";
- }, remote);
+ }, "", remote);
fromServer.change().addHandler((oldValue, newValue) -> receivedServerMessage(newValue));
logger.fine("Setup: Starting GUI");
@@ -170,7 +170,7 @@ private void createConnection(int roomNumber, String roomName, String clientName
return r.get();
else
return "";
- }, r);
+ }, "", r);
s.change().addHandler((oldValue, newValue) -> receivedChatMessage(roomNumber, clientName, newValue));
}
diff --git a/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java b/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java
index 18a61f4..902b4cd 100644
--- a/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java
+++ b/Dream2/src/examples/java/dream/examples/chat/core/ChatServer.java
@@ -83,7 +83,7 @@ private void createNewSessionFor(String clientName, String clientVar) {
return var.get();
else
return "";
- }, var);
+ }, "", var);
sig.change().addHandler((oldValue, newValue) -> receivedMessage(clientName, newValue));
// add client as registered
diff --git a/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java b/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java
index 77823f6..4e96902 100644
--- a/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java
+++ b/Dream2/src/examples/java/dream/examples/financial/FinancialApp.java
@@ -36,9 +36,9 @@ public void start() {
f2 = new RemoteVar<>("Model2", "f2");
f3 = new RemoteVar<>("Model3", "f3");
- f1Signal = new Signal<>("f1Signal", () -> f1.get(), f1);
- f2Signal = new Signal<>("f2Signal", () -> f2.get(), f2);
- f3Signal = new Signal<>("f3Signal", () -> f3.get(), f3);
+ f1Signal = new Signal<>("f1Signal", () -> f1.get(), 1, f1);
+ f2Signal = new Signal<>("f2Signal", () -> f2.get(), 1, f2);
+ f3Signal = new Signal<>("f3Signal", () -> f3.get(), 1, f3);
f1Signal.change().addHandler(this);
f2Signal.change().addHandler(this);
diff --git a/Dream2/src/examples/java/dream/examples/financial/Model1.java b/Dream2/src/examples/java/dream/examples/financial/Model1.java
index 3d7e9c9..b4045d2 100644
--- a/Dream2/src/examples/java/dream/examples/financial/Model1.java
+++ b/Dream2/src/examples/java/dream/examples/financial/Model1.java
@@ -27,7 +27,7 @@ public void start() {
} else {
return marketIndex.get() * 2 + stockOpts.get();
}
- } , marketIndex, stockOpts);
+ }, 1, marketIndex, stockOpts);
f1.change().addHandler((oldVal, newVal) -> System.out.println("New value for f1: " + newVal));
}
diff --git a/Dream2/src/examples/java/dream/examples/financial/Model2.java b/Dream2/src/examples/java/dream/examples/financial/Model2.java
index a63825b..d23fb75 100644
--- a/Dream2/src/examples/java/dream/examples/financial/Model2.java
+++ b/Dream2/src/examples/java/dream/examples/financial/Model2.java
@@ -27,7 +27,7 @@ public void start() {
} else {
return marketIndex.get() + stockOpts.get() * 2;
}
- } , marketIndex, stockOpts);
+ }, 1, marketIndex, stockOpts);
f2.change().addHandler((oldVal, newVal) -> System.out.println("New value for f2: " + newVal));
}
diff --git a/Dream2/src/examples/java/dream/examples/financial/Model3.java b/Dream2/src/examples/java/dream/examples/financial/Model3.java
index f85bdce..8683335 100644
--- a/Dream2/src/examples/java/dream/examples/financial/Model3.java
+++ b/Dream2/src/examples/java/dream/examples/financial/Model3.java
@@ -28,7 +28,7 @@ public void start() {
} else {
return marketIndex.get() + news.get();
}
- } , marketIndex, news);
+ }, 1, marketIndex, news);
f3.change().addHandler((oldVal, newVal) -> System.out.println("New value for f3: " + newVal));
}
diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java
index df02d5b..095c2c9 100644
--- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java
+++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/CompleteGlitchFreeFormServer.java
@@ -13,35 +13,35 @@ protected void createDependencies() {
return working_hours.get() > required_hours.get().getFirst();
else
return false;
- }, working_hours, required_hours);
+ }, true, working_hours, required_hours);
final Signal maximumHours = new Signal<>(MaximumHours, () -> {
if (working_hours.get() != null && required_hours.get() != null)
return working_hours.get() < required_hours.get().getSecond();
else
return false;
- }, working_hours, required_hours);
+ }, true, working_hours, required_hours);
final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> {
if (euro_per_hour.get() != null)
return euro_per_hour.get() > 10;
else
return false;
- }, euro_per_hour);
+ }, true, euro_per_hour);
new Signal<>(SettingsOkay, () -> {
if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null)
return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get();
else
return false;
- }, minimumHours, maximumHours, minimumEuroPerHour);
+ }, true, minimumHours, maximumHours, minimumEuroPerHour);
new Signal<>(Salary, () -> {
if (working_hours.get() != null && euro_per_hour.get() != null)
return working_hours.get() * euro_per_hour.get();
else
return 0.0;
- }, working_hours, euro_per_hour);
+ }, true, working_hours, euro_per_hour);
logger.fine("Finished building Dependencies");
}
diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java
index efe026a..b8914e8 100644
--- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java
+++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormClient.java
@@ -43,14 +43,14 @@ protected void start() {
return salary.get();
else
return 0.0;
- }, salary);
+ }, 0.0, salary);
remoteSettings = new Signal<>("remoteSettings", () -> {
if (settings.get() != null)
return settings.get();
else
return false;
- }, settings);
+ }, false, settings);
gui.setText("Salary: ");
gui.setColor(Color.red);
diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java
index 3789a9a..075bf85 100644
--- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java
+++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/FormServer.java
@@ -61,35 +61,35 @@ protected void createDependencies() {
return working_hours.get() > required_hours.get().getFirst();
else
return false;
- }, working_hours, required_hours);
+ }, false, working_hours, required_hours);
final Signal maximumHours = new Signal<>(MaximumHours, () -> {
if (working_hours.get() != null && required_hours.get() != null)
return working_hours.get() < required_hours.get().getSecond();
else
return false;
- }, working_hours, required_hours);
+ }, false, working_hours, required_hours);
final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> {
if (euro_per_hour.get() != null)
return euro_per_hour.get() > 10;
else
return false;
- }, euro_per_hour);
+ }, false, euro_per_hour);
new Signal<>(SettingsOkay, () -> {
if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null)
return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get();
else
return false;
- }, minimumHours, maximumHours, minimumEuroPerHour);
+ }, false, minimumHours, maximumHours, minimumEuroPerHour);
new Signal<>(Salary, () -> {
if (working_hours.get() != null && euro_per_hour.get() != null)
return working_hours.get() * euro_per_hour.get();
else
return 0.0;
- }, working_hours, euro_per_hour);
+ }, 0.0, working_hours, euro_per_hour);
logger.fine("Finished building Dependencies");
}
diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java
index 7b88796..dc68b9a 100644
--- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java
+++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockClient.java
@@ -26,7 +26,7 @@ public LockClient(String name) {
return new ArrayList>();
else
return registeredClients.get();
- }, registeredClients);
+ }, new ArrayList>(), registeredClients);
s.change().addHandler((o, n) -> {
if (n.contains(new Pair<>(this.getHostName(), LockManager.VAR_requestLock)) && setup == false)
lockSetup();
@@ -42,7 +42,7 @@ private void lockSetup() {
RemoteVar lock = new RemoteVar<>(LockManager.NAME, LockManager.VAR_lock);
Signal sLock = new Signal<>("lock", () -> {
return lock.get();
- }, lock);
+ }, null, lock);
sLock.change().addHandler((oldValue, newValue) -> {
newValue.forEach((var, client) -> {
if (client.equals(getHostName()))
diff --git a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java
index d925fd0..73fc517 100644
--- a/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java
+++ b/Dream2/src/examples/java/dream/examples/form/complete_glitchfree/LockManager.java
@@ -60,7 +60,7 @@ private void createNewSessionFor(Variable x) {
return rv.get();
else
return null;
- }, rv);
+ }, null, rv);
s.change().addHandler((oldValue, newValue) -> {
if (newValue.isLockRequest()) {
// client requesting a lock
diff --git a/Dream2/src/examples/java/dream/examples/form/core/FormClient.java b/Dream2/src/examples/java/dream/examples/form/core/FormClient.java
index dd77f4d..3f2a831 100644
--- a/Dream2/src/examples/java/dream/examples/form/core/FormClient.java
+++ b/Dream2/src/examples/java/dream/examples/form/core/FormClient.java
@@ -44,14 +44,14 @@ protected void start() {
return salary.get();
else
return 0.0;
- }, salary);
+ }, 0.0, salary);
remoteSettings = new Signal<>("remoteSettings", () -> {
if (settings.get() != null)
return settings.get();
else
return false;
- }, settings);
+ }, false, settings);
gui.setText("Salary: ");
gui.setColor(Color.red);
diff --git a/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java b/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java
index 3a56c02..851e5b3 100644
--- a/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java
+++ b/Dream2/src/examples/java/dream/examples/form/simple/FormServer.java
@@ -48,35 +48,35 @@ protected void createDependencies() {
return working_hours.get() > 10;
else
return false;
- }, working_hours);
+ }, false, working_hours);
final Signal maximumHours = new Signal<>(MaximumHours, () -> {
if (working_hours.get() != null)
return working_hours.get() < 60;
else
return false;
- }, working_hours);
+ }, false, working_hours);
final Signal minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> {
if (euro_per_hour.get() != null)
return euro_per_hour.get() > 10;
else
return false;
- }, euro_per_hour);
+ }, false, euro_per_hour);
new Signal<>(SettingsOkay, () -> {
if (minimumHours.get() != null && maximumHours.get() != null && minimumEuroPerHour.get() != null)
return minimumHours.get() && maximumHours.get() && minimumEuroPerHour.get();
else
return false;
- }, minimumHours, maximumHours, minimumEuroPerHour);
+ }, false, minimumHours, maximumHours, minimumEuroPerHour);
new Signal<>(Salary, () -> {
if (working_hours.get() != null && euro_per_hour.get() != null)
return working_hours.get() * euro_per_hour.get();
else
return 0.0;
- }, working_hours, euro_per_hour);
+ }, 0.0, working_hours, euro_per_hour);
logger.fine("Finished building Dependencies");
}
diff --git a/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java b/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java
index d98caab..3edcb2c 100644
--- a/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java
+++ b/Dream2/src/examples/java/dream/examples/form/simple/GlitchFreeFormServer.java
@@ -23,15 +23,15 @@ protected void createDependencies() {
minimumEuroPerHour = new Signal<>(MinimumEuroPerHour, () -> {
return euro_per_hour.get() > 10;
- }, euro_per_hour);
+ }, false, euro_per_hour);
final Signal minimumHours = new Signal<>(MinimumHours, () -> {
return working_hours.get() > 10;
- }, working_hours);
+ }, false, working_hours);
final Signal maximumHours = new Signal<>(MaximumHours, () -> {
return working_hours.get() < 60;
- }, working_hours);
+ }, false, working_hours);
minimumEuroPerHour.change().addHandler((o, n) -> updateSettingsOkay());
@@ -50,7 +50,7 @@ protected void createDependencies() {
return working_hours.get() * euro_per_hour.get();
else
return 0.0;
- }, working_hours, euro_per_hour);
+ }, 0.0, working_hours, euro_per_hour);
logger.fine("Finished building Dependencies");
}
diff --git a/Dream2/src/examples/java/dream/examples/local/Example1.java b/Dream2/src/examples/java/dream/examples/local/Example1.java
index 03f0632..6cd4c7a 100755
--- a/Dream2/src/examples/java/dream/examples/local/Example1.java
+++ b/Dream2/src/examples/java/dream/examples/local/Example1.java
@@ -13,12 +13,12 @@ public static void main(String args[]) {
final Var varString2 = new Var<>("varString2", "");
final Signal signalInt = new Signal("signalInt",
- () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, varInt);
+ () -> 10 - 2 + (varInt.get() * 2 + varInt.get()) / 2, 1, varInt);
final Signal signalDouble = new Signal("signalDouble",
- () -> varDouble.get() + varDouble.get() * 2, varDouble);
- final Signal signalBool = new Signal("signalBool", () -> !varBool.get(), varBool);
+ () -> varDouble.get() + varDouble.get() * 2, 1.0, varDouble);
+ final Signal signalBool = new Signal("signalBool", () -> !varBool.get(), false, varBool);
final Signal signalString = new Signal("signalString",
- () -> varString1.get() + varString2.get(), varString1, varString2);
+ () -> varString1.get() + varString2.get(), "", varString1, varString2);
signalInt.change()
.addHandler((oldVal, val) -> System.out.println("signalInt: " + val + " (correct value: 158)"));
diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java b/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java
index 22292e2..9e0bf74 100644
--- a/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java
+++ b/Dream2/src/examples/java/dream/examples/local/ExampleFilter.java
@@ -7,9 +7,9 @@ public class ExampleFilter {
public static void main(String args[]) {
final Var varInt = new Var<>("varInt", 1);
- final Signal signalInt = new Signal<>("signalInt", () -> varInt.get() + 1,
+ final Signal signalInt = new Signal<>("signalInt", () -> varInt.get() + 1, 1,
varInt.filter(val -> val > 10));
- final Signal signalInt2 = new Signal<>("signalInt2", () -> signalInt.get() + 1,
+ final Signal signalInt2 = new Signal<>("signalInt2", () -> signalInt.get() + 1, 1,
signalInt.filter(val -> val > 20));
signalInt.change().addHandler((oldVal, val) -> System.out.println("SignalInt: " + val));
diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java b/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java
index 70fe13b..3aa0d27 100755
--- a/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java
+++ b/Dream2/src/examples/java/dream/examples/local/ExampleGlitch.java
@@ -13,10 +13,10 @@ public static void main(String args[]) {
public void launch() {
final Var var = new Var<>("var", 1.0);
- final Signal mid1 = new Signal<>("mid1", () -> var.get() * 2, var);
- final Signal mid2 = new Signal<>("mid2", () -> var.get() * 3, var);
+ final Signal mid1 = new Signal<>("mid1", () -> var.get() * 2, 1.0, var);
+ final Signal mid2 = new Signal<>("mid2", () -> var.get() * 3, 1.0, var);
- final Signal finalResult = new Signal<>("final", () -> mid1.get() + mid2.get(), mid1, mid2);
+ final Signal finalResult = new Signal<>("final", () -> mid1.get() + mid2.get(), 1.0, mid1, mid2);
finalResult.change().addHandler((oldVal, val) -> System.out.println(val));
try {
diff --git a/Dream2/src/examples/java/dream/examples/local/ExampleList.java b/Dream2/src/examples/java/dream/examples/local/ExampleList.java
index 4e8aa67..def5917 100755
--- a/Dream2/src/examples/java/dream/examples/local/ExampleList.java
+++ b/Dream2/src/examples/java/dream/examples/local/ExampleList.java
@@ -9,7 +9,8 @@ public class ExampleList {
public static void main(String args[]) {
final Var> varList = new Var<>("varList", new ArrayList());
- final Signal signalInt = new Signal("signalInt", () -> 1000 + varList.get().size(), varList);
+ final Signal signalInt = new Signal("signalInt", () -> 1000 + varList.get().size(), 1,
+ varList);
signalInt.change().addHandler((oldVal, val) -> System.out.println(val));
diff --git a/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java b/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java
index 9d3eeee..a8e6bdd 100755
--- a/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java
+++ b/Dream2/src/examples/java/dream/examples/remote/RemoteSignalExample.java
@@ -12,7 +12,7 @@
public class RemoteSignalExample {
public static void main(String args[]) {
- Consts.hostName = "Signal";
+ Consts.setHostName("Signal");
final DreamClient client = DreamClient.instance;
client.connect();
@@ -36,12 +36,12 @@ public static void main(String args[]) {
final RemoteVar> remoteList = new RemoteVar<>("Remote", "remoteList");
final Signal signal1 = new Signal("signal1",
- () -> remoteInt.get() + remoteString1.get().length(), remoteInt, remoteString1);
- final Signal signal2 = new Signal("signal2", () -> remoteInt.get(), remoteInt);
+ () -> remoteInt.get() + remoteString1.get().length(), 1, remoteInt, remoteString1);
+ final Signal signal2 = new Signal("signal2", () -> remoteInt.get(), 1, remoteInt);
final Signal signal3 = new Signal("signal3", () -> remoteString1.get() + remoteString2.get(),
- remoteString1, remoteString2);
+ "", remoteString1, remoteString2);
final Signal signal4 = new Signal("signal4",
- () -> remoteString1.get().length() + remoteList.get().size(), remoteString1, remoteList);
+ () -> remoteString1.get().length() + remoteList.get().size(), 1, remoteString1, remoteList);
signal1.change().addHandler((oldVal, val) -> System.out.println("Signal1: " + val));
signal2.change().addHandler((oldVal, val) -> System.out.println("Signal2: " + val));
diff --git a/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java b/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java
index 2facf23..80b0ca7 100755
--- a/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java
+++ b/Dream2/src/examples/java/dream/examples/remote/RemoteVarExample.java
@@ -9,7 +9,7 @@
public class RemoteVarExample {
public static void main(String args[]) {
- Consts.hostName = "Remote";
+ Consts.setHostName("Remote");
final Var remoteInt = new Var("remoteInt", 1);
final Var remoteString1 = new Var("remoteString1", "a");
final Var remoteString2 = new Var("remoteString2", "b");
diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java
index 7984933..e8333e4 100644
--- a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java
+++ b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockClient.java
@@ -24,7 +24,7 @@ public LockClient(String name) {
return new ArrayList>();
else
return registeredClients.get();
- }, registeredClients);
+ }, new ArrayList>(), registeredClients);
s.change().addHandler((o, n) -> {
if (n.contains(new Pair<>(this.getHostName(), LockManager.VAR_requestLock)) && setup == false)
lockSetup();
@@ -39,7 +39,7 @@ private void lockSetup() {
RemoteVar lock = new RemoteVar<>(LockManager.NAME, LockManager.VAR_lock);
Signal sLock = new Signal<>("lock", () -> {
return lock.get();
- }, lock);
+ }, "", lock);
sLock.change().addHandler((oldValue, newValue) -> {
if (newValue.equals(getHostName()))
hasLock = true;
diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java
index 835ec14..b807c25 100644
--- a/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java
+++ b/Dream2/src/examples/java/dream/examples/scrumBoard/atomic/LockManager.java
@@ -57,7 +57,7 @@ private void createNewSessionFor(Pair x) {
return rv.get();
else
return false;
- }, rv);
+ }, false, rv);
s.change().addHandler((oldValue, newValue) -> {
if (newValue) {
// client requesting a lock
diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java
index eb8f12c..92b621b 100644
--- a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java
+++ b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Monitor.java
@@ -36,12 +36,12 @@ public Monitor() {
devs = new RemoteVar(Server.NAME, Server.VAR_developers);
sigDevs = new Signal("sigDevs", () -> {
return devs.get();
- }, devs);
+ }, "", devs);
tasks = new RemoteVar(Server.NAME, Server.VAR_tasks);
sigTasks = new Signal("sigTests", () -> {
return tasks.get();
- }, tasks);
+ }, "", tasks);
sigDevs.change().addHandler((oldVa, newVal) -> {
System.out.println("newVal devs:" + newVal);
diff --git a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java
index 7db4274..25c190c 100644
--- a/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java
+++ b/Dream2/src/examples/java/dream/examples/scrumBoard/core/Server.java
@@ -14,8 +14,8 @@
/**
* Holds a list of tasks and a list of developers each indicated by a simple
- * integer. Searches for new clients (TaskCreater) and registers to their
- * "task creation channels"
+ * integer. Searches for new clients (TaskCreater) and registers to their "task
+ * creation channels"
*
* @author Min Yang
* @author Tobias Becker
@@ -78,7 +78,7 @@ private void initDependencies() {
temp.addAll(rv2.get());
Collections.sort(temp);
return temp.stream().map(a -> a.getDevString()).collect(Collectors.joining(":"));
- }, rv1, rv2);
+ }, "", rv1, rv2);
Signal tasks = new Signal(VAR_tasks, () -> {
LinkedList temp = new LinkedList<>();
@@ -88,7 +88,7 @@ private void initDependencies() {
temp.addAll(rv2.get());
Collections.sort(temp);
return temp.stream().map(a -> a.getTaskString()).collect(Collectors.joining(":"));
- }, rv1, rv2);
+ }, "", rv1, rv2);
}
diff --git a/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java b/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java
index ddab317..db3fc94 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/DeligatProcess.java
@@ -49,13 +49,13 @@ public DeligatProcess() {
}
private void init() {
- Consts.hostName = "Host2";
+ Consts.setHostName("Host2");
RemoteVar rv = new RemoteVar("Host1", "TASK");
Var myVar = new Var("TASK_ASSIGNED", null);
Signal s = new Signal("s", () -> {
return rv.get();
- } , rv);
+ }, new Message(), rv);
// Register a handler which will be executed upon receiving the signal
s.change().addHandler((oldVal, val) -> {
diff --git a/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java b/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java
index d6b3c98..c9e8232 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/MasterProcess.java
@@ -14,7 +14,7 @@
public class MasterProcess {
private void init() {
- Consts.hostName = "Host1";
+ Consts.setHostName("Host1");
VectorClock vectorClock = new VectorClock("p1");
Var initTask = new Var("TASK", null);
diff --git a/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java b/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java
index 3d6d2de..003cdb0 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/WorkerHelper.java
@@ -63,7 +63,7 @@ public void isEvent(String val) {
public void run() {
localClock = new VectorClockHelper("p3", this);
- Consts.hostName = "Host3";
+ Consts.setHostName("Host3");
Thread t = new Thread(localClock);
t.start();
RemoteVar task = new RemoteVar("Host1", "TASK2");
@@ -71,10 +71,10 @@ public void run() {
Signal signalFromMaster = new Signal("s", () -> {
return task.get();
- } , task);
+ }, new Message(), task);
Signal signalFromDeligator = new Signal("s1", () -> {
return taskDeligated.get();
- } , taskDeligated);
+ }, new Message(), taskDeligated);
// Register a handler which will be executed upon receiving the signal
// from master process
diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java
index a2c7610..1e951ce 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/DeligatProcess.java
@@ -49,13 +49,13 @@ public DeligatProcess() {
}
private void init() {
- Consts.hostName = "Host2";
+ Consts.setHostName("Host2");
RemoteVar rv = new RemoteVar("Host1", "TASK");
Var myVar = new Var("TASK_ASSIGNED", null);
Signal s = new Signal("s", () -> {
return rv.get();
- } , rv);
+ }, new Message(), rv);
// Register a handler which will be executed upon receiving the signal
s.change().addHandler((oldVal, val) -> {
diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java
index 3988855..2c35ef6 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/MasterProcess.java
@@ -14,7 +14,7 @@
public class MasterProcess {
private void init() {
- Consts.hostName = "Host1";
+ Consts.setHostName("Host1");
VectorClock vectorClock = new VectorClock("p1");
Var initTask = new Var("TASK", null);
diff --git a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java
index b78a7a2..7b67332 100644
--- a/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java
+++ b/Dream2/src/examples/java/dream/examples/tasks/inconsistent/WorkerHelper.java
@@ -55,18 +55,17 @@ public void isEvent(String val) {
}
public void run() {
-
- Consts.hostName = "Host3";
+ Consts.setHostName("Host3");
RemoteVar task = new RemoteVar("Host1", "TASK2");
RemoteVar taskDeligated = new RemoteVar("Host2", "TASK_ASSIGNED");
Signal signalFromMaster = new Signal("s", () -> {
return task.get();
- } , task);
+ }, new Message(), task);
Signal signalFromDeligator = new Signal("s1", () -> {
return taskDeligated.get();
- } , taskDeligated);
+ }, new Message(), taskDeligated);
// Register a handler which will be executed upon receiving the signal
// from master process
diff --git a/Dream2/src/examples/java/dream/examples/util/Client.java b/Dream2/src/examples/java/dream/examples/util/Client.java
index 4dfcb4d..53b42b4 100644
--- a/Dream2/src/examples/java/dream/examples/util/Client.java
+++ b/Dream2/src/examples/java/dream/examples/util/Client.java
@@ -38,7 +38,7 @@ public Client(String name) {
startDream();
// set hostName
- Consts.hostName = name;
+ Consts.setHostName(name);
// connect to the dependency graph
DreamClient.instance.connect();
@@ -131,7 +131,7 @@ private static final void startLockManager() throws InterruptedException {
}
public String getHostName() {
- return Consts.hostName;
+ return Consts.getHostName();
}
private LinkedList lockQueue = new LinkedList<>();
diff --git a/Dream2/src/main/java/dream/client/ClientEventForwarder.java b/Dream2/src/main/java/dream/client/ClientEventForwarder.java
index 8ab0cfd..3bc7eea 100755
--- a/Dream2/src/main/java/dream/client/ClientEventForwarder.java
+++ b/Dream2/src/main/java/dream/client/ClientEventForwarder.java
@@ -28,6 +28,7 @@
import dream.common.utils.FinalNodesDetector;
import dream.common.utils.InterSourceDependencyDetector;
import dream.common.utils.IntraSourceDependencyDetector;
+import dream.eval.utils.EvalUtils;
import polimi.reds.NodeDescriptor;
import polimi.reds.broker.routing.Outbox;
import polimi.reds.broker.routing.PacketForwarder;
@@ -49,6 +50,9 @@ class ClientEventForwarder implements PacketForwarder {
: new CompleteGlitchFreeDependencyDetector();
private final FinalNodesDetector finalNodesDetector = new FinalNodesDetector();
+ private final Map trafficPkts = new HashMap<>();
+ private final Map trafficBytes = new HashMap<>();
+
// Lock applicants waiting for a grant
private final Map lockApplicants = new HashMap<>();
@@ -85,6 +89,10 @@ private ClientEventForwarder() {
public synchronized Collection forwardPacket(String subject, NodeDescriptor sender,
Serializable packet, Collection neighbors, Outbox outbox) {
final Collection result = new ArrayList();
+ if (Consts.enableEvaluation) {
+ EvalUtils.updateTraffic(packet, subject, trafficPkts, trafficBytes);
+ EvalUtils.saveTrafficToFile(trafficPkts, trafficBytes);
+ }
if (subject.equals(AdvertisementPacket.subject)) {
assert packet instanceof AdvertisementPacket;
logger.finer("Received an advertisement packet " + packet);
@@ -107,7 +115,7 @@ public synchronized Collection forwardPacket(String subject, Nod
return result;
}
- final synchronized void sendEvent(UUID id, Event> ev, String initialVar) {
+ final synchronized void sendEvent(UUID id, Event> ev, String initialVar, long timestamp) {
logger.finer("Sending an event " + ev);
Set lockReleaseNodes;
switch (Consts.consistencyType) {
@@ -122,7 +130,7 @@ final synchronized void sendEvent(UUID id, Event> ev, String initialVar) {
}
if (subTable.needsToDeliverToServer(ev)) {
- connectionManager.sendEvent(id, ev, initialVar, lockReleaseNodes);
+ connectionManager.sendEvent(id, ev, initialVar, lockReleaseNodes, timestamp);
}
}
@@ -253,12 +261,16 @@ final synchronized void removeSubscription(Subscriber subscriber, Subscription
}
}
+ final synchronized boolean isConnected() {
+ return connectionManager.isConnected();
+ }
+
private final boolean needToSendToServer(Subscription> sub) {
return !isLocal(sub);
}
private final boolean isLocal(Subscription> sub) {
- return sub.getHostId().equals(Consts.hostName);
+ return sub.getHostId().equals(Consts.getHostName());
}
private final void processEventFromServer(EventPacket evPkt) {
@@ -272,26 +284,26 @@ private final void processEventFromServer(EventPacket evPkt) {
}
private final void processAdvertisementFromServer(AdvertisementPacket advPkt) {
+ final Set> subs = advPkt.getSubscriptions();
+ switch (advPkt.getAdvType()) {
+ case ADV:
+ if (subs.isEmpty()) {
+ dependencyGraph.processAdv(advPkt.getAdvertisement());
+ } else {
+ dependencyGraph.processAdv(advPkt.getAdvertisement(), subs);
+ }
+ break;
+ case UNADV:
+ if (subs.isEmpty()) {
+ dependencyGraph.processUnAdv(advPkt.getAdvertisement());
+ } else {
+ dependencyGraph.processUnAdv(advPkt.getAdvertisement(), subs);
+ }
+ break;
+ }
if (Consts.consistencyType == ConsistencyType.SINGLE_SOURCE_GLITCH_FREE || //
Consts.consistencyType == ConsistencyType.COMPLETE_GLITCH_FREE || //
Consts.consistencyType == ConsistencyType.ATOMIC) {
- final Set> subs = advPkt.getSubscriptions();
- switch (advPkt.getAdvType()) {
- case ADV:
- if (subs.isEmpty()) {
- dependencyGraph.processAdv(advPkt.getAdvertisement());
- } else {
- dependencyGraph.processAdv(advPkt.getAdvertisement(), subs);
- }
- break;
- case UNADV:
- if (subs.isEmpty()) {
- dependencyGraph.processUnAdv(advPkt.getAdvertisement());
- } else {
- dependencyGraph.processUnAdv(advPkt.getAdvertisement(), subs);
- }
- break;
- }
updateDetectors();
}
}
diff --git a/Dream2/src/main/java/dream/client/ConnectionManager.java b/Dream2/src/main/java/dream/client/ConnectionManager.java
index a237bb4..9206d1b 100755
--- a/Dream2/src/main/java/dream/client/ConnectionManager.java
+++ b/Dream2/src/main/java/dream/client/ConnectionManager.java
@@ -78,9 +78,9 @@ final NodeDescriptor getNodeDescriptor() {
return overlay.getNodeDescriptor();
}
- final void sendEvent(UUID id, Event extends Serializable> event, String initialVar,
- Set lockReleaseNodes) {
- final EventPacket pkt = new EventPacket(event, id, initialVar);
+ final void sendEvent(UUID id, Event extends Serializable> event, String initialVar, Set lockReleaseNodes,
+ long timestamp) {
+ final EventPacket pkt = new EventPacket(event, id, initialVar, timestamp);
pkt.setLockReleaseNodes(lockReleaseNodes);
sendToServer(EventPacket.subject, pkt);
}
@@ -177,6 +177,13 @@ public final Collection forwardPacket(String subject, NodeDescri
return new ArrayList<>();
}
+ final boolean isConnected() {
+ return server != null && //
+ (lockManager != null || //
+ (Consts.consistencyType != ConsistencyType.COMPLETE_GLITCH_FREE && //
+ Consts.consistencyType != ConsistencyType.ATOMIC));
+ }
+
private class PacketSubjectPair {
private final String subject;
private final Serializable packet;
diff --git a/Dream2/src/main/java/dream/client/DreamClient.java b/Dream2/src/main/java/dream/client/DreamClient.java
index fe8536f..673a1d3 100644
--- a/Dream2/src/main/java/dream/client/DreamClient.java
+++ b/Dream2/src/main/java/dream/client/DreamClient.java
@@ -54,6 +54,10 @@ public final void unlock(LockToken token) {
}
}
+ public final boolean isConnected() {
+ return eventForwarder.isConnected();
+ }
+
private class Lock {
private boolean unlocked = false;
diff --git a/Dream2/src/main/java/dream/client/RemoteVar.java b/Dream2/src/main/java/dream/client/RemoteVar.java
index 6e87851..e1f89ea 100755
--- a/Dream2/src/main/java/dream/client/RemoteVar.java
+++ b/Dream2/src/main/java/dream/client/RemoteVar.java
@@ -42,7 +42,7 @@ public RemoteVar(String host, String object, List> cons
}
public RemoteVar(String object, List> constraints) {
- this(Consts.hostName, object, constraints);
+ this(Consts.getHostName(), object, constraints);
}
public RemoteVar(String host, String object) {
@@ -50,7 +50,7 @@ public RemoteVar(String host, String object) {
}
public RemoteVar(String object) {
- this(Consts.hostName, object);
+ this(Consts.getHostName(), object);
}
public final synchronized T get() {
@@ -85,9 +85,10 @@ private void processNextEvent() {
private final void sendEventPacketToListeners(EventPacket evPkt) {
if (!consumers.isEmpty()) {
final Set satConsumers = //
- consumers.entrySet().stream().filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))//
- .map(e -> e.getKey())//
- .collect(Collectors.toSet());
+ consumers.entrySet().stream()
+ .filter(e -> e.getValue().stream().allMatch(constr -> constr.test(val)))//
+ .map(e -> e.getKey())//
+ .collect(Collectors.toSet());
pendingAcks = satConsumers.size();
satConsumers.forEach(c -> c.updateFromProducer(evPkt, this));
diff --git a/Dream2/src/main/java/dream/client/Signal.java b/Dream2/src/main/java/dream/client/Signal.java
index 02f7618..c023a28 100755
--- a/Dream2/src/main/java/dream/client/Signal.java
+++ b/Dream2/src/main/java/dream/client/Signal.java
@@ -21,6 +21,7 @@
import dream.common.packets.content.Advertisement;
import dream.common.packets.content.Event;
import dream.common.packets.content.Subscription;
+import dream.eval.utils.EvalUtils;
public class Signal implements TimeChangingValue, UpdateProducer, UpdateConsumer {
@@ -41,12 +42,17 @@ public class Signal implements TimeChangingValue, Upd
private T val;
+ // Variables for delay measurement
+ int updateCount = 0;
+ double sumOfDelays = 0;
+
private final Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
@SuppressWarnings({ "unchecked", "rawtypes" })
- public Signal(String object, Supplier evaluation, UpdateProducer>... prods) {
- this.host = Consts.hostName;
+ public Signal(String object, Supplier evaluation, T defaultVal, UpdateProducer>... prods) {
+ this.host = Consts.getHostName();
this.object = object;
+ this.val = defaultVal;
this.evaluation = evaluation;
final Set> subs = new HashSet<>();
@@ -56,13 +62,33 @@ public Signal(String object, Supplier evaluation, UpdateProducer>... prods)
}
clientEventForwarder = ClientEventForwarder.get();
- clientEventForwarder.advertise(new Advertisement(Consts.hostName, object), subs, true);
+ clientEventForwarder.advertise(new Advertisement(Consts.getHostName(), object), subs, true);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Signal(String object, Supplier evaluation, T defaultVal, List> prods) {
+ this.host = Consts.getHostName();
+ this.object = object;
+ this.val = defaultVal;
+ this.evaluation = evaluation;
+
+ final Set> subs = new HashSet<>();
+ for (final UpdateProducer prod : prods) {
+ prod.registerUpdateConsumer(this, prod.getConstraints());
+ subs.add(new Subscription(prod.getHost(), prod.getObject(), prod.getConstraints()));
+ }
+
+ clientEventForwarder = ClientEventForwarder.get();
+ clientEventForwarder.advertise(new Advertisement(Consts.getHostName(), object), subs, true);
}
private final synchronized void processNextUpdate() {
if (pendingAcks == 0) {
// Notify that the previous update has finished
if (!waitingProducers.isEmpty()) {
+ // Set> cache = new
+ // HashSet<>(waitingProducers);
+ // cache.forEach(prod -> prod.notifyUpdateFinished());
waitingProducers.forEach(prod -> prod.notifyUpdateFinished());
waitingProducers.clear();
}
@@ -75,7 +101,8 @@ private final synchronized void processNextUpdate() {
private final void processUpdate(EventProducerPair update) {
logger.finest("processTask method invoked with " + update);
- final List pairs = queueManager.processEventPacket(update, object + "@" + Consts.hostName);
+ final List pairs = queueManager.processEventPacket(update,
+ object + "@" + Consts.getHostName());
logger.finest("The queueManager returned the following pairs " + pairs);
if (!pairs.isEmpty()) {
@@ -83,29 +110,48 @@ private final void processUpdate(EventProducerPair update) {
// Extract information from any of the packets
final EventPacket anyPkt = pairs.stream().findAny().get().getEventPacket();
+ // Extract the oldest timestamp
+ long timestamp = pairs.stream().min((a, b) -> {
+ final long aTime = a.getEventPacket().getGenerationTime();
+ final long bTime = b.getEventPacket().getGenerationTime();
+ if (aTime == bTime) {
+ return 0;
+ } else if (aTime < bTime) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }).get().getEventPacket().getGenerationTime();
+
// Compute the new value
try {
val = evaluate();
+ if (Consts.enableEvaluation) {
+ updateCount++;
+ double delay = (System.nanoTime() - timestamp) / 1000000.0;
+ sumOfDelays += delay;
+ EvalUtils.saveDelayToFile(updateCount, sumOfDelays);
+ }
logger.finest("New value computed for the reactive object: " + val);
} catch (final Exception e) {
- logger.log(Level.INFO,
- "Exception during the evaluation of the expression. Acknowledging the producers, releasing the locks, and returning.",
- e);
- pairs.forEach(pair -> pair.getUpdateProducer().notifyUpdateFinished());
- // Release locks, if needed
- if ((Consts.consistencyType == ConsistencyType.COMPLETE_GLITCH_FREE || //
- Consts.consistencyType == ConsistencyType.ATOMIC) && //
- anyPkt.getLockReleaseNodes().contains(object + "@" + host)) {
- clientEventForwarder.sendLockRelease(anyPkt.getId());
- }
- return;
+ logger.log(Level.INFO, "Exception during the evaluation of the expression.", e);
+ // pairs.forEach(pair ->
+ // pair.getUpdateProducer().notifyUpdateFinished());
+ // // Release locks, if needed
+ // if ((Consts.consistencyType ==
+ // ConsistencyType.COMPLETE_GLITCH_FREE || //
+ // Consts.consistencyType == ConsistencyType.ATOMIC) && //
+ // anyPkt.getLockReleaseNodes().contains(object + "@" + host)) {
+ // clientEventForwarder.sendLockRelease(anyPkt.getId());
+ // }
+ // return;
}
// Notify local and remote dependent objects
logger.finest("Sending event to dependent objects.");
- final Event event = new Event(Consts.hostName, object, val);
+ final Event event = new Event(Consts.getHostName(), object, val);
// Notify remote subscribers
- clientEventForwarder.sendEvent(anyPkt.getId(), event, anyPkt.getSource());
+ clientEventForwarder.sendEvent(anyPkt.getId(), event, anyPkt.getSource(), timestamp);
final Set satConsumers = //
consumers.entrySet().stream()
@@ -115,7 +161,7 @@ private final void processUpdate(EventProducerPair update) {
// Notify local subscribers
if (!satConsumers.isEmpty()) {
pairs.forEach(pair -> waitingProducers.add(pair.getUpdateProducer()));
- final EventPacket newEvPkt = new EventPacket(event, anyPkt.getId(), anyPkt.getSource());
+ final EventPacket newEvPkt = new EventPacket(event, anyPkt.getId(), anyPkt.getSource(), timestamp);
newEvPkt.setLockReleaseNodes(anyPkt.getLockReleaseNodes());
pendingAcks = satConsumers.size();
satConsumers.forEach(c -> c.updateFromProducer(newEvPkt, this));
@@ -157,9 +203,9 @@ public UpdateProducer filter(SerializablePredicate constraint) {
public final synchronized void updateFromProducer(EventPacket packet, UpdateProducer> producer) {
final EventProducerPair pair = new EventProducerPair(packet, producer);
eventQueue.add(pair);
- logger.finest("Method update called for event " + pair + ". Added to the queue1.");
+ logger.finest("Method update called for event " + pair + ". Added to the queue.");
if (eventQueue.size() == 1) {
- logger.finest("The element is the only one in the queue1. Let's process it.");
+ logger.finest("The element is the only one in the queue. Let's process it.");
processNextUpdate();
}
}
diff --git a/Dream2/src/main/java/dream/client/Var.java b/Dream2/src/main/java/dream/client/Var.java
index 7f840d7..9d9a691 100755
--- a/Dream2/src/main/java/dream/client/Var.java
+++ b/Dream2/src/main/java/dream/client/Var.java
@@ -29,29 +29,31 @@ public class Var implements UpdateProducer, LockAppli
private final List> constraints = new ArrayList<>();
private final Map>> consumers = new HashMap<>();
- private final Queue